基于SpringBoot+Redis实现RabbitMQ幂等性设计,解决MQ重复消费问题

news2025/6/4 11:27:11

解决MQ重复消费问题


一、实现方案
本方案参考 「RabbitMQ消息可靠性深度解析|从零构建高可靠消息系统的实战指南」,向开源致敬!
1、业务层幂等处理:
每个消息携带一个全局唯一ID,在业务处理过程中,首先检查这个ID是否已经被处理过。例如,将已处理消息的ID记录到数据库的“已处理消息表”中,下次收到同样ID的消息时直接返回成功而不进行实际操作。
对于更新型操作,可以使用乐观锁或分布式锁来保证同一事务多次执行结果相同,例如通过版本号(version)控制更新操作,只有当版本号未变时才执行更新。
对于创建型操作,确保即使多次调用也不会生成多个资源,例如通过查询是否存在相同的唯一键来决定是否创建新的资源。
2、确认模式选择:
使用acknowledgement模式,消费者接收到消息后必须发送确认给RabbitMQ,只有收到确认后RabbitMQ才会从队列中移除消息,否则会在连接恢复后重新投递。
设置publisher confirms,生产者可以得到消息发布的确认,确保消息确实到达了MQ服务器并持久化存储。
3、死信队列与重试策略:
配置死信交换机和死信队列,对于那些重复投递依然无法正确处理的消息,可以转移到死信队列,并设置相应的重试策略及最大重试次数,超过限制则记录日志、报警或手动介入处理。
4、幂等服务设计:
设计能够应对重复调用的服务接口,这些接口内部应该包含足够的逻辑判断以识别重复请求并作出正确的响应。
5、事务与补偿机制:
对于涉及多个系统的分布式事务场景,可以考虑采用TCC(Try-Confirm-Cancel)模式或其他分布式事务解决方案,使得整个流程具有幂等性。
总结来说,在RabbitMQ中实现幂等性主要依赖于业务逻辑层面的改造和优化,同时配合RabbitMQ自身的消息确认机制来确保消息不会因为异常情况而重复处理。
二、代码实战
1、通过雪花算法生成分布式唯一ID

/**
 * SnowflakeIdWorker雪花算法
 *
 * @author huahua
 * @DATE 2025/5/25
 **/
@Component
public class SnowflakeIdWorker {
    // 起始的时间戳 (2010-01-01)
    private final long twepoch = 1288834974657L;

    // 机器标识位数
    private final long workerIdBits = 5L;
    private final long datacenterIdBits = 5L;

    // 序列号位数
    private final long sequenceBits = 12L;

    // 工作机器ID最大值
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    // 数据中心ID最大值
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

    // 每一部分向左的偏移量
    private final long workerIdShift = sequenceBits;
    private final long datacenterIdShift = sequenceBits + workerIdBits;
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

    // 时间戳边界值
    private long lastTimestamp = -1L;

    // 工作节点ID(0~31)
    @Value("${snowConfig.workerId}")
    private long workerId;

    // 数据中心ID(0~31)
    @Value("${snowConfig.datacenterId}")
    private long datacenterId;

    // 每个节点每毫秒内的序列号
    private AtomicLong sequence = new AtomicLong(0L);

