【RabbitMQ】 RabbitMQ高级特性(二)

news2025/5/20 8:11:16

文章目录

  • 一、重试机制
    • 1.1、重试配置
    • 1.2、配置交换机&队列
    • 1.3、发送消息
    • 1.4、消费消息
    • 1.5、运行程序
    • 1.6、 手动确认
  • 二、TTL
    • 2.1、设置消息的TTL
    • 2.2、设置队列的TTL
    • 2.3、两者区别
  • 三 、死信队列
    • 6.1 死信的概念
    • 3.2 代码示例
      • 3.2.1、声明队列和交换机
      • 3.2.2、正常队列绑定死信交换机
      • 3.2.3 制造死信产生的条件
      • 3.2.4、发送消息
      • 3.2.5、测试死信
    • 3.3、常见面试题
  • 四、延迟队列
    • 4.1、概念
    • 4.2、应用场景
    • 4.3、TTL+死信队列实现
    • 4.4、常见面试题
  • 五、事务
    • 5.1、配置事务管理器
    • 5.2、声明队列
    • 5.3、生产者
  • 结语


本文延续上文RabbitMQ高级特性(一)为大家继续讲解RabbitMQ其他高级特性

在这里插入图片描述

一、重试机制

在消息传递过程中, 可能会遇到各种问题, 如网络故障, 服务不可用, 资源不足等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送. 但如果是程序逻辑引起的错误, 那么多次重试也是没有⽤的, 可以设置重试次数

1.1、重试配置

spring:
 rabbitmq:
	 addresses: amqp://study:study@110.41.51.65:15673/jiaohuan
	 listener:
		 simple:
			 acknowledge-mode: auto #消息接收确认
			 retry:
				 enabled: true # 开启消费者失败重试
				 initial-interval: 5000ms # 初始失败等待时⻓为5秒
				 max-attempts: 5 # 最⼤重试次数(包括⾃⾝消费的⼀次)

1.2、配置交换机&队列

   //重试机制
    public static final String RETRY_QUEUE = "retry_queue";
    public static final String RETRY_EXCHANGE_NAME = "retry_exchange";

    //重试机制 发布订阅模式
	//1. 交换机
    @Bean("retryExchange")
    public Exchange retryExchange() {
        return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();
    }

    //2. 队列
    @Bean("retryQueue")
    public Queue retryQueue() {
        return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
    }

    //3. 队列和交换机绑定 Binding
    @Bean("retryBinding")
    public Binding retryBinding(@Qualifier("retryExchange") FanoutExchange
                                        exchange, @Qualifier("retryQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange);
    }

1.3、发送消息

@RequestMapping("/retry")
public String retry(){
 	rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE_NAME, "", "retry test...");
 	return "发送成功!"; 
 }

1.4、消费消息

@Component
public class RetryQueueListener {
    //指定监听队列的名称
    @RabbitListener(queues = Constant.RETRY_QUEUE)
    public void ListenerQueue(Message message) throws Exception {
        System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),
                message.getMessageProperties().getDeliveryTag());
        //模拟处理失败
        int num = 3 / 0;
        System.out.println("处理完成");
    }
}

1.5、运行程序

我们可以观察到结果

接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 1
o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'consumer ack test...' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ack_exchange, receivedRoutingKey=ack, deliveryTag=1, consumerTag=amq.ctag-vYckQBt9_0-5v2oG9oBnFw, consumerQueue=ack_queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void com.jiaohuan.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'threw exception

如果对异常进行捕获, 那么就不会进行重试 代码修改如下:

System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), 
message.getMessageProperties().getDeliveryTag());
//模拟处理失败
try {
	 int num = 3/0;
	 System.out.println("处理完成");
}catch (Exception e){
	 System.out.println("处理失败");
}

重新运行程序, 结果如下:

接收到消息: consumer ack test..., deliveryTag: 1
处理失败

1.6、 手动确认

