RabbitMQ 幂等性与消息可靠性保障

news2025/5/10 22:25:16

一、引言

RabbitMQ 是一个广泛应用于软件开发、数据传输、微服务等领域的高效、可靠的开源消息队列系统1。在分布式系统中,保证消息的可靠传递和幂等性是至关重要的,它能够确保系统在各种复杂情况下的稳定性和数据的准确性。

二、消息可靠性保障

(一)生产者端

  • 发送方确认机制(Publisher Confirm)
    • 原理:生产者发送消息后,MQ 会返回一个确认消息给生产者,告知消息是否被成功接收。通过在配置文件中开启publisher - confirm - type来启用该功能,一般设置为correlated,表示成功发布消息到交换机后会触发回调方法。生产者可以为每个消息设置一个唯一的CorrelationData作为消息的标识符,在回调方法中根据这个标识符来确定消息的发送结果。
    • 示例配置:在 Spring Boot 项目的application.yml文件中配置如下:
spring:
  rabbitmq:
    addresses: 127.0.0.1
    host: 5672
    username: guest
    password: guest
    virtual - host: /
    # 开启消息确认
    publisher - confirm - type: correlated
  • 事务机制
    • 原理:生产者将发送消息的操作放在一个事务中,如果消息发送过程出现异常,可以回滚事务,确保消息不会丢失或出现不一致的情况。然而,事务机制会阻塞生产者线程,严重影响性能,在生产环境中一般不建议使用4。
    • 示例代码:在使用 RabbitMQ 的 Java 客户端时,可以通过以下方式开启事务:
channel.txSelect();
try {
    // 发送消息的代码
    channel.basicPublish(exchange, routingKey, body);
    channel.txCommit();
} catch (Exception e) {
    channel.txRollback();
}
  • 失败重试机制2
    • 自带重试机制:如果发送方一开始就连不上 MQ,Spring Boot 中利用 Spring 的retry机制来实现重试。可以在配置文件中配置相关参数,如重试起始间隔时间、最大重试次数、最大重试间隔时间和间隔时间乘数等。
    • 示例配置
spring:
  rabbitmq:
    template:
      retry:
        initial - interval: 1000ms
        max - attempts: 10
        max - interval: 10000ms
        multiplier: 2
  • 业务重试:针对消息没有到达交换机的情况,在消息发送失败回调中进行处理。首先创建一张表记录发送到中间件的消息,包括消息的状态、第一次重试时间和重试次数等字段。在消息发送时往表中插入记录,在确认回调方法中根据消息的msgId更新消息状态。另外开启定时任务,定时检查状态为发送中且超过重试时间的消息,根据重试次数决定是否重新发送消息。

(二)MQ 中间件端

  • 消息持久化
    • 队列持久化:创建队列时将其设置为持久化,这样可以保证 Broker 在重启后队列的元数据不会丢失。在定义队列时,将durable参数设置为true即可实现队列持久化。
    • 消息持久化:将消息的deliveryMode设置为2,可以将消息持久化到磁盘。这样只有消息成功持久化到磁盘之后,Broker 才会发送通知给生产者进行确认。
  • 交换机持久化:创建交换机时设置为持久化,保证交换机在 Broker 重启后不会丢失。在定义交换机时,将durable参数设置为true
  • 镜像队列:为了防止 MQ 服务器宕机或磁盘损坏导致消息丢失,可以引入镜像队列。镜像队列会将消息复制到多个节点上,即使某个节点出现故障,其他节点仍然可以提供服务,从而提高系统的可靠性。

(三)消费者端

  • 消费者确认机制(Consumer Acknowledgement)
    • 原理:当消费者处理消息结束后,需要向 RabbitMQ 发送一个回执,以告知消息的处理状态。ACK 表示消费者成功处理了消息,RabbitMQ 会从队列中删除该消息;NACK 表示消息处理失败,RabbitMQ 需要再次投递该消息;REJECT 表示消息处理失败并且被拒绝,RabbitMQ 会从队列中删除该消息,但一般很少使用 REJECT,通常只在消息格式存在问题时使用。
    • 确认模式:RabbitMQ 支持三种不同的确认模式,通过acknowledge - mode属性进行配置。manual模式下,消费者接收到消息后需要手动发送确认给发送者;auto模式下,Spring AMQP 利用 AOP 对消息处理逻辑做环绕增强,业务正常执行时自动返回 ACK,出现异常时根据异常类型返回 NACK 或 REJECT;none模式下,消费者接收到消息后不需要发送任何确认给发送者,这种模式无法保证消息的可靠性,一般不使用。
  • 失败重试机制
    • 本地重试:Spring 框架提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制地将消息重新入队到 MQ 队列。可以通过配置相关参数来控制重试的次数、间隔时间等。在达到最大重试次数后,Spring AMQP 会抛出AmqpRejectAndDontRequeueException异常,并将消息从队列中删除。
    • 重试策略自定义:Spring AMQP 允许开发人员自定义重试次数耗尽后的消息处理策略,通过实现MessageRecovery接口来定义不同的策略,如RejectAndDontRequeueRecoverer(重试次数耗尽后直接拒绝消息并丢弃)、ImmediateRequeueMessageRecoverer(重试次数耗尽后返回 NACK 给生产者使消息重新入队)、RepublishMessageRecoverer(重试次数耗尽后将失败消息投递到指定的交换机和队列中)。

