【学习笔记】RabbitMQ04:延迟队列的原理以及实现代码

news2025/7/7 4:48:23

参考资料

  • RabbitMQ官方网站
  • RabbitMQ官方文档
  • 噼咔噼咔-动力节点教程

文章目录

    • 七、延迟队列
      • 7.1 什么是延迟队列
      • 7.2 延迟队列的解决方案
        • 7.2.1 定时任务
        • 7.2.2 **被动取消**
        • 7.2.3 JDK的延迟队列
        • 7.2.3 采用消息中间件(rabbitMQ
          • 7.2.3.1 适用专门优化后的死信队列实现延迟队列
          • 7.2.3.2 :star:实例代码
          • 7.2.3.2 测试结果
        • 7.2.4 使用rabbitmq_delayed_message_exchange插件.
          • 7.2.4.1 插件下载
          • 7.2.4.2 :star:如何在docker环境下安装插件
          • 7.2.4.3 :star: 代码示例:如何使用该插件
          • 7.2.4.4 测试结果
      • 7.3 问题:多个消息的延迟时间不同该如何解决?
        • 7.3.1 解决方案一:用延迟队列区分
        • 7.3.2 使用延迟队列插件rabbitmq_delayed_message_exchange

七、延迟队列

7.1 什么是延迟队列

正常的MQ应用场景中,我们希望消息可以快速稳定的传递。但是有一些场景中,希望在指定的延迟后再消费信息,比如订单支付场景(订单15部分内未支付则关闭订单)。

这类实现延迟任务的场景,就可以采用延迟队列来实现。

以下介绍一下其他的一些方法。

7.2 延迟队列的解决方案

7.2.1 定时任务

每隔n秒扫描一次数据库,查询数据库装为过期的订单进行处理。

实现方式

spring schedule、quartz、xxljob等

优点

简单,容易实现;

缺点

  1. 存在延迟(受定时器延迟时间限制
  2. 性能较差,每次扫描数据库,如果订单量交大,会给数据库造成较大压力。
7.2.2 被动取消

当用户主动查询订单时,判断订单是否超时,超时则取消

  • 优点:服务器压力小
  • 缺点:如果用户长时间不查询,则会造成统计异常;而且用户打开订单页面会变慢,严重的话会影响用户体验
7.2.3 JDK的延迟队列

DelayedQueue:无界阻塞队列,该队列只有在延迟期满后,才能从中获取元素。

优点

实现简单,任务的延迟低。

缺点

  • 服务器重启宕机,数据会丢失
  • 只适用于单机版
  • 订单量大时,可能会造成内存不足:OOM
7.2.3 采用消息中间件(rabbitMQ

RabbitMQ 本身不支持延迟队列,可以使用 TTL 结合 DLX 的方式来实现消息的延迟投递(前面提到的死信队列)。.

image-20231017141210411

把 DLX 跟某个队列绑定,到了指定时间,消息过期后,就会从 DLX 路由到这个队列,消费者可以从DLX的队列中取走消息。

7.2.3.1 适用专门优化后的死信队列实现延迟队列

在上面的mq方案中,存在两个不同的交换机,我们可以利用直连交换机的特性,将交换机优化成一个交换机,同时通过不同的routingKey指定普通队列和死信队列。

image-20231017141445269

思路解释

  1. 生产者发送消息到交换机X,并指定ttl的key
  2. 消息被交换机传递到ttl队列中(指定了消息过期时间的队列
  3. 同时,ttl队列还指定的死信交换机DLX为自身的交换机X,但是指定的routingKey为死信队列的key
  4. 这样,当消息在ttl队列中到期后,这条消息就会被传递到死信队列中,提供给消费者
7.2.3.2 ⭐️实例代码

为了便于测试,将发送和接收写在同一个服务中

配置信息

@Configuration
public class DelayExchangeConfig {
    public static String exchangeName = "order.ttl.exchange";
    public static String orderQ = "order.ttl.queue";
    public static String dlxQ = "order.dlx.queue";

    @Bean
    public DirectExchange delayedExchange(){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }

    @Bean
    public Queue orderQueue(){
        // 指定该队列的过期时间和死信队列
        Map<String , Object> properties = new HashMap<>();
        properties.put("x-message-ttl" , 15000);
        properties.put("x-dead-letter-exchange" , exchangeName);
        properties.put("x-dead-letter-routing-key" , "dead-letter");
        return QueueBuilder.durable(orderQ).withArguments(properties).build();
    }

    @Bean
    public Queue dlxQueue(){
        return QueueBuilder.durable(dlxQ).build();
    }

    @Bean
    public Binding dlxBinding1(){
        return BindingBuilder.bind(dlxQueue()).to(this.delayedExchange()).with("dead-letter");
    }

    @Bean
    public Binding ttlBinding1(){
        return BindingBuilder.bind(dlxQueue()).to(this.delayedExchange()).with("order");
    }

}

测试代码

@RestController
@RequestMapping("/delay")
@Slf4j
public class DelayedController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{msg}")
    public void sentErrorMsg(@PathVariable("msg") String msg) {
        log.info("(延迟队列)准备发送的信息:{} , 路由键 :{}", msg, "order");
        // 发送到普通的延时列表中
        rabbitTemplate.convertAndSend(exchangeName, "order", msg.getBytes(StandardCharsets.UTF_8));
        log.info("(延迟队列)成功发送!发送时间{}" , LocalDateTimeUtil.now());
    }

    @RabbitListener(queues = "order.dlx.queue")
    public void receiveDelayedMsg(Message message){
        log.info("(延迟队列)接受到的消息是:{}" , new String(message.getBody()));
    }
}
7.2.3.2 测试结果

配置正确

image-20231017144033384

控制台打印正确:15秒后接收到的了之前发送的信息

image-20231017144116843


7.2.4 使用rabbitmq_delayed_message_exchange插件.
7.2.4.1 插件下载

插件下载地址

  • https://www.rabbitmq.com/community-plugins.html
  • https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
    • 根据自己的rabbit版本,我这里用的是3.9
7.2.4.2 ⭐️如何在docker环境下安装插件

参考文章:https://juejin.cn/post/7138717546894589966

  1. 将下载到的文件,移动到容器内

    docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
    

image-20231017153230781

  1. 进入容器bash指令,并启动插件

    docker exec -it rabbitmq bash
    
    root@rabbit:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    # 使用下面的指令查看插件列表
    rabbitmq-plugins list
    

image-20231017153257970

进入控制台新建交换机,能查看到新的交换机类型

image-20231017154024943

7.2.4.3 ⭐️ 代码示例:如何使用该插件

官方说明文档:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange#usage

image-20231017153803323

理解原理:delay exchange在接受到消息后,会先存在内部数据库中,检查x-delay延迟时间(头部

image-20231017154940504

代码使用思路

  1. 要创建自定义的交换机类型,要使用CustomExchange()来创建。几个参数的解释如下:

    • name:rabbit中交换机的名称
    • type:交换机类型 (x-delayed-message)
    • durable:是否持久
    • autoDelete:是否自动删除
    • arguments:参数信息
  2. arguments:参数信息从官方文档中获取

    // ... elided code ...
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
    // ... more code ...
    
  3. 交换机创建好后,只需要创建一条队列即可,并进行绑定

  4. 注意:消息发送需要在头部存放信息headers.put("x-delay", 延迟时间)。不需要使用自带的expiration来控制延迟时间了

配置类

@Configuration
public class DelayPluginConfig {
    public static String exchangeName = "delay-x-plugin.x";
    public static String key = "demo";
    @Bean
    public CustomExchange customExchange(){
        // 参考官方文档,创建插件提供的自定义交换机
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        // public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
        return new CustomExchange(exchangeName, "x-delayed-message" , true , false , args);
    }

    @Bean
    public Queue delayDemoQueue(){
        return QueueBuilder.durable("delay-x-plugin.queue.demo").build();
    }

    @Bean
    public Binding delayPluginBinding(){
        return BindingBuilder
                .bind(delayDemoQueue())
                .to(customExchange())
                .with(key)
                .noargs();
    }
}

生产者

@RestController
@RequestMapping("/delay/plugin")
@Slf4j
public class DelayedPluginController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{delay}/{msg}")
    public void sentErrorMsg(@PathVariable("msg") String msg, @PathVariable("delay") Long delay) {
        log.info("(延迟插件队列)准备发送的信息:{} ,延迟时间:{} 路由键 :{}", msg, delay , "demo");
        // 在头部设置过期时间
        MessageProperties properties = new MessageProperties();
        properties.setHeader("x-delay", delay);
        Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
        // 发送信息
        rabbitTemplate.convertAndSend(exchangeName, "demo", message);
        log.info("(延迟插件队列)成功发送!发送时间:{}", new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date()));
    }

    @RabbitListener(queues = "delay-x-plugin.queue.demo")
    public void receiveDelayedMsg(Message message) {
        log.info("(延迟插件队列)接受到的消息是:{}", new String(message.getBody()));
    }
}

7.2.4.4 测试结果

生成交换机和队列

image-20231017160126659image-20231017160147125

访问路径/delay/plugin/25000/一条25秒过期的信息:查看日志打印:成功

image-20231017160422203

7.3 问题:多个消息的延迟时间不同该如何解决?

由于队列先进先出的特性,如果不同消息的延迟时间不同,一旦出现后进的消息延迟时间小于先进的队列,那么消息过期的时间就会出错。

7.3.1 解决方案一:用延迟队列区分

要解决这个问题,就需要将队列的延迟时间统一,将不同的延迟的消息发送到对应延迟的队列中。

保证队列的延迟时间和消息的延迟时间是一样的即可。

如下

image-20231017144817671

7.3.2 使用延迟队列插件rabbitmq_delayed_message_exchange

由于该插件的原理并不是单纯的队列实现,而是使用rabbit内部数据库时间,所以可以很好的解决问题。

可以进行一个简单测试验证:

  • 先发送一条25秒过期的信息,再发送3条5秒过期的信息

  • 查看结果:正常消费,解决问题

    image-20231017160917110

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1103601.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

防火墙规则顺序解决方案

防火墙是保护网络免受攻击的第一道防线&#xff0c;防火墙对互联网和公司IT网络之间的流量拥有绝对控制权&#xff0c;防火墙规则的配置处理调节流量的关键任务。 这些规则会仔细检查传入和传出流量&#xff0c;并根据规则中提到的条件允许或阻止它&#xff0c;防火墙规则越严…

leetCode 5. 最长回文子串 动态规划 + 优化空间 / 中心扩展法 + 双指针

5. 最长回文子串 - 力扣&#xff08;LeetC5. 最长回文子串 - 力扣&#xff08;LeetCode&#xff09;5. 最长回文子串 - 力扣&#xff08;LeetC 给你一个字符串 s&#xff0c;找到 s 中最长的回文子串。如果字符串的反序与原始字符串相同&#xff0c;则该字符串称为回文字符串。…

制造企业如何做好MES管理系统需求分析

随着制造业的不断发展&#xff0c;制造企业对于生产过程的管理需求日益增长。为了提高生产效率和质量&#xff0c;越来越多的制造企业开始关注MES生产管理系统的需求分析。本文将从以下几个方面探讨制造企业如何做好MES管理系统需求分析。 一、明确需求 在进行MES管理系统需求…

利用PHP快速抓取音频数据的方法与技巧

目录 使用cURL库抓取音频数据 优点 潜在问题及解决方案 使用file_get_contents函数抓取音频数据 优点 潜在问题及解决方案 总结 随着互联网的发展&#xff0c;音频内容在网络上的应用越来越广泛&#xff0c;如音乐播放、语音通信等。有时&#xff0c;我们需要从特定的音…

电脑缺失dll文件有什么办法快速解决,dll文件是什么

玩游戏时经常会出现dll文件缺失&#xff0c;那么dll文件是什么&#xff1f;都有哪些办法可以解决dll文件缺失&#xff1f;今天就带大家了解dll文件以及解决dll文件缺失的办法&#xff0c;看完这篇文章相信你会有很大收获&#xff0c;接下来往下看。 一.Dll文件 Dll文件是VC运…

互联网Java工程师面试题·Java 总结篇·第二弹

目录 12、用最有效率的方法计算 2 乘以 8&#xff1f; 13、数组有没有 length()方法&#xff1f;String 有没有 length()方法&#xff1f; 14、在 Java 中&#xff0c;如何跳出当前的多重嵌套循环&#xff1f; 15、构造器&#xff08;constructor&#xff09;是否可被重写&…

汽车安全的未来:毫米波雷达在碰撞避免系统中的角色

随着科技的飞速发展&#xff0c;汽车安全系统变得愈加智能化&#xff0c;而毫米波雷达技术正是这一领域的亮点之一。本文将深入探讨毫米波雷达在汽车碰撞避免系统中的关键角色&#xff0c;以及其对未来汽车安全的影响。 随着城市交通的拥堵和驾驶环境的变化&#xff0c;汽车安全…

腾讯云入选挑战者象限,2023 Gartner容器管理魔力象限发布

10月17日&#xff0c;记者获悉&#xff0c;腾讯云入围在Gartner刚刚发布的2023《容器管理魔力象限》报告&#xff08;Magic Quadrant™ for Container Management&#xff09;中&#xff0c;并位列挑战者象限&#xff0c;执行力维度排名国内第二。 Gartner的魔力象限报告是业界…

Spring: 通过注解获取Bean对象

目录 一, 属性注入 属性注入的优点: 属性注入的缺点 二. Setter注入 Setter注入的优点: Setter注入的缺点: 三, 构造方法注入 (主流方式) 构造方法注入的优点 构造方法注入的缺点 四, Autowired与Resource区别 获取Bean对象也叫对象注入(对象装配), 指把对象取出来放…

阿里云starrocks监控告发至钉钉群

背景&#xff1a;新入职一家公司&#xff0c;现场没有对sr的进行监控&#xff0c;根据开发的需求编写了一个python脚本。 脚本逻辑&#xff1a;抓取sr的be/fe/routine load状态信息&#xff0c;判读是否触发告警&#xff0c;若满足告警条件&#xff0c;则发送告警信息到钉钉群…

C# GFPGAN 图像(人脸面部)修复

效果 项目 代码 using Microsoft.ML.OnnxRuntime; using Microsoft.ML.OnnxRuntime.Tensors; using OpenCvSharp; using System; using System.Collections.Generic; using System.Drawing; using System.Drawing.Imaging; using System.Windows.Forms;namespace 图像修复 {pu…

小程序首页搭建

小程序首页搭建 1. Flex布局是什么&#xff1f;2. 容器的属性2.1 flex-direction属性2.2 flex-wrap属性2.3 flex-flow属性2.4 justify-content属性2.5 align-items属性2.6 align-content属性 二.首页布局搭建二.1moke模拟数据实现轮播图4.信息搭建 Flex弹性布局 1. Flex布局是…

iperf3交叉编译

简介 iperf3是一个用于执行网络吞吐量测量的命令行工具。它支持时序、缓冲区、协议&#xff08;TCP&#xff0c;UDP&#xff0c;SCTP与IPv4和IPv6&#xff09;有关的各种参数。对于每次测试&#xff0c;它都会详细的带宽报告&#xff0c;延迟抖动和数据包丢失。 如果是ubuntu系…

[ConvNet]卷积神经网络概念解析

在初步接触了深度学习以后&#xff0c;我们把目光投向对于一些图像的识别。 其实在d2l这本书中&#xff0c;我们接触过用深度神经网络去识别一个图像&#xff0c;并且对其进行一个分类操作&#xff0c;核心原理是将图像展开成一维tensor&#xff0c;然后作为特征进行检测。 其…

JAVA基础(JAVA SE)学习笔记(二)变量与运算符

前言 1. 学习视频&#xff1a; 尚硅谷Java零基础全套视频教程(宋红康2023版&#xff0c;java入门自学必备)_哔哩哔哩_bilibili 2023最新Java学习路线 - 哔哩哔哩 正文 第一阶段&#xff1a;Java基本语法 1. Java 语言概述 JAVA基础&#xff08;JAVA SE&#xff09;学习…

源码解析flink文件连接源TextInputFormat

背景&#xff1a; kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性 TextInputFormat源码解析 首先flink会把输入的文件进行切分&#xff0c;分成多个数据块的形式&#xff0c;每个数据源算子任务会被分配以读取…

【面试经典150 | 区间】插入区间

文章目录 Tag题目解读题目来源解题思路方法一&#xff1a;合并区间方法二&#xff1a;模拟 其他语言python3 写在最后 Tag 【模拟】【数组】 题目解读 给定一个含有多个无重叠区间的数组&#xff0c;并且数组已经按照区间开始值升序排序。在列表中插入一个新的区间&#xff0…

unity动画_UI动画案例 c#

首先我们打开一个项目 在这个初始界面我们需要做一些准备工作 创建基础通用包 在场景上创建一个Cube 选中Cube 在Window下点击Animation拖拽至运行窗口 点击创建 保存后 这个操作是给Cube添加了一个组件 对Cube_添加一个Position动画 设置几个帧位置的坐标(x,y,z)值 点击运行测…

PHP 如何查看php函数源码

一、在git找到php对应的版本 找到对应的分支版本可以下载也可以在线直接查看 通过这个地址 https://github.com/php/php-src 二、下面已shuffle函数举例&#xff0c;版本为7.4 找到对应的版本进入 点击ext&#xff0c;这个文件夹里面是存放函数的目录 在文件夹里搜不到stu…

Linux使用rpm包安装mysql5.7

以前安装过mysql 前言&#xff1a;检查以前是否装有mysql rpm -qa|grep -i mysql安装了会显示&#xff1a;   bt-mysql57-5.7.31-1.el7.x86_64 停止mysql服务和删除之前安装的mysql rpm -e bt-mysql57-5.7.31-1.el7.x86_64查找并删除mysql相关目录 find / -name mysql/va…