zmq源码分析之IO线程绑定时机
文章目录核心流程详细代码分析1. Socket 创建入口2. IO 线程选择3. IO 线程选择逻辑4. Session 创建与绑定5. 连接建立时的 IO 线程绑定6. Session 与 IO 线程关联完整绑定流程技术要点1. IO 线程选择策略2. 绑定机制3. 线程安全总结核心流程用户创建 socket 到绑定 IO 线程的完整流程用户调用zmq_socket(ctx, type)内部创建ctx-create_socket(type)IO 线程选择choose_io_thread(affinity)Session 创建与 IO 线程关联绑定完成socket 成功绑定到 IO 线程详细代码分析1. Socket 创建入口用户代码void*socketzmq_socket(ctx,ZMQ_REP);内部实现void*zmq_socket(void*ctx_,inttype_){zmq::ctx_t*ctxstatic_castzmq::ctx_t*(ctx_);zmq::socket_base_t*sctx-create_socket(type_);// ...returns;}2. IO 线程选择ctx_t::create_socketzmq::socket_base_t*zmq::ctx_t::create_socket(inttype_){// ...if(unlikely(_starting)){if(!start())returnNULL;}// ...// 选择 IO 线程io_thread_t*io_threadchoose_io_thread(0);if(!io_thread){errnoEMTHREAD;returnNULL;}// ...socket_base_t*ssocket_base_t::create(type_,this,slot,sid);// ...}3. IO 线程选择逻辑ctx_t::choose_io_threadzmq::io_thread_t*zmq::ctx_t::choose_io_thread(uint64_taffinity_){// 遍历 IO 线程选择负载最小的intmin_load-1;io_thread_t*selected_threadNULL;for(io_threads_t::size_type i0;i!_io_threads.size();i){if(!affinity_||(affinity_(1ULLi))){intload_io_threads[i]-get_load();if(min_load-1||loadmin_load){min_loadload;selected_thread_io_threads[i];}}}returnselected_thread;}4. Session 创建与绑定socket_base_t::createzmq::socket_base_t*zmq::socket_base_t::create(inttype_,ctx_t*parent_,uint32_ttid_,intsid_){// 创建具体类型的 socketsocket_base_t*sNULL;switch(type_){caseZMQ_REP:snew(std::nothrow)rep_t(parent_,tid_,sid_);break;// ... 其他类型}// ...returns;}5. 连接建立时的 IO 线程绑定当 socket 绑定或连接时intzmq::socket_base_t::bind(constchar*endpoint_uri_){// ...if(protocolprotocol_name::tcp){// 选择 IO 线程io_thread_t*io_threadchoose_io_thread(options.affinity);if(!io_thread){errnoEMTHREAD;return-1;}// 创建 sessionsession_base_t*sessionsession_base_t::create(io_thread,true,this,options,paddr);// ...}// ...}6. Session 与 IO 线程关联session_base_t 构造zmq::session_base_t::session_base_t(zmq::io_thread_t*io_thread_,boolactive_,zmq::socket_base_t*socket_,constoptions_toptions_,address_t*addr_):own_t(io_thread_,options_),io_object_t(io_thread_),_active(active_),_pipe(NULL),_zap_pipe(NULL),_incomplete_in(false),_pending(false),_engine(NULL),_socket(socket_),_io_thread(io_thread_),_has_linger_timer(false),_addr(addr_){// session 现在与 IO 线程关联}完整绑定流程┌─────────────────────────────────────────────────────────────────────┐ │ Socket 绑定 IO 线程流程 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ 1. 用户创建 socket │ │ └─ zmq_socket(ctx, type) │ │ └─ ctx-create_socket(type) │ │ └─ choose_io_thread(affinity) │ │ └─ 选择负载最小的 IO 线程 │ │ └─ socket_base_t::create() │ │ └─ 创建具体类型的 socket │ │ │ │ 2. Socket 绑定或连接 │ │ └─ zmq_bind() / zmq_connect() │ │ └─ socket_base_t::bind() / connect() │ │ └─ choose_io_thread(options.affinity) │ │ └─ session_base_t::create() │ │ └─ 与 IO 线程关联 │ │ └─ attach_pipe() │ │ │ │ 3. 引擎创建与注册 │ │ └─ 创建 stream_engine │ │ └─ engine-plug(io_thread, session) │ │ └─ io_object_t::plug(io_thread) │ │ └─ add_fd(_s) 注册到 poller │ │ │ └─────────────────────────────────────────────────────────────────────┘技术要点1. IO 线程选择策略负载均衡选择负载最小的 IO 线程亲和性支持通过ZMQ_AFFINITY指定 IO 线程默认策略轮询或基于负载2. 绑定机制Session 作为桥梁连接 socket 和 IO 线程Engine 注册网络引擎注册到 IO 线程的 pollerPipe 通信socket 和 session 通过 pipe 通信3. 线程安全命令传递通过 mailbox 传递命令无锁设计使用 ypipe 进行线程间通信事件驱动IO 线程通过事件循环处理总结Socket 绑定到 IO 线程的过程创建阶段选择 IO 线程并创建 socket连接阶段创建 session 并与 IO 线程关联运行阶段引擎注册到 IO 线程的 poller这种设计使得 ZeroMQ 能够高效处理IO 线程专门处理网络 I/O灵活扩展通过增加 IO 线程数提高性能负载均衡自动分配任务到负载较轻的线程这是 ZeroMQ 高性能网络通信的重要基础
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2554504.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!