RabbitMQ 消息可靠保障
- 消息的可靠性保证
- 生产端到交换机和队列的可靠性保障
- 解决思路A-确认机制
- 解决思路B-备份交换机
 
- MQ 服务器宕机导致消息丢失
- 消费端消息的可靠性保障
 
- 消费端限流
 
消息的可靠性保证
实际项目中 MQ 的流程一般是:生产端把消息路由到交换机,然后由交换机把消息发送到队列,接着就是消费端拿到消息进行消费,这三个过程都有可能造成消息的不稳定,导致不可靠
 
生产端到交换机和队列的可靠性保障
解决思路A-确认机制
在生产端进行确认,具体操作中会分别针对交换机和队列来确认,如果没有成功发送的队列服务器上,那就可以尝试重新发送
首先需要增加下列配置
spring.rabbitmq.host=192.168.133.128
spring.rabbitmq.port=5672
spring.rabbitmq.password=admin
spring.rabbitmq.username=admin
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.type=simple
spring.rabbitmq.publisher-confirm-type=CORRELATED #交换机的确认
spring.rabbitmq.publisher-returns=true #队列的确认
这里 publisher-confirm-type 有三种模式可选:
- none:关闭 confirm 机制
- simple:同步阻塞等待MQ的回执消息
- CORRELATED:MQ异步回调方式返回回执消息
接下来增加一个Rabbit 配置
@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // 消息发送到交换机成功或失败时调用此方法
        log.info("confirm函数打印correlationData:" + correlationData);
        log.info("confirm函数打印ack:" + ack);
        log.info("confirm函数打印cause:" + cause);
    }
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        // 发送队列失败时才调用此方法
        log.info("消息主体:" + new String(returnedMessage.getMessage().getBody()));
        log.info("应答码:" + returnedMessage.getReplyCode());
        log.info("描述:" + returnedMessage.getReplyText());
        log.info("交换机:" + returnedMessage.getExchange());
        log.info("路由key:" + returnedMessage.getRoutingKey());
    }
}

 为了方便测试,定义一个交换机和队列吧并声明它们的绑定关系
