FastAPI实战:5分钟搞定即梦AI文生视频API逆向(附完整代码)
FastAPI实战构建企业级文生视频API网关的完整架构最近在帮几个内容创作团队做技术架构升级发现一个普遍痛点市面上很多优秀的AI视频生成工具要么没有开放API要么调用成本高得吓人。特别是像即梦AI这样的平台虽然Web端体验不错但官方就是不提供API接入渠道这让想要集成到自家工作流中的开发者很是头疼。我在实际项目中摸索出了一套解决方案——通过FastAPI构建一个中间层API网关把Web端的功能“包装”成标准的RESTful接口。这不仅仅是简单的接口转发而是涉及身份模拟、异步任务管理、文件存储集成等多个技术环节的完整架构。今天我就把这套方案的实现细节拆开来讲讲无论你是想为团队搭建内部工具还是开发面向客户的SaaS服务都能从中找到可复用的思路。1. 逆向工程的核心理解现代Web应用的认证机制要模拟Web端的行为首先得搞清楚现在的Web应用是怎么做身份验证的。早些年可能一个简单的session cookie就够了但现在很多平台都采用了多层防御机制。1.1 Cookie与签名机制的深度解析即梦AI这类平台通常会在请求头中携带多个验证字段其中最关键的两个是cookie和sign。这两个字段不是孤立存在的它们之间存在复杂的关联关系。# 典型的请求头配置示例 headers { accept: application/json, text/plain, */*, accept-language: zh-CN,zh;q0.9, app-sdk-version: 48.0.0, appid: 513695, # 固定的应用标识 appvr: 5.8.0, content-type: application/json, cookie: fpk1xxxxxx; sessionidyyyyyy, # 关键的身份凭证 device-time: str(int(time.time())), lan: zh-Hans, loc: cn, origin: https://jimeng.jianying.com, pf: 7, priority: u1, i, referer: https://jimeng.jianying.com/ai-tool/video/generate, sec-ch-ua: Google Chrome;v129, NotA?Brand;v8, Chromium;v129, sec-ch-ua-mobile: ?0, sec-ch-ua-platform: Windows, sec-fetch-dest: empty, sec-fetch-mode: cors, sec-fetch-site: same-origin, sign: 加密后的签名串, # 动态生成的请求签名 sign-ver: 1, user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 }这里有几个关键点需要注意Cookie的时效性大多数平台的cookie都有有效期短的几小时长的几天。在实际部署中需要建立cookie的刷新机制。我遇到过cookie突然失效的情况后来发现是平台做了风控连续大量请求会被临时封禁。Sign签名的生成逻辑这个字段通常是客户端根据请求参数、时间戳、cookie等要素计算得出的。虽然我们不需要完全复现生成算法直接抓取即可但要理解它的作用——防止请求被篡改和重放攻击。1.2 如何安全地获取和存储凭证直接从浏览器开发者工具复制cookie是最直接的方法但在生产环境中我们需要更自动化的方式。这里分享一个我实际用过的技巧import browser_cookie3 import json def extract_cookies_from_browser(): 从Chrome浏览器提取指定域名的cookies try: # 使用browser_cookie3库需要pip安装 cj browser_cookie3.chrome(domain_namejimeng.jianying.com) cookies_dict {} for cookie in cj: if jimeng.jianying.com in cookie.domain: cookies_dict[cookie.name] cookie.value # 格式化为请求头需要的字符串 cookie_str ; .join([f{k}{v} for k, v in cookies_dict.items()]) return cookie_str except Exception as e: print(f提取cookie失败: {e}) return None # 安全存储到配置文件或数据库 def save_credentials_to_config(cookie_str, sign_str): 将凭证安全存储到配置文件 config { video_api: { cookie: cookie_str, sign: sign_str, last_updated: datetime.now().isoformat() } } # 使用加密存储 encrypted_config encrypt_data(json.dumps(config)) with open(credentials.enc, wb) as f: f.write(encrypted_config)注意自动化获取cookie需要用户授权且不同浏览器、不同操作系统的方法不同。在生产环境中更稳妥的方式是提供配置界面让用户手动输入。2. FastAPI网关的核心架构设计有了基础的身份凭证接下来要设计一个健壮的API网关。这个网关不仅要转发请求还要处理错误重试、限流、监控等企业级功能。2.1 项目结构的最佳实践经过多个项目的迭代我总结出了一套比较合理的项目结构jimeng_video_gateway/ ├── app/ │ ├── __init__.py │ ├── main.py # FastAPI应用入口 │ ├── api/ │ │ ├── __init__.py │ │ ├── endpoints.py # 路由定义 │ │ └── dependencies.py # 依赖注入 │ ├── core/ │ │ ├── config.py # 配置管理 │ │ ├── security.py # 安全相关 │ │ └── exceptions.py # 自定义异常 │ ├── services/ │ │ ├── video_service.py # 视频生成服务 │ │ ├── storage_service.py # 存储服务 │ │ └── auth_service.py # 认证服务 │ └── models/ │ ├── schemas.py # Pydantic模型 │ └── responses.py # 响应模型 ├── tests/ │ ├── test_api.py │ └── test_services.py ├── requirements.txt ├── config.ini └── .env.example这种结构的好处是职责分离清晰便于团队协作和维护。特别是把业务逻辑放在services层API路由只负责接收请求和返回响应符合单一职责原则。2.2 配置管理的艺术配置文件的管理经常被忽视但其实很重要。我推荐使用分层配置# app/core/config.py from pydantic_settings import BaseSettings from typing import Optional import os class Settings(BaseSettings): # API配置 api_title: str 文生视频API网关 api_version: str 1.0.0 api_prefix: str /api/v1 # 服务器配置 host: str 0.0.0.0 port: int 8088 reload: bool os.getenv(ENV, development) development # 即梦API配置 jimeng_base_url: str https://jimeng.jianying.com/mweb/v1 jimeng_appid: str 513695 # 存储配置 storage_type: str cos # cos, s3, local cos_region: Optional[str] None cos_secret_id: Optional[str] None cos_secret_key: Optional[str] None cos_bucket: Optional[str] None # 安全配置 api_keys: list[str] [] rate_limit_per_minute: int 10 # 任务配置 max_polling_attempts: int 30 polling_interval: int 2 class Config: env_file .env case_sensitive False settings Settings()使用Pydantic Settings的好处是类型安全还能自动从环境变量读取配置。对于敏感信息如API密钥一定要使用环境变量不要硬编码在代码中。2.3 健壮的错误处理机制API网关的一个核心价值就是提供统一的错误处理。FastAPI的异常处理机制很强大但需要合理设计# app/core/exceptions.py from fastapi import HTTPException, status from typing import Any, Dict class VideoGenerationException(HTTPException): 视频生成相关异常 def __init__(self, detail: str, status_code: int status.HTTP_500_INTERNAL_SERVER_ERROR): super().__init__(status_codestatus_code, detaildetail) class AuthenticationException(HTTPException): 认证异常 def __init__(self, detail: str 认证失败): super().__init__(status_codestatus.HTTP_401_UNAUTHORIZED, detaildetail) class RateLimitException(HTTPException): 限流异常 def __init__(self, detail: str 请求过于频繁请稍后再试): super().__init__( status_codestatus.HTTP_429_TOO_MANY_REQUESTS, detaildetail, headers{Retry-After: 60} ) # app/main.py中的全局异常处理器 from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from app.core.exceptions import VideoGenerationException, AuthenticationException, RateLimitException app FastAPI(titlesettings.api_title, versionsettings.api_version) app.exception_handler(VideoGenerationException) async def video_generation_exception_handler(request: Request, exc: VideoGenerationException): return JSONResponse( status_codeexc.status_code, content{ error: VIDEO_GENERATION_ERROR, message: exc.detail, request_id: request.state.request_id if hasattr(request.state, request_id) else None } ) app.exception_handler(AuthenticationException) async def authentication_exception_handler(request: Request, exc: AuthenticationException): return JSONResponse( status_codeexc.status_code, content{ error: AUTHENTICATION_FAILED, message: exc.detail } )这样的错误处理有几个好处1客户端可以根据error字段做不同的处理2包含了request_id便于追踪3HTTP状态码和错误信息分离更符合RESTful规范。3. 视频生成服务的完整实现视频生成是核心业务逻辑这里的设计要考虑到异步、重试、超时等多个方面。3.1 异步任务的状态管理即梦AI的视频生成是异步的提交任务后需要轮询状态。这里的设计要点# app/services/video_service.py import asyncio import aiohttp import uuid from typing import Optional, Dict, Any from datetime import datetime, timedelta from app.core.config import settings from app.core.exceptions import VideoGenerationException class VideoGenerationService: def __init__(self): self.session: Optional[aiohttp.ClientSession] None self.active_tasks: Dict[str, Dict[str, Any]] {} async def __aenter__(self): self.session aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def generate_video( self, prompt: str, aspect_ratio: str 16:9, duration_ms: int 5000, fps: int 24, user_id: Optional[str] None ) - Dict[str, Any]: 生成视频的核心方法 # 生成唯一的任务ID task_id str(uuid.uuid4()) submit_id str(uuid.uuid4()) # 构建请求payload payload { submit_id: submit_id, task_extra: json.dumps({ promptSource: custom, originSubmitId: submit_id, isDefaultSeed: 1, originTemplateId: , imageNameMapping: {}, isUseAiGenPrompt: False, batchNumber: 1 }), http_common_info: {aid: settings.jimeng_appid}, input: { video_aspect_ratio: aspect_ratio, seed: self._generate_seed(), # 随机种子 video_gen_inputs: [{ prompt: prompt, fps: fps, duration_ms: duration_ms, video_mode: 2, template_id: }], priority: 0, model_req_key: dreamina_ic_generate_video_model_vgfm_lite }, mode: workbench, history_option: {}, commerce_info: { resource_id: generate_video, resource_id_type: str, resource_sub_type: aigc, benefit_type: basic_video_operation_vgfm_lite }, client_trace_data: {} } try: # 发送生成请求 async with self.session.post( f{settings.jimeng_base_url}/generate_video, headersself._get_headers(), jsonpayload ) as response: if response.status ! 200: raise VideoGenerationException(f视频生成请求失败: {response.status}) data await response.json() if not data.get(data, {}).get(aigc_data, {}).get(task, {}): raise VideoGenerationException(API返回格式异常) jimeng_task_id data[data][aigc_data][task][task_id] # 启动异步轮询 video_url await self._poll_video_status(jimeng_task_id) # 存储任务信息 self.active_tasks[task_id] { jimeng_task_id: jimeng_task_id, status: completed, video_url: video_url, created_at: datetime.now(), user_id: user_id, prompt: prompt } return { task_id: task_id, video_url: video_url, status: completed } except Exception as e: raise VideoGenerationException(f视频生成过程出错: {str(e)}) async def _poll_video_status(self, task_id: str) - str: 轮询视频生成状态 polling_url f{settings.jimeng_base_url}/mget_generate_task for attempt in range(settings.max_polling_attempts): await asyncio.sleep(settings.polling_interval) try: async with self.session.post( polling_url, headersself._get_headers(), json{task_id_list: [task_id]} ) as response: if response.status ! 200: continue data await response.json() task_map data.get(data, {}).get(task_map, {}) task_data task_map.get(task_id, {}) status task_data.get(status) if status 50: # 生成完成 # 提取视频URL item_list task_data.get(item_list, []) if item_list and video in item_list[0]: video_data item_list[0][video] transcoded video_data.get(transcoded_video, {}) origin transcoded.get(origin, {}) video_url origin.get(video_url) if video_url: return video_url except Exception: continue raise VideoGenerationException(视频生成超时) def _get_headers(self) - Dict[str, str]: 获取请求头这里需要从配置或数据库读取凭证 # 实际项目中应该从安全的存储中读取 from app.core.config import settings # 这里简化处理实际需要完整的headers return { cookie: settings.jimeng_cookie, sign: settings.jimeng_sign, content-type: application/json, user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 } def _generate_seed(self) - int: 生成随机种子 import random return random.randint(1000000000, 9999999999)这个实现有几个关键设计使用异步HTTP客户端aiohttp比requests更适合高并发场景任务状态管理维护active_tasks字典跟踪所有任务完善的错误处理每个可能失败的地方都有相应的异常处理可配置的轮询参数通过配置控制重试次数和间隔3.2 文件存储的抽象层设计生成的视频需要存储不同环境可能需要不同的存储后端。我设计了一个存储抽象层# app/services/storage_service.py from abc import ABC, abstractmethod from typing import BinaryIO, Optional import boto3 from qcloud_cos import CosConfig, CosS3Client import os class StorageProvider(ABC): 存储提供者抽象基类 abstractmethod async def upload(self, file_content: BinaryIO, filename: str) - str: 上传文件返回访问URL pass abstractmethod async def delete(self, filename: str) - bool: 删除文件 pass class COSStorageProvider(StorageProvider): 腾讯云COS存储 def __init__(self, region: str, secret_id: str, secret_key: str, bucket: str): config CosConfig(Regionregion, SecretIdsecret_id, SecretKeysecret_key) self.client CosS3Client(config) self.bucket bucket self.region region async def upload(self, file_content: BinaryIO, filename: str) - str: # 注意qcloud_cos库是同步的实际生产环境应该用线程池 response self.client.upload_file( Bucketself.bucket, LocalFilePathfile_content.name if hasattr(file_content, name) else None, Keyfilename, PartSize10, MAXThread10, EnableMD5False ) if response.get(ETag): return fhttps://{self.bucket}.cos.{self.region}.myqcloud.com/{filename} raise Exception(COS上传失败) class S3StorageProvider(StorageProvider): AWS S3存储 def __init__(self, bucket: str, region: str, access_key: str, secret_key: str): self.s3_client boto3.client( s3, region_nameregion, aws_access_key_idaccess_key, aws_secret_access_keysecret_key ) self.bucket bucket async def upload(self, file_content: BinaryIO, filename: str) - str: self.s3_client.upload_fileobj(file_content, self.bucket, filename) return fhttps://{self.bucket}.s3.{self.region}.amazonaws.com/{filename} class LocalStorageProvider(StorageProvider): 本地存储适合开发和测试 def __init__(self, base_path: str): self.base_path base_path os.makedirs(base_path, exist_okTrue) async def upload(self, file_content: BinaryIO, filename: str) - str: file_path os.path.join(self.base_path, filename) with open(file_path, wb) as f: if hasattr(file_content, read): f.write(file_content.read()) else: f.write(file_content) # 本地环境返回文件路径生产环境可能需要配置静态文件服务 return f/storage/{filename} class StorageService: 存储服务根据配置选择不同的提供者 def __init__(self, provider_type: str, **kwargs): self.provider_type provider_type self.kwargs kwargs self._provider: Optional[StorageProvider] None property def provider(self) - StorageProvider: if self._provider is None: if self.provider_type cos: self._provider COSStorageProvider(**self.kwargs) elif self.provider_type s3: self._provider S3StorageProvider(**self.kwargs) elif self.provider_type local: self._provider LocalStorageProvider(**self.kwargs) else: raise ValueError(f不支持的存储类型: {self.provider_type}) return self._provider async def upload_video(self, video_url: str, filename: Optional[str] None) - str: 从URL下载视频并上传到存储 import aiohttp import uuid from datetime import datetime if filename is None: timestamp datetime.now().strftime(%Y%m%d%H%M%S) random_str str(uuid.uuid4())[:8] filename fvideo_{timestamp}_{random_str}.mp4 # 下载视频 async with aiohttp.ClientSession() as session: async with session.get(video_url) as response: if response.status ! 200: raise Exception(f下载视频失败: {response.status}) content await response.read() # 上传到存储 storage_url await self.provider.upload(content, filename) return storage_url这种设计的好处是存储后端可以随时切换只需要修改配置不需要改动业务代码。在实际项目中我还增加了文件分片上传、断点续传、CDN集成等功能。4. API端点的设计与安全防护最后是暴露给外部的API接口设计。这里要特别注意安全性和易用性的平衡。4.1 完整的API路由设计# app/api/endpoints.py from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from typing import List, Optional from pydantic import BaseModel, Field import asyncio from app.core.config import settings from app.core.exceptions import AuthenticationException, RateLimitException from app.services.video_service import VideoGenerationService from app.services.storage_service import StorageService from app.models.schemas import VideoGenerationRequest, VideoGenerationResponse, TaskStatus router APIRouter(prefixsettings.api_prefix) security HTTPBearer() # 简单的内存限流器生产环境应该用Redis class RateLimiter: def __init__(self): self.requests {} def check_limit(self, api_key: str) - bool: import time current_time time.time() if api_key not in self.requests: self.requests[api_key] [] # 清理1分钟前的记录 self.requests[api_key] [ req_time for req_time in self.requests[api_key] if current_time - req_time 60 ] if len(self.requests[api_key]) settings.rate_limit_per_minute: return False self.requests[api_key].append(current_time) return True rate_limiter RateLimiter() def verify_api_key(credentials: HTTPAuthorizationCredentials Depends(security)): 验证API Key token credentials.credentials if token not in settings.api_keys: raise AuthenticationException(无效的API Key) return token router.post(/videos/generate, response_modelVideoGenerationResponse) async def generate_video( request: VideoGenerationRequest, background_tasks: BackgroundTasks, api_key: str Depends(verify_api_key) ): 生成视频 - **prompt**: 视频描述提示词 - **aspect_ratio**: 视频宽高比默认16:9 - **duration_ms**: 视频时长毫秒默认5000 - **fps**: 帧率默认24 # 限流检查 if not rate_limiter.check_limit(api_key): raise RateLimitException() # 参数验证 if len(request.prompt.strip()) 5: raise HTTPException( status_codestatus.HTTP_400_BAD_REQUEST, detail提示词太短请提供至少5个字符的描述 ) if request.duration_ms 1000 or request.duration_ms 10000: raise HTTPException( status_codestatus.HTTP_400_BAD_REQUEST, detail视频时长必须在1000-10000毫秒之间 ) # 支持的宽高比 valid_aspect_ratios [16:9, 9:16, 1:1, 4:3, 3:4] if request.aspect_ratio not in valid_aspect_ratios: raise HTTPException( status_codestatus.HTTP_400_BAD_REQUEST, detailf不支持的宽高比请使用: {, .join(valid_aspect_ratios)} ) try: # 创建视频生成服务实例 async with VideoGenerationService() as video_service: # 调用生成服务 result await video_service.generate_video( promptrequest.prompt, aspect_ratiorequest.aspect_ratio, duration_msrequest.duration_ms, fpsrequest.fps, user_idapi_key # 使用API Key作为用户标识 ) # 如果需要存储到云存储 if settings.storage_type ! none: storage_service StorageService( provider_typesettings.storage_type, regionsettings.cos_region, secret_idsettings.cos_secret_id, secret_keysettings.cos_secret_key, bucketsettings.cos_bucket ) # 异步处理存储 background_tasks.add_task( storage_service.upload_video, video_urlresult[video_url], filenamef{result[task_id]}.mp4 ) return VideoGenerationResponse( task_idresult[task_id], statusresult[status], video_urlresult[video_url], message视频生成任务已提交 ) except Exception as e: raise HTTPException( status_codestatus.HTTP_500_INTERNAL_SERVER_ERROR, detailf视频生成失败: {str(e)} ) router.get(/videos/{task_id}, response_modelTaskStatus) async def get_video_status(task_id: str, api_key: str Depends(verify_api_key)): 获取视频生成状态 # 这里应该从数据库或缓存中查询任务状态 # 简化实现实际项目中需要完整的任务状态管理 return TaskStatus( task_idtask_id, statuscompleted, # 实际应该查询真实状态 progress100, estimated_time_remaining0 ) router.get(/videos, response_modelList[VideoGenerationResponse]) async def list_videos( limit: int 10, offset: int 0, api_key: str Depends(verify_api_key) ): 列出用户的视频生成历史 # 实际应该从数据库查询 return []4.2 请求和响应模型设计使用Pydantic模型可以确保输入输出的类型安全还能自动生成API文档# app/models/schemas.py from pydantic import BaseModel, Field, validator from typing import Optional, List from datetime import datetime from enum import Enum class VideoAspectRatio(str, Enum): SIXTEEN_NINE 16:9 NINE_SIXTEEN 9:16 ONE_ONE 1:1 FOUR_THREE 4:3 THREE_FOUR 3:4 class VideoGenerationRequest(BaseModel): 视频生成请求模型 prompt: str Field( ..., min_length5, max_length1000, description视频描述提示词至少5个字符 ) aspect_ratio: VideoAspectRatio Field( VideoAspectRatio.SIXTEEN_NINE, description视频宽高比 ) duration_ms: int Field( 5000, ge1000, le10000, description视频时长毫秒范围1000-10000 ) fps: int Field( 24, ge1, le60, description视频帧率范围1-60 ) validator(prompt) def validate_prompt(cls, v): # 过滤敏感词 sensitive_words [暴力, 色情, 政治] # 实际应该从配置读取 for word in sensitive_words: if word in v: raise ValueError(f提示词包含敏感内容: {word}) return v.strip() class VideoGenerationResponse(BaseModel): 视频生成响应模型 task_id: str Field(..., description任务ID) status: str Field(..., description任务状态) video_url: Optional[str] Field(None, description视频URL生成完成后返回) message: str Field(..., description状态消息) created_at: datetime Field(default_factorydatetime.now) estimated_completion_time: Optional[datetime] Field(None, description预计完成时间) class TaskStatus(BaseModel): 任务状态查询响应 task_id: str status: str # pending, processing, completed, failed progress: int Field(0, ge0, le100, description进度百分比) estimated_time_remaining: Optional[int] Field(None, description剩余时间秒) error_message: Optional[str] Field(None, description错误信息)4.3 部署和监控考虑在实际部署时还需要考虑以下几个关键点Docker化部署# Dockerfile FROM python:3.10-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ gcc \ rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建非root用户 RUN useradd -m -u 1000 appuser chown -R appuser:appuser /app USER appuser # 运行应用 CMD [uvicorn, app.main:app, --host, 0.0.0.0, --port, 8088]监控和日志# 在main.py中添加 import logging from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter # 设置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(app.log), logging.StreamHandler() ] ) # 设置OpenTelemetry追踪 trace.set_tracer_provider(TracerProvider()) tracer trace.get_tracer(__name__) # 添加请求ID中间件 app.middleware(http) async def add_request_id(request: Request, call_next): request_id str(uuid.uuid4()) request.state.request_id request_id response await call_next(request) response.headers[X-Request-ID] request_id return response # 健康检查端点 router.get(/health) async def health_check(): return { status: healthy, timestamp: datetime.now().isoformat(), version: settings.api_version }这套方案在我负责的几个内容创作平台中运行稳定每天处理上千个视频生成请求。最大的收获是抽象层设计的重要性——把第三方API的细节封装在服务层对外提供统一的接口这样即使即梦AI的接口发生变化也只需要修改一个地方。实际部署时还遇到了cookie失效的问题后来增加了自动刷新机制和备用账号池。对于企业级应用建议考虑使用消息队列来处理视频生成任务避免HTTP请求超时还能更好地支持批量生成和优先级调度。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2408457.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!