
📝个人主页:五敷有你
🔥系列专栏:MQ
⛺️稳中求进,晒太阳

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。
2.数据持久化
为了提高性能,默认情况下,MQ的数据都是再内存存储的临时数据,重启后就会消失,为了保证数据的可靠性,必须配置数据持久化,包括:
-  交换机持久化 
-  队列持久化 
-  消息持久化 
我们以控制台界面为例来说明。
2.1.交换机持久化
在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

设置为Durable就是持久化模式,Transient就是临时模式。
2.2.队列持久化
在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

除了持久化以外,你可以看到队列还有很多其它参数,有一些我们会在后期学习。
2.3.消息持久化
在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties:

说明:
在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。
代码层次实现:
 @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "msg.queue",durable ="true"),
                exchange = @Exchange(name = "msg.topic",type = ExchangeTypes.TOPIC,durable = "true"),
                key = "msg"
        ))
        public void listenMsg(String jsonStr){
            log.info("收到消息{}", jsonStr);
            Map<String,Object> map = JSONUtil.toBean(jsonStr, Map.class);
            JSONObject object=new JSONObject(map);
            String actionName =object.getString(Action.ACTION);
            Action action = getAction(actionName);
            action.doMessage(getWebSocketManager(),object);
        }3.消费者的可靠性
当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:
-  消息投递的过程中出现了网络故障 
-  消费者接收到消息后突然宕机 
-  消费者接收到消息后,因处理不当导致异常 
RabbitMQ如何得知消费者的处理状态呢?
3.1消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
-  ack:成功处理消息,RabbitMQ从队列中删除该消息 
-  nack:消息处理失败,RabbitMQ需要再次投递消息 
-  reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息 
        一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
-  none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
测试下面代码验证: 在none情况下,异常产生后,消息队列中的消息被删除了
  @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "msg.queue",durable ="true"),
                exchange = @Exchange(name = "msg.topic",type = ExchangeTypes.TOPIC,durable = "true"),
                key = "msg"
        ))
        public void listenMsg(String jsonStr){
            log.info("收到消息{}", jsonStr);
            throw new RuntimeException("异常");
}

-  manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
-  auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:-  如果是业务异常,会自动返回 nack;
-  如果是消息处理或校验异常,自动返回 reject;
 
-  
测试下面代码验证: 在auto情况下,异常产生后,消息一直在被重复投递,


在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):放行以后,由于抛出的是业务异常,所以Spring返回ack,最终消息恢复至Ready状态,并且没有被RabbitMQ删除: 这个一直Unack的状态。当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。
如果是消息转换异常,spring会返回reject
刚刚发现,当消费者出现异常后,消息会不断的requeue(重入队)到队列,再重新发送给消费者,如果消费者执行依然出错,消息会再次投递到队列,直到处理成功为止。
极端情况下,消费之一直无法执行成功,那么消息requeue就会无限循环,导致mq的处理消息飙升,带来不必要的压力。

3.2.失败重试机制
为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false重启consumer服务,重复之前的测试。可以发现:
-  消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次 
-  本地重试3次以后,抛出了 AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject
试了三次,失败就停下来了。

结论:
-  开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试 
-  重试达到最大次数后,Spring会返回reject,消息会被丢弃 
3.2失败处理策略
本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:
-  RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
-  ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
-  RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}2)定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}package com.aqiuo.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public TopicExchange errorMessageExchange(){
            return new TopicExchange("error.topic",true,false);
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, TopicExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }
}

![[渗透利器]某大佬公开自用红队渗透工具](https://img-blog.csdnimg.cn/img_convert/14ef6e451886efc67e2988a0e73c7834.png)

![[ 项目 ] tcmalloc简化版—高并发内存池](https://img-blog.csdnimg.cn/direct/0ad5d1a5f91f44b2b227ddef3abd3f95.png)