public class RabbitMQConfig {
    //定义一个交换机已及 routingkey,用来测试消息的可靠传递测试
    public static final String NORMAL_EXCHANGE = "normal.demo.exchange";
    public static final String NORMAL_ROUTING_KEY = "normal.demo.routingkey";
    public static final String NORMAL_QUEUE = "normal.demo.queue";
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }
    @Bean
    public Queue normalQueue() {
        return new Queue(NORMAL_QUEUE);
    }
    @Bean
    public Binding normalBinding(@Qualifier("normalQueue") Queue queue,
                                 @Qualifier("normalExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY);
    }
}
在 controller 层测试
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/test")
    public String test(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, "Message Test Confirm~~");
        return "success.";
    }
}
消息成功路由到交换机,然后交换机发送到队列,正常情况下是下面的输出
2024-08-17T17:08:35.105+08:00  INFO 2484 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.133.128:5672]
2024-08-17T17:08:35.149+08:00  INFO 2484 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#59f45950:0/SimpleConnection@3542faa [delegate=amqp://admin@192.168.133.128:5672/, localPort=63927]
2024-08-17T17:08:35.204+08:00  INFO 2484 --- [nectionFactory2] com.example.demo.config.RabbitConfig     : confirm函数打印correlationData:null
2024-08-17T17:08:35.204+08:00  INFO 2484 --- [nectionFactory2] com.example.demo.config.RabbitConfig     : confirm函数打印ack:true
2024-08-17T17:08:35.205+08:00  INFO 2484 --- [nectionFactory2] com.example.demo.config.RabbitConfig     : confirm函数打印cause:null
现在改一下,模拟路由交换机失败的场景,注意没有 RabbitMQConfig.NORMAL_EXCHANGE+"~",这个交换机
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE+"~", RabbitMQConfig.NORMAL_ROUTING_KEY, "Message Test Confirm~~");
重新测试,此时输出如下,错误提示的还是很明显的
2024-08-17T17:11:39.981+08:00  INFO 15864 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.133.128:5672]
2024-08-17T17:11:40.037+08:00  INFO 15864 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#59f45950:0/SimpleConnection@568384f7 [delegate=amqp://admin@192.168.133.128:5672/, localPort=64266]
2024-08-17T17:11:40.074+08:00 ERROR 15864 --- [68.133.128:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'normal.demo.exchange~' in vhost '/', class-id=60, method-id=40)
2024-08-17T17:11:40.076+08:00  INFO 15864 --- [nectionFactory3] com.example.demo.config.RabbitConfig     : confirm函数打印correlationData:null
2024-08-17T17:11:40.076+08:00  INFO 15864 --- [nectionFactory3] com.example.demo.config.RabbitConfig     : confirm函数打印ack:false
2024-08-17T17:11:40.077+08:00  INFO 15864 --- [nectionFactory3] com.example.demo.config.RabbitConfig     : confirm函数打印cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'normal.demo.exchange~' in vhost '/', class-id=60, method-id=40)
现在再模拟交换机发送消息到队列失败的场景,改一下 routingKey
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY+"~", "Message Test Confirm~~");
重新测试,此时输出如下,returnedMessage 方法执行了
2024-08-17T17:13:49.835+08:00  INFO 12892 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.133.128:5672]
2024-08-17T17:13:49.889+08:00  INFO 12892 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#59f45950:0/SimpleConnection@572e41be [delegate=amqp://admin@192.168.133.128:5672/, localPort=64519]
2024-08-17T17:13:49.933+08:00  INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig     : 消息主体:Message Test Confirm~~
2024-08-17T17:13:49.934+08:00  INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig     : 应答码:312
2024-08-17T17:13:49.934+08:00  INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig     : 描述:NO_ROUTE
2024-08-17T17:13:49.934+08:00  INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig     : 交换机:normal.demo.exchange
2024-08-17T17:13:49.934+08:00  INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig     : 路由key:normal.demo.routingkey~
2024-08-17T17:13:49.935+08:00  INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig     : confirm函数打印correlationData:null
2024-08-17T17:13:49.936+08:00  INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig     : confirm函数打印ack:true
2024-08-17T17:13:49.936+08:00  INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig     : confirm函数打印cause:null
顺便了解一下spring.rabbitmq.template.mandatory 配置
 当使用 RabbitMQ 作为消息队列时,spring.rabbitmq.template.mandatory 配置项用于控制 MQ 消息模板(RabbitTemplate)的发送行为,特别是在消息路由时找不到队列(queue)的情况下的处理方式。
 2、spring.rabbitmq.template.mandatory属性可能会返回三种值null、false、true,
 3、spring.rabbitmq.template.mandatory 结果为 false 时会忽略掉spring.rabbitmq.publisher-returns 属性的值,也就是 returnedMessage 方法不会执行;结果为 true,returnedMessage 可以得到执行
 4、spring.rabbitmq.template.mandatory 结果为null(即不配置)时结果由spring.rabbitmq.publisher-returns 确定
 
解决思路B-备份交换机
为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机(实际项目中很少用,一般用思路A)
在上面代码的基础上改一下,RabbitMQConfig 就单纯的声明队列和交换机
public class RabbitMQConfig {
	//定义一个正常交换机以及 routingkey,用来测试消息的可靠传递测试
	public static final String NORMAL_EXCHANGE = "normal.demo.exchange";
	public static final String NORMAL_ROUTING_KEY = "normal.demo.routingkey";
	public static final String NORMAL_QUEUE = "normal.demo.queue";
	
