zmq源码分析之io_thread_t
文章目录概述继承关系核心成员构造函数启动与停止启动停止事件处理读事件处理核心其他事件理论上不会被调用停止处理架构图事件循环流程与其他组件的关系线程创建流程关键设计点命令处理类型性能特点总结概述io_thread_t是 ZeroMQ 中的I/O线程负责处理网络事件和命令传递。它是 ZeroMQ 多线程架构的核心组件运行独立的事件循环来监听和处理各种文件描述符上的I/O事件。io_thread_t通过poller_t间接的持有线程同时io_thread_t自身也持有了一个邮箱用于接收外部命令邮箱本身也是个文件描述符io_thread_t自身的io事件也交给了poller_t管理。继承关系classio_thread_tZMQ_FINAL:publicobject_t,publici_poll_eventsobject_t提供命令处理能力和对象标识i_poll_events提供I/O事件回调接口核心成员private:// I/O线程通过这个邮箱访问传入的命令mailbox_t _mailbox;// 与邮箱文件描述符关联的句柄poller_t::handle_t _mailbox_handle;// I/O多路复用使用poller对象实现poller_t*_poller;构造函数zmq::io_thread_t::io_thread_t(ctx_t*ctx_,uint32_ttid_):object_t(ctx_,tid_),_mailbox_handle(static_castpoller_t::handle_t(NULL)){// 创建poller对象_pollernew(std::nothrow)poller_t(*ctx_);alloc_assert(_poller);// 将邮箱的文件描述符注册到pollerif(_mailbox.get_fd()!retired_fd){_mailbox_handle_poller-add_fd(_mailbox.get_fd(),this);_poller-set_pollin(_mailbox_handle);}}关键操作创建平台特定的pollerepoll/kqueue/poll/select将邮箱的文件描述符注册到poller设置监听读事件POLLIN启动与停止启动voidzmq::io_thread_t::start(){charname[16];snprintf(name,sizeof(name),IO/%u,get_tid()-zmq::ctx_t::reaper_tid-1);// 启动底层的I/O线程_poller-start(name);}停止voidzmq::io_thread_t::stop(){send_stop();}事件处理读事件处理核心voidzmq::io_thread_t::in_event(){// 从邮箱接收命令并处理command_t cmd;intrc_mailbox.recv(cmd,0);while(rc0||errnoEINTR){if(rc0)cmd.destination-process_command(cmd);rc_mailbox.recv(cmd,0);}errno_assert(rc!0errnoEAGAIN);}工作流程从邮箱接收命令循环处理所有可用命令直到邮箱为空返回 EAGAIN其他事件理论上不会被调用voidzmq::io_thread_t::out_event(){// 永远不会在这里轮询POLLOUTzmq_assert(false);}voidzmq::io_thread_t::timer_event(int){// 这里没有定时器永远不会被调用zmq_assert(false);}停止处理voidzmq::io_thread_t::process_stop(){zmq_assert(_mailbox_handle);_poller-rm_fd(_mailbox_handle);_poller-stop();}架构图┌─────────────────────────────────────────────────────────────────────┐ │ io_thread_t │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ object_t (基类) │ │ │ │ - ctx_t * 上下文指针 │ │ │ │ - uint32_t tid 线程ID │ │ │ │ - process_command() 命令处理分发 │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ i_poll_events (接口) │ │ │ │ in_event() 文件描述符可读回调 │ │ │ │ out_event() 文件描述符可写回调 │ │ │ │ timer_event() 定时器到期回调 │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ mailbox_t │ │ poller_t │ │mailbox_handle│ │ │ │ │ │ (命令队列) │ │ (事件循环) │ │ (fd句柄) │ │ │ │ │ └──────┬──────┘ └──────┬──────┘ └─────────────┘ │ │ │ │ │ │ │ │ │ └─────────┼──────────────────┼───────────────────────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ 物理线程 (Poller 事件循环) │ │ │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ poll/epoll/kqueue/select │ │ │ │ │ │ │ │ │ │ │ │ │ └── in_event() ── 处理命令 ── process_command() │ │ │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘事件循环流程┌─────────────────────────────────────────────────────────────────────┐ │ IO 线程事件循环 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ ┌───────────────┐ │ │ │ Poller 等待 │ │ │ │ 事件 │ │ │ └───────┬───────┘ │ │ │ │ │ │ 邮箱fd可读 │ │ ▼ │ │ ┌───────────────┐ │ │ │ 调用in_event() │ │ │ └───────┬───────┘ │ │ │ │ │ ▼ │ │ ┌───────────────┐ ┌───────────────┐ │ │ │ mailbox.recv()│────│ 有命令 │──── 处理命令 │ │ └───────┬───────┘ └───────────────┘ │ │ │ │ │ │ EAGAIN (无命令) │ │ ▼ │ │ ┌───────────────┐ │ │ │ 返回继续等待 │ │ │ └───────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘与其他组件的关系┌─────────────────────────────────────────────────────────────────────┐ │ ctx_t (上下文) │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ _io_threads: IO线程池 │ │ │ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │ │ │IO[0] │ │IO[1] │ │IO[2] │ ... │ │ │ │ └───────┘ └───────┘ └───────┘ │ │ │ │ │ │ │ │ │ │ └───────┼─────────┼─────────┼─────────────────────────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ Slot 数组 │ │ │ │ [term] [reaper] [IO[0]] [IO[1]] [IO[2]] [socket1] ... │ │ │ └─────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────┐ │ Session/Engine │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ 网络连接 (TCP/UDP/IPC/PGM) │ │ │ │ │ │ │ │ │ │ 注册到 IO 线程的 Poller │ │ │ │ ▼ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ 当网络数据可读/可写时Poller 回调 io_thread_t │ │ │ │ │ │ → in_event() / out_event() │ │ │ │ │ │ → 处理网络数据 │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────┐ │ Socket (应用线程) │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ 用户 API 调用 (send/recv/connect/bind) │ │ │ │ │ │ │ │ │ │ 发送命令到 IO 线程的 mailbox │ │ │ │ ▼ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ mailbox.send(command) │ │ │ │ │ │ → _signaler.send() (如果需要) │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘线程创建流程上下文创建时创建 IO 线程数组_io_threads为每个 IO 线程分配 slotIO 线程构造创建 mailbox创建 poller注册 mailbox fd 到 pollerIO 线程启动调用start()启动 pollerPoller 创建物理线程线程运行事件循环关键设计点设计点说明独立线程每个 IO 线程运行在独立物理线程上事件驱动基于 poll/epoll/kqueue/select命令处理通过 mailbox 接收命令多路复用一个 poller 监听多个 fd负载均衡ctx 选择负载最小的 IO 线程命令处理类型IO 线程可以处理多种命令stop- 停止 IO 线程plug- 注册到 IO 线程bind- 绑定 pipe 到 socketactivate_read/write- pipe 流量控制hiccup- pipe 中断处理等等…性能特点高效事件处理基于内核级 I/O 多路复用低延迟事件驱动无轮询开销可扩展支持多个 IO 线程线程安全每个 IO 线程独立运行总结io_thread_t是 ZeroMQ 多线程架构的核心组件独立运行每个 IO 线程运行在独立物理线程上事件驱动通过 poller 监听文件描述符命令通道通过 mailbox 接收来自其他线程的命令网络处理负责处理所有网络 I/O 事件负载均衡ctx 选择最适合的 IO 线程处理连接这是 ZeroMQ 能够高效处理高并发连接的关键机制
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2535007.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!