实战指南:用Python ESL(greenswitch库)监听FreeSWITCH事件并自动录音
实战指南用Python ESLgreenswitch库监听FreeSWITCH事件并自动录音在通信系统开发中FreeSWITCH作为强大的开源软交换平台其Event Socket接口为开发者提供了深度集成的可能。本文将聚焦如何利用Python生态中的greenswitch库构建高效的事件监听服务实现通话接通自动录音功能。1. 环境准备与库对比1.1 核心组件选择传统python-ESL与greenswitch库的对比特性python-ESLgreenswitch协议支持Inbound/OutboundInbound/Outbound底层实现C扩展Pure Python gevent并发模型同步异步协程维护状态更新缓慢活跃维护生产环境验证广泛使用每日数百通话验证推荐使用greenswitch的三大理由gevent协程天然适合高并发场景完全兼容Python3生态简洁的API设计降低学习成本安装只需执行pip install greenswitch loguru1.2 FreeSWITCH配置要点确保conf/autoload_configs/event_socket.conf.xml包含param namelisten-ip value127.0.0.1/ param namelisten-port value8021/ param namepassword valueClueCon/2. 事件监听核心架构2.1 连接初始化建立稳定连接需处理的三重保障import gevent from greenswitch import InboundESL from loguru import logger fs InboundESL(host127.0.0.1, port8021, passwordClueCon) def connect_with_retry(max_retries5): for i in range(max_retries): try: fs.connect() return True except Exception as e: logger.error(fConnection attempt {i1} failed: {str(e)}) gevent.sleep(2**i) # 指数退避 raise ConnectionError(Max retries exceeded)2.2 事件处理机制典型的事件处理流程包含三个关键步骤事件过滤通过正则匹配关键事件上下文提取从事件头获取通话唯一标识异步执行非阻塞式处理耗时操作def event_handler(event): # 关键事件识别 if event.headers.get(Event-Name) CHANNEL_ANSWER: uuid event.headers.get(Unique-ID) record_path f/var/spool/freeswitch/recordings/{uuid}.wav # 异步启动录音 gevent.spawn(start_recording, uuid, record_path) def start_recording(uuid, path): try: cmd fuuid_record {uuid} start {path} fs.send(fbgapi {cmd}) logger.success(fRecording started for {uuid}) except Exception as e: logger.error(fRecording failed: {str(e)})3. 生产级代码实现3.1 完整服务框架class RecordingService: def __init__(self): self.fs None self.running False def start(self): self.running True self._connect() # 注册全局事件处理器 fs.register_handle(*, self._event_dispatcher) fs.send(event plain ALL) # 主循环保持连接 while self.running: gevent.sleep(1) def _connect(self): self.fs InboundESL(host127.0.0.1, port8021, passwordClueCon) connect_with_retry() def _event_dispatcher(self, event): try: event_name event.headers.get(Event-Name) { CHANNEL_ANSWER: self._on_answer, CHANNEL_HANGUP: self._on_hangup }.get(event_name, lambda _: None)(event) except Exception as e: logger.exception(Event processing error) def _on_answer(self, event): uuid event.headers[Unique-ID] path self._generate_path(uuid) # 防止重复录音 if not self._is_recording(uuid): self._start_recording(uuid, path) def _on_hangup(self, event): uuid event.headers[Unique-ID] if self._is_recording(uuid): self._stop_recording(uuid) def _generate_path(self, uuid): return f/records/{datetime.now().strftime(%Y%m%d)}/{uuid}.wav3.2 异常处理策略设计健壮的错误恢复机制def safe_send(command, timeout5): try: with gevent.Timeout(timeout): return fs.send(command) except gevent.Timeout: logger.warning(fCommand timeout: {command}) raise except ConnectionError: logger.critical(Connection lost, reconnecting...) connect_with_retry() raise4. 高级功能扩展4.1 录音管理增强实现录音文件自动转储和元数据记录def _start_recording(self, uuid, path): os.makedirs(os.path.dirname(path), exist_okTrue) # 启动录音并记录元数据 safe_send(fuuid_record {uuid} start {path}) self._save_metadata(uuid, path) # 监听DTMF实现控制功能 safe_send(fuuid_setvar {uuid} record_dtmf_actions 1) def _save_metadata(self, uuid, path): metadata { start_time: datetime.now().isoformat(), caller_id: self._get_caller_id(uuid), file_path: path } with open(f{path}.meta, w) as f: json.dump(metadata, f)4.2 性能优化技巧提升处理效率的三种方法事件过滤精确订阅必要事件fs.send(event plain CHANNEL_ANSWER,CHANNEL_HANGUP)批量处理合并短时间内的连续事件from collections import defaultdict from gevent.queue import Queue event_queue Queue() processing_buffer defaultdict(list) def _event_dispatcher(event): event_queue.put(event) gevent.spawn def batch_processor(): while True: gevent.sleep(0.1) # 100ms批处理窗口 while not event_queue.empty(): event event_queue.get() processing_buffer[event.headers[Event-Name]].append(event) for event_type, events in processing_buffer.items(): self._handle_batch_events(event_type, events) processing_buffer.clear()连接池管理多个ESL连接from gevent.pool import Pool class ESLConnectionPool: def __init__(self, size5): self.pool Pool(size) self.connections [self._create_connection() for _ in range(size)] def _create_connection(self): fs InboundESL(host127.0.0.1, port8021, passwordClueCon) fs.connect() return fs def get_connection(self): return self.pool.apply(lambda: random.choice(self.connections))5. 部署与监控方案5.1 系统服务化使用systemd管理服务# /etc/systemd/system/freeswitch_recorder.service [Unit] DescriptionFreeSWITCH Recording Service Afternetwork.target [Service] Userfreeswitch WorkingDirectory/opt/freeswitch_recorder ExecStart/usr/bin/python3 /opt/freeswitch_recorder/main.py Restartalways RestartSec30 [Install] WantedBymulti-user.target5.2 监控指标采集集成Prometheus监控from prometheus_client import start_http_server, Counter, Gauge RECORDING_STARTED Counter(recording_started_total, Total recordings started) RECORDING_FAILED Counter(recording_failed_total, Total recording failures) ACTIVE_RECORDINGS Gauge(active_recordings, Currently active recordings) class InstrumentedRecorder(RecordingService): def _on_answer(self, event): try: super()._on_answer(event) RECORDING_STARTED.inc() ACTIVE_RECORDINGS.inc() except Exception: RECORDING_FAILED.inc() raise def _on_hangup(self, event): super()._on_hangup(event) ACTIVE_RECORDINGS.dec() # 启动指标服务器 start_http_server(8000)在实际部署中发现通过gevent的协程模型单个进程即可稳定处理200并发通话事件。关键是要确保事件处理函数中不包含阻塞IO操作所有耗时任务都应通过gevent.spawn异步执行。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2629242.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!