GLM-ASR-Nano-2512实战教程:Python SDK封装与异步批量任务队列集成
GLM-ASR-Nano-2512实战教程Python SDK封装与异步批量任务队列集成1. 引言如果你正在寻找一个既强大又高效的语音识别工具GLM-ASR-Nano-2512绝对值得你花时间了解。这个拥有15亿参数的开源模型在多个测试中表现超越了知名的Whisper V3而且模型体积控制得相当不错。但你可能已经发现直接使用它的Web界面或基础API在处理大量音频文件时会有些力不从心。手动一个个上传文件、等待结果、再下载这个过程既耗时又容易出错。特别是当你需要处理成百上千个音频文件时这种手动操作方式几乎不可行。这正是我们今天要解决的问题。我将带你一步步构建一个完整的解决方案一个封装了GLM-ASR-Nano-2512的Python SDK再加上一个智能的异步批量任务队列系统。有了这个工具你可以轻松地批量处理音频文件系统会自动管理任务、处理错误、保存结果而你只需要关注最终的文字输出。2. 环境准备与快速部署在开始编码之前我们需要先把GLM-ASR-Nano-2512服务跑起来。别担心这个过程比你想的要简单。2.1 基础环境检查首先确认你的机器满足基本要求操作系统Linux或Windows建议Ubuntu 22.04内存至少16GB存储空间至少10GB可用GPU有NVIDIA显卡更好RTX 4090/3090最佳没有也能用CPU运行如果你有NVIDIA显卡还需要检查CUDA驱动nvidia-smi这个命令会显示你的GPU信息和CUDA版本。GLM-ASR-Nano-2512需要CUDA 12.4或更高版本。2.2 一键部署服务最简单的方法是使用Docker它能帮你处理好所有依赖关系。如果你还没有安装Docker可以先到Docker官网下载安装。准备好一个Dockerfile文件内容如下FROM nvidia/cuda:12.4.0-runtime-ubuntu22.04 # 安装必要的软件 RUN apt-get update apt-get install -y python3 python3-pip git-lfs RUN pip3 install torch torchaudio transformers gradio # 设置工作目录并复制文件 WORKDIR /app COPY . /app # 初始化Git LFS并拉取模型 RUN git lfs install git lfs pull # 暴露服务端口 EXPOSE 7860 # 启动服务 CMD [python3, app.py]然后创建docker-compose.yml文件让部署更简单version: 3.8 services: glm-asr: build: . container_name: glm-asr-service ports: - 7860:7860 volumes: - ./audio_data:/app/audio_data - ./results:/app/results environment: - CUDA_VISIBLE_DEVICES0 deploy: resources: reservations: devices: - driver: nvidia count: 1 capabilities: [gpu] restart: unless-stopped现在只需要一个命令就能启动服务docker-compose up -d等待几分钟服务就启动完成了。你可以在浏览器中打开http://localhost:7860看到GLM-ASR-Nano-2512的Web界面。2.3 验证服务状态服务启动后我们可以用简单的Python代码测试一下import requests def test_service(): 测试ASR服务是否正常运行 try: response requests.get(http://localhost:7860/) if response.status_code 200: print(✅ ASR服务运行正常) return True else: print(f❌ 服务返回异常状态码: {response.status_code}) return False except Exception as e: print(f❌ 连接服务失败: {e}) return False # 运行测试 if __name__ __main__: test_service()如果看到✅ ASR服务运行正常的输出说明一切准备就绪我们可以开始构建SDK了。3. Python SDK封装设计现在服务已经跑起来了我们来创建一个好用的Python SDK。好的SDK应该像使用本地函数一样简单隐藏掉所有复杂的网络请求细节。3.1 基础客户端类设计我们先从最基础的客户端开始它负责与ASR服务通信import requests import json import time from typing import Optional, Dict, Any from pathlib import Path import logging class GLMASRClient: GLM-ASR-Nano-2512基础客户端 def __init__(self, base_url: str http://localhost:7860, timeout: int 300): 初始化ASR客户端 Args: base_url: ASR服务地址 timeout: 请求超时时间秒 self.base_url base_url.rstrip(/) self.timeout timeout self.api_url f{self.base_url}/gradio_api/ self.logger logging.getLogger(__name__) def transcribe_file(self, audio_path: str, language: str auto, task: str transcribe) - Dict[str, Any]: 转录单个音频文件 Args: audio_path: 音频文件路径 language: 语言代码如zh、en或auto自动检测 task: 任务类型transcribe或translate Returns: 包含转录结果的字典 try: # 检查文件是否存在 if not Path(audio_path).exists(): raise FileNotFoundError(f音频文件不存在: {audio_path}) # 准备请求数据 files {file: open(audio_path, rb)} data { language: language, task: task } # 发送请求 self.logger.info(f开始转录文件: {audio_path}) start_time time.time() response requests.post( f{self.api_url}predict, filesfiles, datadata, timeoutself.timeout ) # 关闭文件 files[file].close() # 检查响应 if response.status_code 200: result response.json() elapsed time.time() - start_time self.logger.info(f转录完成: {audio_path}, 耗时: {elapsed:.2f}秒) return { success: True, file_path: audio_path, text: result.get(data, [])[0], language: result.get(data, [, ])[1], processing_time: elapsed } else: self.logger.error(f转录失败: {response.status_code}, {response.text}) return { success: False, file_path: audio_path, error: fHTTP {response.status_code}: {response.text} } except Exception as e: self.logger.error(f转录过程出错: {audio_path}, 错误: {e}) return { success: False, file_path: audio_path, error: str(e) } def transcribe_text(self, text: str) - Dict[str, Any]: 测试用直接转录文本实际调用语音识别 这个方法主要用于测试服务连通性 Args: text: 测试文本 Returns: 测试结果 # 注意实际ASR服务需要音频文件 # 这里只是演示API调用格式 data { data: [text, auto, transcribe] } try: response requests.post( f{self.api_url}predict, jsondata, timeoutself.timeout ) if response.status_code 200: return { success: True, response: response.json() } else: return { success: False, error: fHTTP {response.status_code} } except Exception as e: return { success: False, error: str(e) } def get_service_info(self) - Dict[str, Any]: 获取服务信息 try: response requests.get(f{self.base_url}/, timeout10) return { status: running if response.status_code 200 else stopped, url: self.base_url } except: return {status: unreachable, url: self.base_url}这个基础客户端已经能处理单个文件的转录了。但你会发现它有几个明显的不足没有错误重试机制不支持批量处理没有进度跟踪结果保存不方便别急我们一步步来完善它。3.2 增强型客户端添加重试和进度跟踪让我们创建一个更强大的客户端解决上述问题import asyncio import aiohttp from tenacity import retry, stop_after_attempt, wait_exponential from tqdm import tqdm import pandas as pd class EnhancedGLMASRClient(GLMASRClient): 增强型ASR客户端支持重试和批量处理 def __init__(self, base_url: str http://localhost:7860, max_retries: int 3, batch_size: int 5): super().__init__(base_url) self.max_retries max_retries self.batch_size batch_size retry(stopstop_after_attempt(3), waitwait_exponential(multiplier1, min4, max10)) def transcribe_with_retry(self, audio_path: str, **kwargs) - Dict[str, Any]: 带重试机制的转录方法 Args: audio_path: 音频文件路径 **kwargs: 其他参数传递给transcribe_file Returns: 转录结果 return self.transcribe_file(audio_path, **kwargs) def transcribe_batch(self, audio_files: list, output_format: str csv, output_file: str None) - pd.DataFrame: 批量转录多个音频文件 Args: audio_files: 音频文件路径列表 output_format: 输出格式csv或excel output_file: 输出文件路径可选 Returns: 包含所有结果的DataFrame results [] # 使用进度条 with tqdm(totallen(audio_files), desc转录进度) as pbar: for i in range(0, len(audio_files), self.batch_size): batch audio_files[i:i self.batch_size] # 处理当前批次 for audio_file in batch: try: result self.transcribe_with_retry(audio_file) results.append(result) except Exception as e: self.logger.error(f处理文件失败: {audio_file}, 错误: {e}) results.append({ success: False, file_path: audio_file, error: str(e), text: , language: , processing_time: 0 }) finally: pbar.update(1) # 批次间短暂暂停避免服务器压力过大 time.sleep(0.5) # 转换为DataFrame df pd.DataFrame(results) # 保存结果 if output_file: if output_format csv: df.to_csv(output_file, indexFalse, encodingutf-8-sig) self.logger.info(f结果已保存到: {output_file}) elif output_format excel: df.to_excel(output_file, indexFalse) self.logger.info(f结果已保存到: {output_file}) return df async def transcribe_async(self, audio_path: str, session: aiohttp.ClientSession, **kwargs) - Dict[str, Any]: 异步转录单个文件 Args: audio_path: 音频文件路径 session: aiohttp会话 **kwargs: 其他参数 Returns: 转录结果 try: # 异步读取文件 with open(audio_path, rb) as f: file_data f.read() # 准备表单数据 data aiohttp.FormData() data.add_field(file, file_data, filenamePath(audio_path).name, content_typeaudio/wav) data.add_field(language, kwargs.get(language, auto)) data.add_field(task, kwargs.get(task, transcribe)) # 异步请求 async with session.post( f{self.api_url}predict, datadata, timeoutaiohttp.ClientTimeout(totalself.timeout) ) as response: if response.status 200: result await response.json() return { success: True, file_path: audio_path, text: result.get(data, [])[0], language: result.get(data, [, ])[1] } else: return { success: False, file_path: audio_path, error: fHTTP {response.status} } except Exception as e: return { success: False, file_path: audio_path, error: str(e) }现在我们的客户端已经强大多了支持批量处理、错误重试还有进度条显示。但处理大量文件时我们还需要更强大的工具。4. 异步批量任务队列系统当你需要处理成千上万个音频文件时简单的批量处理就不够用了。我们需要一个真正的任务队列系统能够管理任务状态、处理失败重试、控制并发数还能随时查看进度。4.1 任务队列核心设计让我们先设计任务队列的基本结构from queue import Queue from threading import Thread, Lock import sqlite3 from datetime import datetime import json class ASRTaskQueue: ASR任务队列管理系统 def __init__(self, db_path: str asr_tasks.db, max_workers: int 3): 初始化任务队列 Args: db_path: SQLite数据库路径 max_workers: 最大工作线程数 self.db_path db_path self.max_workers max_workers self.task_queue Queue() self.workers [] self.running False self.lock Lock() # 初始化数据库 self._init_database() # 初始化客户端 self.client EnhancedGLMASRClient() def _init_database(self): 初始化任务数据库 conn sqlite3.connect(self.db_path) cursor conn.cursor() # 创建任务表 cursor.execute( CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_path TEXT NOT NULL, status TEXT DEFAULT pending, language TEXT DEFAULT auto, task_type TEXT DEFAULT transcribe, priority INTEGER DEFAULT 0, retry_count INTEGER DEFAULT 0, max_retries INTEGER DEFAULT 3, result_text TEXT, result_language TEXT, processing_time REAL, error_message TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, started_at TIMESTAMP, completed_at TIMESTAMP ) ) # 创建索引 cursor.execute(CREATE INDEX IF NOT EXISTS idx_status ON tasks(status)) cursor.execute(CREATE INDEX IF NOT EXISTS idx_priority ON tasks(priority DESC)) conn.commit() conn.close() def add_task(self, file_path: str, language: str auto, task_type: str transcribe, priority: int 0): 添加任务到队列 Args: file_path: 音频文件路径 language: 语言代码 task_type: 任务类型 priority: 优先级数字越大优先级越高 conn sqlite3.connect(self.db_path) cursor conn.cursor() cursor.execute( INSERT INTO tasks (file_path, language, task_type, priority, status) VALUES (?, ?, ?, ?, pending) , (file_path, language, task_type, priority)) task_id cursor.lastrowid conn.commit() conn.close() # 添加到内存队列 self.task_queue.put({ id: task_id, file_path: file_path, language: language, task_type: task_type }) return task_id def add_batch_tasks(self, file_paths: list, **kwargs): 批量添加任务 Args: file_paths: 文件路径列表 **kwargs: 其他参数传递给add_task Returns: 任务ID列表 task_ids [] for file_path in file_paths: task_id self.add_task(file_path, **kwargs) task_ids.append(task_id) return task_ids def _worker(self, worker_id: int): 工作线程函数 while self.running: try: # 从队列获取任务 task self.task_queue.get(timeout1) # 更新任务状态为处理中 self._update_task_status(task[id], processing) # 执行转录任务 result self.client.transcribe_file( task[file_path], languagetask[language], tasktask[task_type] ) # 更新任务结果 if result[success]: self._update_task_result( task[id], completed, result_textresult[text], result_languageresult[language], processing_timeresult.get(processing_time, 0) ) else: # 检查是否需要重试 retry_count self._get_retry_count(task[id]) max_retries self._get_max_retries(task[id]) if retry_count max_retries: # 重新加入队列重试 self._update_task_status(task[id], pending, retry_countretry_count 1) self.task_queue.put(task) else: # 标记为失败 self._update_task_result( task[id], failed, error_messageresult[error] ) # 标记任务完成 self.task_queue.task_done() except Exception as e: self.client.logger.error(f工作线程 {worker_id} 出错: {e}) continue def _update_task_status(self, task_id: int, status: str, **kwargs): 更新任务状态 conn sqlite3.connect(self.db_path) cursor conn.cursor() if status processing: cursor.execute( UPDATE tasks SET status ?, started_at CURRENT_TIMESTAMP, retry_count ? WHERE id ? , (status, kwargs.get(retry_count, 0), task_id)) else: cursor.execute( UPDATE tasks SET status ? WHERE id ? , (status, task_id)) conn.commit() conn.close() def _update_task_result(self, task_id: int, status: str, **kwargs): 更新任务结果 conn sqlite3.connect(self.db_path) cursor conn.cursor() cursor.execute( UPDATE tasks SET status ?, result_text ?, result_language ?, processing_time ?, error_message ?, completed_at CURRENT_TIMESTAMP WHERE id ? , (status, kwargs.get(result_text), kwargs.get(result_language), kwargs.get(processing_time), kwargs.get(error_message), task_id)) conn.commit() conn.close() def start(self): 启动任务队列 if self.running: return self.running True # 启动工作线程 for i in range(self.max_workers): worker Thread(targetself._worker, args(i,), daemonTrue) worker.start() self.workers.append(worker) self.client.logger.info(f任务队列已启动工作线程数: {self.max_workers}) def stop(self): 停止任务队列 self.running False # 等待所有工作线程结束 for worker in self.workers: worker.join(timeout5) self.workers.clear() self.client.logger.info(任务队列已停止) def get_stats(self) - Dict[str, Any]: 获取队列统计信息 conn sqlite3.connect(self.db_path) cursor conn.cursor() # 统计各状态任务数量 cursor.execute( SELECT status, COUNT(*) as count FROM tasks GROUP BY status ) status_counts {row[0]: row[1] for row in cursor.fetchall()} # 统计平均处理时间 cursor.execute( SELECT AVG(processing_time) FROM tasks WHERE status completed AND processing_time 0 ) avg_time cursor.fetchone()[0] or 0 conn.close() return { total_tasks: sum(status_counts.values()), status_counts: status_counts, queue_size: self.task_queue.qsize(), active_workers: len([w for w in self.workers if w.is_alive()]), avg_processing_time: round(avg_time, 2) }这个任务队列系统已经相当完善了但它还缺少一个重要的功能Web界面。让我们为它添加一个简单的监控界面。4.2 Web监控界面使用Flask创建一个简单的监控界面from flask import Flask, render_template, jsonify import threading class ASRQueueMonitor: ASR任务队列监控界面 def __init__(self, task_queue: ASRTaskQueue, host: str 0.0.0.0, port: int 5000): self.task_queue task_queue self.host host self.port port self.app Flask(__name__) # 设置路由 self._setup_routes() def _setup_routes(self): 设置Flask路由 self.app.route(/) def index(): 监控主页 stats self.task_queue.get_stats() # 获取最近的任务 conn sqlite3.connect(self.task_queue.db_path) cursor conn.cursor() cursor.execute( SELECT id, file_path, status, created_at, processing_time, error_message FROM tasks ORDER BY created_at DESC LIMIT 50 ) recent_tasks [] for row in cursor.fetchall(): recent_tasks.append({ id: row[0], file_path: row[1], status: row[2], created_at: row[3], processing_time: row[4], error_message: row[5] }) conn.close() return render_template(monitor.html, statsstats, tasksrecent_tasks) self.app.route(/api/stats) def get_stats(): 获取统计信息API return jsonify(self.task_queue.get_stats()) self.app.route(/api/tasks) def get_tasks(): 获取任务列表API status request.args.get(status, all) limit int(request.args.get(limit, 100)) conn sqlite3.connect(self.task_queue.db_path) cursor conn.cursor() if status all: cursor.execute( SELECT id, file_path, status, created_at, processing_time, error_message FROM tasks ORDER BY created_at DESC LIMIT ? , (limit,)) else: cursor.execute( SELECT id, file_path, status, created_at, processing_time, error_message FROM tasks WHERE status ? ORDER BY created_at DESC LIMIT ? , (status, limit)) tasks [] for row in cursor.fetchall(): tasks.append({ id: row[0], file_path: row[1], status: row[2], created_at: row[3], processing_time: row[4], error_message: row[5] }) conn.close() return jsonify(tasks) self.app.route(/api/tasks/int:task_id) def get_task(task_id): 获取单个任务详情 conn sqlite3.connect(self.task_queue.db_path) cursor conn.cursor() cursor.execute( SELECT * FROM tasks WHERE id ? , (task_id,)) row cursor.fetchone() conn.close() if row: columns [description[0] for description in cursor.description] task dict(zip(columns, row)) return jsonify(task) else: return jsonify({error: Task not found}), 404 def start(self): 启动监控服务 thread threading.Thread( targetlambda: self.app.run( hostself.host, portself.port, debugFalse, use_reloaderFalse ), daemonTrue ) thread.start() print(f监控界面已启动: http://{self.host}:{self.port})现在我们需要创建一个简单的HTML模板。在templates文件夹中创建monitor.html!DOCTYPE html html head titleASR任务队列监控/title style body { font-family: Arial, sans-serif; margin: 20px; } .stats { background: #f5f5f5; padding: 20px; border-radius: 5px; margin-bottom: 20px; } .stat-item { display: inline-block; margin-right: 30px; } .stat-value { font-size: 24px; font-weight: bold; color: #007bff; } .task-table { width: 100%; border-collapse: collapse; } .task-table th, .task-table td { border: 1px solid #ddd; padding: 8px; text-align: left; } .task-table th { background-color: #f2f2f2; } .status-pending { color: #ffc107; } .status-processing { color: #17a2b8; } .status-completed { color: #28a745; } .status-failed { color: #dc3545; } /style /head body h1GLM-ASR-Nano-2512 任务队列监控/h1 div classstats h2系统统计/h2 div classstat-item div总任务数/div div classstat-value{{ stats.total_tasks }}/div /div div classstat-item div队列大小/div div classstat-value{{ stats.queue_size }}/div /div div classstat-item div工作线程/div div classstat-value{{ stats.active_workers }}/div /div div classstat-item div平均处理时间/div div classstat-value{{ stats.avg_processing_time }}s/div /div /div h2任务状态分布/h2 div {% for status, count in stats.status_counts.items() %} div classstat-item div{{ status }}/div div classstat-value{{ count }}/div /div {% endfor %} /div h2最近任务/h2 table classtask-table thead tr thID/th th文件路径/th th状态/th th创建时间/th th处理时间/th th错误信息/th /tr /thead tbody {% for task in tasks %} tr td{{ task.id }}/td td{{ task.file_path }}/td td classstatus-{{ task.status }}{{ task.status }}/td td{{ task.created_at }}/td td{{ task.processing_time or - }}/td td{{ task.error_message or - }}/td /tr {% endfor %} /tbody /table script // 自动刷新页面 setTimeout(function() { location.reload(); }, 5000); // 每5秒刷新一次 /script /body /html5. 完整使用示例现在我们已经有了完整的SDK和任务队列系统让我们看看如何在实际项目中使用它。5.1 基础使用示例首先一个简单的使用示例# 示例1基础使用 from glm_asr_sdk import EnhancedGLMASRClient def basic_usage(): 基础使用示例 # 创建客户端 client EnhancedGLMASRClient(base_urlhttp://localhost:7860) # 检查服务状态 info client.get_service_info() print(f服务状态: {info[status]}) # 转录单个文件 result client.transcribe_file(audio/sample.wav, languagezh) if result[success]: print(f转录成功: {result[text]}) print(f检测语言: {result[language]}) print(f处理时间: {result[processing_time]:.2f}秒) else: print(f转录失败: {result[error]}) # 批量处理文件 audio_files [ audio/meeting1.wav, audio/meeting2.wav, audio/interview.mp3 ] results_df client.transcribe_batch( audio_files, output_formatcsv, output_filetranscription_results.csv ) print(f批量处理完成共处理 {len(results_df)} 个文件) print(f成功: {len(results_df[results_df[success] True])}) print(f失败: {len(results_df[results_df[success] False])}) if __name__ __main__: basic_usage()5.2 高级任务队列示例对于大规模处理使用任务队列# 示例2高级任务队列使用 import os from pathlib import Path def advanced_queue_usage(): 高级任务队列使用示例 # 创建任务队列 queue ASRTaskQueue( db_pathasr_tasks.db, max_workers4 # 4个并发工作线程 ) # 启动监控界面 monitor ASRQueueMonitor(queue, host0.0.0.0, port5000) monitor.start() # 启动任务队列 queue.start() try: # 扫描音频文件夹添加所有音频文件 audio_folder audio_data audio_extensions {.wav, .mp3, .flac, .ogg, .m4a} audio_files [] for root, dirs, files in os.walk(audio_folder): for file in files: if Path(file).suffix.lower() in audio_extensions: audio_files.append(os.path.join(root, file)) print(f找到 {len(audio_files)} 个音频文件) # 批量添加任务 task_ids queue.add_batch_tasks( audio_files, languageauto, # 自动检测语言 task_typetranscribe, priority1 ) print(f已添加 {len(task_ids)} 个任务到队列) # 等待所有任务完成 while True: stats queue.get_stats() pending stats[status_counts].get(pending, 0) processing stats[status_counts].get(processing, 0) completed stats[status_counts].get(completed, 0) total stats[total_tasks] print(f\r进度: {completed}/{total} 完成 | f等待: {pending} | 处理中: {processing}, end) if pending 0 and processing 0: print(\n所有任务已完成!) break time.sleep(5) # 每5秒检查一次进度 # 导出结果 conn sqlite3.connect(queue.db_path) df pd.read_sql_query( SELECT file_path, result_text, result_language, processing_time, error_message FROM tasks WHERE status completed ORDER BY completed_at , conn) conn.close() # 保存到Excel output_file ftranscription_results_{datetime.now().strftime(%Y%m%d_%H%M%S)}.xlsx df.to_excel(output_file, indexFalse) print(f结果已保存到: {output_file}) # 生成统计报告 generate_report(df, output_file.replace(.xlsx, _report.txt)) except KeyboardInterrupt: print(\n用户中断正在停止...) finally: queue.stop() def generate_report(df, report_file): 生成处理报告 total_files len(df) success_files len(df[df[result_text].notna()]) avg_time df[processing_time].mean() # 语言分布 language_dist df[result_language].value_counts() with open(report_file, w, encodingutf-8) as f: f.write( ASR处理报告 \n\n) f.write(f处理时间: {datetime.now().strftime(%Y-%m-%d %H:%M:%S)}\n) f.write(f总文件数: {total_files}\n) f.write(f成功文件数: {success_files}\n) f.write(f成功率: {success_files/total_files*100:.1f}%\n) f.write(f平均处理时间: {avg_time:.2f}秒\n\n) f.write(语言分布:\n) for lang, count in language_dist.items(): f.write(f {lang}: {count} 个文件 ({count/total_files*100:.1f}%)\n) # 处理时间分布 f.write(\n处理时间分布:\n) time_stats df[processing_time].describe() f.write(f 最短: {time_stats[min]:.2f}秒\n) f.write(f 最长: {time_stats[max]:.2f}秒\n) f.write(f 平均: {time_stats[mean]:.2f}秒\n) f.write(f 中位数: {time_stats[50%]:.2f}秒\n) print(f报告已生成: {report_file}) if __name__ __main__: advanced_queue_usage()5.3 实际应用场景这个系统可以应用在很多实际场景中场景1会议录音批量转录def transcribe_meetings(meeting_folder): 批量转录会议录音 queue ASRTaskQueue(max_workers2) # 会议录音通常较长减少并发数 queue.start() # 按日期组织会议录音 for date_folder in Path(meeting_folder).iterdir(): if date_folder.is_dir(): audio_files list(date_folder.glob(*.mp3)) list(date_folder.glob(*.wav)) # 添加任务设置较高的优先级 for audio_file in audio_files: queue.add_task( str(audio_file), languagezh, # 明确指定中文 priority5 # 高优先级 ) print(会议录音转录任务已提交)场景2客服录音实时处理def process_customer_service_calls(): 处理客服录音 import watchdog.observers import watchdog.events class NewAudioHandler(watchdog.events.FileSystemEventHandler): def __init__(self, queue): self.queue queue def on_created(self, event): if event.src_path.endswith((.wav, .mp3)): print(f检测到新录音: {event.src_path}) # 添加到任务队列实时处理 self.queue.add_task( event.src_path, languageauto, priority10 # 最高优先级实时处理 ) # 创建队列 queue ASRTaskQueue(max_workers3) queue.start() # 监控录音文件夹 audio_folder /path/to/customer_service/recordings event_handler NewAudioHandler(queue) observer watchdog.observers.Observer() observer.schedule(event_handler, audio_folder, recursiveFalse) observer.start() print(f开始监控文件夹: {audio_folder}) try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() queue.stop() observer.join()6. 总结通过本文的教程我们完成了一个完整的GLM-ASR-Nano-2512语音识别解决方案。让我们回顾一下关键收获6.1 核心成果我们构建的系统包含三个主要部分基础SDK客户端提供了简单易用的API让调用语音识别服务像调用本地函数一样方便增强型客户端增加了错误重试、批量处理、进度跟踪等实用功能完整的任务队列系统支持大规模音频文件处理具备任务管理、状态跟踪、失败重试等企业级功能6.2 实际价值这个解决方案能为你带来几个实实在在的好处效率大幅提升从手动一个个处理文件变成自动化批量处理效率提升不是一点半点。特别是处理成百上千个文件时节省的时间非常可观。可靠性增强内置的错误重试机制和任务状态跟踪确保即使个别文件处理失败也不会影响整体进度系统会自动重试或记录错误。易于监控Web监控界面让你随时了解处理进度、成功失败情况、系统负载等信息管理起来非常方便。灵活扩展系统设计考虑了扩展性你可以根据需要调整并发数、优先级策略、存储方式等。6.3 使用建议在实际使用中我有几个建议硬件配置如果处理大量音频建议使用GPU加速。GLM-ASR-Nano-2512在RTX 4090上处理速度比CPU快5-10倍。并发控制根据你的硬件配置调整max_workers参数。一般规则是GPU强大就多开几个CPU处理就少开几个。文件组织建议按日期或项目组织音频文件这样处理结果也容易管理。定期备份任务队列的数据库记得定期备份特别是处理重要数据时。6.4 下一步探索如果你对这个系统感兴趣还可以考虑以下几个扩展方向分布式处理当单机处理能力不足时可以考虑将任务队列扩展到多台机器。结果后处理在转录结果基础上添加自动标点、分段、说话人分离等功能。实时流处理除了处理文件还可以扩展支持实时音频流识别。集成其他服务将转录结果自动推送到其他系统比如内容管理系统、数据分析平台等。这个系统现在已经可以直接用在你的项目中。无论是处理会议录音、客服对话还是其他语音转文字的需求它都能帮你节省大量时间。最重要的是所有代码都是开源的你可以根据实际需求自由修改和扩展。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2415031.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!