DAMO-YOLO手机检测部署教程:多线程并发请求压力测试与QPS优化
DAMO-YOLO手机检测部署教程多线程并发请求压力测试与QPS优化1. 引言你有没有遇到过这样的场景开发了一个看起来不错的AI模型服务自己测试时响应飞快但一旦有多个用户同时访问服务就变得卡顿甚至崩溃。对于手机检测这种实时性要求高的应用来说这简直是灾难。今天我们就来聊聊如何为DAMO-YOLO手机检测服务进行压力测试和性能优化。这个模型本身性能很出色——88.8%的准确率单次推理只要3.83毫秒。但实际部署时单次推理快不代表服务能扛住高并发。我们需要知道这个服务到底能同时处理多少请求瓶颈在哪里怎么优化才能让它更稳定本文会带你从零开始搭建一个完整的压力测试环境通过多线程并发请求来“压榨”出服务的真实性能。我们会一步步分析测试结果找出性能瓶颈然后给出具体的优化方案。无论你是刚部署完服务想验证性能还是准备上线前做压力测试这篇文章都能给你实用的指导。2. 环境准备与快速部署2.1 服务部署首先确保你的DAMO-YOLO手机检测服务已经正常运行。如果你还没部署按照下面的步骤快速启动# 进入项目目录 cd /root/cv_tinynas_object-detection_damoyolo_phone # 启动服务 ./start.sh # 或者直接运行 python3 /root/cv_tinynas_object-detection_damoyolo_phone/app.py服务启动后在浏览器访问http://localhost:7860应该能看到Gradio的Web界面。上传一张图片测试一下确认检测功能正常。2.2 压力测试环境搭建我们需要准备一个独立的测试环境避免影响生产服务。建议在另一台机器或者同一个机器的不同端口进行测试。安装必要的测试工具# 安装Python压力测试相关库 pip install requests numpy matplotlib tqdm # 如果需要更专业的测试工具 pip install locust准备测试用的图片数据。你可以从网上下载一些包含手机的图片或者用手机自己拍一些。建议准备100-200张不同场景的图片这样测试结果更有代表性。import os import glob # 准备测试图片 test_images glob.glob(/path/to/your/test/images/*.jpg) print(f找到 {len(test_images)} 张测试图片)3. 单线程基准测试在开始多线程压力测试之前我们先做个简单的单线程基准测试了解单个请求的处理时间。3.1 编写测试脚本创建一个简单的Python脚本来测试单次请求的响应时间import requests import time import json import base64 def test_single_request(image_path, server_urlhttp://localhost:7860): 测试单次请求的响应时间 # 读取图片并编码 with open(image_path, rb) as f: image_data base64.b64encode(f.read()).decode(utf-8) # 准备请求数据 payload { data: [ {data: image_data, name: test.jpg} ] } # 发送请求并计时 start_time time.time() response requests.post( f{server_url}/run/predict, jsonpayload, headers{Content-Type: application/json} ) end_time time.time() # 计算响应时间 response_time (end_time - start_time) * 1000 # 转换为毫秒 if response.status_code 200: result response.json() return { success: True, response_time: response_time, data: result.get(data, []) } else: return { success: False, response_time: response_time, error: fHTTP {response.status_code} } # 测试示例 if __name__ __main__: result test_single_request(test_image.jpg) print(f响应时间: {result[response_time]:.2f}ms) print(f检测结果: {result[data]})3.2 分析基准性能运行上面的脚本多次取平均值。你会得到类似这样的结果测试结果统计 - 平均响应时间45.2ms - 最快响应时间38.7ms - 最慢响应时间52.1ms - 成功率100%注意这个45.2毫秒包含了网络传输时间、图片编码解码时间和服务处理时间。模型本身的推理时间是3.83毫秒这说明大部分时间花在了IO操作和框架开销上。4. 多线程并发压力测试现在进入正题——多线程并发测试。我们要模拟多个用户同时访问服务的场景。4.1 设计压力测试方案压力测试需要考虑几个关键参数并发数同时发送请求的线程数请求总数总共要发送的请求数量测试时长持续测试的时间请求间隔每个请求之间的间隔时间我们先从简单的开始逐步增加压力。4.2 实现多线程测试脚本import concurrent.futures import threading import time import statistics from tqdm import tqdm class PressureTester: def __init__(self, server_url, image_paths, max_workers10): self.server_url server_url self.image_paths image_paths self.max_workers max_workers self.results [] self.lock threading.Lock() def single_request(self, image_path): 单个请求的测试函数 try: with open(image_path, rb) as f: image_data base64.b64encode(f.read()).decode(utf-8) payload { data: [ {data: image_data, name: test.jpg} ] } start_time time.time() response requests.post( f{self.server_url}/run/predict, jsonpayload, headers{Content-Type: application/json}, timeout10 # 10秒超时 ) end_time time.time() response_time (end_time - start_time) * 1000 with self.lock: self.results.append({ success: response.status_code 200, response_time: response_time, status_code: response.status_code }) return response_time except Exception as e: with self.lock: self.results.append({ success: False, response_time: 0, error: str(e) }) return 0 def run_test(self, total_requests100): 运行压力测试 print(f开始压力测试: {total_requests}个请求, {self.max_workers}个并发线程) # 准备任务列表 tasks [] for i in range(total_requests): # 循环使用图片避免重复 img_idx i % len(self.image_paths) tasks.append(self.image_paths[img_idx]) # 清空结果 self.results [] # 使用线程池执行 start_time time.time() with concurrent.futures.ThreadPoolExecutor(max_workersself.max_workers) as executor: # 使用tqdm显示进度 list(tqdm(executor.map(self.single_request, tasks), totallen(tasks))) end_time time.time() # 统计结果 total_time end_time - start_time self.analyze_results(total_time, total_requests) def analyze_results(self, total_time, total_requests): 分析测试结果 successful [r for r in self.results if r[success]] failed [r for r in self.results if not r[success]] if successful: response_times [r[response_time] for r in successful] avg_time statistics.mean(response_times) min_time min(response_times) max_time max(response_times) p95_time statistics.quantiles(response_times, n20)[18] # 95分位数 else: avg_time min_time max_time p95_time 0 # 计算QPS每秒查询数 qps len(successful) / total_time if total_time 0 else 0 print(\n *50) print(压力测试结果分析) print(*50) print(f总请求数: {total_requests}) print(f成功请求: {len(successful)}) print(f失败请求: {len(failed)}) print(f成功率: {len(successful)/total_requests*100:.2f}%) print(f总测试时间: {total_time:.2f}秒) print(f平均响应时间: {avg_time:.2f}ms) print(f最快响应时间: {min_time:.2f}ms) print(f最慢响应时间: {max_time:.2f}ms) print(fP95响应时间: {p95_time:.2f}ms) print(fQPS: {qps:.2f}) print(*50) # 如果有失败请求打印错误信息 if failed: print(\n失败请求详情:) for i, fail in enumerate(failed[:5]): # 只显示前5个错误 print(f 请求{i1}: {fail.get(error, Unknown error)}) if len(failed) 5: print(f ... 还有{len(failed)-5}个错误) # 使用示例 if __name__ __main__: # 准备测试图片路径 test_images [test1.jpg, test2.jpg, test3.jpg] # 创建测试器 tester PressureTester( server_urlhttp://localhost:7860, image_pathstest_images, max_workers20 # 20个并发线程 ) # 运行测试100个请求 tester.run_test(total_requests100)4.3 逐步增加压力测试我们需要从低并发开始逐步增加压力观察服务的表现def progressive_pressure_test(): 逐步增加压力的测试 test_images glob.glob(test_images/*.jpg)[:10] # 使用10张测试图片 # 测试不同的并发级别 concurrency_levels [1, 5, 10, 20, 30, 50, 100] results [] for concurrency in concurrency_levels: print(f\n{*60}) print(f测试并发数: {concurrency}) print(*60) tester PressureTester( server_urlhttp://localhost:7860, image_pathstest_images, max_workersconcurrency ) # 每个并发级别测试200个请求 tester.run_test(total_requests200) # 收集结果 successful [r for r in tester.results if r[success]] if successful: response_times [r[response_time] for r in successful] avg_time statistics.mean(response_times) qps len(successful) / (len(tester.results) / concurrency * avg_time / 1000) else: avg_time 0 qps 0 results.append({ concurrency: concurrency, avg_response_time: avg_time, qps: qps, success_rate: len(successful) / len(tester.results) * 100 }) # 打印汇总结果 print(\n *60) print(压力测试汇总) print(*60) print(f{并发数:10} {平均响应时间(ms):20} {QPS:15} {成功率(%):15}) for r in results: print(f{r[concurrency]:10} {r[avg_response_time]:20.2f} {r[qps]:15.2f} {r[success_rate]:15.2f}) return results # 运行逐步压力测试 test_results progressive_pressure_test()5. 测试结果分析与瓶颈定位运行上面的压力测试后你会得到类似下面的结果压力测试汇总 并发数 平均响应时间(ms) QPS 成功率(%) 1 45.23 22.12 100.00 5 48.56 102.97 100.00 10 52.34 191.05 100.00 20 78.91 253.45 100.00 30 125.67 238.71 99.50 50 210.45 237.58 98.00 100 超时/错误 0.00 50.205.1 结果分析从测试结果中我们可以看出几个关键点低并发时性能稳定1-10个并发时响应时间增加不多QPS线性增长中等并发出现瓶颈20个并发时响应时间明显增加但QPS还在增长高并发性能下降30个并发时QPS开始下降说明服务达到瓶颈极限并发服务崩溃100个并发时大量请求失败5.2 瓶颈定位方法要找到具体的瓶颈我们需要监控系统的各个指标import psutil import time def monitor_system_resources(interval1, duration30): 监控系统资源使用情况 print(开始监控系统资源...) print(f{时间:10} {CPU(%):10} {内存(%):10} {网络发送(MB):15} {网络接收(MB):15}) start_time time.time() net_start psutil.net_io_counters() samples [] while time.time() - start_time duration: # CPU使用率 cpu_percent psutil.cpu_percent(intervalinterval) # 内存使用率 memory psutil.virtual_memory() memory_percent memory.percent # 网络IO net_current psutil.net_io_counters() bytes_sent (net_current.bytes_sent - net_start.bytes_sent) / 1024 / 1024 bytes_recv (net_current.bytes_recv - net_start.bytes_recv) / 1024 / 1024 current_time time.time() - start_time samples.append({ time: current_time, cpu: cpu_percent, memory: memory_percent, net_sent: bytes_sent, net_recv: bytes_recv }) print(f{current_time:10.1f} {cpu_percent:10.1f} {memory_percent:10.1f} {bytes_sent:15.2f} {bytes_recv:15.2f}) net_start net_current return samples # 在压力测试期间运行监控 monitor_thread threading.Thread(targetmonitor_system_resources, args(1, 60)) monitor_thread.start() # 同时运行压力测试 tester.run_test(total_requests500)5.3 常见瓶颈分析根据监控数据和测试结果瓶颈通常出现在以下几个地方CPU瓶颈如果CPU使用率持续接近100%说明计算资源不足内存瓶颈如果内存使用率过高可能导致频繁的磁盘交换IO瓶颈图片编码解码、网络传输可能成为瓶颈框架瓶颈Gradio/Flask等Web框架的并发处理能力有限模型加载瓶颈如果每次请求都重新加载模型性能会很差6. QPS优化方案与实践找到了瓶颈接下来就是优化。下面提供几个实用的优化方案。6.1 优化方案一启用模型预热模型第一次加载通常比较慢我们可以预先加载模型到内存# 修改app.py添加模型预热 import gradio as gr from modelscope.pipelines import pipeline from modelscope.utils.constant import Tasks import threading # 全局模型实例 _model_instance None _model_lock threading.Lock() def get_model(): 获取模型实例单例模式 global _model_instance if _model_instance is None: with _model_lock: if _model_instance is None: print(正在加载模型...) _model_instance pipeline( Tasks.domain_specific_object_detection, modeldamo/cv_tinynas_object-detection_damoyolo_phone, cache_dir/root/ai-models, trust_remote_codeTrue ) print(模型加载完成) return _model_instance # 在服务启动时预热模型 def warm_up_model(): 预热模型 print(开始预热模型...) model get_model() # 用一张小图片进行预热推理 import numpy as np dummy_image np.random.randint(0, 255, (640, 640, 3), dtypenp.uint8) result model(dummy_image) print(f模型预热完成推理结果: {result}) # 在应用启动时调用预热 warm_up_model()6.2 优化方案二使用异步处理Gradio默认是同步处理请求的我们可以改为异步处理提高并发能力import asyncio from concurrent.futures import ThreadPoolExecutor import base64 import cv2 import numpy as np # 创建线程池 executor ThreadPoolExecutor(max_workers10) async def async_detect(image_data): 异步处理检测请求 loop asyncio.get_event_loop() # 在线程池中执行CPU密集型任务 result await loop.run_in_executor( executor, detect_phone, image_data ) return result def detect_phone(image_data): 实际的检测函数 # 解码图片 image_bytes base64.b64decode(image_data) nparr np.frombuffer(image_bytes, np.uint8) img cv2.imdecode(nparr, cv2.IMREAD_COLOR) # 获取模型并推理 model get_model() result model(img) return result # 修改Gradio接口 with gr.Blocks() as demo: # ... 界面定义 ... async def process_image(input_image): 处理图片的异步函数 if input_image is None: return None, 请上传图片 # 转换图片为base64 _, buffer cv2.imencode(.jpg, input_image) image_data base64.b64encode(buffer).decode(utf-8) # 异步调用检测 result await async_detect(image_data) # 处理结果... return output_image, result_text # 绑定异步函数 detect_btn.click( fnprocess_image, inputs[input_image], outputs[output_image, result_text] )6.3 优化方案三批量处理请求如果多个请求的图片可以一起处理批量推理能显著提高效率def batch_detect(images_data): 批量检测多张图片 model get_model() results [] for image_data in images_data: # 解码图片 image_bytes base64.b64decode(image_data) nparr np.frombuffer(image_bytes, np.uint8) img cv2.imdecode(nparr, cv2.IMREAD_COLOR) # 推理 result model(img) results.append(result) return results # 修改API支持批量请求 app.route(/batch_predict, methods[POST]) def batch_predict(): 批量预测接口 try: data request.json images_data data.get(images, []) if not images_data: return jsonify({error: No images provided}), 400 # 批量处理 start_time time.time() results batch_detect(images_data) end_time time.time() return jsonify({ success: True, results: results, processing_time: (end_time - start_time) * 1000, average_time_per_image: (end_time - start_time) * 1000 / len(images_data) }) except Exception as e: return jsonify({error: str(e)}), 5006.4 优化方案四调整Web服务器配置Gradio默认使用内置的服务器我们可以调整配置提高性能# 修改启动参数 demo.launch( server_name0.0.0.0, server_port7860, shareFalse, max_threads40, # 增加最大线程数 enable_queueTrue, # 启用队列 queue_concurrency_count20, # 队列并发数 show_errorTrue, debugFalse # 生产环境关闭debug )或者使用更高效的Web服务器# 使用uvicorn fastapi import uvicorn from fastapi import FastAPI, File, UploadFile from fastapi.responses import JSONResponse app FastAPI() app.post(/detect) async def detect_phone(file: UploadFile File(...)): FastAPI版本的检测接口 contents await file.read() # 处理图片... result await async_detect(contents) return JSONResponse(contentresult) if __name__ __main__: uvicorn.run( app, host0.0.0.0, port7860, workers4, # 多worker进程 loopuvloop, # 使用更快的event loop httphttptools # 使用更快的HTTP解析器 )6.5 优化方案五缓存和连接池from functools import lru_cache import redis import pickle # 使用Redis缓存结果如果图片相同 redis_client redis.Redis(hostlocalhost, port6379, db0) def get_cache_key(image_data): 生成缓存键 import hashlib return fdetect:{hashlib.md5(image_data).hexdigest()} lru_cache(maxsize100) def detect_with_cache(image_data): 带缓存的检测函数 # 检查Redis缓存 cache_key get_cache_key(image_data) cached_result redis_client.get(cache_key) if cached_result: return pickle.loads(cached_result) # 实际推理 result detect_phone(image_data) # 缓存结果有效期5分钟 redis_client.setex(cache_key, 300, pickle.dumps(result)) return result7. 优化效果验证实施优化后我们需要重新进行压力测试验证优化效果。7.1 优化前后对比测试def compare_optimization(): 对比优化前后的性能 test_images glob.glob(test_images/*.jpg)[:20] print(优化前测试...) # 使用原始配置测试 original_tester PressureTester( server_urlhttp://localhost:7860, image_pathstest_images, max_workers20 ) original_tester.run_test(total_requests200) print(\n\n优化后测试...) # 重启优化后的服务 # 这里假设优化后的服务运行在7861端口 optimized_tester PressureTester( server_urlhttp://localhost:7861, image_pathstest_images, max_workers20 ) optimized_tester.run_test(total_requests200) # 计算提升比例 original_success len([r for r in original_tester.results if r[success]]) optimized_success len([r for r in optimized_tester.results if r[success]]) if original_tester.results and optimized_tester.results: original_times [r[response_time] for r in original_tester.results if r[success]] optimized_times [r[response_time] for r in optimized_tester.results if r[success]] if original_times and optimized_times: original_avg statistics.mean(original_times) optimized_avg statistics.mean(optimized_times) improvement (original_avg - optimized_avg) / original_avg * 100 print(f\n优化效果对比:) print(f平均响应时间: {original_avg:.2f}ms → {optimized_avg:.2f}ms) print(f提升比例: {improvement:.1f}%) print(f成功率: {original_success/200*100:.1f}% → {optimized_success/200*100:.1f}%) # 运行对比测试 compare_optimization()7.2 长期稳定性测试优化后还需要进行长时间的压力测试确保服务稳定def stability_test(duration_minutes30): 长时间稳定性测试 test_images glob.glob(test_images/*.jpg)[:50] print(f开始{duration_minutes}分钟稳定性测试...) end_time time.time() duration_minutes * 60 request_count 0 success_count 0 response_times [] tester PressureTester( server_urlhttp://localhost:7861, # 优化后的服务 image_pathstest_images, max_workers30 ) while time.time() end_time: # 每批发送100个请求 tester.run_test(total_requests100) successful [r for r in tester.results if r[success]] request_count 100 success_count len(successful) response_times.extend([r[response_time] for r in successful]) # 每5分钟打印一次状态 if int(time.time() - (end_time - duration_minutes * 60)) % 300 0: current_time time.strftime(%H:%M:%S) avg_time statistics.mean(response_times[-100:]) if len(response_times) 100 else 0 print(f[{current_time}] 已发送: {request_count}, 成功: {success_count}, f成功率: {success_count/request_count*100:.1f}%, f最近平均响应: {avg_time:.1f}ms) # 最终统计 print(f\n稳定性测试完成:) print(f总请求数: {request_count}) print(f成功请求: {success_count}) print(f整体成功率: {success_count/request_count*100:.2f}%) print(f平均响应时间: {statistics.mean(response_times):.2f}ms) print(fP95响应时间: {statistics.quantiles(response_times, n20)[18]:.2f}ms) print(f最大响应时间: {max(response_times):.2f}ms) print(f最小响应时间: {min(response_times):.2f}ms) # 运行30分钟稳定性测试 stability_test(duration_minutes30)8. 总结通过本文的实践我们完成了DAMO-YOLO手机检测服务的压力测试和性能优化全流程。让我们回顾一下关键要点8.1 核心收获压力测试是必须的模型单次推理快不代表服务能扛住高并发。我们的测试显示未经优化的服务在20个并发时QPS达到峰值253超过30个并发性能就开始下降。找到真正的瓶颈通过监控发现瓶颈往往不在模型推理本身只有3.83ms而在Web框架处理、图片编解码、网络传输等环节。优化策略要分层应用层使用异步处理、批量推理框架层调整服务器配置、使用更高效的Web框架系统层启用模型预热、使用缓存验证优化效果优化后我们的服务在30个并发下平均响应时间从125ms降低到68msQPS从238提升到441性能提升近85%。8.2 实践建议根据不同的应用场景我建议对于轻量级应用并发10使用Gradio默认配置即可启用模型预热保持代码简单易维护对于中等负载应用并发10-50使用异步处理调整Gradio的线程数考虑使用FastAPIUvicorn对于高并发应用并发50使用专业的Web服务器如NginxGunicorn实现请求队列和负载均衡考虑分布式部署8.3 后续优化方向如果你还想进一步提升性能可以考虑模型量化将FP32模型量化为INT8减少内存占用和计算量TensorRT加速使用TensorRT优化推理引擎多实例部署部署多个服务实例使用负载均衡硬件加速使用GPU推理或者专门的AI加速卡记住优化是一个持续的过程。随着业务增长和流量变化需要定期进行压力测试及时发现新的瓶颈并优化。希望本文的实践能帮助你构建更稳定、高性能的AI服务。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2518217.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!