	//定义一个备份交换机以及 队列,用来测试消息的可靠传递测试
	public static final String NORMAL_EXCHANGE_BACKUP = "normal.demo.exchange.backup";
	public static final String NORMAL_QUEUE_BACKUP = "normal.demo.queue.backup";
}
定义一个消费者,监听队列,实际项目中都会有的,有如下注意点:
- 备份交换机必须定义成 FANOUT 类型
- 声明正常交换机的时候指定一个备份交换机,通过 alternate-exchange参数,代码有体现
@Slf4j
@Component
public class RabbitConsumer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE),
            exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE, type = ExchangeTypes.DIRECT, arguments = {@Argument(name = "alternate-exchange", value = RabbitMQConfig.NORMAL_EXCHANGE_BACKUP)}),
            key = RabbitMQConfig.NORMAL_ROUTING_KEY))
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("正常队列{}收到消息:{}",  RabbitMQConfig.NORMAL_QUEUE, msg);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_BACKUP),
            exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_BACKUP, type = ExchangeTypes.FANOUT)))
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("备份队列{}收到消息:{}",  RabbitMQConfig.NORMAL_QUEUE_BACKUP, msg);
    }
}
注意:@RabbitListener 必须先创建队列,不然报错, 有三个 属性 queues()、bindings()、queuesToDeclare(),它们之间是互斥的。设定了queues(),就不能再设定 bindings() 和 queuesToDeclare()了,具体用法可以看这篇文章
接下来模拟无法到达队列(如果无法到达交换机直接报错,没有后面什么事了…),测试能不能正常进入备份交换机
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY+"~", "Message Test Confirm~~");
运行结果如下:
2024-08-17T18:25:32.253+08:00  INFO 22476 --- [nectionFactory2] com.example.demo.config.RabbitConfig     : confirm函数打印correlationData:null
2024-08-17T18:25:32.254+08:00  INFO 22476 --- [nectionFactory2] com.example.demo.config.RabbitConfig     : confirm函数打印ack:true
2024-08-17T18:25:32.254+08:00  INFO 22476 --- [nectionFactory2] com.example.demo.config.RabbitConfig     : confirm函数打印cause:null
2024-08-17T18:25:32.257+08:00  INFO 22476 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer   : 备份队列normal.demo.queue.backup收到消息:Message Test Confirm~~
MQ 服务器宕机导致消息丢失
这一点官方早就考虑到了,现在无论是交换机还是队列,消息都是默认持久化到硬盘上,哪怕服务器重启也不会导致消息丢失
消费端消息的可靠性保障
配置文件增加 spring.rabbitmq.listener.simple.acknowledge-mode=manual 配置,也就是下面这样
spring.rabbitmq.host=192.168.133.128
spring.rabbitmq.port=5672
spring.rabbitmq.password=admin
spring.rabbitmq.username=admin
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.type=simple
spring.rabbitmq.publisher-confirm-type=CORRELATED
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual #把消息确认模式改成手动确认
因为 RabbitMQ 客户端默认是 自动返回 ACK 确认的,也就是不管是处理成功还是失败,默认都按成功来处理,这样就不太好,所以这个配置要改成手动确认
需要提前知道个东西,啥是 deliveryTag?
 
 队列的定义,这里把备份交换机队列都删了哈