改为手动确认

  @RabbitListener(queues = Constant.RETRY_QUEUE)
    public void ListenerQueue(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),
                    message.getMessageProperties().getDeliveryTag());
            //模拟处理失败
            int num = 3 / 0;
            System.out.println("处理完成");
            //3. ⼿动签收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //4. 异常了就拒绝签收
            Thread.sleep(1000);
            //第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 则直接丢弃
            channel.basicNack(deliveryTag, true, true);
        }
    }

运⾏结果:

接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 2
接收到消息: retry test..., deliveryTag: 3
接收到消息: retry test..., deliveryTag: 4
接收到消息: retry test..., deliveryTag: 5
接收到消息: retry test..., deliveryTag: 6
接收到消息: retry test..., deliveryTag: 7
接收到消息: retry test..., deliveryTag: 8
接收到消息: retry test..., deliveryTag: 9
接收到消息: retry test..., deliveryTag: 10
接收到消息: retry test..., deliveryTag: 11

可以看到, 手动确认模式时, 重试次数的限制不会像在自动确认模式下那样直接生效, 因为是否重试以及何时重试更多地取决于应⽤程序的逻辑和消费者的实现. ⾃动确认模式下, RabbitMQ 会在消息被投递给消费者后自动确认消息. 如果消费者处理消息时抛出异 常, RabbitMQ 根据配置的重试参数自动将消息重新⼊队, 从而实现重试. 重试次数和重试间隔等参数可以直接在RabbitMQ的配置中设定,并且RabbitMQ会负责执行这些重试策略.
⼿动确认模式下, 消费者需要显式地对消息进行确认. 如果消费者在处理消息时遇到异常, 可以选择不确认消息使消息可以重新⼊队. 重试的控制权在于应用程序本身, 而不是RabbitMQ的内部机制. 应用程序 可以通过自己的逻辑和利用RabbitMQ的⾼级特性来实现有效的重试策略。

在这里插入图片描述

使⽤重试机制时需要注意:
1 . ⾃动确认模式下: 程序逻辑异常, 多次重试还是失败, 消息就会被自动确认, 那么消息就丢失 了
2 . ⼿动确认模式下: 程序逻辑异常, 多次重试消息依然处理失败, 无法被确认, 就⼀直是unacked的状态, 导致消息积压

二、TTL

TTL(Time to Live, 过期时间), 即过期时间. RabbitMQ可以对消息和队列设置TTL.当消息到达存活时间之后, 还没有被消费, 就会被⾃动清除。

咱们在⽹上购物, 经常会遇到⼀个场景, 当下单超过24⼩时还未付款, 订单会被⾃动取消 还有类似的, 申请退款之后, 超过7天未被处理, 则⾃动退款
在这里插入图片描述

2.1、设置消息的TTL

目前有两种方法可以设置消息的TTL.
⼀是设置队列的TTL, 队列中所有消息都有相同的过期时间. ⼆是对消息本身进行单独设置, 每条消息的TTL可以不同. 如果两种方法⼀起使用, 则消息的TTL以两者之间较小的那个数值为准. 先看针对每条消息设置TTL。针对每条消息设置TTL的方法是在发送消息的方法中加入expiration的属性参数,单位为毫秒.

配置交换机&队列:


    //TTL
    public static final String TTL_QUEUE = "ttl_queue";
    public static final String TTL_EXCHANGE_NAME = "ttl_exchange";

    //ttl
	//1. 交换机
    @Bean("ttlExchange")
    public Exchange ttlExchange() {
        return ExchangeBuilder.fanoutExchange(Constant.TTL_EXCHANGE_NAME).durable(true).build();
    }

    //2. 队列
    @Bean("ttlQueue")
    public Queue ttlQueue() {
        return QueueBuilder.durable(Constant.TTL_QUEUE).build();
    }

    //3. 队列和交换机绑定 Binding
    @Bean("ttlBinding")
    public Binding ttlBinding(@Qualifier("ttlExchange") FanoutExchange exchange,
                              @Qualifier("ttlQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange);
    }

发送消息:

    @RequestMapping("/ttl")
    public String ttl() {
        String ttlTime = "10000";//10s
        rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl 
                test...", messagePostProcessor -> {
        messagePostProcessor.getMessageProperties().setExpiration(ttlTime);
        return messagePostProcessor;
    });
        return"发送成功!";
    }

