RabbitMQ 延时消息实现

news2025/7/9 14:18:16

1. 实现方式

1. 设置队列过期时间:延迟队列消息过期 + 死信队列,所有消息过期时间一致
2. 设置消息的过期时间:此种方式下有缺陷,MQ只会判断队列第一条消息是否过期,会导致消息的阻塞
   需要额外安装 `rabbitmq_delayed_message_exchange` 插件才能解决此问题
  • 导入Spring 集成RabbitMQ MAEVN
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

2. 设置队列过期时间:延迟队列消息过期 + 死信队列

推送消息至延迟队列 -> 消息过期自动推送到死信队列 -> 消费死信队列

2.1. MQ配置信息

2.1.1. 自定义队列配置

…/bootstrap.yml

# rabbitmq自定义配置
rabbitmq:
  ttlExchange: medical_dev_ttl_topic_change
  ttlKey: dev_ttl
  ttlQueue: medical.dev.ttl.topic.queue
  delayExpireTime: 600
  ttlQueueSize: 10000
  deadExchange: medical_dev_dead_topic_change
  deadKey: dev_dead
  deadQueue: medical.dev.dead.topic.queue
2.1.2. 读取自定义MQ配置信息
/**
 * amqp配置文件
 */
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class MyConfigProperties {

    /**
     * 延迟队列
     */
    public String ttlExchange;
    public String ttlKey;
    public String ttlQueue;
    private Integer delayExpireTime;
    public Integer ttlQueueSize;

    /**
     * 死信队列
     */
    public String deadExchange;
    public String deadKey;
    public String deadQueue;

}

2.2. 配置文件自动生成队列

2.2.1. 延迟队列
import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.HashMap;

/**
 * 延迟队列配置文件
 * 
 * @author mingAn.xie
 */
@Configuration
public class RabbitMQConfigTTL {

    @Resource
    MyConfigProperties myConfigProperties;

    // 1: 声明交换机
    @Bean
    public TopicExchange ttlTopicExchange(){
        return new TopicExchange(myConfigProperties.getTtlExchange());
    }

    // 2: 声明队列
    // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    @Bean
    public Queue ttlTopicduanxinQueue(){
        HashMap<String, Object> args = new HashMap<>();
        // 给队列设置消息过期时间:毫秒值
        args.put("x-message-ttl", mqConfigProperties.getDelayExpireTime() * 1000);
        // 设置队列最大长度
        args.put("x-max-length", myConfigProperties.getTtlQueueSize());
        // 设置死信队列交换机名称
        // 当消息在一个队列中变成死信后,它能就发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列被称之为死信队列
        // 编程死信队列的原因:消息被拒绝,消息过期,队列达到最大长度
        args.put("x-dead-letter-exchange", myConfigProperties.getDeadExchange());
        // 设置死信队列路由key
        args.put("x-dead-letter-routing-key", myConfigProperties.getDeadKey());
        return new Queue(myConfigProperties.getTtlQueue(), true, false, false, args);
    }

    // 3: 绑定对用关系
    @Bean
    public Binding ttlTopicsmsBinding(){
        return BindingBuilder.bind(ttlTopicduanxinQueue()).to(ttlTopicExchange()).with(myConfigProperties.getTtlKey());
    }

}
2.2.2. 死信队列

import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * 死信队列配置文件
 * 
 * @author mingAn.xie
 */
@Configuration
public class RabbitMQConfigDead {

    @Resource
    MyConfigProperties myConfigProperties;

    // 1: 声明交换机
    @Bean
    public TopicExchange deadTopicExchange(){
        return new TopicExchange(myConfigProperties.getDeadExchange());
    }

    // 2: 声明队列
    // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    @Bean
    public Queue deadTopicduanxinQueue(){
        return new Queue(myConfigProperties.getDeadQueue(), true);
    }

    // 3: 绑定对用关系
    @Bean
    public Binding deadTopicsmsBinding(){
        return BindingBuilder.bind(deadTopicduanxinQueue()).to(deadTopicExchange()).with(myConfigProperties.getDeadKey());
    }

}

2.3. 生产者推送消息

import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * RabbitMQ生产者推送消息类
 * 
 * @author xiemingan
 */
@Component
@Slf4j
public class RabbitmqProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Resource
    private MyConfigProperties myConfigProperties;

    /**
     * @param pushMessage 推送消息体
     */
    public void pushTtlMessage(String pushMessage) {
		// 推送消息至交换机,并指定路由key
        rabbitTemplate.convertAndSend(myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);
        log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}", myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);
    }

}

