RabbitMQ核心机制——延迟队列

news2025/6/1 22:51:20

一、 什么是延迟队列?

    消息发送之后,不想让消费者马上收到消息,而是等待特定时间后消费者才能拿到这条消息进行消费。


二、 如何实现延迟队列

    RabbitMQ并没有直接支持延迟队列这一功能,如果需要实现延迟队列,有两种方法可以实现:

1> TTL + 死信队列:给普通队列或消息设置TTL,但没有消费者监听普通队列,消息过期后通过死信交换机路由到死信队列,死信队列的消费者获取消息,就达到了延迟的效果,如下图:

2> 插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 通过这个链接下载好插件,安装即可

下载好插件之后(注意插件的版本要与RabbitMQ版本对应),通过下列命令安装插件:

#进入下列目录,这是附加目录,如果没有就自己创建一个
cd /usr/lib/rabbitmq/plugins

#查看插件列表
rabbitmq-plugins list

#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

#重启服务
service rabbitmq-server restart

如果在管理界面的交换机——>新建交换机看到下图这个交换机,就代表安装好了:

准备工作完成,接下来看如何通过这两种方式实现延迟队列。


三、 基于 TTL + 死信队列 实现

 准备工作:

(1)声明队列、交换机、及绑定关系

    @Bean("normalQueue")
    public Queue normalQueue() {
        return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(10000).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();
    }

    @Bean("normalExchange")
    public DirectExchange normalExchange() {
        return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
    }

    @Bean("normalBinding")
    public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("normal");
    }

    @Bean("dlQueue")
    public Queue dlQueue() {
        return QueueBuilder.durable(Constants.DL_QUEUE).build();
    }

    @Bean("dlExchange")
    public DirectExchange dlExchange() {
        return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
    }

    @Bean("dlBinding")
    public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("dl");
    }

(2)生产者代码

    @RequestMapping("/delay")
    public String delay(){
        rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test...");
        System.out.printf("%tc 消息发送成功 \n",new Date());
        return "消息发送成功";
    }

(3)消费者代码

@Component
public class DelayListener {
    @RabbitListener(queues = Constants.DL_QUEUE)
    public void messageHandler(Message message) throws UnsupportedEncodingException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("[delay.queue] %tc 接收到消息: %s, deliveryTag: %d \n",new Date(),new String(message.getBody(),"UTF-8"),deliveryTag);

        //业务处理
    }
}

 

3.1 设置队列TTL + 死信队列

上面的代码就是设置 队列的TTL + 死信队列,这里直接测试:

结果预测:由于上面给队列设置的TTL为10s,因此发送消息10s后消息就因该被消费

可以看到,确实达到了延迟效果,消息发送后10消费者才接收到消息


3.2 设置消息TTL + 死性队列(不推荐)

     前面学习死信队列时,我们知道,如果队列前面的消息比后面的消息过期时间长,那么后面的消息必须等待前面的消息被判定为过期才能继续判定后面的消息是否过期,如果使用 设置消息的TTL + 死信队列 来实现延迟队列是否会出现问题?不妨一试

一、修改normal队列声明,修改生产者代码

    @Bean("normalQueue")
    public Queue normalQueue() {
        return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();
    }
    @RequestMapping("/delay")
    public String delay(){
        MessagePostProcessor messagePostProcessor1 = message -> {
            message.getMessageProperties().setExpiration("10000");
            return message;
        };
        MessagePostProcessor messagePostProcessor2 = message -> {
            message.getMessageProperties().setExpiration("30000");
            return message;
        };
        rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test 30s...",messagePostProcessor2);
        rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test 10s...",messagePostProcessor1);
        System.out.printf("%tc 消息发送成功 \n",new Date());
        return "消息发送成功";
    }

二、运行程序,测试

预期结果:10s后收到第一条消息,再过20s手收到另一条消息

可以看到,两条消息都在30s后才被消费者接收,显然不符合期望

     可以看到,通过设置 消息的TTL + 死信队列 来实现延迟效果是可能会出现问题的,在实际应用中,推荐使用 队列TTL + 死信队列 或 插件 来实现延迟队列,而不是 消息TTL + 死信队列 来实现。


四、 通过插件实现

    通过插件实现延迟队列非常简单,只需要在声明交换机时通过delayed方法指定这是一个延迟交换机即可。