观看结果:

发送消息后, 可以看到, Ready消息为1:
在这里插入图片描述
10秒钟之后, 刷新页面, 发现消息已被删除:
在这里插入图片描述
如果不设置TTL,则表⽰此消息不会过期;如果将TTL设置为0,则表示除非此时可以直接将消息投递到 消费者,否则该消息会被立即丢弃

2.2、设置队列的TTL

设置队列TTL的方法是在创建队列时, 加⼊ x-message-ttl 参数实现的, 单位是毫秒。

配置队列和绑定关系:

	public static final String TTL_QUEUE2 = "ttl_queue2";

    //设置ttl
    @Bean("ttlQueue2")
    public Queue ttlQueue2() {
        //设置20秒过期
        return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20 * 1000).build();
    }

    //3. 队列和交换机绑定 Binding
    @Bean("ttlBinding2")
    public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange,
                               @Qualifier("ttlQueue2") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange);
    }

设置过期时间, 也可以采⽤以下方式:

 @Bean("ttlQueue2")
    public Queue ttlQueue2() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 20000);//20秒过期
        return QueueBuilder.durable(Constant.TTL_QUEUE2).withArguments(arguments).build();
    }

发送消息:

  @RequestMapping("/ttl")
    public String ttl() {
        // String ttlTime = "30000";//10s
        // //发送带ttl的消息
        // rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl 
        test...", messagePostProcessor -> {
        // messagePostProcessor.getMessageProperties().setExpiration(ttlTime);
        // return messagePostProcessor;
        //});
        //发送不带ttl的消息
        rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl 
                test...");
        return "发送成功!";
    }

运行程序:
运行之后发现,新增了⼀个队列, 队列Features有⼀个TTL标识:
在这里插入图片描述
调用接口, 发送消息:

发送消息后, 可以看到, Ready消息为1:
在这里插入图片描述

采⽤发布订阅模式, 所有与该交换机绑定的队列(ttl_queue和ttl_queue2)都会收到消息

20秒钟之后, 刷新页面, 发现消息已被删除
在这里插入图片描述
由于ttl_queue队列, 未设置过期时间, 所以ttl_queue的消息未删除。

2.3、两者区别

设置队列TTL属性的方法, ⼀旦消息过期, 就会从队列中删除 设置消息TTL的方法, 即使消息过期, 也不会马上从队列中删除, 而是在即将投递到消费者之前进行判定的.
为什么这两种方法处理的方式不⼀样?
因为设置队列过期时间, 队列中已过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否 有过期的消息即可. ⽽设置消息TTL的方式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不 如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可.

三 、死信队列

6.1 死信的概念

死信(dead message) 简单理解就是因为种种原因, ⽆法被消费的信息, 就是死信. 有死信, ⾃然就有死信队列. 当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器 中,这个交换器就是DLX( Dead Letter Exchange ), 绑定DLX的队列, 就称为死信队列(Dead Letter Queue,简称DLQ)
在这里插入图片描述
消息变成死信⼀般是由于以下几种情况:

  1. 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false.
  2. 消息过期.
  3. 队列达到最大长度

3.2 代码示例

3.2.1、声明队列和交换机

包含两部分:
• 声明正常的队列和正常的交换机
• 声明死信队列和死信交换机

死信交换机和死信队列和普通的交换机, 队列没有区别

    //死信队列
public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
    
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitmq.Constant;

/**
 * 死信队列相关配置
 */
