FastAPI分块上传存储:对象存储集成完整指南
FastAPI分块上传存储对象存储集成完整指南【免费下载链接】fastapiFastAPI framework, high performance, easy to learn, fast to code, ready for production项目地址: https://gitcode.com/GitHub_Trending/fa/fastapi想要在FastAPI应用中实现大文件上传分块上传和对象存储集成是关键解决方案本文将为您详细介绍如何在FastAPI中实现高效的文件分块上传并与主流对象存储服务如AWS S3、阿里云OSS等无缝集成。为什么需要分块上传在传统文件上传中大文件需要一次性加载到内存这会导致内存溢出、上传失败等问题。FastAPI的分块上传技术通过将大文件分割成多个小块逐块上传到服务器解决了大文件处理的难题。分块上传的核心优势✅ 支持超大文件上传GB级别✅ 断点续传功能✅ 内存使用优化✅ 并行上传加速✅ 更好的错误恢复能力FastAPI文件上传基础FastAPI内置了强大的文件上传支持。在docs/en/docs/tutorial/request-files.md中您可以找到详细的文件上传教程。基本的上传接口如下from fastapi import FastAPI, File, UploadFile app FastAPI() app.post(/upload/) async def upload_file(file: UploadFile File(...)): contents await file.read() return {filename: file.filename}但这种方式对于大文件并不理想因为await file.read()会将整个文件读入内存。分块上传实现方案方案一使用UploadFile的流式读取FastAPI的UploadFile对象支持流式读取这是实现分块上传的基础app.post(/upload-chunked/) async def upload_chunked(file: UploadFile File(...)): chunk_size 1024 * 1024 # 1MB per chunk total_size 0 while True: chunk await file.read(chunk_size) if not chunk: break # 处理每个分块 total_size len(chunk) return {filename: file.filename, size: total_size}方案二结合StreamingResponse对于需要边上传边处理的情况可以使用StreamingResponse。在docs/en/docs/advanced/stream-data.md中详细介绍了流式数据处理的方法。对象存储集成实战AWS S3集成将FastAPI分块上传与AWS S3集成可以实现无限扩展的文件存储import boto3 from fastapi import FastAPI, File, UploadFile from botocore.exceptions import ClientError s3_client boto3.client(s3) app FastAPI() app.post(/upload-to-s3/) async def upload_to_s3(file: UploadFile File(...)): chunk_size 5 * 1024 * 1024 # 5MB chunks (S3 multipart最小要求) # 创建多部分上传 response s3_client.create_multipart_upload( Bucketyour-bucket, Keyfile.filename ) upload_id response[UploadId] parts [] part_number 1 while True: chunk await file.read(chunk_size) if not chunk: break # 上传分块 part_response s3_client.upload_part( Bucketyour-bucket, Keyfile.filename, PartNumberpart_number, UploadIdupload_id, Bodychunk ) parts.append({ ETag: part_response[ETag], PartNumber: part_number }) part_number 1 # 完成多部分上传 s3_client.complete_multipart_upload( Bucketyour-bucket, Keyfile.filename, UploadIdupload_id, MultipartUpload{Parts: parts} ) return {message: 文件上传成功, filename: file.filename}阿里云OSS集成阿里云OSS也支持类似的多部分上传功能import oss2 from fastapi import FastAPI, File, UploadFile auth oss2.Auth(your-access-key-id, your-access-key-secret) bucket oss2.Bucket(auth, your-endpoint, your-bucket-name) app FastAPI() app.post(/upload-to-oss/) async def upload_to_oss(file: UploadFile File(...)): # 初始化分片上传 key file.filename upload_id bucket.init_multipart_upload(key).upload_id parts [] part_number 1 chunk_size 5 * 1024 * 1024 # 5MB while True: chunk await file.read(chunk_size) if not chunk: break # 上传分片 result bucket.upload_part( key, upload_id, part_number, chunk ) parts.append(oss2.models.PartInfo(part_number, result.etag)) part_number 1 # 完成分片上传 bucket.complete_multipart_upload(key, upload_id, parts) return {message: OSS上传成功, filename: file.filename}断点续传实现断点续传是分块上传的重要特性。以下是实现断点续传的关键步骤from fastapi import FastAPI, File, UploadFile, HTTPException import hashlib import os app FastAPI() UPLOAD_DIR uploads app.post(/resumable-upload/) async def resumable_upload( file: UploadFile File(...), chunk_number: int 0, total_chunks: int 1, file_hash: str ): # 创建上传目录 os.makedirs(UPLOAD_DIR, exist_okTrue) # 生成文件唯一标识 if not file_hash: file_hash hashlib.md5(file.filename.encode()).hexdigest() temp_file os.path.join(UPLOAD_DIR, f{file_hash}.part) # 读取分块数据 chunk_data await file.read() # 写入分块 with open(temp_file, ab) as f: f.write(chunk_data) # 检查是否上传完成 if chunk_number total_chunks - 1: final_path os.path.join(UPLOAD_DIR, file.filename) os.rename(temp_file, final_path) return {status: complete, filename: file.filename} else: return {status: chunk_uploaded, chunk: chunk_number}前端分块上传实现前端可以使用JavaScript实现分块上传async function uploadFile(file) { const chunkSize 5 * 1024 * 1024; // 5MB const totalChunks Math.ceil(file.size / chunkSize); const fileHash await calculateFileHash(file); for (let chunkNumber 0; chunkNumber totalChunks; chunkNumber) { const start chunkNumber * chunkSize; const end Math.min(start chunkSize, file.size); const chunk file.slice(start, end); const formData new FormData(); formData.append(file, chunk, file.name); formData.append(chunk_number, chunkNumber); formData.append(total_chunks, totalChunks); formData.append(file_hash, fileHash); const response await fetch(/upload-chunked/, { method: POST, body: formData }); if (!response.ok) { throw new Error(上传失败); } } }性能优化技巧1. 并行上传可以同时上传多个分块显著提高上传速度import asyncio from concurrent.futures import ThreadPoolExecutor executor ThreadPoolExecutor(max_workers4) async def upload_chunk_parallel(chunk_data, chunk_info): loop asyncio.get_event_loop() return await loop.run_in_executor( executor, upload_to_storage, chunk_data, chunk_info )2. 内存优化使用生成器避免一次性加载大文件async def read_file_in_chunks(file_path, chunk_size): with open(file_path, rb) as f: while True: chunk f.read(chunk_size) if not chunk: break yield chunk3. 进度追踪实现上传进度实时显示from fastapi import WebSocket app.websocket(/upload-progress/{upload_id}) async def websocket_endpoint(websocket: WebSocket, upload_id: str): await websocket.accept() while True: # 接收分块数据 chunk_data await websocket.receive_bytes() # 处理分块 # ... # 发送进度 progress calculate_progress() await websocket.send_json({ progress: progress, status: uploading })安全注意事项1. 文件类型验证ALLOWED_EXTENSIONS {.jpg, .jpeg, .png, .pdf, .doc, .docx} def validate_file_extension(filename: str): ext os.path.splitext(filename)[1].lower() if ext not in ALLOWED_EXTENSIONS: raise HTTPException(400, 文件类型不支持)2. 文件大小限制from fastapi import FastAPI, File, UploadFile from fastapi.responses import JSONResponse MAX_FILE_SIZE 10 * 1024 * 1024 * 1024 # 10GB app.post(/upload/) async def upload_large_file(file: UploadFile File(...)): # 检查文件大小 file.file.seek(0, 2) # 移动到文件末尾 file_size file.file.tell() file.file.seek(0) # 重置指针 if file_size MAX_FILE_SIZE: return JSONResponse( status_code400, content{error: 文件太大} )监控与日志在fastapi/logger.py中FastAPI提供了完整的日志系统。您可以为上传功能添加专门的日志import logging upload_logger logging.getLogger(fastapi.upload) app.post(/upload/) async def upload_with_logging(file: UploadFile File(...)): upload_logger.info(f开始上传文件: {file.filename}) try: # 上传逻辑 upload_logger.info(f文件上传成功: {file.filename}) return {status: success} except Exception as e: upload_logger.error(f文件上传失败: {str(e)}) raise HTTPException(500, 上传失败)最佳实践总结分块大小选择根据网络条件和存储服务限制选择合适的分块大小通常5-10MB错误处理实现完善的错误处理和重试机制进度反馈为用户提供实时上传进度安全验证验证文件类型、大小和内容存储策略根据业务需求选择合适的存储服务监控告警建立上传监控和异常告警系统扩展阅读FastAPI官方文件上传文档流式数据处理指南FastAPI依赖注入系统中间件配置通过本文介绍的分块上传和对象存储集成方案您可以轻松构建支持大文件上传的FastAPI应用。无论是视频处理、大数据分析还是云存储服务FastAPI都能提供高效、可靠的解决方案。记住分块上传不仅仅是技术实现更是用户体验的重要部分。合理的设计和优化能让您的应用在处理大文件时更加流畅和可靠。【免费下载链接】fastapiFastAPI framework, high performance, easy to learn, fast to code, ready for production项目地址: https://gitcode.com/GitHub_Trending/fa/fastapi创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2461644.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!