一、声明队列、交换机及绑定关系

    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
    }

    @Bean("delayExchange")
    public DirectExchange delayExchange() {
        return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();//通过delayed方法声明这是一个延迟交换机
    }

    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("delay");
    }


二、消费者代码

@Component
public class DelayListener {
    @RabbitListener(queues = Constants.DELAY_QUEUE)
    public void messageHandler(Message message) throws UnsupportedEncodingException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("[delay.queue] %tc 接收到消息: %s, deliveryTag: %d \n",new Date(),new String(message.getBody(),"UTF-8"),deliveryTag);

        //业务处理
    }
}

三、生产者代码

    @RequestMapping("/delay")
    public String delay(){
        MessagePostProcessor messagePostProcessor1 = message -> {
            message.getMessageProperties().setDelayLong(10000l);
            return message;
        };
        MessagePostProcessor messagePostProcessor2 = message -> {
            message.getMessageProperties().setDelayLong(30000l);
            return message;
        };
        rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay test 30s...",messagePostProcessor2);
        rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay test 10s...",messagePostProcessor1);
        System.out.printf("%tc 消息发送成功 \n",new Date());
        return "消息发送成功";
    }

四、运行程序,测试

预期结果,10s后收到第一条消息,再过20s收到第二条消息

符合预期结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2386832.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为云Flexus+DeepSeek征文|Flexus云服务器Dify-LLM资源部署极致体验Agent

前引:重磅来袭!本次以DeepSeek-V3/R1商用大模型和Dify-LLM应用平台一键部署为核心,专为新手打造“开箱即用”的AI开发体验。无论你是想快速搭建企业级AI应用,还是探索大模型落地的无限可能,只需跟随小编实现三步走&…

【Elasticsearch入门到落地】13、DSL查询详解:分类、语法与实战场景

接上篇《12、索引库删除判断以及文档增删改查》 上一篇我们讲解了如何判断索引库是否存在并删除它,以及如何对索引库中的文档进行增删改查操作。本篇我们进入ElasticSearch的DSL语法的详解。 Elasticsearch(ES)作为强大的分布式搜索引擎&…

[欠拟合过拟合]机器学习-part10

7.欠拟合过拟合 7.1欠拟合 欠拟合是指模型在训练数据上表现不佳,同时在新的未见过的数据上也表现不佳。这通常发生在模型过于简单,或者是训练的次数不够,无法捕捉数据中的复杂模式时。欠拟合模型的表现特征如下: 训练误差较高。 …

【windwos】文本编辑器Notepad++ 替代品Notepad--

一、N和N--对比介绍 曾经备受推崇的「Notepad」曾是Windows上的经典代码编辑器。然而,作者的一些政治言论已经让它被广大中国用户抛弃。 一个名为「Notepad--」的新编辑器,也是开源免费,功能和实用性也在尽可能接近。与此同时,「N…

Linux基本指令篇 —— clear指令

clear 是 Linux 和 Unix 系统中用于清空终端屏幕的常用命令。它的作用是移除当前终端窗口中的所有可见内容,提供一个干净的界面,类似于“刷新”终端。以下是关于 clear 的详细解析: 目录 1. 基本用法 2. 实现原理 3. 常见场景 场景 1&…

阿里云DDoS防护:万一被“黑”了,如何更换IP地址?