@Configuration
public class DLXConfig {
    //死信交换机
    @Bean("dlxExchange")
    public Exchange dlxExchange() {
        return
                ExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true).build();
    }

    //2. 死信队列
    @Bean("dlxQueue")
    public Queue dlxQueue() {
        return QueueBuilder.durable(Constant.DLX_QUEUE).build();
    }

    //3. 死信队列和交换机绑定 Binding
    @Bean("dlxBinding")
    public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange,
                              @Qualifier("dlxQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
    }

    //正常交换机
    @Bean("normalExchange")
    public Exchange normalExchange() {
        return ExchangeBuilder.topicExchange(Constant.NORMAL_EXCHANGE_NAME).durable(true).build();
    }

    //正常队列
    @Bean("normalQueue")
    public Queue normalQueue() {
        return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();
    }

    //正常队列和交换机绑定 Binding
    @Bean("normalBinding")
    public Binding normalBinding(@Qualifier("normalExchange") Exchange
                                         exchange, @Qualifier("normalQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
    }
}

3.2.2、正常队列绑定死信交换机

当这个队列中存在死信时, RabbitMQ会自动的把这个消息重新发布到设置的DLX上, 进而被路由到另一个队列, 即死信队列.可以监听这个死信队列中的消息以进⾏相应的处理

  @Bean("normalQueue")
    public Queue normalQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);//绑定死 信队列
        arguments.put("x-dead-letter-routing-key", "dlx");//设置发送给死信队列的
        RoutingKey
        return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
    }

3.2.3 制造死信产生的条件

  @Bean("normalQueue")
    public Queue normalQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);//绑定死 信队列
        arguments.put("x-dead-letter-routing-key", "dlx");//设置发送给死信队列的
        RoutingKey
        //制造死信产⽣的条件
        arguments.put("x-message-ttl", 10000);//10秒过期
        arguments.put("x-max-length", 10);//队列⻓度
        return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
    }

3.2.4、发送消息

@RequestMapping("/dlx")
 public void dlx() {
 //测试过期时间, 当时间达到TTL, 消息⾃动进⼊到死信队列
 rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
	 //测试队列⻓度
	// for (int i = 0; i < 20; i++) {
	//  	rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
	// }
	 //测试消息拒收
	// rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
 }

3.2.5、测试死信

程序启动之后, 观察队列:
在这里插入图片描述

队列Features说明:
D: durable的缩写, 设置持久化
TTL: Time to Live, 队列设置了TTL
Lim: 队列设置了长度(x-max-length)
DLX: 队列设置了死信交换机(x-dead-letter-exchange)
DLK: 队列设置了死信RoutingKey(x-dead-letter-routing-key)

  1. 测试过期时间, 到达过期时间之后, 进⼊死信队列

发送之后:
在这里插入图片描述
10秒后, 消息进入到死信队列:
在这里插入图片描述

生产者首先发送⼀条消息,然后经过交换器(normal_exchange)顺利地存储到队列(normal_queue)中. 由于队列normal_queue设置了过期时间为10s, 在这10s内没有消费者消费这条消息, 那么判定这条消息过期. 由于设置了DLX, 过期之时, 消息会被丢给交换器(dlx_exchange)中, 这时根据RoutingKey匹配, 找到匹配的队列(dlx_queue), 最后消息被存储在queue.dlx这个死信队列中.

  1. 测试达到队列长度, 消息进入死信队列
    队列⻓度设置为10, 我们发送20条数据, 会有10条数据直接进⼊到死信队列 发送前, 死信队列只有⼀条数据
    在这里插入图片描述
    发送20条消息:
//测试队列⻓度
for (int i = 0; i < 20; i++) {
	 rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
}

运⾏后, 可以看到死信队列变成了11条
在这里插入图片描述

过期之后, 正常队列的10条也会进入到死信队列

在这里插入图片描述

3.3、常见面试题