    /**
     * 通过专属工作节点ID和数据中心ID构建专属的雪花算法工具类
     */
    public SnowflakeIdWorker() {
        if (this.workerId > maxWorkerId || this.workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (this.datacenterId > maxDatacenterId || this.datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
    }

    /**
     * 分布式唯一ID生成
     * @return
     */
    public synchronized long nextId() {
        long timestamp = timeGen();

        // 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(
                String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        // 如果是同一时间生成的,则进行序列号的自增
        if (lastTimestamp == timestamp) {
            sequence.incrementAndGet();
            // 判断是否溢出
            if (sequence.get() > (-1L ^ (-1L << sequenceBits))) {
                // 阻塞到下一个时间戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            // 时间戳改变,重置序列号
            sequence.set(0L);
        }

        // 上次生成ID的时间截
        lastTimestamp = timestamp;

        // 移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) |
            (datacenterId << datacenterIdShift) |
            (workerId << workerIdShift) | sequence.get();
    }

    /**
     * 从给定的最后时间戳中获取下一个时间戳
     *
     * @param lastTimestamp 最后时间戳
     * @return 下一个时间戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * 生成当前时间的毫秒数。
     *
     * @return 当前时间的毫秒数。
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }
}

2、通过枚举类,设计Message消费状态

/**
 * Message消费状态
 *
 * @author huahua
 * @DATE 2025/5/30
 **/
public enum RabbitStatusEnum {
    CONSUME(0, "待消费"),
    BEGIN(1, "开始消费"),
    SUCCESS(2, "成功"),
    FAIL(3, "失败");

    private Integer code;

    private String message;

    RabbitStatusEnum(Integer code, String message) {
        this.code = code;
        this.message = message;
    }

    public int getCode() {
        return code;
    }

    public void setCode(Integer code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    /**
     * 获取需要执行的状态集合
     *
     * @return
     */
    public static List<Integer> getNeedExecuteList() {
        return Arrays.asList(CONSUME.getCode(), FAIL.getCode());
    }

    /**
     * 获取不需要执行的状态集合
     *
     * @return
     */
    public static List<Integer> getCompletionExecuteList() {
        return Arrays.asList(CONSUME.getCode(), FAIL.getCode());
    }
}

3、自定义RedisKey

/**
 * 自定义RedisKey
 *
 * @author
 * @DATE 2025/5/30
 **/
public enum RedisKeyEnum {
    MQ_STATUS("mq:messageId");
    private String key;

    RedisKeyEnum(String key) {
        this.key = key;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }
}

4、MQConfig
将自定义"direct.exchange"和"direct.queue1"通过"direct.key"进行绑定

@Configuration
public class MQConfig {
    / direct消息 start ///
    /**
     * 声明direct交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct.exchange");
    }

    /**
     * 声明direct队列:direct.queue1
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 将direct.queue1队列绑定到交换机上
     */
    @Bean
    public Binding bindingDirectQueue1(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("direct.key");
    }

    @Bean
    public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }
}

5、生产者RabbitMQProducer
生产者发送消息时,生成专属分布式唯一业务ID,通过Redis记录消息的消费状态。

/**
 * 生产者
 *
 * @author
 * @DATE 2025/5/31
 **/
@RestController
@Slf4j
public class RabbitMQProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Resource
    private SnowflakeIdWorker snowflakeIdWorker;

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    /**
     * 批量发送消息
     *
     * @param message
     */
    @RequestMapping("/sendQueueBatch")
    public void sendQueueBatch(String message) {
        //请求头设置消息id(messageId)
        Map<String, Object> map = new HashMap<>();
        map.put("message", message);
        for (int i = 0; i < 3; i++) {
            long id = snowflakeIdWorker.nextId();
            map.put("id", id);
            JSONObject entries = JSONUtil.parseObj(map);
            redisTemplate.opsForValue().set(RedisKeyEnum.MQ_STATUS.getKey() + id, RabbitStatusEnum.CONSUME.getCode());
            rabbitTemplate.convertAndSend("direct.exchange", "direct.key", entries);
        }
        log.info("3个消息都发送成功");
    }
}

6、消费者RabbitMqConsumer
在@RabbitListener注解中设置了ackMode=“MANUAL”,这意味着消息确认将由开发者手动完成。当接收到消息时,可以通过获取的Channel对象调用basicAck()、basicNack()或basicReject()方法来进行消息确认或者拒绝操作。

消息开始消费时,记录开始消费的状态
消息成功完成后,记录成功消费的状态

这里是为了避免在消息开始消费后,RabbitMq宕机了,此时MQ并不知道这个消息最终有没有消费完成,因此重启MQ之后,MQ会重新消费这条消息。

因此我们只运行执行“待消费”和“消费失败”状态的消息。
如果在执行消费的过程中,出错了(抛出Exception),则记录消费失败的状态,MQ会再次尝试去进行消费。我们可以设置最多重试次数,以及两次重试消费的间隔时间。

/**
 * RabbitMqConsumer消费者
 *
 * @author huahua
 * @DATE 2025/5/25
 **/
@Slf4j
@Service
public class RabbitMqConsumer implements ChannelAwareMessageListener {
    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    /**
     * 记录消费次数
     */
    private int n = 0;

    @Override
    @RabbitListener(queues = "direct.queue1", ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        if(StringUtils.isEmpty( msg)){
            System.out.println("消息为空:");
            return;
        }
        JSONObject entries = JSONUtil.parseObj(new String(message.getBody()));
        Integer status = (Integer) redisTemplate.opsForValue().get(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"));
        try {
            //只有待消费和消费失败的能进行消费
            if (RabbitStatusEnum.getNeedExecuteList().contains(status)) {
                //记录开始消费
                redisTemplate.opsForValue().set(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"), RabbitStatusEnum.BEGIN.getCode());
                // 处理消息逻辑
                processMessage(entries);
                System.out.println("执行成功了:" + entries.get("id"));
                //记录消费成功
                redisTemplate.opsForValue().set(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"), RabbitStatusEnum.SUCCESS.getCode());
                // 成功处理后手动确认消息
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                channel.basicAck(deliveryTag, false);
            }
        } catch (Exception e) {
            // 处理失败,可以选择重新入队列(取决于业务需求)
            if (shouldRequeueOnFailure()) {
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                channel.basicNack(deliveryTag, false, true);
                System.out.println("执行失败了:" + entries.get("id"));
                //记录消费失败
                redisTemplate.opsForValue().set(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"), RabbitStatusEnum.FAIL.getCode());
            } else {
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                channel.basicReject(deliveryTag, false);
            }
        }
    }

    /**
     * 根据业务需求决定是否重新入队列
     * @return boolean
     */
    private boolean shouldRequeueOnFailure() {
        return true;
    }

    /**
     * 消费逻辑
     *
     * @param entries
     * @throws Exception
     */
    private void processMessage(JSONObject entries) throws Exception {
        n++;
        //模拟MQ消费时长
        Thread.sleep(8000);
        //消费
        System.out.println("Processing id: " + RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"));
        System.out.println("Processing message: " + entries.get("message"));
        System.out.println("第" + n + "次消费");
    }
}

三、测试验证
1、本地启动Redis和RabbitmMQ
在这里插入图片描述
在这里插入图片描述
2、启动生产者工程ProviderApplication,利用postman调用接口生产3个消息
在这里插入图片描述
在这里插入图片描述
观察RabbitmMQ消息状态
在这里插入图片描述
3、启动消费者,在消费完第2个消息后,手动关闭RabbitmMQ服务,模拟宕机/网络波动。之后再手动重启RabbitmMQ服务,查看之前未完成消费的消息是否能重新执行成功。
在这里插入图片描述
在这里插入图片描述
可以看到消费者服务,消费完第2个消息后,由于RabbitMQ宕机,本地服务报错,无法消费第3个消息。而且界面上显示还有1个消息等待被消费。
4、再次手动重启RabbitMQ服务,观察第3个消息消费情况
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

可以看到,手动重启完RabbitMQ服务后,第3个消息被正确消费完成而且Redis服务中生成了3个唯一消息id。
通过上述方案,基本解决了服务宕机或网络波动导致的重复消费问题,
三、项目结构及源码
项目结构及源码如下,欢迎Star!
在这里插入图片描述
源码下载,欢迎Star!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2396278.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用lighttpd和开发板进行交互

文章目录 &#x1f9e0; 一、Lighttpd 与开发板的交互原理1. 什么是 Lighttpd&#xff1f;2. 与开发板交互的方式&#xff1f; &#x1f9fe; 二、lighttpd.conf 配置文件讲解⚠️ 注意事项&#xff1a; &#x1f4c1; 三、目录结构说明&#x1f4a1; 四、使用 C 编写 CGI 脚本…

DRF的使用

1. DRF概述 DRF即django rest framework&#xff0c;是一个基于Django的Web API框架&#xff0c;专门用于构建RESTful API接口。DRF的核心特点包括&#xff1a; 序列化&#xff1a;通过序列化工具&#xff0c;DRF能够轻松地将Django模型转换为JSON格式&#xff0c;也可以将JS…

2024年09月 C/C++(四级)真题解析#中国电子学会#全国青少年软件编程等级考试

C/C++编程(1~8级)全部真题・点这里 第1题:有几个PAT 字符串 APPAPT 中包含了两个单词 PAT,其中第一个 PAT 是第 2 位,第 4 位(A),第 6 位(T);第二个 PAT 是第 3 位,第 4 位(A),第 6 位(T)。 现给定字符串,问一共可以形成多少个 PAT? 时间限制:1000 内存限制:26214…

免费且好用的PDF水印添加工具

软件介绍 琥珀扫描.zip下载链接&#xff1a;https://pan.quark.cn/s/3a8f432b29aa 今天要给大家推荐一款超实用的PDF添加水印工具&#xff0c;它能够满足用户给PDF文件添加水印的需求&#xff0c;而且完全免费。 这款PDF添加水印的软件有着简洁的界面&#xff0c;操作简便&a…

mqtt协议连接阿里云平台

首先现在的阿里云物联网平台已经不在新购了&#xff0c;如下图所示&#xff1a; 解决办法&#xff1a;在咸鱼上租用一个账号&#xff0c;先用起来。 搭建阿里云平台&#xff0c;参考博客&#xff1a; &#xff08;一&#xff09;MQTT连接阿里云物联网平台&#xff08;小白向&…

一文详谈Linux中的时间管理和定时器编程

&#xff08;目录&#xff09; 先说一些在计算机中需要用到时间的地方&#xff1a;系统日志log、OS调度(时间片、定时器)等等~~ 时间的计量 计时的方式发展&#xff1a;日晷、沙漏 -> 机械钟 -> 石英振荡器、晶振 -> 铯原子钟 -> 氢原子钟 计算机中的计时方式&…

性能优化 - 理论篇:常见指标及切入点

文章目录 引言一、 Java 性能优化的核心思路二、为什么要度量&#xff1f;三、常用性能衡量指标详解3.1 吞吐量与响应速度3.2 响应时间的具体度量&#xff1a;平均响应时间与百分位数3.3 并发量3.4 秒开率&#xff08;页面秒开&#xff09;3.5 正确性&#xff08;功能可用性&am…

【论文阅读 | PR 2024 |ICAFusion:迭代交叉注意力引导的多光谱目标检测特征融合】

论文阅读 | PR 2024 |ICAFusion&#xff1a;迭代交叉注意力引导的多光谱目标检测特征融合 1.摘要&&引言2.方法2.1 架构2.2 双模态特征融合&#xff08;DMFF&#xff09;2.2.1 跨模态特征增强&#xff08;CFE&#xff09;2.2.2 空间特征压缩&#xff08;SFS&#xff09;…

华为OD机试真题——模拟消息队列(2025A卷:100分)Java/python/JavaScript/C++/C语言/GO六种最佳实现

2025 A卷 100分 题型 本文涵盖详细的问题分析、解题思路、代码实现、代码详解、测试用例以及综合分析; 并提供Java、python、JavaScript、C++、C语言、GO六种语言的最佳实现方式! 2025华为OD真题目录+全流程解析/备考攻略/经验分享 华为OD机试真题《模拟消息队列》: 目录 题…

MySql(十三)

目录 mysql外键约束 准备工作 创建表 插入数据 创建表时添加外键 1..格式 2..创建表student表时&#xff0c;为其添加外键 3.插入数据测试 正常数据 异常数据 3.使用alter添加外键 删除外键 添加外键 4.Mysql外键不生效的原因 修改引擎 phpystudy的mysql位置 mysql外键约束 注&…

iOS —— UI 初探

简介 第一次新建时&#xff0c;你可能会好奇。为什么有这么多文件&#xff0c;他们都有什么用&#xff1f; App 启动与生命周期管理相关 文件名 类型 作用 main.m m 程序入口&#xff0c;main() 函数定义在这里 AppDelegate.h/.m h/m App 启动/进入后台/退出等全局事…

day23-计算机网络-1

1. 网络简介 1.1. 网络介质 网线&#xff1a;cat5,cat5e 六类网线&#xff0c;七类网线&#xff0c;芭蕾网线光纤&#xff1a;wifi&#xff1a;无线路由器&#xff0c;ap5G 1.2. 常见网线类型 1.2.1. 双绞线&#xff08;Twisted Pair Cable&#xff09;【最常用】 按性能主…

C语言基础(09)【数组的概念 与一维数组】

数组 数组的概念 什么是数组 数组是相同类型、有序数据的集合。 数组的特征 数组中的数据称之为数组的元素(数组中的每一个匿名变量空间&#xff0c;是同构的)数组中的元素存放在内存空间建立。 衍生概念&#xff1a;下标&#xff08;索引&#xff09; 下标或者索引代表…

【JavaScript】Ajax 侠客行:axios 轻功穿梭服务器间

一、AJAX 概念和 axios 使用讲解 什么是 AJAX ? 使用浏览器的 XMLHttpRequest 对象与服务器通信 浏览器网页中&#xff0c;使用 AJAX技术&#xff08;XHR对象&#xff09;发起获取省份列表数据的请求&#xff0c;服务器代码响应准备好的省份列表数据给前端&#xff0c;前端…

Django数据库连接报错 django.db.utils.NotSupportedError: MySQL 8 or later is required

可尝试换django版本 pip install django3.2.13 另外mysql下载地址 https://dev.mysql.com/downloads/installer/ 安装可以参考&#xff1a; https://blog.csdn.net/HHHQHHHQ/article/details/148125549 重点&#xff1a;用户变量添加 C:\Program Files\MySQL\MySQL Server …

2025年- H57-Lc165--994.腐烂的橘子(图论,广搜)--Java版

1.题目描述 2.思路 3.代码实现 import java.util.LinkedList; import java.util.Queue;public class H994 {public int orangesRotting(int[][] grid) {//1.获取行数int rowsgrid.length;int colsgrid[0].length;//2.创建队列用于bfsQueue<int[]> quenew LinkedList<…

(9)-Fiddler抓包-Fiddler如何设置捕获Https会话

1.简介 由于近几年来各大网站越来越注重安全性都改成了https协议&#xff0c;不像前十几年前直接是http协议直接裸奔在互联网。接着讲解如何抓取https协议会话。 2.什么是HTTPS&#xff1f; HTTPS就是加过密的HTTP。使用HTTPS后&#xff0c;浏览器客户端和Web服务器传输的数…

Vue-Router 基础使用

Vue Router 是 Vue 官方的客户端路由解决方案。 客户端路由的作用是在单页应用 SPA 中将浏览器的 URL 和用户看到的内容绑定起来。当用户在应用中浏览不同页面时&#xff0c;URL 会随之更新&#xff0c;但页面不需要从服务器重新加载。 Vue Router 基于 Vue 的组件系统构建&a…

【案例分享】蓝牙红外线影音遥控键盘:瑞昱RTL8752CJF

蓝牙红外线影音遥控键盘 Remotec的无线控制键盘采用瑞昱蓝牙RTL8752CJF解决方案&#xff0c;透过蓝牙5.0与手机配对后&#xff0c;连线至 Remotec 红外 code server 取得对应影音视觉设备的红外 code后&#xff0c;即可控制多达2个以上的影音视觉设备&#xff0c;像是智能电视…

网络协议的原理及应用层

网络协议 网络协议目的为了减少通信成本&#xff0c;所有的网络问题都是传输距离变长的问题。 协议的概念&#xff1a;用计算机语言来发出不同的信号&#xff0c;信号代表不同的含义&#xff0c;这就是通信双方的共识&#xff0c;便就是协议。 协议分层&#xff08;语言层和…