阿里云DDoS防护:万一被“黑”了,如何更换IP地址“绝地反击”? 各位站长、运维老铁、业务负责人们,大家好!在如今这个网络世界,最让人提心吊胆的,莫过于遭遇**DDoS攻击(分布式拒绝服…

【小白量化智能体】应用2:编写通达信绘图指标及生成Python绘图程序

【小白量化智能体】应用2:编写通达信绘图指标及生成Python绘图程序 【小白量化智能体】是指能够自主或半自主地通过与环境的交互来实现目标或任务的计算实体。智能体技术是一个百科全书,又融合了人工智能、计算机科学、心理学和经济学等多个领域的知识&a…

C++23 std::start_lifetime_as:用于隐式生存期类型的显式生存期管理函数 (P2590R2)

文章目录 一、C23简介二、std::start_lifetime_as 基本概念函数原型模板参数参数返回值注意事项示例代码 三、std::start_lifetime_as 的作用1. 避免复杂的拷贝操作2. 保持对象表示不变3. 简化代码逻辑 四、std::start_lifetime_as 的使用场景1. 内存池管理2. 类型双关&#xf…

Innodb底层原理与Mysql日志机制深入刨析

MySQL的内部组件结构 大体来说,MySQL 可以分为 Server 层和存储引擎层两部分。 Server层 主要包括连接器、查询缓存、分析器、优化器、执行器等,涵盖 MySQL 的大多数核心服务功能,以及所有的内置函数(如日期、时间、数学和加密函数等),所有跨存储引擎的功能都在这一层实…

JMeter-SSE响应数据自动化

结构图 背景: 需要写一个JMeter脚本来进行自动化测试,主要是通过接口调用一些东西,同时要对响应的数据进行处理,包括不限于错误信息的输出。 1.SSE(摘录) SSE(Server-Sent Events)是一种基于HTTP协议、允许…

泛型(1)

1.泛型的理解和好处 使用传统方法的问题分析 (1)不能对加入到集合ArrayList中的数据类型进行约束 (2)遍历的时候,需要进行类型装换,如果集合中的数量较大,对效率有影响. 使用泛型的好处 (1)使用泛型添加 (检查元素的类型,提高了安全性.) (2)减少了类型转换的次数,提高效率…

esp8266 点灯科技远程控制继电器

手机端安装点灯科技app 打开 Arduino IDE 编辑&#xff1a; #define BLINKER_WIFI #include <Blinker.h> char auth[] "点灯科技 key"; char ssid[] "wifi ID"; char pswd[] "WiFi key"; // 新建组件对象 BlinkerButton Button1(&q…

MMA: Multi-Modal Adapter for Vision-Language Models论文解读

abstract 预训练视觉语言模型&#xff08;VLMs&#xff09;已成为各种下游任务中迁移学习的优秀基础模型。然而&#xff0c;针对少样本泛化任务对VLMs进行微调时&#xff0c;面临着“判别性—泛化性”困境&#xff0c;即需要保留通用知识&#xff0c;同时对任务特定知识进行微…

使用 Cannonballs 进行实用导体粗糙度建模

在 GB/s 制度下&#xff0c;导体损耗的精确建模是高速串行链路设计成功的前提。未能对粗糙度效果进行建模可能会毁了您的一天。例如&#xff0c;图 1 显示了与测量数据相比&#xff0c;无粗糙度的 40 英寸印刷电路板 &#xff08;PCB&#xff09; 走线的模拟总损耗。总损耗是电…

Spring Boot 注解 @ConditionalOnMissingBean是什么

一句话总结&#xff1a; ConditionalOnMissingBean 是 Spring Boot 提供的一个 条件注解&#xff08;Conditional Annotation&#xff09;&#xff0c;意思是&#xff1a; 只有当 Spring 容器中 不存在 某个 Bean 时&#xff0c;当前的 Bean 或配置才会被加载。 这是一种典型的…

(先发再改)测试流程标准文档

Revision Record 修订记录 序号 修改日期 修改章节 修改描述 拟制 审批 修订版本 1 20250520 初稿 v1.0 目录 1. 文档概述... 7 1.1 文档目的... 7 1.1.1 标准化质量保障流程... 7 1.1.2.…

亚马逊SP-API开发实战:商品数据获取与操作

一、API接入准备 开发者注册&#xff1a; 登录亚马逊开发者中心申请SP-API权限 完成MWS迁移&#xff08;如适用&#xff09; 认证配置&#xff1a; # OAuth2.0认证示例 import requests auth_url "https://api.amazon.com/auth/o2/token" params { "…

行为型:策略模式

目录 1、核心思想 2、实现方式 2.1 模式结构 2.2 实现案例 3、优缺点分析 4、适用场景 5、优化技巧 1、核心思想 目的&#xff1a;将算法&#xff08;行为&#xff09;抽象出来作为一系列策略类&#xff0c;使他们可以相互替换&#xff0c;使系统拥有“可插拔”扩展的能…

知识宇宙-学习篇:开源项目 README 文档该如何写?

名人说&#xff1a;博观而约取&#xff0c;厚积而薄发。——苏轼《稼说送张琥》 创作者&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 目录 一、README 文档的重要性1. 项目的第一印象2. 搜索引擎优化的重要载体 二、现代 RE…

YOLOv12增加map75指标

YOLOv12源码&#xff1a;https://github.com/sunsmarterjie/yolov12 第一步&#xff1a;更改Val.py文件 地址&#xff1a;该文件在yolov12-main\ultralytics\models\yolo\detect下 首先定位到def get_desc(self):这个函数上 代码修正如下&#xff1a; def get_desc(self):&q…