死信队列作为RabbitMQ的高级特性,也是面试的一大重点。

  1. 死信队列的概念 死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些无法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列用于存储这些死信消息
  2. 死信的来源
    1)消息过期: 消息在队列中存活的时间超过了设定的TTL
    2)消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新入队(requeue=false), 消息也会成为死信.
    3)队列满了: 当队列达到最大长度, 无法再容纳新的消息时, 新来的消息会被处理为死信.
  3. 死信队列的应用场景 对于RabbitMQ来说, 死信队列是⼀个非常有用的特性. 它可以处理异常情况下,消息不能够被消费者正 确消费而被置⼊死信队列中的情况, 应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的 异常情况, 进而可以改善和优化系统. 比如: 用户支付订单之后, 支付系统会给订单系统返回当前订单的⽀付状态。为了保证支付信息不丢失, 需要使用到死信队列机制. 当消息消费异常时, 将消息投入到死信队列中, 由订单系统的其他消费者来监听这个队列, 并对数据进行处理(比如发送工单等,进行人工确认).
    场景的应用场景还有:
    • 消息重试:将死信消息重新发送到原队列或另⼀个队列进行重试处理.
    • 消息丢弃:直接丢弃这些无法处理的消息,以避免它们占⽤系统资源.
    • ⽇志收集:将死信消息作为日志收集起来,用于后续分析和问题定位.

四、延迟队列

4.1、概念

延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, 而是等待特定时间后, 消费者才能拿到这个消息进行消费.

4.2、应用场景

延迟队列的使用场景有很多, 比如:

  1. 智能家居: 用户希望通过手机远程遥控家⾥的智能设备在指定的时间进行⼯作. 这时候就可以将用户指令发送到延迟队列, 当指令设定的时间到了再将指令推送到智能设备.
  2. ⽇常管理: 预定会议后,需要在会议开始前十五分钟提醒参会⼈参加会议
  3. ⽤⼾注册成功后, 7天后发送短信, 提高用户活跃度等

RabbitMQ本身没有直接支持延迟队列的的功能, 但是可以通过前面所介绍的TTL+死信队列的⽅式组合模拟出延迟队列的功能. 假设⼀个应用中需要将每条消息都设置为10秒的延迟, 生产者通过 normal_exchange 这个交换器将 发送的消息存储在 normal_queue 这个队列中. 消费者订阅的并非是 normal_queue 这个队列, 而是 dlx_queue 这个队列. 当消息从normal_queue 这个队列中过期之后被存入 dlx_queue 这个 队列中,消费者就恰巧消费到了延迟10秒的这条消息.

所以死信队列展⽰的也是延迟队列的使用.

4.3、TTL+死信队列实现

代码实现:
先看TTL+死信队列实现延迟队列
继续沿用死信队列的代码即可
声明队列

  //正常队列
    @Bean("normalQueue")
    public Queue normalQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);//绑定死 信队列
        arguments.put("x-dead-letter-routing-key", "dlx");//设置发送给死信队列的
        RoutingKey
        return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
    }

⽣产者:
发送两条消息, ⼀条消息10s后过期, 第二条20s后过期

    @RequestMapping("/delay")
    public String delay() {
        //发送带ttl的消息
        rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
                "ttl test 10s..." + new Date(), messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setExpiration("10000");
					//10s过期
                    return messagePostProcessor;
                });
        rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
                "ttl test 20s..." + new Date(), messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setExpiration("20000");
					//20s过期
                    return messagePostProcessor;
                });
        return "发送成功!";
    }

消费者:

//指定监听队列的名称
@RabbitListener(queues = Constant.DLX_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws Exception {
 	long deliveryTag = message.getMessageProperties().getDeliveryTag();
	 System.out.printf("%tc 死信队列接收到消息: %s, deliveryTag: %d%n", newDate(),newString(message.getBody(),"UTF-8"), 
	message.getMessageProperties().getDeliveryTag());
}

运行程序:
通过控制台观察死信队列消费情况:

死信队列接收到消息: ttl test 10s…Wed May 22 11:58:50 CST , deliveryTag: 1
死信队列接收到消息: ttl test 20s…Wed May 22 11:58:50 CST , deliveryTag: 2

可以看到, 两条消息按照过期时间依次进入了死信队列. 延迟队列, 就是希望等待特定的时间之后, 消费者才能拿到这个消息. TTL刚好可以让消息延迟⼀段时间 成为死信, 成为死信的消息会被投递到死信队列⾥, 这样消费者⼀直消费死信队列里的消息就可以了.

存在问题

接下来把⽣产消息的顺序修改⼀下 先发送20s过期数据, 再发送10s过期数据

 @RequestMapping("/delay")
    public String delay() {
        //发送带ttl的消息
        rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
                "ttl test 20s..." + new Date(), messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setExpiration("20000");
					//20s过期
                    return messagePostProcessor;
                });
        rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
                "ttl test 10s..." + new Date(), messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setExpiration("10000");
					//10s过期
                    return messagePostProcessor;
                });
        return "发送成功!";
   }

