Wan2.1-UMT5模型服务化使用RESTful API对外提供视频生成能力你是不是已经用Wan2.1-UMT5的WebUI界面玩得不亦乐乎了自己输入描述看着它生成一段段有趣的视频确实很有成就感。但有没有想过如果能把这种能力开放出去让其他程序、网站或者移动应用也能调用它来生成视频那会是什么场景比如你的内容创作平台想给用户提供“文字变视频”的快捷功能或者你的电商后台需要批量生成商品展示短视频。每次都让用户或运营人员手动打开WebUI去操作显然不现实。这时候就需要一个标准的、程序能直接调用的接口——也就是我们常说的API。这篇文章我就来手把手带你把那个部署好的Wan2.1-UMT5 WebUI包装成一个专业的RESTful API服务。我们会从最基础的接口设计开始一步步实现任务提交、结果查询再加上必不可少的API密钥认证和访问频率控制最后还会教你如何生成一份机器和人都能看懂的API文档。学完这篇你就能让任何应用都轻松拥有视频生成的能力了。1. 准备工作与环境确认在开始动手写代码之前我们得先确保“地基”是稳固的。这里的地基就是你之前已经部署好的Wan2.1-UMT5 WebUI服务。首先打开你的终端或命令行确认你的WebUI服务正在健康运行。通常你启动它的命令可能是这样的python webui.py --port 7860你应该能在终端看到服务正常启动的日志没有报错。然后打开浏览器访问http://你的服务器IP:7860如果是本地就是http://localhost:7860。如果熟悉的WebUI界面能正常加载并且你可以成功用它生成一段视频那么恭喜你准备工作就完成了一大半。接下来我们需要思考API层和WebUI层的关系。我们不打算直接修改WebUI本身的代码那样可能会引入不必要的复杂性。一个更清晰、更稳定的架构是构建一个全新的API服务这个服务作为“中间人”或“代理”。当它收到外部的API请求时它再去“模拟”用户操作调用后端的WebUI服务来完成视频生成任务。这种解耦的设计让API服务的迭代和WebUI的升级可以互不影响。为了实现这个架构我们需要安装几个关键的Python库。新建一个干净的目录作为你的API项目文件夹然后在里面创建一个requirements.txt文件内容如下fastapi0.104.1 uvicorn[standard]0.24.0 pydantic2.5.0 requests2.31.0 python-jose[cryptography]3.3.0 passlib[bcrypt]1.7.4 slowapi0.1.8 python-multipart0.0.6这些库各自扮演着重要角色FastAPI和Uvicorn是我们构建现代、高性能API的框架和服务器。Pydantic用于数据验证和设置管理确保接口传入传出的数据格式都是正确的。Requests让我们的API服务能够去调用后端的WebUI服务。python-jose和passlib用来实现JWTJSON Web Token令牌认证管理API密钥。slowapi帮我们轻松实现接口限流防止被过度调用。python-multipart处理文件上传虽然本文主要讲文本生成视频但为扩展预留。在项目目录下运行pip install -r requirements.txt来安装它们。环境准备好我们就可以开始设计最核心的接口了。2. 核心接口设计与实现异步任务处理视频生成是个耗时的过程不可能让调用方一直等着。所以我们采用“异步任务”的模式。这就像你去打印店打印一份厚文件店员不会让你干等着而是给你一个取件号你可以先去忙别的过会儿再来凭号取件。我们的API也将遵循这个模式设计两个核心接口POST /api/v1/generate提交一个视频生成任务。接口立即返回一个唯一的task_id而不是视频本身。GET /api/v1/result/{task_id}通过任务ID来查询这个任务的执行状态和结果。2.1 定义数据模型与任务状态机我们先在项目里创建一个models.py文件用Pydantic来定义清晰的数据结构这能让代码更健壮FastAPI还能自动基于它生成文档。from pydantic import BaseModel, Field from typing import Optional, Literal from enum import Enum class TaskStatus(str, Enum): 任务状态枚举 PENDING pending # 排队中 PROCESSING processing # 生成中 SUCCESS success # 成功 FAILED failed # 失败 class VideoGenerateRequest(BaseModel): 视频生成请求体 prompt: str Field(..., min_length5, max_length500, description视频描述文本至少5个字符) negative_prompt: Optional[str] Field(None, description不希望出现在视频中的内容描述) duration: int Field(default5, ge2, le30, description视频时长秒范围2-30) # 这里可以添加更多Wan2.1-UMT5支持的参数如尺寸、帧率等 # width: int Field(default512, ge256, le1024) # height: int Field(default512, ge256, le1024) class Config: schema_extra { example: { prompt: 一只可爱的猫咪在草地上追逐蝴蝶阳光明媚, negative_prompt: 模糊丑陋多只猫, duration: 8 } } class TaskResponse(BaseModel): 任务提交响应 task_id: str status: TaskStatus message: str 任务已提交请使用task_id查询结果 estimated_wait_time: Optional[int] Field(None, description预计等待时间秒) class TaskResultResponse(BaseModel): 任务结果查询响应 task_id: str status: TaskStatus message: Optional[str] None video_url: Optional[str] Field(None, description视频文件访问URL仅当status为success时存在) error_detail: Optional[str] Field(None, description如果失败错误详情) created_at: str finished_at: Optional[str] None2.2 实现任务队列与处理器由于可能有多个任务同时提交我们需要一个简单的内存队列来管理它们对于生产环境你可能需要考虑使用Redis或RabbitMQ这样的专业队列。同时我们需要一个“工人”在后台不断地从队列中取任务并调用真正的WebUI服务。创建一个task_manager.py文件import asyncio import uuid import time import logging from typing import Dict from models import TaskStatus, VideoGenerateRequest, TaskResultResponse import requests import json logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) class TaskManager: def __init__(self, webui_base_url: str http://localhost:7860): self.webui_base_url webui_base_url.rstrip(/) self.tasks: Dict[str, dict] {} # 内存中存储任务状态和结果 self.task_queue asyncio.Queue() self._stop_event asyncio.Event() # 启动后台任务处理器 asyncio.create_task(self._process_task_queue()) async def submit_task(self, request: VideoGenerateRequest) - str: 提交一个新任务返回task_id task_id str(uuid.uuid4()) now time.strftime(%Y-%m-%d %H:%M:%S) self.tasks[task_id] { status: TaskStatus.PENDING, request: request.dict(), created_at: now, finished_at: None, result: None, error: None } # 将任务放入队列 await self.task_queue.put(task_id) logger.info(f任务 {task_id} 已提交并进入队列。) return task_id async def _process_task_queue(self): 后台任务处理循环 while not self._stop_event.is_set(): try: task_id await asyncio.wait_for(self.task_queue.get(), timeout1.0) await self._execute_single_task(task_id) except asyncio.TimeoutError: continue except Exception as e: logger.error(f任务处理器发生错误: {e}) async def _execute_single_task(self, task_id: str): 执行单个任务调用WebUI API生成视频 task_info self.tasks.get(task_id) if not task_info: return task_info[status] TaskStatus.PROCESSING request_data task_info[request] try: logger.info(f开始处理任务 {task_id}: {request_data.get(prompt)[:50]}...) # 这里是关键模拟WebUI的调用。 # 你需要根据Wan2.1-UMT5 WebUI实际提供的内部API或自动化方式来调用。 # 这里是一个假设的示例实际情况可能需要分析WebUI的网络请求。 # 方法一如果WebUI有内置的API如--api启动参数 # api_url f{self.webui_base_url}/run/predict # payload { # data: [ # request_data.get(prompt), # request_data.get(negative_prompt, ), # request_data.get(duration, 5) # ] # } # response requests.post(api_url, jsonpayload) # 方法二更通用的使用requests模拟表单提交如果WebUI是Gradio # 你需要找到Gradio应用对应的API端点通常是 /api/predict/ payload { data: json.dumps([ request_data.get(prompt), request_data.get(negative_prompt, ), request_data.get(duration, 5) ]) } # 注意Gradio的API调用可能需要session或特定的headers请根据实际情况调整。 response requests.post(f{self.webui_base_url}/api/predict/, datapayload, timeout300) # 设置长超时 if response.status_code 200: result response.json() # 假设返回的data里包含视频文件路径或Base64数据 # 这里需要你解析Wan2.1-UMT5的实际返回格式 # 例如它可能返回一个临时文件URL或Base64字符串 video_data result.get(data, [])[0] if result.get(data) else None if video_data and isinstance(video_data, str) and video_data.startswith(‘http’): video_url video_data else: # 如果不是URL你可能需要将Base64数据保存为文件并生成一个可访问的URL # 这里简化处理假设我们有一个文件服务地址 video_url f/api/v1/videos/{task_id}.mp4 # 实际应将视频文件保存到磁盘并记录路径 task_info[status] TaskStatus.SUCCESS task_info[result] {video_url: video_url} task_info[finished_at] time.strftime(%Y-%m-%d %H:%M:%S) logger.info(f任务 {task_id} 处理成功。) else: raise Exception(fWebUI调用失败状态码: {response.status_code}, 响应: {response.text}) except Exception as e: logger.error(f处理任务 {task_id} 时出错: {e}) task_info[status] TaskStatus.FAILED task_info[error] str(e) task_info[finished_at] time.strftime(%Y-%m-%d %H:%M:%S) def get_task_result(self, task_id: str) - Optional[TaskResultResponse]: 获取任务结果 task_info self.tasks.get(task_id) if not task_info: return None return TaskResultResponse( task_idtask_id, statustask_info[status], message任务已完成 if task_info[status] in [TaskStatus.SUCCESS, TaskStatus.FAILED] else 任务处理中, video_urltask_info.get(result, {}).get(video_url) if task_info[status] TaskStatus.SUCCESS else None, error_detailtask_info.get(error), created_attask_info[created_at], finished_attask_info[finished_at] ) async def stop(self): 停止任务处理器 self._stop_event.set() # 全局任务管理器实例 task_manager TaskManager()重要提示上面代码中_execute_single_task方法里调用WebUI的部分是关键也是最需要你根据Wan2.1-UMT5 WebUI的实际接口进行调整的地方。你可能需要查阅Wan2.1-UMT5的文档看是否提供了直接的API。打开浏览器开发者工具F12在WebUI界面上操作一次生成观察它向服务器发送了什么样的网络请求然后模仿这个请求。2.3 构建FastAPI主应用与核心接口现在让我们创建主应用文件main.py把刚才设计的接口实现出来。from fastapi import FastAPI, HTTPException, Depends, status from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager import asyncio from models import VideoGenerateRequest, TaskResponse, TaskResultResponse, TaskStatus from task_manager import task_manager import time # 应用生命周期管理 asynccontextmanager async def lifespan(app: FastAPI): # 启动时 print(API服务启动...) yield # 关闭时 print(API服务关闭清理任务队列...) await task_manager.stop() await asyncio.sleep(1) # 等待任务处理器结束 app FastAPI( titleWan2.1-UMT5 视频生成API服务, description提供异步视频生成能力基于Wan2.1-UMT5模型。, version1.0.0, lifespanlifespan ) # 添加CORS中间件允许前端应用调用 app.add_middleware( CORSMiddleware, allow_origins[*], # 生产环境应指定具体域名 allow_credentialsTrue, allow_methods[*], allow_headers[*], ) app.post(/api/v1/generate, response_modelTaskResponse, status_codestatus.HTTP_202_ACCEPTED, # 202表示已接受处理 summary提交视频生成任务, tags[视频生成]) async def submit_generation_task(request: VideoGenerateRequest): 提交一个视频生成任务。 由于视频生成需要时间此接口会立即返回一个任务ID (task_id)。 你需要使用这个task_id去查询任务状态和获取结果。 try: task_id await task_manager.submit_task(request) # 简单估算等待时间例如队列中任务数 * 平均处理时间 queue_size task_manager.task_queue.qsize() estimated_wait queue_size * 30 # 假设每个任务平均30秒 return TaskResponse( task_idtask_id, statusTaskStatus.PENDING, message视频生成任务已加入处理队列。, estimated_wait_timeestimated_wait if estimated_wait 0 else None ) except Exception as e: raise HTTPException(status_code500, detailf提交任务失败: {str(e)}) app.get(/api/v1/result/{task_id}, response_modelTaskResultResponse, summary查询任务结果, tags[任务查询]) async def get_task_result(task_id: str): 根据任务ID查询视频生成任务的状态和结果。 - **task_id**: 提交任务时返回的唯一任务标识符。 result task_manager.get_task_result(task_id) if not result: raise HTTPException(status_code404, detail任务ID不存在) return result if __name__ __main__: import uvicorn uvicorn.run(app, host0.0.0.0, port8000)现在你可以运行python main.py启动你的API服务了默认在8000端口。用工具如curl或Postman测试一下提交任务curl -X POST http://localhost:8000/api/v1/generate \ -H Content-Type: application/json \ -d { prompt: 星空下的宁静湖泊有流星划过, duration: 10 }你会得到一个包含task_id的响应。查询结果curl http://localhost:8000/api/v1/result/刚才得到的task_id根据任务状态你会看到pending,processing,success或failed的信息。核心功能已经跑通了但一个真正能对外提供的服务还需要安全和控制措施。3. 加固服务认证、限流与文件服务任何人都能随意调用你的API生成视频可能会产生不必要的成本和安全风险。我们来加上两道“锁”。3.1 API密钥认证我们采用常见的Bearer TokenJWT方式。创建一个auth.py文件。from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import HTTPException, status, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import secrets # 用于演示生产环境应从安全的环境变量或配置中心读取 SECRET_KEY secrets.token_urlsafe(32) # 生成一个随机的密钥 ALGORITHM HS256 ACCESS_TOKEN_EXPIRE_MINUTES 60 * 24 * 7 # 令牌有效期例如7天 # 模拟一个用户/API密钥数据库。生产环境应使用真实数据库。 fake_users_db { client_app_01: { api_key: sk_test_123456789abcdef, # 模拟的API Key hashed_key: None, # 这里我们直接对比明文生产环境应哈希存储 is_active: True, } } pwd_context CryptContext(schemes[bcrypt], deprecatedauto) security HTTPBearer() def verify_api_key(api_key: str) - bool: 验证API Key是否有效 for user, info in fake_users_db.items(): if info[is_active] and info[api_key] api_key: return True return False def create_access_token(data: dict, expires_delta: Optional[timedelta] None): 创建JWT访问令牌 to_encode data.copy() if expires_delta: expire datetime.utcnow() expires_delta else: expire datetime.utcnow() timedelta(minutes15) to_encode.update({exp: expire}) encoded_jwt jwt.encode(to_encode, SECRET_KEY, algorithmALGORITHM) return encoded_jwt async def get_current_user(credentials: HTTPAuthorizationCredentials Depends(security)): 依赖项验证请求中的Bearer Token credentials_exception HTTPException( status_codestatus.HTTP_401_UNAUTHORIZED, detail无效的认证凭证, headers{WWW-Authenticate: Bearer}, ) token credentials.credentials try: payload jwt.decode(token, SECRET_KEY, algorithms[ALGORITHM]) api_key: str payload.get(sub) if api_key is None: raise credentials_exception except JWTError: raise credentials_exception if not verify_api_key(api_key): raise credentials_exception return api_key # 一个简单的登录端点用于交换API Key为JWT Token可选 app.post(/api/v1/auth/token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm Depends()): # 这里form_data.username可以当作api_key传入 api_key form_data.username if not verify_api_key(api_key): raise HTTPException( status_codestatus.HTTP_401_UNAUTHORIZED, detail无效的API Key, ) access_token_expires timedelta(minutesACCESS_TOKEN_EXPIRE_MINUTES) access_token create_access_token( data{sub: api_key}, expires_deltaaccess_token_expires ) return {access_token: access_token, token_type: bearer}然后修改main.py中的接口添加依赖项from auth import get_current_user app.post(/api/v1/generate, ...) async def submit_generation_task( request: VideoGenerateRequest, current_user: str Depends(get_current_user) # 添加认证依赖 ): # 现在只有携带有效Token的请求才能调用 # 你可以用current_user记录是谁调用的 print(f用户 {current_user} 提交了任务) # ... 其余代码不变 ...3.2 接口访问限流防止同一个API密钥在短时间内疯狂调用拖垮服务。我们使用slowapi和redis可选来实现。这里先用内存限流演示。在main.py中新增from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded # 初始化限流器 limiter Limiter(key_funcget_remote_address) # 根据IP限流也可用lambda: current_user根据用户限流 app.state.limiter limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # 然后将限流装饰器加到需要限流的接口上 app.post(/api/v1/generate) limiter.limit(5/minute) # 限制每分钟最多5次调用 async def submit_generation_task(request: VideoGenerateRequest, current_user: str Depends(get_current_user)): # ... 函数体不变 ...3.3 提供生成的视频文件任务成功后我们返回了一个video_url。你需要实现这个文件服务端点让用户能真正下载到视频。在main.py中添加from fastapi.responses import FileResponse import os # 假设视频文件保存在这个目录 VIDEO_STORAGE_PATH ./generated_videos app.get(/api/v1/videos/{filename}) async def get_video_file(filename: str): 提供生成的视频文件下载 file_path os.path.join(VIDEO_STORAGE_PATH, filename) if not os.path.exists(file_path): raise HTTPException(status_code404, detail视频文件未找到) return FileResponse(file_path, media_typevideo/mp4, filenamefilename)同时记得在task_manager.py的_execute_single_task方法中成功生成视频后将文件保存到VIDEO_STORAGE_PATH目录下并以task_id或其他唯一名称命名。4. 生成清晰的API文档与总结FastAPI的一个巨大优势就是能自动生成交互式API文档。你启动服务后访问以下两个地址就能看到http://localhost:8000/docsSwagger UI提供的交互式文档可以在这里直接尝试调用接口。http://localhost:8000/redocReDoc提供的另一种风格的文档更简洁美观。为了让文档更专业我们还可以导出OpenAPI规范文件用于导入到其他API管理平台。在你的项目根目录创建一个generate_openapi.py脚本import json from main import app # 生成OpenAPI规范 openapi_schema app.openapi() # 保存为JSON文件 with open(openapi.json, w) as f: json.dump(openapi_schema, f, indent2) print(OpenAPI规范已保存到 openapi.json)运行这个脚本你就会得到一个标准的openapi.json文件它可以被Postman、Apifox等工具直接导入。整套流程走下来我们从一个只能手动操作的WebUI构建出了一个具备生产级雏形的API服务。它具备了异步任务处理、认证鉴权、访问控制等关键特性。实际部署时你还需要考虑更多比如使用Nginx做反向代理和负载均衡、用Redis作为任务队列和限流存储、添加更详细的日志监控、以及制定清晰的API使用计费策略等。开发过程中最关键的步骤是厘清你的API服务如何与底层的Wan2.1-UMT5 WebUI进行通信这需要你仔细研究WebUI的交互方式。一旦这个桥梁打通剩下的就是标准的服务化工程实践了。希望这篇教程能为你打开一扇门让你强大的模型能力可以更灵活、更广泛地服务于各种应用场景。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。
Wan2.1-UMT5模型服务化:使用RESTful API对外提供视频生成能力
Wan2.1-UMT5模型服务化使用RESTful API对外提供视频生成能力你是不是已经用Wan2.1-UMT5的WebUI界面玩得不亦乐乎了自己输入描述看着它生成一段段有趣的视频确实很有成就感。但有没有想过如果能把这种能力开放出去让其他程序、网站或者移动应用也能调用它来生成视频那会是什么场景比如你的内容创作平台想给用户提供“文字变视频”的快捷功能或者你的电商后台需要批量生成商品展示短视频。每次都让用户或运营人员手动打开WebUI去操作显然不现实。这时候就需要一个标准的、程序能直接调用的接口——也就是我们常说的API。这篇文章我就来手把手带你把那个部署好的Wan2.1-UMT5 WebUI包装成一个专业的RESTful API服务。我们会从最基础的接口设计开始一步步实现任务提交、结果查询再加上必不可少的API密钥认证和访问频率控制最后还会教你如何生成一份机器和人都能看懂的API文档。学完这篇你就能让任何应用都轻松拥有视频生成的能力了。1. 准备工作与环境确认在开始动手写代码之前我们得先确保“地基”是稳固的。这里的地基就是你之前已经部署好的Wan2.1-UMT5 WebUI服务。首先打开你的终端或命令行确认你的WebUI服务正在健康运行。通常你启动它的命令可能是这样的python webui.py --port 7860你应该能在终端看到服务正常启动的日志没有报错。然后打开浏览器访问http://你的服务器IP:7860如果是本地就是http://localhost:7860。如果熟悉的WebUI界面能正常加载并且你可以成功用它生成一段视频那么恭喜你准备工作就完成了一大半。接下来我们需要思考API层和WebUI层的关系。我们不打算直接修改WebUI本身的代码那样可能会引入不必要的复杂性。一个更清晰、更稳定的架构是构建一个全新的API服务这个服务作为“中间人”或“代理”。当它收到外部的API请求时它再去“模拟”用户操作调用后端的WebUI服务来完成视频生成任务。这种解耦的设计让API服务的迭代和WebUI的升级可以互不影响。为了实现这个架构我们需要安装几个关键的Python库。新建一个干净的目录作为你的API项目文件夹然后在里面创建一个requirements.txt文件内容如下fastapi0.104.1 uvicorn[standard]0.24.0 pydantic2.5.0 requests2.31.0 python-jose[cryptography]3.3.0 passlib[bcrypt]1.7.4 slowapi0.1.8 python-multipart0.0.6这些库各自扮演着重要角色FastAPI和Uvicorn是我们构建现代、高性能API的框架和服务器。Pydantic用于数据验证和设置管理确保接口传入传出的数据格式都是正确的。Requests让我们的API服务能够去调用后端的WebUI服务。python-jose和passlib用来实现JWTJSON Web Token令牌认证管理API密钥。slowapi帮我们轻松实现接口限流防止被过度调用。python-multipart处理文件上传虽然本文主要讲文本生成视频但为扩展预留。在项目目录下运行pip install -r requirements.txt来安装它们。环境准备好我们就可以开始设计最核心的接口了。2. 核心接口设计与实现异步任务处理视频生成是个耗时的过程不可能让调用方一直等着。所以我们采用“异步任务”的模式。这就像你去打印店打印一份厚文件店员不会让你干等着而是给你一个取件号你可以先去忙别的过会儿再来凭号取件。我们的API也将遵循这个模式设计两个核心接口POST /api/v1/generate提交一个视频生成任务。接口立即返回一个唯一的task_id而不是视频本身。GET /api/v1/result/{task_id}通过任务ID来查询这个任务的执行状态和结果。2.1 定义数据模型与任务状态机我们先在项目里创建一个models.py文件用Pydantic来定义清晰的数据结构这能让代码更健壮FastAPI还能自动基于它生成文档。from pydantic import BaseModel, Field from typing import Optional, Literal from enum import Enum class TaskStatus(str, Enum): 任务状态枚举 PENDING pending # 排队中 PROCESSING processing # 生成中 SUCCESS success # 成功 FAILED failed # 失败 class VideoGenerateRequest(BaseModel): 视频生成请求体 prompt: str Field(..., min_length5, max_length500, description视频描述文本至少5个字符) negative_prompt: Optional[str] Field(None, description不希望出现在视频中的内容描述) duration: int Field(default5, ge2, le30, description视频时长秒范围2-30) # 这里可以添加更多Wan2.1-UMT5支持的参数如尺寸、帧率等 # width: int Field(default512, ge256, le1024) # height: int Field(default512, ge256, le1024) class Config: schema_extra { example: { prompt: 一只可爱的猫咪在草地上追逐蝴蝶阳光明媚, negative_prompt: 模糊丑陋多只猫, duration: 8 } } class TaskResponse(BaseModel): 任务提交响应 task_id: str status: TaskStatus message: str 任务已提交请使用task_id查询结果 estimated_wait_time: Optional[int] Field(None, description预计等待时间秒) class TaskResultResponse(BaseModel): 任务结果查询响应 task_id: str status: TaskStatus message: Optional[str] None video_url: Optional[str] Field(None, description视频文件访问URL仅当status为success时存在) error_detail: Optional[str] Field(None, description如果失败错误详情) created_at: str finished_at: Optional[str] None2.2 实现任务队列与处理器由于可能有多个任务同时提交我们需要一个简单的内存队列来管理它们对于生产环境你可能需要考虑使用Redis或RabbitMQ这样的专业队列。同时我们需要一个“工人”在后台不断地从队列中取任务并调用真正的WebUI服务。创建一个task_manager.py文件import asyncio import uuid import time import logging from typing import Dict from models import TaskStatus, VideoGenerateRequest, TaskResultResponse import requests import json logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) class TaskManager: def __init__(self, webui_base_url: str http://localhost:7860): self.webui_base_url webui_base_url.rstrip(/) self.tasks: Dict[str, dict] {} # 内存中存储任务状态和结果 self.task_queue asyncio.Queue() self._stop_event asyncio.Event() # 启动后台任务处理器 asyncio.create_task(self._process_task_queue()) async def submit_task(self, request: VideoGenerateRequest) - str: 提交一个新任务返回task_id task_id str(uuid.uuid4()) now time.strftime(%Y-%m-%d %H:%M:%S) self.tasks[task_id] { status: TaskStatus.PENDING, request: request.dict(), created_at: now, finished_at: None, result: None, error: None } # 将任务放入队列 await self.task_queue.put(task_id) logger.info(f任务 {task_id} 已提交并进入队列。) return task_id async def _process_task_queue(self): 后台任务处理循环 while not self._stop_event.is_set(): try: task_id await asyncio.wait_for(self.task_queue.get(), timeout1.0) await self._execute_single_task(task_id) except asyncio.TimeoutError: continue except Exception as e: logger.error(f任务处理器发生错误: {e}) async def _execute_single_task(self, task_id: str): 执行单个任务调用WebUI API生成视频 task_info self.tasks.get(task_id) if not task_info: return task_info[status] TaskStatus.PROCESSING request_data task_info[request] try: logger.info(f开始处理任务 {task_id}: {request_data.get(prompt)[:50]}...) # 这里是关键模拟WebUI的调用。 # 你需要根据Wan2.1-UMT5 WebUI实际提供的内部API或自动化方式来调用。 # 这里是一个假设的示例实际情况可能需要分析WebUI的网络请求。 # 方法一如果WebUI有内置的API如--api启动参数 # api_url f{self.webui_base_url}/run/predict # payload { # data: [ # request_data.get(prompt), # request_data.get(negative_prompt, ), # request_data.get(duration, 5) # ] # } # response requests.post(api_url, jsonpayload) # 方法二更通用的使用requests模拟表单提交如果WebUI是Gradio # 你需要找到Gradio应用对应的API端点通常是 /api/predict/ payload { data: json.dumps([ request_data.get(prompt), request_data.get(negative_prompt, ), request_data.get(duration, 5) ]) } # 注意Gradio的API调用可能需要session或特定的headers请根据实际情况调整。 response requests.post(f{self.webui_base_url}/api/predict/, datapayload, timeout300) # 设置长超时 if response.status_code 200: result response.json() # 假设返回的data里包含视频文件路径或Base64数据 # 这里需要你解析Wan2.1-UMT5的实际返回格式 # 例如它可能返回一个临时文件URL或Base64字符串 video_data result.get(data, [])[0] if result.get(data) else None if video_data and isinstance(video_data, str) and video_data.startswith(‘http’): video_url video_data else: # 如果不是URL你可能需要将Base64数据保存为文件并生成一个可访问的URL # 这里简化处理假设我们有一个文件服务地址 video_url f/api/v1/videos/{task_id}.mp4 # 实际应将视频文件保存到磁盘并记录路径 task_info[status] TaskStatus.SUCCESS task_info[result] {video_url: video_url} task_info[finished_at] time.strftime(%Y-%m-%d %H:%M:%S) logger.info(f任务 {task_id} 处理成功。) else: raise Exception(fWebUI调用失败状态码: {response.status_code}, 响应: {response.text}) except Exception as e: logger.error(f处理任务 {task_id} 时出错: {e}) task_info[status] TaskStatus.FAILED task_info[error] str(e) task_info[finished_at] time.strftime(%Y-%m-%d %H:%M:%S) def get_task_result(self, task_id: str) - Optional[TaskResultResponse]: 获取任务结果 task_info self.tasks.get(task_id) if not task_info: return None return TaskResultResponse( task_idtask_id, statustask_info[status], message任务已完成 if task_info[status] in [TaskStatus.SUCCESS, TaskStatus.FAILED] else 任务处理中, video_urltask_info.get(result, {}).get(video_url) if task_info[status] TaskStatus.SUCCESS else None, error_detailtask_info.get(error), created_attask_info[created_at], finished_attask_info[finished_at] ) async def stop(self): 停止任务处理器 self._stop_event.set() # 全局任务管理器实例 task_manager TaskManager()重要提示上面代码中_execute_single_task方法里调用WebUI的部分是关键也是最需要你根据Wan2.1-UMT5 WebUI的实际接口进行调整的地方。你可能需要查阅Wan2.1-UMT5的文档看是否提供了直接的API。打开浏览器开发者工具F12在WebUI界面上操作一次生成观察它向服务器发送了什么样的网络请求然后模仿这个请求。2.3 构建FastAPI主应用与核心接口现在让我们创建主应用文件main.py把刚才设计的接口实现出来。from fastapi import FastAPI, HTTPException, Depends, status from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager import asyncio from models import VideoGenerateRequest, TaskResponse, TaskResultResponse, TaskStatus from task_manager import task_manager import time # 应用生命周期管理 asynccontextmanager async def lifespan(app: FastAPI): # 启动时 print(API服务启动...) yield # 关闭时 print(API服务关闭清理任务队列...) await task_manager.stop() await asyncio.sleep(1) # 等待任务处理器结束 app FastAPI( titleWan2.1-UMT5 视频生成API服务, description提供异步视频生成能力基于Wan2.1-UMT5模型。, version1.0.0, lifespanlifespan ) # 添加CORS中间件允许前端应用调用 app.add_middleware( CORSMiddleware, allow_origins[*], # 生产环境应指定具体域名 allow_credentialsTrue, allow_methods[*], allow_headers[*], ) app.post(/api/v1/generate, response_modelTaskResponse, status_codestatus.HTTP_202_ACCEPTED, # 202表示已接受处理 summary提交视频生成任务, tags[视频生成]) async def submit_generation_task(request: VideoGenerateRequest): 提交一个视频生成任务。 由于视频生成需要时间此接口会立即返回一个任务ID (task_id)。 你需要使用这个task_id去查询任务状态和获取结果。 try: task_id await task_manager.submit_task(request) # 简单估算等待时间例如队列中任务数 * 平均处理时间 queue_size task_manager.task_queue.qsize() estimated_wait queue_size * 30 # 假设每个任务平均30秒 return TaskResponse( task_idtask_id, statusTaskStatus.PENDING, message视频生成任务已加入处理队列。, estimated_wait_timeestimated_wait if estimated_wait 0 else None ) except Exception as e: raise HTTPException(status_code500, detailf提交任务失败: {str(e)}) app.get(/api/v1/result/{task_id}, response_modelTaskResultResponse, summary查询任务结果, tags[任务查询]) async def get_task_result(task_id: str): 根据任务ID查询视频生成任务的状态和结果。 - **task_id**: 提交任务时返回的唯一任务标识符。 result task_manager.get_task_result(task_id) if not result: raise HTTPException(status_code404, detail任务ID不存在) return result if __name__ __main__: import uvicorn uvicorn.run(app, host0.0.0.0, port8000)现在你可以运行python main.py启动你的API服务了默认在8000端口。用工具如curl或Postman测试一下提交任务curl -X POST http://localhost:8000/api/v1/generate \ -H Content-Type: application/json \ -d { prompt: 星空下的宁静湖泊有流星划过, duration: 10 }你会得到一个包含task_id的响应。查询结果curl http://localhost:8000/api/v1/result/刚才得到的task_id根据任务状态你会看到pending,processing,success或failed的信息。核心功能已经跑通了但一个真正能对外提供的服务还需要安全和控制措施。3. 加固服务认证、限流与文件服务任何人都能随意调用你的API生成视频可能会产生不必要的成本和安全风险。我们来加上两道“锁”。3.1 API密钥认证我们采用常见的Bearer TokenJWT方式。创建一个auth.py文件。from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import HTTPException, status, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import secrets # 用于演示生产环境应从安全的环境变量或配置中心读取 SECRET_KEY secrets.token_urlsafe(32) # 生成一个随机的密钥 ALGORITHM HS256 ACCESS_TOKEN_EXPIRE_MINUTES 60 * 24 * 7 # 令牌有效期例如7天 # 模拟一个用户/API密钥数据库。生产环境应使用真实数据库。 fake_users_db { client_app_01: { api_key: sk_test_123456789abcdef, # 模拟的API Key hashed_key: None, # 这里我们直接对比明文生产环境应哈希存储 is_active: True, } } pwd_context CryptContext(schemes[bcrypt], deprecatedauto) security HTTPBearer() def verify_api_key(api_key: str) - bool: 验证API Key是否有效 for user, info in fake_users_db.items(): if info[is_active] and info[api_key] api_key: return True return False def create_access_token(data: dict, expires_delta: Optional[timedelta] None): 创建JWT访问令牌 to_encode data.copy() if expires_delta: expire datetime.utcnow() expires_delta else: expire datetime.utcnow() timedelta(minutes15) to_encode.update({exp: expire}) encoded_jwt jwt.encode(to_encode, SECRET_KEY, algorithmALGORITHM) return encoded_jwt async def get_current_user(credentials: HTTPAuthorizationCredentials Depends(security)): 依赖项验证请求中的Bearer Token credentials_exception HTTPException( status_codestatus.HTTP_401_UNAUTHORIZED, detail无效的认证凭证, headers{WWW-Authenticate: Bearer}, ) token credentials.credentials try: payload jwt.decode(token, SECRET_KEY, algorithms[ALGORITHM]) api_key: str payload.get(sub) if api_key is None: raise credentials_exception except JWTError: raise credentials_exception if not verify_api_key(api_key): raise credentials_exception return api_key # 一个简单的登录端点用于交换API Key为JWT Token可选 app.post(/api/v1/auth/token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm Depends()): # 这里form_data.username可以当作api_key传入 api_key form_data.username if not verify_api_key(api_key): raise HTTPException( status_codestatus.HTTP_401_UNAUTHORIZED, detail无效的API Key, ) access_token_expires timedelta(minutesACCESS_TOKEN_EXPIRE_MINUTES) access_token create_access_token( data{sub: api_key}, expires_deltaaccess_token_expires ) return {access_token: access_token, token_type: bearer}然后修改main.py中的接口添加依赖项from auth import get_current_user app.post(/api/v1/generate, ...) async def submit_generation_task( request: VideoGenerateRequest, current_user: str Depends(get_current_user) # 添加认证依赖 ): # 现在只有携带有效Token的请求才能调用 # 你可以用current_user记录是谁调用的 print(f用户 {current_user} 提交了任务) # ... 其余代码不变 ...3.2 接口访问限流防止同一个API密钥在短时间内疯狂调用拖垮服务。我们使用slowapi和redis可选来实现。这里先用内存限流演示。在main.py中新增from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded # 初始化限流器 limiter Limiter(key_funcget_remote_address) # 根据IP限流也可用lambda: current_user根据用户限流 app.state.limiter limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # 然后将限流装饰器加到需要限流的接口上 app.post(/api/v1/generate) limiter.limit(5/minute) # 限制每分钟最多5次调用 async def submit_generation_task(request: VideoGenerateRequest, current_user: str Depends(get_current_user)): # ... 函数体不变 ...3.3 提供生成的视频文件任务成功后我们返回了一个video_url。你需要实现这个文件服务端点让用户能真正下载到视频。在main.py中添加from fastapi.responses import FileResponse import os # 假设视频文件保存在这个目录 VIDEO_STORAGE_PATH ./generated_videos app.get(/api/v1/videos/{filename}) async def get_video_file(filename: str): 提供生成的视频文件下载 file_path os.path.join(VIDEO_STORAGE_PATH, filename) if not os.path.exists(file_path): raise HTTPException(status_code404, detail视频文件未找到) return FileResponse(file_path, media_typevideo/mp4, filenamefilename)同时记得在task_manager.py的_execute_single_task方法中成功生成视频后将文件保存到VIDEO_STORAGE_PATH目录下并以task_id或其他唯一名称命名。4. 生成清晰的API文档与总结FastAPI的一个巨大优势就是能自动生成交互式API文档。你启动服务后访问以下两个地址就能看到http://localhost:8000/docsSwagger UI提供的交互式文档可以在这里直接尝试调用接口。http://localhost:8000/redocReDoc提供的另一种风格的文档更简洁美观。为了让文档更专业我们还可以导出OpenAPI规范文件用于导入到其他API管理平台。在你的项目根目录创建一个generate_openapi.py脚本import json from main import app # 生成OpenAPI规范 openapi_schema app.openapi() # 保存为JSON文件 with open(openapi.json, w) as f: json.dump(openapi_schema, f, indent2) print(OpenAPI规范已保存到 openapi.json)运行这个脚本你就会得到一个标准的openapi.json文件它可以被Postman、Apifox等工具直接导入。整套流程走下来我们从一个只能手动操作的WebUI构建出了一个具备生产级雏形的API服务。它具备了异步任务处理、认证鉴权、访问控制等关键特性。实际部署时你还需要考虑更多比如使用Nginx做反向代理和负载均衡、用Redis作为任务队列和限流存储、添加更详细的日志监控、以及制定清晰的API使用计费策略等。开发过程中最关键的步骤是厘清你的API服务如何与底层的Wan2.1-UMT5 WebUI进行通信这需要你仔细研究WebUI的交互方式。一旦这个桥梁打通剩下的就是标准的服务化工程实践了。希望这篇教程能为你打开一扇门让你强大的模型能力可以更灵活、更广泛地服务于各种应用场景。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。