消息中间件RabbitMQ04:路由模式+死信队列的应用实践模板
一、Demo场景基于 RabbitMQ 实现带死信队列的消息收发场景生产者发送消息至 Direct 交换机普通消费者处理消息50% 失败率失败 / 超时消息自动进入死信队列由死信消费者兜底处理避免消息丢失或阻塞。二、代码思路封装连接管理器统一管理 MQ 连接生产者声明交换机并发送持久化 JSON 消息消费者分普通 / 死信类型普通队列绑定死信参数消费失败则转发至死信队列处理。三、代码效果生产者循环发送 5 条测试消息普通消费者随机处理成败失败 / 超时消息进入死信队列死信消费者接收并移除死信消息完成消息全链路兜底处理。四、代码using ConsoleApp1.Model; using DocumentFormat.OpenXml.Wordprocessing; using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections; using System.Collections.Generic; using System.Linq.Expressions; using System.Reflection; using System.Text; using System.Threading.Tasks; namespace CloneDemo { // 客户端代码 class Program { /// summary /// MQ管理者配置url、账密 /// /summary public static class RabbitMQConnectionManager { private static readonly LazyTaskIConnection _connection new LazyTaskIConnection(() { var factory new ConnectionFactory() { HostName localhost, UserName consumer,//账号 Password consumer,//密码 AutomaticRecoveryEnabled true, // 自动重连 NetworkRecoveryInterval TimeSpan.FromSeconds(5) }; return factory.CreateConnectionAsync(); }); public static TaskIConnection Connection _connection.Value; public static async TaskIChannel CreateChannelAsync() { var connection await Connection; return await connection.CreateChannelAsync(); } } /// summary /// MQ生产者配置交换机、路由键 /// /summary public class RabbitMQProducerService { private readonly string _exchangeName mydirect; private readonly string _routingKey myroot; /// summary /// 构造函数 /// /summary /// param nameexchangeName/param /// param nameroutingKey/param public RabbitMQProducerService(string exchangeName, string routingKey) { _exchangeName exchangeName; _routingKey routingKey; } /// summary /// 发送消息 /// /summary public async Taskbool SendMessageT(T input) { // 01 建立队列连接 var channel await RabbitMQConnectionManager.CreateChannelAsync(); try { // 02 声明交换机Direct类型适用于路由器模式 await channel.ExchangeDeclareAsync(exchange: _exchangeName, durable: true, type: ExchangeType.Direct); // 03 发送内容到交换机 var message JsonConvert.SerializeObject(input); var body Encoding.UTF8.GetBytes(message); // 4.将消息路由给消费者 await channel.BasicPublishAsync( exchange: _exchangeName, // 交换机名称 routingKey: _routingKey, // 路由键 mandatory: false, // 非强制路由无匹配队列则丢弃 basicProperties: new BasicProperties { DeliveryMode DeliveryModes.Persistent, // 消息持久化 ContentType application/json // 标识JSON格式 }, // 消息属性 body: body.AsMemory()); Console.WriteLine($生产者发送了: {message}); return true; } catch (Exception ex) { Console.WriteLine($生产者异常{ex.ToString()}); return false; } finally { if (channel ! null channel.IsOpen) { await channel.CloseAsync(); await channel.DisposeAsync(); } } } } /// summary /// MQ消费者配置交换机同生产者一样、路由键同生产者一样、队列名称自由命名、处理委托 /// /summary public class RabbitMQConsumerServiceT { private readonly string _exchangeName mydirect; private readonly string _routingKey myroot; private readonly string _qName myqname; /// summary /// 业务处理委托入参为消费的消息返回true表示处理成功会Ack移除消息false表示处理失败 /// /summary private readonly FuncT, bool _processMethod null; /// summary /// 是否为死信队列消费者默认为false业务消费者 /// /summary private readonly bool _isDLX false; /// summary /// 构造函数 /// /summary /// param nameexchangeName交换机名称同生产者一样/param /// param nameroutingKey路由键名称同生产者一样/param /// param nameqName队列名称自由命名/param /// param nameprocessMethod处理的方法/param /// param nameisDLX当前是否是死信队列的消费者默认false死信队列是各组件后面加_dlx后缀/param public RabbitMQConsumerService(string exchangeName, string routingKey, string qName, FuncT, bool processMethod, bool isDLX false) { _isDLX isDLX; string suffix isDLX ? _dlx : string.Empty;//后缀 _exchangeName exchangeName suffix; _routingKey routingKey suffix; _qName qName suffix; _processMethod processMethod; } /// summary /// 开启消费 /// /summary public async Task StartConsuming() { // 01 建立队列连接 var channel await RabbitMQConnectionManager.CreateChannelAsync(); try { // 02 死信队列的声明 Dictionarystring, object queueArgs new Dictionarystring, object(); if (!_isDLX)//如果不是死信队列消费者 { queueArgs await DeclearDLXAsync(channel); } // 03 声明交换机Direct类型适用于路由模式必须与生产者相同 await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Direct, durable: true); // 04 声明一个独立的队列用于接收消息 await channel.QueueDeclareAsync(_qName, true, false, false, queueArgs); // 05 绑定队列到交换机 await channel.QueueBindAsync(queue: _qName, exchange: _exchangeName, routingKey: _routingKey); // 06 消费的逻辑方法 var consumer new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync async (model, ea) { try { // 获取入参 var body ea.Body.ToArray(); var message Encoding.UTF8.GetString(body); // 委托处理 var success _processMethod(JsonConvert.DeserializeObjectT(message)); // 根据结果进行是否确认 if (success) { await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);//成功了则移除消息 } else { await channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: false);//普通消费者→消息移入死信队列死信消费者→消息直接移除无后续转移 } } catch (Exception ex) { Console.WriteLine($消费者出错{ex.ToString()}); await channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: false); } }; // 07 启动消费 await channel.BasicConsumeAsync(queue: _qName, autoAck: false,//禁止使用消息自动确认 consumer: consumer); Console.WriteLine(点击退出消费者); Console.ReadLine(); } catch (Exception ex) { Console.WriteLine($消费者出错{ex.ToString()}); } } /// summary /// 死信队列声明 /// /summary private async TaskDictionarystring, object DeclearDLXAsync(IChannel channel) { string dlxExchangeName ${_exchangeName}_dlx; // 死信交换机业务交换机_dlx string dlxRoutingKey ${_routingKey}_dlx; // 死信路由键业务路由键_dlx string dlxQueueName ${_qName}_dlx; // 死信队列业务队列_dlx await channel.ExchangeDeclareAsync(exchange: dlxExchangeName, type: ExchangeType.Direct, durable: true); await channel.QueueDeclareAsync(dlxQueueName, true, false, false); await channel.QueueBindAsync(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: dlxRoutingKey); var queueArgs new Dictionarystring, object { [x-dead-letter-exchange] dlxExchangeName, // 死信转发到的交换机 [x-dead-letter-routing-key] dlxRoutingKey, // 死信使用的路由键 [x-message-ttl] 60000 // 业务队列消息超时时间60秒未被消费的消息自动移入死信队列 }; return queueArgs; } } /// summary /// 主函数注意要先部署消费者绑定好队列再部署生产者才能正确路由到指定消费者 /// /summary static async Task Main(string[] args) { // 以下1、2、3请在不同的exe进程里面启动 // 1. 生产者发送测试消息 for (int i 0; i 5; i) { var producer new RabbitMQProducerService(mydirect, myroot); await producer.SendMessage(new { Id i, Content 测试死信消息 }); } // 2. 启动普通消费者模拟处理失败消息进死信 var normalConsumer new RabbitMQConsumerServicedynamic( exchangeName: mydirect, routingKey: myroot, qName: myqname, processMethod: msg { var issucess new Random().Next(0, 100) 50;// 50%概率处理成功 Console.WriteLine($普通消费者处理消息处理的结果是{issucess}); return issucess; // 返回false消息进入死信队列 }, isDLX: false); await normalConsumer.StartConsuming(); // 3. 启动死信消费者处理死信消息 var dlxConsumer new RabbitMQConsumerServicedynamic( exchangeName: mydirect, routingKey: myroot, qName: myqname, processMethod: msg { Console.WriteLine($死信消费者处理消息{msg} ); return true; // 【重要】为避免阻塞队列死信队列强烈建议返回true即无论处理是否成功都将从死信队列中移除 }, isDLX: true); await dlxConsumer.StartConsuming(); } } }
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2425670.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!