这时会发现: 10s过期的消息,,也是在20s后才进入到死信队列.
消息过期之后, 不⼀定会被马上丢弃. 因为RabbitMQ只会检查队首消息是否过期, 如果过期则丢到死信队列. 此时就会造成⼀个问题, 如果第⼀个消息的延时时间很长, 第二个消息的延时时间很短, 那第二个 消息并不会优先得到执行.
所以在考虑使用TTL+死信队列实现延迟任务队列的时候, 需要确认业务上每个任务的延迟时间是⼀致 的, 如果遇到不同的任务类型需要不同的延迟的话, 需要为每⼀种不同延迟时间的消息建⽴单独的消息队列。

另外注意,同样可以使用插件使得消息按照延迟时间到达消费者

4.4、常见面试题

延迟队列作为RabbitMQ的高级特性,也是面试的一大重点. 介绍下RabbitMQ的延迟队列。延迟队列是⼀个特殊的队列, 消息发送之后, 并不立即给消费者, 而是等待特定的时间, 才发送给消费者. 延迟队列的应用场景有很多, 比如:

  1. 订单在十分钟内未支付自动取消
  2. 用户注册成功后, 3天后发调查问卷
  3. 用户发起退款, 24小时后商家未处理, 则默认同意, 自动退款

  4. 但RabbitMQ本身并没直接实现延迟队列, 通常有两种方法:
    1 . TTL+死信队列组合的方式
    2 . 使用官方提供的延迟插件实现延迟功能

⼆者对比:

  1. 基于死信实现的延迟队列
    a. 优点: 1) 灵活不需要额外的插件支持
    b. 缺点: 1) 存在消息顺序问题 2) 需要额外的逻辑来处理死信队列的消息, 增加了系统的复杂性
  2. 基于插件实现的延迟队列
    a. 优点: 1) 通过插件可以直接创建延迟队列, 简化延迟消息的实现. 2) 避免了DLX的时序问题
    b. 缺点: 1) 需要依赖特定的插件, 有运维工作 2) 只适用特定版本

五、事务

RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也支持事务机制. SpringAMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原子性的, 要么全部成功, 要么全部失败.

5.1、配置事务管理器

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TransactionConfig {
    @Bean
    public RabbitTransactionManager
    transactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory
                                                 connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }
}

5.2、声明队列

@Bean("transQueue")
 public Queue transQueue() {
	 return QueueBuilder.durable("trans_queue").build();
 }

5.3、生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/trans")
@RestController
public class TransactionProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Transactional
    @RequestMapping("/send")
    public String send() {
        rabbitTemplate.convertAndSend("", "trans_queue", "trans test 1...");
        int a = 5 / 0;
        rabbitTemplate.convertAndSend("", "trans_queue", "trans test 2...");
        return "发送成功";
    }
}

通过测试发现:

  1. 不加 @Transactional , 会发现消息1发送成功
  2. 添加 @Transactional , 消息1和消息2全部发送失败

结语

本篇文章主要介绍了RAbbitMQ中的部分高级特性,主要从重试机制、有效时间TTL、死信队列、延迟队列和事务几方面展开。以上就是本文全部内容,感谢各位能够看到最后,如有问题,欢迎各位大佬在评论区指正,希望大家可以有所收获!创作不易,希望大家多多支持!

