基于SpringBoot解决RabbitMQ消息丢失问题

news2025/6/8 21:09:44

基于SpringBoot解决RabbitMQ消息丢失问题

  • 一、RabbitMQ解决消息丢失问题
  • 二、方案实践
    • 1、在生产者服务相关配置
    • 2、在消费者服务相关配置
  • 三、测试验证
    • 1、依次启动RabbitMQ、producer(建议先清空队列里面旧的测试消息再启动consumer)和consumer
    • 2、在producer中调用接口,发送消息
    • 3、观察控制台打印日志
  • 四、项目结构及源码
    • 1、项目结构
    • 2、源码下载

一、RabbitMQ解决消息丢失问题

RabbitMQ通过以下机制来保证消息的可靠性,从而解决消息丢失问题:

(1)消息持久化:RabbitMQ支持将消息持久化到磁盘,即使RabbitMQ服务器宕机或重启,消息也不会丢失。在发布消息时,可以设置消息的持久化标志,这样消息就会被写入磁盘中,而不是仅仅保存在内存中。在自定义MQ的配置中,设置消息队列和交换机,默认为true代表持久化。

(2)消息确认机制:RabbitMQ提供了消息确认机制,即生产者在发送消息后,可以等待RabbitMQ服务器返回确认信息,以确保消息已经被正确地接收和处理。如果RabbitMQ服务器没有返回确认信息,生产者可以选择重新发送消息或者采取其他的补救措施。在自定义MQ的配置中,配置RabbitTemplate,设置setConfirmCallback和setReturnCallback进行确认。

(3)事务机制:RabbitMQ还支持事务机制,即生产者可以将多个操作封装在一个事务中,只有当所有的操作都成功完成后,才提交事务。如果某个操作失败,整个事务会被回滚,从而保证消息的完整性和一致性。

(4)消息重试机制:如果消息在传输过程中出现异常,RabbitMQ会自动进行消息重试,直到消息被正确地处理为止。可以通过设置重试次数和重试时间间隔来控制消息重试的行为。可以在application.yaml配置文件中设置

  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    #1、确保消息从发送端到服务端投递可靠(分为以下两个步骤)
    #1.1、确认消息已发送到交换机(Exchange) 可以把publisher-confirms: true 替换为  publisher-confirm-type: correlate
    publisher-confirm-type: correlated
    #1.2、确认消息从交换机中到队列中
    publisher-returns: true

综上所述,RabbitMQ通过持久化、确认、事务和重试等机制来保证消息的可靠性,从而解决消息丢失的问题。下面按照上述4个机制进行代码实践。

二、方案实践

1、在生产者服务相关配置

1.1 通过自定义RabbitConfig设置消息投递失败的策略为返回到客户端,处理返回的消息(请注意!如果你使用了延迟队列插件,那么一定会调用该callback方法,因为数据并没有提交上去)。

@Slf4j
@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);

        //设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。
        //我们既然要做可靠性,当然是设置为返回到客户端(true是返回客户端,false是自动删除)
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    log.info("ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack);
                } else {
                    log.info("ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause);
                }
            }
        });

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                // 请注意!如果你使用了延迟队列插件,那么一定会调用该callback方法,因为数据并没有提交上去,
                // 而是提交在交换器中,过期时间到了才提交上去,并非是bug!你可以用if进行判断交换机名称来捕捉该报错
                /*if (exchange.equals("你声明的延迟队列的交换机")) {
                    return;
                }*/
                log.info("ReturnsCallback 消息被退回:{},回应码:{},回应信息:{},交换机:{},路由键:{}"
                        , returnedMessage.getMessage(), returnedMessage.getReplyCode()
                        , returnedMessage.getReplyText(), returnedMessage.getExchange()
                        , returnedMessage.getRoutingKey());
            }
        });

        return rabbitTemplate;
    }
}

DirectRabbitConfig直连交换机配置

@Slf4j
@Configuration
public class DirectRabbitConfig {
    private static final String QUEUE = "TestDirectQueue";
    private static final String EXCHANGE = "TestDirectExchange";
    private static final String ROUTING_KEY = "TestDirectRouting";

    /**
     * 创建一个名为TestDirectQueue的队列
     *
     * @return
     */
    @Bean
    public Queue testDirectQueue() {
        // durable:是否持久化,默认为true,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。
        // arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。
        return new Queue(QUEUE);
    }

    /**
     * 创建一个名为TestDirectExchange的Direct类型的交换机
     *
     * @return
     */
    @Bean
    public DirectExchange testDirectExchange() {
        // durable:是否持久化,true,持久化交换机。
        // autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。
        // arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机
        return new DirectExchange(EXCHANGE);
    }

    /**
     * 绑定交换机和队列
     *
     * @return
     */
    @Bean
    public Binding bindingDirect() {
        //bind队列to交换机中with路由key(routing key)
        return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(ROUTING_KEY);
    }
}

2、在消费者服务相关配置

2.1 配置DirectConsumer