2.4. 消费者处理消息

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * @author mingAn.xie
 */
@Log4j2
@Component
public class RabbitmqConsumer {

    /**
     * 消费死信队列
     * @param message 消息体
     */
    @RabbitListener(queues = "${rabbitmq.deadQueue}")
    public void pushMessages(Message message) {

        String body = new String(message.getBody()).trim();
        if (StringUtils.isEmpty(body)){
            return;
        }
        log.info("MQ消息消费, RabbitmqConsumer.pushMessages() : {}", body);
    }

}

3. 设置消息的过期时间

设置交换机类型为 x-delayed-type,推送消息至交换机,直连队列消费

3.1. 安装插件 rabbitmq_delayed_message_exchange

前言:这里默认使用环境为 Liunx 系统 Docker 安装 RabbitMQ

具体可以参考这篇文章:Docker 安装 RabbitMQ 挂载配置文件

安装插件版本需要与RabbitMQ版本一致,否则可能会导致安装失败,可先进入RabbitMQ容器中查看其他插件版本

插件各版本地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  • 这里以最新版本 v3.13.0 举例
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez

# 将插件复制进容器中: rabbitmq_xxxxxx
docker cp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq_xxxxxx:/plugins

# 进入容器: rabbitmq_xxxxxx
docker exec -it rabbitmq_xxxxxx bash
cd plugins

# 查询插件列表, 此处可看到插件的版本
rabbitmq-plugins list

# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 交换机类型中出现 x-delayed-type 表示安装成功

3.2. MQ配置信息

3.2.1. 自定义队列配置

…/bootstrap.yml

#mq队列自定义配置
rabbitmq:
  saveTaskTtlExchange: ey240001_pro_save_task_ttl_topic_exchange
  saveTaskTtlKey: ey240001_pro_save_task_ttl
  saveTaskTtlQueue: ey240001.pro.save.task.ttl.topic.queue
  saveTaskTtlQueueSize: 10000
3.2.2. 读取自定义MQ配置信息
/**
 * amqp配置文件
 *
 * @author mingAn.xie
 */
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class MyConfigProperties {

    /**
     * 任务待办生成延时队列
     */
    public String saveTaskTtlExchange;
    public String saveTaskTtlKey;
    public String saveTaskTtlQueue;
    public Integer saveTaskTtlQueueSize;

}

3.3. 配置文件生成 x-delayed-type 交换机

import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * x-delayed-type 交换机延迟队列配置
 * 
 * @author mingAn.xie
 */
@Configuration
public class RabbitMQConfigSaveTaskTtl {

    @Resource
    MyConfigProperties myConfigProperties;

    // 1: 声明交换机
    @Bean
    public CustomExchange saveTaskTopicExchange() {
        Map<String, Object> args = new HashMap<>();
        // 设置延迟队列插件类型:按过期时间消费
        args.put("x-delayed-type", "direct");
        // 参数:name 交换机名称,type 交换机类型,durable 是否持久化,autoDelete 是否自动删除,arguments 参数
        return new CustomExchange(myConfigProperties.getSaveTaskTtlExchange(), "x-delayed-message", true, false, args);
    }

    // 2: 声明队列
    // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    @Bean
    public Queue saveTaskTopicduanxinQueue() {
        return new Queue(myConfigProperties.getSaveTaskTtlQueue(), true, false, false);
    }

    // 3: 绑定对用关系
    @Bean
    public Binding saveTaskTopicsmsBinding() {
        return BindingBuilder.bind(saveTaskTopicduanxinQueue()).to(saveTaskTopicExchange()).with(myConfigProperties.getSaveTaskTtlKey()).noargs();
    }

}

3.4. 生产者推送消息

import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 生产者推送消息类
 * 
 * @author xiemingan
 */
@Component
@Slf4j
public class RabbitmqProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private MyConfigProperties myConfigProperties;

    /**
     * @param pushMessage 推送消息体
     * @param ttlTime     延时时间(毫秒值)
     */
    public void pushTtlMessage(String pushMessage, long ttlTime) {
        ttlTime = ttlTime <= 0 ? 1000 : ttlTime;
        // 3.1.推送MQ延迟消息队列
        long finalTtlTime = ttlTime;
        MessagePostProcessor messagePostProcessor = message -> {
            // 设置延迟时间
            message.getMessageProperties().setDelay((int) finalTtlTime);
            return message;
        };
        rabbitTemplate.convertAndSend(myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, messagePostProcessor);
        log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}, ttlTime: {}", myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, ttlTime);
    }

}