最后,大家再见!祝好!我们下期见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2379034.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电子电路:什么是电流离散性特征?

关于电荷的量子化,即电荷的最小单位是电子的电荷量e。在宏观电路中,由于电子数量极大,电流看起来是连续的。但在微观层面,比如纳米器件或单电子晶体管中,单个电子的移动就会引起可观测的离散电流。 还要提到散粒噪声,这是电流离散性的表现之一。当电流非常小时,例如在二…

深入理解位图(Bit - set):概念、实现与应用

目录 引言 一、位图概念 &#xff08;一&#xff09;基本原理 &#xff08;二&#xff09;适用场景 二、位图的实现&#xff08;C 代码示例&#xff09; 三、位图应用 1. 快速查找某个数据是否在一个集合中 2. 排序 去重 3. 求两个集合的交集、并集等 4. 操作系…

猫番阅读APP:丰富资源,优质体验,满足你的阅读需求

猫番阅读APP是一款专为书籍爱好者设计的移动阅读应用&#xff0c;致力于提供丰富的阅读体验和多样化的书籍资源。它不仅涵盖了小说、非虚构、杂志等多个领域的电子书&#xff0c;还提供了个性化推荐、书架管理、离线下载等功能&#xff0c;满足不同读者的阅读需求。无论是通勤路…

MetaMask安装及使用-使用水龙头获取测试币的坑?

常见的异常有&#xff1a; 1.unable to request drip, please try again later. 2.You must hold at least 1 LINK on Ethereum Mainnet to request native tokens. 3.The address provided does not have sufficient historical activity or balance on the Ethereum Mainne…

AI:OpenAI论坛分享—《AI重塑未来:技术、经济与战略》

AI&#xff1a;OpenAI论坛分享—《AI重塑未来&#xff1a;技术、经济与战略》 导读&#xff1a;2025年4月24日&#xff0c;OpenAI论坛全面探讨了 AI 的发展趋势、技术范式、地缘政治影响以及对经济和社会的广泛影响。强调了 AI 的通用性、可扩展性和高级推理能力&#xff0c;以…

Linux配置vimplus

配置vimplus CentOS的配置方案很简单&#xff0c;但是Ubuntu的解决方案网上也很多但是有效的很少&#xff0c;尤其是22和24的解决方案&#xff0c;在此我整理了一下我遇到的问题解决方法 CentOS7 一键配置VimForCPP 基本上不会有什么特别难解决的报错 sudo yum install vims…

服务端HttpServletRequest、HttpServletResponse、HttpSession

一、概述 在JavaWeb 开发中&#xff0c;获取客户端传递的参数至关重要。http请求是客户端向服务端发起数据传输协议&#xff0c;主要包含包含请求行、请求头、空行和请求体四个部分&#xff0c;在这四部分中分别携带客户端传递到服务端的数据。常见的http请求方式有get、post、…

实验九视图索引

设计性实验 1. 创建视图V_A包括学号&#xff0c;姓名&#xff0c;性别&#xff0c;课程号&#xff0c;课程名、成绩&#xff1b; 一个语句把学号103 课程号3-105 的姓名改为陆君茹1&#xff0c;性别为女 &#xff0c;然后查看学生表的信息变化&#xff0c;再把上述数据改为原…

git 本地提交后修改注释

dos命令行进入目录&#xff0c;idea可以点击Terminal 进入命令行 git commit --amend -m "修改内容"

面向具身智能的视觉-语言-动作模型(VLA)综述

具身智能被广泛认为是通用人工智能&#xff08;AGI&#xff09;的关键要素&#xff0c;因为它涉及控制具身智能体在物理世界中执行任务。在大语言模型和视觉语言模型成功的基础上&#xff0c;一种新的多模态模型——视觉语言动作模型&#xff08;VLA&#xff09;已经出现&#…

计算机发展的历程