public class RabbitMQConfig {
	//定义一个正常交换机以及 routingkey,用来测试消息的可靠传递测试
	public static final String NORMAL_EXCHANGE = "normal.demo.exchange";
	public static final String NORMAL_ROUTING_KEY = "normal.demo.routingkey";
	public static final String NORMAL_QUEUE = "normal.demo.queue";
}
消费者逻辑如下
@Slf4j
@Component
public class RabbitConsumer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE),
            exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE, type = ExchangeTypes.DIRECT, arguments = {@Argument(name = "alternate-exchange", value = RabbitMQConfig.NORMAL_EXCHANGE_BACKUP)}),
            key = RabbitMQConfig.NORMAL_ROUTING_KEY))
    public void receiveA(Message message, Channel channel) throws IOException {
        // 消息内容
        String msg = new String(message.getBody());
        // 消息的 deliveryTag
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 操作成功,返回 ack 信息
            int i = 1/0; // 模拟消息处理异常
            channel.basicAck(deliveryTag, false);
            log.info("正常队列{}收到消息:{}",  RabbitMQConfig.NORMAL_QUEUE, msg);
        } catch (Exception ex) {
            // 获取当前消息是否是重复投递的
            // true-说明消息已经重复投递过一次了;false-说明当前消息是第一次投递
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            // 操作失败,返回 Nack 信息
            if (redelivered) {
                // requeue 参数:控制消息是否重新放入队列 true-重放队列,broker会重新投递消息; false-不重放,broker会丢弃消息
                channel.basicNack(deliveryTag, false, false); // basicNack(long deliveryTag, boolean multiple, boolean requeue)
            } else {
                channel.basicNack(deliveryTag, false, true);
            }
            log.info("正常队列{}收到消息,,redelivered: {},但是处理异常:{}",  RabbitMQConfig.NORMAL_QUEUE, redelivered, msg);
        }
        // basicReject 和 basicNack 区别-> 唯一的区别就是 basicNack 有批量操作控制,就是 multiple 参数
        // channel.basicReject(deliveryTag, false); //basicReject(long deliveryTag, boolean requeue)
    }
}
2024-08-17T19:48:38.131+08:00  INFO 18192 --- [nectionFactory2] com.example.demo.config.RabbitConfig     : confirm函数打印correlationData:null
2024-08-17T19:48:38.132+08:00  INFO 18192 --- [nectionFactory2] com.example.demo.config.RabbitConfig     : confirm函数打印ack:true
2024-08-17T19:48:38.132+08:00  INFO 18192 --- [nectionFactory2] com.example.demo.config.RabbitConfig     : confirm函数打印cause:null
2024-08-17T19:48:55.205+08:00  INFO 18192 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer   : 正常队列normal.demo.queue收到消息,,redelivered: false,但是处理异常:Message Test Confirm~~
2024-08-17T19:49:12.007+08:00  INFO 18192 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer   : 正常队列normal.demo.queue收到消息,,redelivered: true,但是处理异常:Message Test Confirm~~
具体效果可以本地 debug 下哈,消费完并且都 ack 后这里都是0了
 
消费端限流
我们都知道 MQ 有削峰填谷的效果,假设有下面的场景,消息队列里面有10000条消息,但是消费端的并发能力只有 1000,为了避免一次性把这1万条消息全部取出,导致消费端压力太大,我们可以做个设置,每次最多取1000条消息,对消费端也是一种保护。
从操作层面来说也比较简单,配置spring.rabbitmq.listener.simple.prefetch 属性即可
 
 做个小实验:
 当不设置 prefetch 时,生产者一次性向交换机中投递100条消息,消费者确认消息的方式设置为手动确认,在确认消息之前线程休眠 5 秒(因为 rabbitmq 管理控制台数据更新是 5 秒更新一次,这里设置为 5 秒比较方便观察),可以观察到队列是一次性将100条数据全部发送给了消费者,然后消费者再进行处理:
生产者一次发送10条消息到交换机:
 @GetMapping("/test")
 public String test(){
     for (int i = 0; i < 100; i++) {
         rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, "Message Test Confirm"+i);
     }
     return "success.";
 }
消费者在确认之前休眠5秒:
@RabbitListener(bindings = @QueueBinding(
  value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_BACKUP),
  exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_BACKUP, type = ExchangeTypes.FANOUT)))
public void receiveB(Message message, Channel channel) throws IOException {
    String msg = new String(message.getBody());
    log.info("备份队列{}收到消息:{}",  RabbitMQConfig.NORMAL_QUEUE_BACKUP, msg);
}

 现在增加spring.rabbitmq.listener.simple.prefetch=2配置,就是一次从队列中取两条数据,再观察结果:
 会发现,消费端不再是一次性全部把消息取出,而是每次只取 2 个,这就是 prefetch 配置作用
 





![[Meachines] [Medium] solidstate Apache JAMES RCE+POP3邮件泄露+定时任务权限提升](https://img-blog.csdnimg.cn/img_convert/7d4354cb1bcf768634351e34e9d8bcdb.jpeg)













