消息队列5-RabbitMQ的高级特性和MQ的应用问题与解决方案-事务、消息分发的应用、幂等性保证、顺序性保证、消息积压的解决
文章目录一. 事务1. 模版开启事务功能2. 配置事务管理器3. 声明队列4. 生产者5. 运行图二. 消息分发1. 限流(1) yml配置文件(2) 声明队列与交换机及绑定关系(3) 生产者(4) 消费者(5) 运行图2. 负载均衡(1) yml配置(2) 消费者代码(3) 运行图三. MQ的幂等性保证1. MQ中存在的问题2. 解决方案四. 顺序性保证方案1. 出现顺序性问题的场景2. 解决方案(1) 单队列单消费者(2) 分区消费(3) 消息(消费者)确认机制(4) 业务逻辑管理五. 消息积压1. 产生原因2. 解决方案一. 事务事务指将一系列操作打包为一块执行, 具有原子性, 要不全部成功, 要不全部失败, 且在执行过程中不会被其他操作插入, 而在AMQP的协议中实现了事务机制, 因此RabbitMQ也支持事务下面使用Spring-AMQP来使用事务功能, 交换机是默认交换机1. 模版开启事务功能// 事务Bean(transactionRabbitTemplate)publicRabbitTemplatetransactionRabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplaterabbitTemplatenewRabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);// 开启事务功能returnrabbitTemplate;}2. 配置事务管理器// 创建事务管理器, 必须和上面开启事务功能配合使用BeanpublicRabbitTransactionManagerrabbitTransactionManager(ConnectionFactoryconnectionFactory){returnnewRabbitTransactionManager(connectionFactory);}3. 声明队列这里使用默认交换机, 不用声明Bean(transQueue)publicQueuetransQueue(){returnQueueBuilder.durable(Constants.TRANS_QUEUE).build();}4. 生产者这里我们在生产者发送两条消息之间, 代码逻辑产生异常Transactional// 事务管理的注解RequestMapping(/trans)publicStringtrans(){System.out.println(trans test...);transactionRabbitTemplate.convertAndSend(,Constants.TRANS_QUEUE,trans 1 ...);intnum1/0;transactionRabbitTemplate.convertAndSend(,Constants.TRANS_QUEUE,trans 2 ...);return发送成功;}5. 运行图正常情况下, 消息1是可以发送成功的, 但在这里可以看到队列中一条消息也没有, 说明事务进行了回滚二. 消息分发RabbitMQ处于工作模式时, 如果有多个消费者, 每个消息只会发送给其中一个订阅者, 消息不会被重复消费, 默认情况下, RabbitMQ以轮询方式发送消息, 不考虑每个消费者的处理能力, 这在一些请求量暴增特定场景下, 会导致消费者处理不过来, 消息越积越多, 即消息积压为解决上面的情况, 可以使用channel.basicQos(int prefetchCount)方法 — 在Spring-AMQP中采用的是配置yml文件, 来限制一个消费者上最大的未确认消息数, 从而控制流量, 防止宕机下面介绍消息分发的应用场景1. 限流如在一些特定时间的秒杀场景, 流量会剧增, 如果不对这些流量加以限制的话, 会直接导致订单系统压力过大宕机下面介绍在Spring-AMQP中如何使用限流(1) yml配置文件在使用限流前, 必须开启消费者对于消息的手动确认模式spring:application:name:Spring-extension-demo rabbitmq:host:localhost port:5672username:admin password:admin virtual-host:extension listener:simple:acknowledge-mode:manual # 消息确认机制 手动确认 prefetch:5(2) 声明队列与交换机及绑定关系packagecom.ran.extension.config;importcom.ran.extension.constant.Constants;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/** * Created with IntelliJ IDEA. * Description: * User: ran * Date: 2026-04-02 * Time: 20:26 */ConfigurationpublicclassQosConfig{Bean(QosExchange)publicExchangeQosExchange(){returnExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}Bean(QosQueue)publicQueueQosQueue(){returnQueueBuilder.durable(Constants.QOS_QUEUE).build();}Bean(QosBinding)publicBindingQosBinding(Qualifier(QosQueue)Queuequeue,Qualifier(QosExchange)Exchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with(qos).noargs();// noargs()方法表示,交换机没有其他参数}}(3) 生产者这里发送20条消息RequestMapping(/qos)publicStringqos(){for(inti0;i20;i){rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE,qos,qos...i);}return发送成功;}(4) 消费者这里我们消费之后, 不进行确认, 这样的话消费者就不会继续从队列获取消息, 因此队列中还剩15条待发消息, 有5条未确认消息packagecom.ran.extension.listener;importcom.rabbitmq.client.Channel;importcom.ran.extension.constant.Constants;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.IOException;importjava.util.Date;/** * Created with IntelliJ IDEA. * Description: * User: ran * Date: 2026-03-31 * Time: 18:24 */ComponentpublicclassQosListeners{RabbitListener(queuesConstants.QOS_QUEUE)publicvoidhandlerQos(Messagemessage,Channelchannel)throwsIOException{longdeliveryTagmessage.getMessageProperties().getDeliveryTag();try{System.out.println(队列[Constants.QOS_QUEUE]接收到消息: newString(message.getBody()), deliveryTag: deliveryTag);}catch(Exceptione){channel.basicNack(deliveryTag,false,true);}}}(5) 运行图队列中还剩15条待发消息, 有5条未确认消息, 共20条2. 负载均衡多个消费者中, 可能有处理能力差的, 也有处理能力强的, 负载均衡就是强者多处理一些, 弱者少处理一点, 根据每个消费者的具体情况来分发流量(1) yml配置对于负载均衡来说, prefetch一般配置为1, 消费者确认一条消息后, 再从队列获取一条消息, 能者多劳spring:application:name:Spring-extension-demo rabbitmq:host:localhost port:5672username:admin password:admin virtual-host:extension listener:simple:acknowledge-mode:manual # 消息确认机制 手动确认 prefetch:1# 负载均衡一般配置为1(2) 消费者代码消费者1处理完一条消息后睡眠200ms, 消费者2睡眠400msComponentpublicclassQosListeners{RabbitListener(queuesConstants.QOS_QUEUE)publicvoidhandlerQos1(Messagemessage,Channelchannel)throwsIOException{longdeliveryTagmessage.getMessageProperties().getDeliveryTag();try{System.out.println(消费者1接收到消息: newString(message.getBody()), deliveryTag: deliveryTag);Thread.sleep(200);channel.basicAck(deliveryTag,false);}catch(Exceptione){channel.basicNack(deliveryTag,false,true);}}RabbitListener(queuesConstants.QOS_QUEUE)publicvoidhandlerQos2(Messagemessage,Channelchannel)throwsIOException{longdeliveryTagmessage.getMessageProperties().getDeliveryTag();try{System.out.println(消费者2接收到消息: newString(message.getBody()), deliveryTag: deliveryTag);Thread.sleep(400);channel.basicAck(deliveryTag,false);}catch(Exceptione){channel.basicNack(deliveryTag,false,true);}}}(3) 运行图三. MQ的幂等性保证幂等性是从数学中引入的概念, 指的是重复调用同一函数, 传入相同的参数, 最终结果是不变的, 例如取绝对值abs(x)在程序中, 指的是同一个系统多次执行同一指令, 不论执行多少次, 最终对系统造成的影响是一致的, 例如数据库中的select操作, 多次查询数据库, 都得到的是同一查询结果, 接下我们介绍的幂等性是所有MQ都要面临的问题, 不只是RabbitMQ1. MQ中存在的问题在上面的订单-MQ-支付组成的系统中, 已知在金融服务, 消息丢失是不可容忍的, 因此为保证消息可靠性, 发布方确认机制, 消费确认机制等等会全部开启, 下面就是可能会出现重复订单的情况情况一: 订单系统(生产者)发送订单消息, 可能会因为网络波动原因, 导致消息已经到达了MQ, 却没有收到MQ的ACK, 这时订单系统会再次发送同一份订单消息, 这就会导致支付系统会收到重复的订单情况二: MQ在向支付系统发送订单消息后, 因为网络波动, 可能会导致支付系统处理完订单后, MQ却没有接收到ACK, 这时MQ会再次重复发送这个订单消息2. 解决方案上面的情况, 如果对用户进行了两次扣款, 就是一个巨大的漏洞, 因此必须要保证MQ消息的幂等性, 即对于重复订单, 只需要支付一次即可最常用的解决方式为:1. 生产者(订单系统)给每条消息添加一个唯一ID, 必须确保是唯一的2. 消费者(支付系统)在收到消息后, 先判断ID是否已经被消费过, 如果被消费过, 直接丢弃即可3. 如果没有被消费过, 开始进行业务处理, 处理成功后, 将ID用数据库(MySQL)或者缓存(Redis)保存起来, 方便为重复消息做判断例如用Redis的 SETNX 命令来存储ID, 返回1代表保存成功, 返回0说明被消费过, 直接丢弃四. 顺序性保证方案在有些业务场景中, 对于消息的顺序有着严格的要求, 例如订单-MQ-支付系统1. 出现顺序性问题的场景因为多个生产者同时发送消息到MQ上, 是无法确定顺序性的, 这里就默认是一个生产者的前提1. 一个队列对应多个消费者, 消息可能会被多个消费者并行处理, 无法保证顺序性2. 网络原因导致ACK确认丢失, 消息重新入队后, 顺序性发生问题3. 在复杂路由中, 一系列消息由于RoutingKey的原因, 路由到了不同队列, 这就无法保证顺序性2. 解决方案顺序性保证又分为局部和全局的, 局部指的是单个队列间的消息顺序, 全局指的是多个队列或者多个消费者之间的消息顺序, 这里需要注意, 确保顺序性是多个方案相互配合使用来保证的, 单一方案并不能保证顺序性(1) 单队列单消费者在单个队列对应单个消费者中, 消息满足FIFO(先进先出)的特性, 天然满足顺序性(2) 分区消费一个队列只对应一个消费者, 吞吐量确实太低, 需要高性能的场景时, 进行分区消费未进行分区时, 拿订单系统-MQ-支付系统来举例, MQ中可能同一时间有多条消息等待消费, 分别有订单的创建, 支付成功, 发货等等, 在下面图片中, 如果消费者1先处理了订单1的支付成功, 消费者2因为网络原因后处理的订单1创建, 就会导致消费者1去数据库中查询不到订单信息, 从而丢弃消息我们可以使用分区消费的方式, 把MQ中的消息分割成多个分区, 每个订单对应一个分区, 每个分区对应一个队列(RabbitMQ通过使用一致性哈希器的方式来路由到每个队列上), 每个队列由一个消费者处理, 每个队列内的消息又天然满足顺序性(3) 消息(消费者)确认机制通过手动确认方式, 消费者每处理成功一条消息, 再继续进行下一条消息的处理拉取(4) 业务逻辑管理通过为消息嵌入序列号, 然后在消费端进行排序, 最后再按顺序处理消息五. 消息积压当消息队列中, 消息的生成速度大于消费者的处理速度时, 就会导致消息再队列中不断积压, 成为消息积压1. 产生原因1. 生产端: 在一些高流量的特定场景, 生产者的发送消息的速率极高, 超过了消费者处理能力2. 消费端: 生产者发送消息速率正常, 但是消费者的处理速度跟不上生成速度, 也会导致积压, 那么消费者为什么会跟不上速度呢? 下面是原因① 业务逻辑较复杂, 耗时长② 屎山代码累计, 性能较低③ 系统资源的限制, 消费端的硬件配置较低④ 消费端发生异常时, 处理不当, 例如不断频繁重试发送同一消息, 导致消息积压3. 网络问题, 消费者无法及时接收消息4. MQ服务器硬件配置较低2. 解决方案1. 限制生产者(不常用)① 通过流量控制逻辑, 根据消费者的处理能力动态发送消息② 这是过期时间, 到达过期时间丢入私信队列, 以此来减少主队列的压力2. 提升消费者① 增加消费者实例② 优化业务代码, 使用多线程③ 设置prefetchCount, 一个消费者达到数量时, 即使转发到其他队列④ 消费端代码逻辑异常时, 采用合适重试机制, 不再频繁重试, 也可以存到死信队列
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2490161.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!