三、幂等性保障

  • 通过唯一标识符保证操作的幂等性
    • 原理:为每个操作生成唯一的标识符(如 ID),并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时,可以跳过重复的操作。Spring AMQP 的MessageConverter自带了MessageID的功能,只要开启这个功能,就可以为每个消息生成唯一的 ID,也可以在业务中基于 ID 判断是否是重复消息。
    • 示例代码
@Bean
public MessageConverter messageConverter() {
    // 定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 配置自动创建消息ID,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}
  • 通过业务判断保证操作的幂等性
    • 基于数据库唯一约束:在数据库表中为相关业务字段设置唯一约束,例如在订单表中,以订单编号作为唯一键。当消费者接收到消息并处理业务时,将相关数据插入或更新到数据库中,如果出现唯一约束冲突,则说明该消息是重复的,直接返回成功即可,避免重复执行相同的业务逻辑。
    • 利用 Redis 缓存:在消费者消费消息之前,先将消息的 ID 放到 Redis 中。可以使用SETNX命令(SET if Not eXists)来设置键值对,如果键已经存在,说明之前有人消费过该消息。可以根据键对应的值来判断消息的处理状态,如0表示正在处理或处理失败,1表示处理成功。当消息成功消费之后,将 ID 对应的值设置为1。为了防止出现死锁等情况,可以给键设置一个生存时间。

四、总结

通过在生产者端、MQ 中间件端和消费者端采取一系列的措施,RabbitMQ 可以有效地保证消息的可靠性和幂等性。在实际应用中,需要根据具体的业务场景和需求,合理地配置和使用这些功能,以确保系统的稳定性和数据的一致性。同时,还需要注意一些性能方面的问题,例如事务机制对性能的影响、重试机制可能导致的资源消耗等,在保证系统可靠性的前提下,尽可能地提高系统的性能和效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2372665.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

修复笔记:获取 torch._dynamo 的详细日志信息

一、问题描述 在运行项目时,遇到与 torch._dynamo 相关的报错,并且希望获取更详细的日志信息以便于进一步诊断问题。 二、相关环境变量设置 通过设置环境变量,可以获得更详细的日志信息: set TORCH_LOGSdynamo set TORCHDYNAM…

Windows平台下的Qt发布版程序打包成exe可执行文件(带图标)|Qt|C++

首先先找一个可执行文件的图标 可以去阿里的矢量图库里找 iconfont-阿里巴巴矢量图标库 找到想要的图标下载下来 此时的图标是png格式的,我们要转到icon格式的文件 要使用到一个工具Drop Icons_2.1.1.rar - 蓝奏云 生成icon文件后把icon文件放到你项目的根目录下…

CSS--图片链接垂直居中展示的方法

原文网址&#xff1a;CSS--图片链接垂直居中展示的方法-CSDN博客 简介 本文介绍CSS图片链接垂直居中展示的方法。 图片链接 问题复现 源码 <html xml:lang"cn" lang"cn"><head><meta http-equiv"Content-Type" content&quo…

TRAE 配置blender MCP AI自动3D建模

BlenderMCP - Blender模型上下文协议集成 BlenderMCP通过模型上下文协议(MCP)将Blender连接到Claude AI&#xff0c;允许Claude直接与Blender交互并控制Blender。这种集成实现了即时辅助的3D建模、场景创建和操纵。 1.第一步下载 MCP插件(addon.py):Blender插件&#xff0c;在…

VUE2课程计划表练习

主要练习数据变量对象 以下是修正后的完整代码&#xff1a; //javascript export default {data() {return {list: [{ id: 1, subject: Vue.js 前端实战开发, content: 学习指令&#xff0c;例如 v-if、v-for、v-model 等, place: 自习室, status: false }// 可以在这里添加更…

2025年软件工程与数据挖掘国际会议(SEDM 2025)

2025 International Conference on Software Engineering and Data Mining 一、大会信息 会议简称&#xff1a;SEDM 2025 大会地点&#xff1a;中国太原 收录检索&#xff1a;提交Ei Compendex,CPCI,CNKI,Google Scholar等 二、会议简介 2025年软件开发与数据挖掘国际会议于…

.NET高频技术点(持续更新中)

1. .NET 框架概述 .NET 框架的发展历程.NET Core 与 .NET Framework 的区别.NET 5 及后续版本的统一平台 2. C# 语言特性 异步编程&#xff08;async/await&#xff09;LINQ&#xff08;Language Integrated Query&#xff09;泛型与集合委托与事件属性与索引器 3. ASP.NET…

pandas中的数据聚合函数:`pivot_table` 和 `groupby`有啥不同?

pivot_table 和 groupby 是 pandas 中两种常用的数据聚合方法&#xff0c;它们都能实现数据分组和汇总&#xff0c;但在使用方式和输出结构上有显著区别。 0. 基本介绍 groupby分组聚合 groupby 是 Pandas 库中的一个功能强大的方法&#xff0c;用于根据一个或多个列对数据进…

对golang中CSP的理解

概念&#xff1a; CSP模型&#xff0c;即通信顺序进程模型&#xff0c;是由英国计算机科学家C.A.R. Hoare于1978年提出的。该模型强调进程之间通过通道&#xff08;channel&#xff09;进行通信&#xff0c;并通过消息传递来协调并发执行的进程。CSP模型的核心思想是“不要通过…

【LunarVim】CMake LSP配置

在 LunarVim 中为 CMakeLists.txt 文件启用代码提示&#xff08;如补全和语义高亮&#xff09;&#xff0c;需要安装支持 CMake 的 LSP&#xff08;语言服务器&#xff09;和适当的插件。以下是完整配置指南&#xff1a; 1、配置流程 1.1 安装cmake-language-server 通过 Ma…

Mkdocs页面如何嵌入PDF

嵌入PDF 嵌入PDF代码 &#xff0c;注意PDF的相对地址 <iframe src"../个人简历.pdf (相对地址)" width"100%" height"800px" style"border: 1px solid #ccc; overflow: auto;"></iframe>我的完整代码&#xff1a; <d…

融合静态图与动态智能:重构下一代智能系统架构

引言&#xff1a;智能系统的分裂 当前的大模型系统架构正处于两个极端之间&#xff1a; 动态智能体系统&#xff1a;依赖语言模型动态决策、自由组合任务&#xff0c;智能灵活但稳定性差&#xff1b; 静态流程图系统&#xff1a;具备强工程能力&#xff0c;可控可靠&#xf…

WORD压缩两个免费方法

日常办公和学习中&#xff0c;Word文档常常因为包含大量图片、图表或复杂格式而导致文件体积过大&#xff0c;带来诸多不便&#xff0c;比如 邮件发送受限&#xff1a;许多邮箱附件限制在10-25MB&#xff0c;大文件无法直接发送 存储空间占用&#xff1a;大量文档占用硬盘或云…

skywalking服务安装与启动

skywalking服务安装并启动 1、介绍2、下载apache-skywalking-apm3、解压缩文件4、创建数据库及用户5、修改配置文件6、下载 MySQL JDBC 驱动7、启动 OAP Serve,需要jkd11,需指定jkd版本,可以修改文件oapService.sh8、启动 Web UI,需要jkd11,需指定jkd版本,可以修改文件oapServi…

Qt 中信号与槽(signal-slot)机制支持 多种连接方式(ConnectionType)

Qt 中信号与槽&#xff08;signal-slot&#xff09;机制支持 多种连接方式&#xff08;ConnectionType&#xff09; Qt 中信号与槽&#xff08;signal-slot&#xff09;机制支持 多种连接方式&#xff08;ConnectionType&#xff09;&#xff0c;用于控制信号发出后如何调用槽…

Midjourney-V7:支持参考图片头像或背景生成新保真图

Midjourney-V7重磅升级Omni Reference&#xff1a;全能图像参考神器&#xff01;再也不用担心生成图片货不对版了&#xff01; 就在上周&#xff0c;Midjourney发版它最新的V7版本&#xff1a;Omini Reference&#xff0c;提供了全方位图像参考功能&#xff0c;它可以参考你提…

耀圣-气动带刮刀硬密封法兰球阀:攻克颗粒高粘度介质的自清洁 “利器”

气动带刮刀硬密封法兰球阀&#xff1a;攻克颗粒高粘度介质的自清洁 “利器” 在化工、矿业、食品加工等行业中&#xff0c;带颗粒高粘度介质、料浆及高腐蚀性介质的输送与控制一直是行业难题。普通阀门极易因介质附着、颗粒堆积导致卡阻失效&#xff0c;密封面磨损加剧&#x…

Google云计算原理和应用之分布式锁服务Chubby

Chubby是Google设计的提供粗粒度锁服务的一个文件系统,它基于松耦合分布式系统,解决了分布的一致性问题。通过使用Chubby的锁服务,用户可以确保数据操作过程中的一致性。不过值得注意的是,这种锁只是一种建议性的锁(Advisory Lock)而不是强制性的锁,这种选择系统具有更大…

SM2Utils NoSuchMethodError: org.bouncycastle.math.ec.ECFieldElement$Fp.<init

1&#xff0c;报错图示 2&#xff0c;报错原因&#xff1a; NoSuchMethodError 表示运行时找不到某个方法&#xff0c;通常是编译时依赖的库版本与运行时使用的库版本不一致。 错误中的 ECFieldElement$Fp. 构造函数参数为 (BigInteger, BigInteger)&#xff0c;说明代码期望使…

《100天精通Python——基础篇 2025 第16天:异常处理与调试机制详解》

目录 一、认识异常1.1 为什么要使用异常处理机制?1.2 语法错误1.3 异常错误1.4 如何解读错误信息 二、异常处理2.1 异常的捕获2.2 Python内置异常2.3 捕获多个异常2.4 raise语句与as子句2.5 使用traceback查看异常2.6 try…except…else语句2.7 try…except…finally语句--捕获…