计算机系统的概述 一, 计算机系统的定义 计算机系统的概念 计算机系统 硬件 软件 硬件的概念 计算机的实体, 如主机, 外设等 计算机系统的物理基础 决定了计算机系统的天花板瓶颈 软件的概念 由具有各类特殊功能的程序组成 决定了把硬件的性能发挥到什么程度 软件的分类…

深度学习驱动下的目标检测技术:原理、算法与应用创新(三)

五、基于深度学习的目标检测代码实现 5.1 开发环境搭建 开发基于深度学习的目标检测项目&#xff0c;首先需要搭建合适的开发环境&#xff0c;确保所需的工具和库能够正常运行。以下将详细介绍 Python、PyTorch 等关键开发工具和库的安装与配置过程。 Python 是一种广泛应用于…

jenkins流水线常规配置教程!

Jenkins流水线是在工作中实现CI/CD常用的工具。以下是一些我在工作和学习中总结出来常用的一些流水线配置&#xff1a;变量需要加双引号括起来 "${main}" 一 引用无账号的凭据 使用变量方式引用&#xff0c;这种方式只适合只由密码&#xff0c;没有用户名的凭证。例…

基于OpenCV的SIFT特征和FLANN匹配器的指纹认证

文章目录 引言一、概述二、代码解析1. 图像显示函数2. 核心认证函数2.1 创建SIFT特征提取器2.2 检测关键点和计算描述符&#xff08;源图像&#xff09;2.3 检测关键点和计算描述符&#xff08;模板图像&#xff09;2.4 创建FLANN匹配器2.5 使用K近邻匹配 3. 匹配点筛选4. 认证…

leetcode:58. 最后一个单词的长度(python3解法)

难度&#xff1a;简单 给你一个字符串 s&#xff0c;由若干单词组成&#xff0c;单词前后用一些空格字符隔开。返回字符串中 最后一个 单词的长度。 单词 是指仅由字母组成、不包含任何空格字符的最大子字符串。 示例 1&#xff1a; 输入&#xff1a;s "Hello World"…

虹科应用 | 探索PCAN卡与医疗机器人的革命性结合

随着医疗技术的不断进步&#xff0c;医疗机器人在提高手术精度、减少感染风险以及提升患者护理质量方面发挥着越来越重要的作用。医疗机器人的精确操作依赖于稳定且高效的数据通信系统&#xff0c;虹科提供的PCAN四通道mini PCIe转CAN FD卡&#xff0c;正是为了满足这一需求而设…

entity线段材质设置

在cesium中,我们可以改变其entity线段材质,这里以直线为例. 首先我们先创建一条直线 const redLine viewer.entities.add({polyline: {positions: Cesium.Cartesian3.fromDegreesArray([-75,35,-125,35,]),width: 5,material:material, 保存后可看到在地图上创建了一条线段…

[STM32] 5-1 时钟树(上)

文章目录 前言5-1 时钟树&#xff08;上&#xff09;时钟树的基本介绍时钟树的基本结构大树和小树频率运算简介计数器和分频STM32内部结构树的结构于关键节点SYSCLK(System Clock) 系统时钟 72M maxHCLK(AHB Clock) AHB时钟 36M maxPLCK(APB1 Clock) APB1时钟 36M maxPLCK2(APB…

【Linux网络与网络编程】12.NAT技术内网穿透代理服务

1. NAT技术 之前我们说到过 IPv4 协议中IP 地址数量不充足的问题可以使用 NAT 技术来解决。还提到过本地主机向公网中的一个服务器发起了一个网络请求&#xff0c;服务器是怎么将应答返回到该本地主机呢&#xff1f;&#xff08;如何进行内网转发&#xff1f;&#xff09; 这就…

从辅助到协作:GitHub Copilot的进化之路

如果说现代程序员的标配工具除了VS Code、Stack Overflow之外&#xff0c;还有谁能入选&#xff0c;那一定是GitHub Copilot。从2021年首次亮相&#xff0c;到如今深度集成进开发者日常流程&#xff0c;这个“AI编程助手”已经不只是写几行自动补全代码的小帮手了&#xff0c;而…