/**
 * 直连交换机消息
 *
 * @author
 * @DATE 2025/6/2
 **/
@RabbitListener(queues = "TestDirectQueue")
@Component
@Slf4j
public class DirectConsumer {

   @RabbitListener(queues = "TestDirectQueue")
@Component
@Slf4j
public class DirectConsumer {

    /*@RabbitHandler
    public void process(Object data, Channel channel, Message message) throws IOException {
        log.info("消费者接受到的消息是:{},消息体为:{}", data, message);
        //由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }*/

    @RabbitHandler
    public void process(Object message, Channel channel,@Headers Map<String,Object> map) {
        System.out.println(message);
        //这里只是模拟业务,其实还有很多可能,比如验证用户的银行卡号已经过期等等,都可以发出nack
        if (map.get("error")!= null){
            System.out.println("错误的消息");
            try {
                channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true);      //否认消息
                return;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        try {
            System.out.println("业务在这里执行!");
            channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);            //确认消息
        } catch (IOException e) {
            //实际业务场景,可能需要将上面的channel.basicNack,放到异常里面进行重试!
            e.printStackTrace();

        }
    }

}

三、测试验证

1、依次启动RabbitMQ、producer(建议先清空队列里面旧的测试消息再启动consumer)和consumer

在这里插入图片描述
首先调用接口,生成交换机和队列,否则,在启动consumer时候会报找不到交换机和队列错误。
然后在RabbitMQ界面手动清空队列消息,防止干扰本次实验。
在这里插入图片描述

2、在producer中调用接口,发送消息

在这里插入图片描述

3、观察控制台打印日志

生产者控制台
在这里插入图片描述
消费者控制台
在这里插入图片描述
可以看到由于接口中第1,4,5条消息会正常发送,所以在consumer已经进行了正常消费,并且针对第5条进行了业务重试。
由于第2,3条分别由于交换机错误或者队列错误,导致消息发送失败。

四、项目结构及源码

1、项目结构

在这里插入图片描述

2、源码下载

RabbitMQ,欢迎Star

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2404569.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

免费插件集-illustrator插件-Ai插件-随机填色

文章目录 1.介绍2.安装3.通过窗口>扩展>知了插件4.功能解释5.总结 1.介绍 本文介绍一款免费插件&#xff0c;加强illustrator使用人员工作效率&#xff0c;实现路径随机填色。首先从下载网址下载这款插件https://download.csdn.net/download/m0_67316550/87890501&#…

Web设计之登录网页源码分享,PHP数据库连接,可一键运行!

HTML 页面结构&#xff08;index.html&#xff09; 1. 流星雨动态背景 2. 主体界面&#xff08;包含登录和注册表单&#xff09; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport&qu…

Cursor + Claude 4:微信小程序流量主变现开发实战案例

前言 随着微信小程序生态的日益成熟&#xff0c;越来越多的开发者开始关注如何通过小程序实现流量变现。本文将详细介绍如何使用Cursor编辑器结合Claude 4 AI助手&#xff0c;快速开发一个具备流量主变现功能的微信小程序&#xff0c;并分享实际的开发经验和变现策略。 项目…

Redis Key过期策略

概述 Redis的Key过期策略是其内存管理系统的核心组成部分&#xff0c;主要包括「被动过期」、「主动过期」和「内存淘汰」三个机制。其中「内存淘汰」相关内容已经在上一篇「Redis内存淘汰策略」中进行了详细的讲解&#xff0c;有信兴趣的同学可以在回顾上一篇文章。本文将着重…

【C/C++】实现固定地址函数调用

在 C 里&#xff0c;函数地址在程序运行期间通常是固定的&#xff0c;不过在动态链接库&#xff08;DLL&#xff09;或者共享库&#xff08;SO&#xff09;中&#xff0c;函数地址可能会因为地址空间布局随机化&#xff08;ASLR&#xff09;而改变。所以我们想要通过地址直接调…

多模态大语言模型arxiv论文略读(109)

Math-PUMA: Progressive Upward Multimodal Alignment to Enhance Mathematical Reasoning ➡️ 论文标题&#xff1a;Math-PUMA: Progressive Upward Multimodal Alignment to Enhance Mathematical Reasoning ➡️ 论文作者&#xff1a;Wenwen Zhuang, Xin Huang, Xiantao Z…

性能优化笔记

性能优化转载 https://www.cnblogs.com/tengzijian/p/17858112.html 性能优化的一般策略及方法 简言之&#xff0c;非必要&#xff0c;不优化。先保证良好的设计&#xff0c;编写易于理解和修改的整洁代码。如果现有的代码很糟糕&#xff0c;先清理重构&#xff0c;然后再考…

Scrapy爬虫教程(新手)

1. Scrapy的核心组成 引擎&#xff08;engine&#xff09;&#xff1a;scrapy的核心&#xff0c;所有模块的衔接&#xff0c;数据流程梳理。 调度器&#xff08;scheduler&#xff09;&#xff1a;本质可以看成一个集合和队列&#xff0c;里面存放着一堆即将要发送的请求&#…

在Windows下编译出llama_cpp_python的DLL后,在虚拟环境中使用方法

定位编译生成的文件 在VS2022编译完成后&#xff0c;在构建目录&#xff08;如build/Release或build/Debug&#xff09;中寻找以下关键文件&#xff1a; ggml.dll、ggml_base.dll、ggml_cpu.dll、ggml_cuda.dll、llama.dll&#xff08;核心动态链接库&#xff09; llama_cp…

关于datetime获取时间的问题

import datetime print(datetime.now())如果用上述代码&#xff0c;会报错&#xff1a; 以下才是正确代码&#xff1a; from datetime import datetime print(datetime.now()) 结果&#xff1a; 如果想格式化时间&#xff0c;使用代码&#xff1a; from datetime import da…

Unity版本使用情况统计(更新至2025年5月)

UWA发布&#xff5c;本期UWA发布的内容是Unity版本使用统计&#xff08;第十六期&#xff09;&#xff0c;统计周期为2024年11月至2025年5月&#xff0c;数据来源于UWA网站&#xff08;www.uwa4d.com&#xff09;性能诊断提测的项目。希望给Unity开发者提供相关的行业趋势作为参…

GPUCUDA 发展编年史:从 3D 渲染到 AI 大模型时代(上)

目录 文章目录 目录1960s~1999&#xff1a;GPU 的诞生&#xff1a;光栅化&#xff08;Rasterization&#xff09;3D 渲染算法的硬件化实现之路 学术界算法研究历程工业界产品研发历程光栅化技术原理光栅化技术的软件实现&#xff1a;OpenGL 3D 渲染管线设计 1. 顶点处理&…

人机融合智能 | 可穿戴计算设备的多模态交互

可穿戴计算设备可以对人体以及周围环境进行连续感知和计算,为用户提供随时随地的智能交互服务。本章主要介绍人机智能交互领域中可穿戴计算设备的多模态交互,阐述以人为中心的智能穿戴交互设计目标和原则,为可穿戴技术和智能穿戴交互技术的设计提供指导,进而简述支持智能穿戴交…

Impromptu VLA:用于驾驶视觉-语言-动作模型的开放权重和开放数据

25年5月来自清华和博世的论文“Impromptu VLA: Open Weights and Open Data for Driving Vision-Language-Action Models”。 用于自动驾驶的“视觉-语言-动作” (VLA) 模型前景光明&#xff0c;但在非结构化极端场景下却表现不佳&#xff0c;这主要是由于缺乏有针对性的基准测…

AI智能体,为美业后端供应链注入“智慧因子”(4/6)

摘要&#xff1a;本文深入剖析美业后端供应链现状&#xff0c;其产品具有多样性、更新换代快等特点&#xff0c;原料供应和生产环节也面临诸多挑战。AI 智能体的登场为美业后端供应链带来变革&#xff0c;包括精准需求预测、智能化库存管理、优化生产计划排程、升级供应商管理等…

跨平台资源下载工具:res-downloader 的使用体验

一款基于 Go Wails 的跨平台资源下载工具&#xff0c;简洁易用&#xff0c;支持多种资源嗅探与下载。res-downloader 一款开源免费的下载软件(开源无毒、放心使用)&#xff01;支持Win10、Win11、Mac系统.支持视频、音频、图片、m3u8等网络资源下载.支持视频号、小程序、抖音、…

数据湖是什么?数据湖和数据仓库的区别是什么?

目录 一、数据湖是什么 &#xff08;一&#xff09;数据湖的定义 &#xff08;二&#xff09;数据湖的特点 二、数据仓库是什么 &#xff08;一&#xff09;数据仓库的定义 &#xff08;二&#xff09;数据仓库的特点 三、数据湖和数据仓库的区别 &#xff08;一&#…

【深度学习新浪潮】如何入门三维重建?

入门三维重建算法技术需要结合数学基础、计算机视觉理论、编程实践和项目经验,以下是系统的学习路径和建议: 一、基础知识储备 1. 数学基础 线性代数:矩阵运算、向量空间、特征分解(用于相机矩阵、变换矩阵推导)。几何基础:三维几何(点、线、面的表示)、射影几何(单…

Codeforces Round 1025 (Div. 2) B. Slice to Survive

Codeforces Round 1025 (Div. 2) B. Slice to Survive 题目 Duelists Mouf and Fouad enter the arena, which is an n m n \times m nm grid! Fouad’s monster starts at cell ( a , b ) (a, b) (a,b), where rows are numbered 1 1 1 to n n n and columns 1 1 1 t…

ubuntu中使用docker

上一篇我已经下载了一个ubuntu:20.04的镜像&#xff1b; 1. 查看所有镜像 sudo docker images 2. 基于本地存在的ubuntu:20.04镜像创建一个容器&#xff0c;容器的名为cppubuntu-1。创建的时候就会启动容器。 sudo docker run -itd --name cppubuntu-1 ubuntu:20.04 结果出…