3.5. 消费者处理消息

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * @author mingAn.xie
 */
@Log4j2
@Component
public class RabbitmqConsumer {

    /**
     * 消费延时消息
     * @param message 消息体
     */
    @RabbitListener(queues = "${rabbitmq.saveTaskTtlQueue}")
    public void pushMessages(Message message) {

        String body = new String(message.getBody()).trim();
        if (StringUtils.isEmpty(body)) {
            return;
        }
        log.info("MQ延迟消息消费, RabbitmqConsumer.pushMessages() : {}", body);

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1555311.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

327京东一面

1.项目相关 2.手撕SQL 两道 3.JMeter性能测试 首先&#xff0c;进行基准测试&#xff1a; 单用户测试&#xff08;单用户循环多次得到的数据&#xff09;&#xff1b;为多用户并发执行提供参考 其次&#xff0c;进行负载测试&#xff1a; 通过逐步增加系统负载&#xff0…

2024年水电站大坝安全监测工作提升要点

根据《水电站大坝运行安全监督管理规定》&#xff08;国家发改委令第23号&#xff09;和《水电站大坝运行安全信息报送办法》&#xff08;国能安全〔2016〕261号&#xff09;的相关规定、要求&#xff0c;电力企业应当在汛期向我中心报送每日大坝汛情。近期&#xff0c;全国各地…

Qt+OpenGL入门教程(三)——绘制三角形

通过前两篇文章的学习&#xff0c;我想大家应该有了基本的理解&#xff0c;我们接下来实操一下。 创建Qt OpenGL窗口 QOpenGLWidget QGLWidget是传统QtOpenGL模块的一部分&#xff0c;与其他QGL类一样&#xff0c;应该在新的应用程序中避免使用。相反&#xff0c;从Qt5.4开始…

蓝桥杯23年第十四届省赛真题-三国游戏|贪心,sort函数排序

题目链接&#xff1a; 1.三国游戏 - 蓝桥云课 (lanqiao.cn) 蓝桥杯2023年第十四届省赛真题-三国游戏 - C语言网 (dotcpp.com) 虽然这道题不难&#xff0c;很容易想到&#xff0c;但是这个视频的思路理得很清楚&#xff1a; [蓝桥杯]真题讲解&#xff1a;三国游戏&#xff0…

备考ICA----Istio实验14---出向流量管控Egress Gateways实验

备考ICA----Istio实验14—出向流量管控Egress Gateways实验 1. 发布测试用 pod kubectl apply -f istio/samples/sleep/sleep.yaml kubectl get pods -l appsleep2. ServiceEntry 创建一个ServiceEntry允许流量访问edition.cnn.com egressgw/edition-ServiceEntry.yaml api…

前端bugs

问题&#xff1a; Failed to load plugin typescript-eslint declared in package.json eslint-config-react-app#overrides[0]: Cannot find module eslint/package.json 解决&#xff1a; google了一晚上还得是chatgpt管用 运行以下命令【同时还要注意项目本身使用的Node版…

亚信安全联合人保财险推出数字安全保障险方案,双重保障企业数字化转型

数字化发展&#xff0c;新兴技术的应用与落地带来网络攻击的进一步演进升级&#xff0c;同时全球产业链供应链融合协同的不断加深&#xff0c;更让网络威胁的影响范围与危害程度不断加剧。 企业单纯依靠自身安全能力建设&#xff0c;能否跟上网络威胁的进化速度&#xff1f;能否…

Day55:WEB攻防-XSS跨站CSP策略HttpOnly属性Filter过滤器标签闭合事件触发

目录 XSS跨站-安全防御-CSP XSS跨站-安全防御-HttpOnly XSS跨站-安全防御-XSSFilter(过滤器的意思) 1、无任何过滤 2、实体化 输入框没有 3、全部实体化 利用标签事件 单引号闭合 4、全部实体化 利用标签事件 双引号闭合 5、事件关键字过滤 利用其他标签调用 双引号闭合…

物联网监控可视化是什么?部署物联网监控可视化大屏有什么作用?

随着物联网技术的深入应用&#xff0c;物联网监控可视化成为了企业数字化转型的关键环节。物联网监控可视化大屏作为物联网监控平台的重要组成部分&#xff0c;能够实时展示物联网设备的运行状态和数据&#xff0c;为企业管理决策和运维监控提供了有力的支持。今天&#xff0c;…

【OpenBayes 官方教程】在模型训练中使用子域名访问服务

本教程主要为大家讲解 OpenBayes 上如何在模型训练中使用子域名访问服务&#xff0c;新朋友点击下方链接注册后&#xff0c;即可获得 4 小时 RTX 4090 5 小时 CPU 的免费使用时长哦&#xff01; 注册链接 https://openbayes.com/console/signup?ryuudi_nBBThttps://openbay…

matlab及其在数字信号处理中的应用001:软件下载及安装

目录 一&#xff0c;matlab的概述 matlab是什么 matlab适用于的问题 matlab的易扩展性 二&#xff0c;matlab的安装 1&#xff0c;解压所有压缩文件 2&#xff0c;解压镜像压缩文件 3&#xff0c;运行setup.exe 4&#xff0c;开始安装 5&#xff0c;不要运行软件…

慧天【HTWATER】:水文水动力模型的革命性工具,城市内涝的精准解决方案

城市内涝水文水动力模型介绍 在城市排水防涝规划过程中&#xff0c;水文水动力耦合模型已经成为一种不可或缺的分析工具。在模型建立、城市内涝风险评估、排水系统性能诊断以及海绵城市规划等方面&#xff0c;内涝耦合模型提供了相应的模拟及分析工具&#xff1a; 1.1丰富的数…

Windows系统下ESP-IDF环境的搭建

第一步&#xff1a;clone项目&#xff0c;建议是下到"Desktop\esp-idf"的路径 下载地址&#xff1a;https://github.com/espressif/esp-idf 第二步&#xff1a;在VSCode下载ESP-IDF插件 第三步&#xff1a;在Setup页面选择第一个选项EXPRESS 参考文档&#xff1a;h…

visual studio报:引发的异常:“System.DllNotFoundException”(位于 ConsoleCAN1.exe 中)

最近在重构CAN通信的代码&#xff0c;把论文中的java转为C#实现&#xff0c;由于某种原因&#xff0c;java不能复现&#xff0c;所以转为c#。 然而c#的重构过程遇到许多问题&#xff0c;因为两种语言的编程方式、线程等等实现上有所差异。 其中一个错误&#xff1a; 引发的异…

java数据结构与算法刷题-----LeetCode34. 在排序数组中查找元素的第一个和最后一个位置

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 二分查找 二分查找 解题思路&#xff1a;时间复杂度O( l o g 2 …

LeetCode.2908. 元素和最小的山形三元组 I

题目 2908. 元素和最小的山形三元组 I 分析 首先&#xff0c;看到这道题&#xff0c;第一反应就是暴力方法&#xff0c;三层for循环&#xff0c;枚举每一种情况&#xff0c;代码如下 class Solution {public int minimumSum(int[] nums) {int min Integer.MAX_VALUE;for(i…

mongodb sharding分片模式的集群数据库,日志治理缺失导致写入数据库报错MongoWriteConcernException的问题总结(上)

一、背景 常见的mongodb集群模式有以下三种&#xff1a; 主从复制&#xff08;Master-Slave&#xff09;模式副本集&#xff08;Replica Set&#xff09;模式分片&#xff08;Sharding&#xff09;模式 公司测试环境搭建的集群采用分片模式&#xff0c;有同事反馈说&#xf…

Linux内核之debugfs_create_dir与debugfs_create_file实例与调用栈流程(三十二)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

【C语言】linux内核tcp_push函数

一、讲解 这个 tcp_push 函数是在Linux内核的TCP网络栈实现中&#xff0c;用于推动TCP缓冲区中待发送数据包的传输。这段代码需要在具备操作系统和网络编程知识背景下来解释。下面我将分步骤用中文逐一讲解这个函数的作用&#xff1a; 1. struct tcp_sock *tp tcp_sk(sk);&am…

阿里云魔搭发起“ModelScope-Sora开源计划”,将为中国类Sora模型开发提供一站式工具链

在2024年3月23日的全球开发者先锋大会上&#xff0c;阿里云的魔搭社区宣布了一个新计划&#xff1a;“ModelScope-Sora开源计划”。这个计划旨在通过开源方式&#xff0c;帮助中国在Sora模型类型上做出更多创新。这个计划提供了一整套工具&#xff0c;包括处理数据的工具、多模…