消息队列实战:RabbitMQ与ZeroMQ
消息队列实战RabbitMQ与ZeroMQ 概述消息队列是分布式系统中重要的通信组件用于解耦服务、异步处理和流量削峰。本文将介绍两种流行的消息队列技术RabbitMQ和ZeroMQ。 RabbitMQ实战RabbitMQ基础知识RabbitMQ是一个开源的消息代理软件实现了高级消息队列协议AMQP。它的特点包括内存与磁盘控制内存到上限之后会将数据写入磁盘中支持多种消息模式发布/订阅、路由、主题等可靠性高支持消息确认、持久化等机制安装部署Docker方式安装# 获取rabbitmq官方镜像dockerpull rabbitmq:3.9.17-management# 运行容器dockerrun-d--hostnamemy-rabbit--namerabbitmq-3.9.17\--expose15671--expose4369\-p15672:15672\--expose15692--expose25672--expose15691--expose5671\-p5672:5672\--restartalways\-eRABBITMQ_DEFAULT_USERadmin\-eRABBITMQ_DEFAULT_PASSadmin\rabbitmq:3.9.17-management访问Web管理界面http://192.168.4.11:15672账号admin/adminC客户端代码示例消费者代码// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.// SPDX-License-Identifier: mit#includestdint.h#includestdio.h#includestdlib.h#includestring.h#includerabbitmq-c/amqp.h#includerabbitmq-c/tcp_socket.h#includeassert.h#includeutils.h#defineSUMMARY_EVERY_US1000000staticvoidrun(amqp_connection_state_tconn){uint64_tstart_timenow_microseconds();intreceived0;intprevious_received0;uint64_tprevious_report_timestart_time;uint64_tnext_summary_timestart_timeSUMMARY_EVERY_US;amqp_frame_tframe;uint64_tnow;for(;;){amqp_rpc_reply_tret;amqp_envelope_tenvelope;nownow_microseconds();if(nownext_summary_time){intcountOverIntervalreceived-previous_received;doubleintervalRatecountOverInterval/((now-previous_report_time)/1000000.0);printf(%d ms: Received %d - %d since last report (%d Hz)\n,(int)(now-start_time)/1000,received,countOverInterval,(int)intervalRate);previous_receivedreceived;previous_report_timenow;next_summary_timeSUMMARY_EVERY_US;}amqp_maybe_release_buffers(conn);retamqp_consume_message(conn,envelope,NULL,0);if(AMQP_RESPONSE_NORMAL!ret.reply_type){if(AMQP_RESPONSE_LIBRARY_EXCEPTIONret.reply_typeAMQP_STATUS_UNEXPECTED_STATEret.library_error){if(AMQP_STATUS_OK!amqp_simple_wait_frame(conn,frame)){return;}if(AMQP_FRAME_METHODframe.frame_type){switch(frame.payload.method.id){caseAMQP_BASIC_ACK_METHOD:/* 发布者确认消息 */break;caseAMQP_BASIC_RETURN_METHOD:/* 消息无法路由时的返回处理 */{amqp_message_tmessage;retamqp_read_message(conn,frame.channel,message,0);if(AMQP_RESPONSE_NORMAL!ret.reply_type){return;}amqp_destroy_message(message);}break;caseAMQP_CHANNEL_CLOSE_METHOD:/* 通道异常关闭 */return;caseAMQP_CONNECTION_CLOSE_METHOD:/* 连接异常关闭 */return;default:fprintf(stderr,An unexpected method was received %u\n,frame.payload.method.id);return;}}}}else{printf(Consumer get envelope.message.body.len%d\n,envelope.message.body.len);printf(message: %s\n,envelope.message.body.bytes);amqp_destroy_envelope(envelope);}received;}}intmain(intargc,charconst*const*argv){charconst*hostname;intport,status;charconst*exchange;charconst*bindingkey;amqp_socket_t*socketNULL;amqp_connection_state_tconn;amqp_bytes_tqueuename;if(argc3){fprintf(stderr,Usage: amqp_consumer host port\n);return1;}hostnameargv[1];portatoi(argv[2]);exchangeargv[3];bindingkeyargv[4];queuename.bytes(void*)argv[4];queuename.lenstrlen(argv[4]);printf(queuename %s queuename.len%d\n,(char*)queuename.bytes,queuename.len);connamqp_new_connection();socketamqp_tcp_socket_new(conn);if(!socket){die(creating TCP socket);}statusamqp_socket_open(socket,hostname,port);if(status){die(opening TCP socket);}die_on_amqp_error(amqp_login(conn,/,0,131072,0,AMQP_SASL_METHOD_PLAIN,admin,admin),Logging in);amqp_channel_open(conn,1);die_on_amqp_error(amqp_get_rpc_reply(conn),Opening channel);{amqp_queue_declare_ok_t*ramqp_queue_declare(conn,1,queuename,0,0,0,0,amqp_empty_table);die_on_amqp_error(amqp_get_rpc_reply(conn),Declaring queue);printf(queue name %s\n,queuename.bytes);if(queuename.bytesNULL){fprintf(stderr,Out of memory while copying queue name);return1;}}amqp_queue_bind(conn,1,queuename,amqp_cstring_bytes(exchange),amqp_cstring_bytes(bindingkey),amqp_empty_table);die_on_amqp_error(amqp_get_rpc_reply(conn),Binding queue);printf(queuename %s queuename.len%d\n,(char*)queuename.bytes,queuename.len);amqp_basic_consume(conn,1,queuename,amqp_empty_bytes,0,1,0,amqp_empty_table);die_on_amqp_error(amqp_get_rpc_reply(conn),Consuming);run(conn);amqp_bytes_free(queuename);die_on_amqp_error(amqp_channel_close(conn,1,AMQP_REPLY_SUCCESS),Closing channel);die_on_amqp_error(amqp_connection_close(conn,AMQP_REPLY_SUCCESS),Closing connection);die_on_error(amqp_destroy_connection(conn),Ending connection);return0;}运行示例# 生产者./amqp_producer192.168.4.11567299999999999# 消费者./amqp_consumer192.168.4.115672amq.topicjpg_queue使用要点生产者发送消息并不用指定queue名其根据Exchange类型、Exchange名、Binding key将消息发送到queueExchange类型在建立的时候已经确定Exchange名和Binding key在发送消息的时候作为参数传过去消费者在消费的时候需要指定Exchange名、Binding key和queue名在Fanout类型的Exchange中Binding key不会起作用Exchange会在消息发送到与其绑定的各个队列auto delete的队列必须是有过消费者并且所有消费者都解除订阅了才会auto deleteExchange也是这样⚡ ZeroMQ实战简介ZeroMQ也称ZMQ、0MQ是一个高性能的异步消息库旨在用于可扩展的分布式或并发应用程序。它提供了一个消息队列但与面向消息的中间件不同ZeroMQ可以在无需专用消息代理的情况下运行。典型使用场景当作普通TCP使用比自己写socket要简单、可靠使用经验1. ZeroMQ Socket不支持多线程ZeroMQ的socket不支持多线程不能将zeromq的socket传到多个线程中然后在多个线程中使用这个socket发送或者接受数据。问题案例解码服务有这样一个需求同一个算法的多个解码任务需要将数据发送到同一个消费者。最开始的想法是创建一个pusher socket再将这个socket传到多个任务中然后利用这个socket进行发送数据但通过实验发现每次只有一个任务能够正常工作其他任务都被阻塞。解决方案创建一个线程专门用来发送数据多个抽帧解码任务将数据放入到一个全局的队列中发送线程从这个队列中取数据进行发送这样就做到了一个zeromq socket只被一个线程使用。也可以考虑全部使用zeromq来实现灵活的架构。由此可以看出zeromq可以设计出非常灵活的架构2. PUSH/PULL模式阻塞问题push/pull模式当没有pull的时候push会被send函数阻塞。3. PUB/SUB vs PUSH/PULL模式区别模式消息分发方式PUB/SUB同一个数据会被传送给多个sub一对多PUSH/PULL一个数据只会给一个pull即一个数据只会被消费一次负载均衡 RabbitMQ vs ZeroMQ对比特性RabbitMQZeroMQ架构需要Broker消息代理无需Broker点对点通信可靠性高支持消息确认和持久化较低依赖应用层实现性能较高极高功能丰富的消息模式和管理功能轻量级模式相对简单部署需要部署和运维RabbitMQ服务无需额外部署适用场景企业级应用需要消息可靠性和管理功能高性能场景分布式系统内部通信 选择建议选择RabbitMQ的场景需要消息持久化和可靠性保证需要复杂的消息路由需要消息管理界面企业级应用对消息可靠性要求高选择ZeroMQ的场景追求极致性能分布式系统内部通信不需要消息代理的轻量级场景可以接受应用层实现可靠性机制 参考资料RabbitMQRabbitMQ基础知识介绍内存与磁盘控制消息自动删除参考ZeroMQ入门介绍各种模式介绍、性能对比发布/订阅模式代码参考官方文档GitHub 总结RabbitMQ和ZeroMQ各有优势RabbitMQ适合需要高可靠性和丰富功能的企业级场景ZeroMQ适合追求高性能的轻量级场景。在实际应用中需要根据业务需求、性能要求和团队能力来选择合适的技术。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2568365.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!