rabbitMQ消息问题与解决

news2025/5/10 10:57:58

rabbitMQ 消息顺序性、消息幂等性、消息不丢失、最终一致性、补偿机制、消息队列设计

1.消息顺序性

  • 溯源
    消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。
  • 举例
      比如通过mysql binlog进行两个数据库的数据同步,由于对数据库的数据操作是具有顺序性的,如果操作顺序搞反,就会造成不可估量的错误。
      比如数据库对一条数据依次进行了 插入->更新->删除操作,这个顺序必须是这样,如果在同步过程中,消息的顺序变成了删除->插入->更新,那么原本应该被删除的数据,就没有被删除,造成数据的不一致问题。

RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序

1.1 发送消息的顺序

消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓。
如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。

1.2 队列中消息的顺序

RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,通常也不需要开发关心。

提示:不同队列中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站

1.3 消费消息的顺序

多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的。

虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。

例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致.

生产者 通过 channel 把消息 通过 exchange 路由到对一个的quque 的 过程中,MQ 本身保证消息的有序性,quque 是有序的,在业务上只要保证生产者发送到mq上的消息是有序的,那么MQ ,quque就能保证生产者发送到消息的有序性;但是生产者保证了消息的有序性并不能保证消费者消费到的消息就是有序的.

这主要体现在以下两点

  • 一个quque 上有多个consumer,由于每个消费者处理消息的快慢不一样,因此并不能保证每个consumer都顺序消费消息.
  • 一个quque上只有一个consumer,但是这个consumer 是多线程异步处理,因此并不能保证这个consumer消费消息的处理是顺序处理;

1.出现顺序错乱的场景

错乱场景一

一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。

在这里插入图片描述

错乱场景二

一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。

在这里插入图片描述

2.保证消息的消费顺序

解决方案一

解决消费顺序的问题,通常就是一个队列只有一个消费者

拆分成多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。保证一个quque 只有一个consumer 这样便保证了消费者消费MQ 消息的有序性;这样就可以一个个消息按顺序处理,缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。
在这里插入图片描述

提示:如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。

解决方案二

解决一个quque一个consumer异步处理的顺序问题

一个queue对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理。

在这里插入图片描述

2.消息幂等性

一条数据(消息)重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。从业务上来说重复调用多次产生的业务结果与调用一次产生的业务结果相同;

为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ服务端是没有这种控制的(同一批的消息有个递增的DeliveryTag),它不知道你是不是就要把一条消息发送两次,只能在消费端控制。

2.1 消息幂等的场景

业务场景1

消费者重复消息

假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

业务场景2

消费者,内部处理消息的时候的重复

如果消费者每一次接收生产者的消息都成功了,只是在响应或者调用API的时候出了问题,会不会出现消息的重复处理?例如:存款100元,ATM重发了5次,核心系统一共处理了6次,余额增加了600元。

业务场景3

生产者、消费者手动确认消息。

1.生产者在消息发送消息后。再收到mq的确认后,还未更改数据发送状态结果挂掉了,导致消息的重复发送;
2.消费者,在消费消息后,还未给mq发送ack确认标志,消费者挂掉了,导致mq会将消息重复推送给消费者;

2.2 产生消费重复的原因

如何避免消息的重复消费?消息出现重复可能会有两个原因:

1.生产者问题

比如在开启了Confirm模式但未收到确认,生产者重新发送消息;或者生产者在消息发送消息后,在收到mq的确认后,还未更改数据发送状态结果挂掉了,导致消息的重复发送。

2.消费者问题

由于消费者未发送ACK或者其他原因,消息重复投递

2.3 消息幂等的解决方案

对于重复发送的消息,可以对每一条消息生成一个唯一的业务ID,通过日志或者消息落库来做重复控制。或者利用redis、mysql等中间工具的特性解决幂等性问题

  • 1.保证生产者者发送到mq的消息幂等性
  • 2.保证消费者消费mq消息的幂等性

常用解决方案:

  • 1.比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 2.比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 3.比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 4.比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据;

3.消息不丢失

在这里插入图片描述
消息丢失原因场景以及解决方案

3.1 生产者发送消息至MQ的数据丢失

生产者发送消息,自动ack,或者发生发生网络异常导致,未做重发处理,导致推送mq的消息丢失;

解决方法: 在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,如果发生异常的情况下,做好消息的重发机制.

3.2 MQ挂掉导致消息丢失

MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失;
如exchange、quque 未设置消息的持久化,在消费者消息未消费或者未确认的情况下导致消息丢失

解决方式:MQ设置为持久化。将内存数据持久化到磁盘中

3.3 消费者自动确认ack下的消息丢失

消费者刚拿到消息,先给mq 发送 ack确认标志 ,再处理业务,解过还未处理业务挂掉了或发生异常

解决方式: 用 RabbitMQ 提供的 ack 机制

4.最终一致性

消息的最终一致性需要结合消息的生产者、消费者的、mq的消息不丢失一块考虑。

5.消息积压处理

消息不积压需要总体上保持消费者的消息消费速率rate 大于生产者的生产速率rate,这样在设计上不会出现消息积压;
消息积压处理需要考虑消息的幂等性,保证消息不被重复消费

5.1 消息堆积的判定

  • 在消费者未消费到消息、或者消费收到的消息延迟比较大的情况下需要消息是否积压;
    可以通过mq的管理界面查看消费者的消息消费情况。

5.2 消息积压的原因场景

1.消息队列的延时以及消息过期失效导致消息队列满了.
2.消息太多导致消息队列磁盘集群满了.
3.积压时间太长了,导致比如RabbitMQ设置了过期时间后,消息就没了。

1.快速处理大量数据积压设计方案

1.存在问题

几千万条数据在MQ里,积压了七八个小时。这个时候就是恢复consumer的问题。让它恢复消费速度,然后傻傻地等待几个小时消费完毕。这个肯定不能再面试的时候说。1个消费者1秒时1000条,1秒3个消费者是3000条。1分钟是18万条。1个小时是1000多万条。如果积压了上万条数据,即使消费者恢复了,也大概需要1个多小时才能恢复过来。

在这里插入图片描述
原来3个消费者1个小时。现在30个消费者,需要10分钟搞定。

2.解决方案

临时扩容:相当于将queue资源和consume资源扩大10倍,以10倍的速度来消费数据

具体操作步骤和思路如下:
1.先修改consumer的问题,确保其恢复消费速度。然后先将现有consumer都停掉。
2.临时创建新的consumer进行快速消费:

  • 2.1:先新建1个topic订阅模型,根据RoutingKey(分割成原来的10倍key),临时建立好原来10倍或者20倍的Queue。
  • 2.2:然后写一个临时consumer程序,部署上去,去消费积压的数据。
  • 2.3:消费之后,不做耗时的处理。直接均匀轮训写入临时建立好的10倍数量的Queue。
  • 2.4:接着征用10倍的机器来部署consume。每一批consumer消费1个临时的queue。

3.等快速消费完积压数据之后,恢复原来的部署架构,重新用原先的consumer来消费消息。

2.过期失效了怎么办?

过期失效就是TTL。如果消息在Queue中积压超过一定的时间就会被RabbitMQ给清理掉。这个数据就没了。这就不是数据积压MQ中了,而是大量的数据会直接搞丢。

1.在这种情况下,增加consume消费积压就不起作用了。此时,只能将丢失的那批数据,写个临时的程序,一点一点查出来,然后再灌入MQ中,把白天丢失的数据补回来。
2.或者将过去的消息放入死信对列(同样也会出现大量数据问题)

6.消息队列设计

如何让你来设计消息队列中间件,如何设计?

主要考察两块。

① 有没有对某个消息队列做过较为深入的原理的了解。或者从整体把握一个mq的架构原理。
② 设计能力,给你一个常见的系统,就是消息队列系统,看能够从全局把握一下整体架构设计的关键点。

比如说,这个消息队列,我们从以下几个方面来了解下:

1、首先MQ得支持可伸缩性吧。就是需要的时候增加吞吐量和容量?

2、其次,需要考虑一下MQ的数据是不是要持久化到磁盘

3、再次,考虑一下MQ的可用性。

4、最后,考虑一下能不能支持数据零丢失

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2372247.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java SE(10)——抽象类接口

1.抽象类 1.1 概念 在之前讲Java SE(6)——类和对象(一)的时候说过,所有的对象都可以通过类来抽象。但是反过来,并不是说所有的类都是用来抽象一个具体的对象。如果一个类本身没有足够的信息来描述一个具体的对象,而…

学习threejs,使用Physijs物理引擎

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:threejs gis工程师 文章目录 一、🍀前言1.1 ☘️Physijs 物理引擎1.1.1 ☘️…

allure生成测试报告(搭配Pytest、allure-pytest)

文章目录 前言allure简介allure安装软件下载安装配置环境变量安装成功验证 allure运行流程allure装饰器函数基本说明装饰器函数使用allure.attach 命令行运行利用allure-pytest生成中间结果json 查看测试报告总览页面每个tab页的说明类别页面测试套图表页面时间刻度功能页面包 …

龙虎榜——20250509

上证指数今天缩量,整体跌多涨少,走势处于日线短期的高位~ 深证指数今天缩量小级别震荡,大盘股表现更好~ 2025年5月9日龙虎榜行业方向分析 一、核心行业方向 军工航天 • 代表个股:航天南湖、天箭科技、襄阳轴承。 • 驱动逻辑…

操作系统的初步了解

目录 引言:什么是操作系统? 一、设计操作系统的目的 二、操作系统是做什么的: 操作系统主要有四大核心任务: 1. 管理硬件 2. 运行软件 3. 存储数据 4. 提供用户界面 如何理解操作系统的管理呢? 1. 什么是操作…

软件工程之软件项目管理深度解析

前文基础: 1.软件工程学概述:软件工程学概述-CSDN博客 2.软件过程深度解析:软件过程深度解析-CSDN博客 3.软件工程之需求分析涉及的图与工具:软件工程之需求分析涉及的图与工具-CSDN博客 4.软件工程之形式化说明技术深度解…

基于Boost库、Jsoncpp、cppjieba、cpp-httplib等构建Boost搜索引擎

⭐️个人主页:小羊 ⭐️所属专栏:项目 很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~ 目录 项目背景技术栈和项目环境正排索引和倒排索引数据去标签与清洗下载数据源去标签 建立索引构建正排索引构建倒排索引 建立搜索引擎h…

Qt 通过控件按钮实现hello world + 命名规范(7)

文章目录 使用编辑框来完成 hello world通过编辑图形化界面方式通过纯代码方式 通过按钮的方式来创建 hello world通过编辑图形化界面方式通过纯代码方式 总结Qt Creator中的快捷键如何使用文档命名规范 简介:这篇文章着重点并不在于创建hello world程序&#xff0c…

关于大数据的基础知识(二)——国内大数据产业链分布结构

成长路上不孤单😊😊😊😊😊😊 【14后😊///计算机爱好者😊///持续分享所学😊///如有需要欢迎收藏转发///😊】 今日分享关于大数据的基础知识(二&a…

Winform(11.案例讲解1)

今天写两个案例,用于更好的理解控件的使用 在写之前先写一个类 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace _1.案例讲解 { internal class Student { public string …

电机密集型工厂环境下的无线通信技术选型与优化策略

点击下面图片带您领略全新的嵌入式学习路线 🔥爆款热榜 88万阅读 1.6万收藏 在电机、变频器、电焊机等强电磁干扰源遍布的工业环境中,无线通信系统的可靠性面临严峻挑战。本文从抗干扰能力、传输稳定性、实时性需求三大核心维度出发,结合工…

一文了解氨基酸的分类、代谢和应用

氨基酸(Amino acids)是在分子中含有氨基和羧基的一类化合物。氨基酸是生命的基石,人类所有的疾病与健康状况都与氨基酸有直接或间接的关系。氨基酸失衡可引起肝硬化、神经系统感染性疾病、糖尿病、免疫性疾病、心血管疾病、肾病、肿瘤等各类疾…

系统学习算法:动态规划(斐波那契+路径问题)

题目一: 思路: 作为动态规划的第一道题,这个题很有代表性且很简单,适合入门 先理解题意,很简单,就是斐波那契数列的加强版,从前两个数变为前三个数 算法原理: 这五步可以说是所有…

JAVA房屋租售管理系统房屋出租出售平台房屋销售房屋租赁房屋交易信息管理源码

一、源码描述 这是一套房屋租售管理源码,基于SpringBootVue框架,后端采用JAVA开发,源码功能完善,涵盖了房屋租赁、房屋销售、房屋交易等业务。 二、源码截图

掌握Multi-Agent实践(三):ReAct Agent集成Bing和Google搜索功能,采用推理与执行交替策略,增强处理复杂任务能力

一个普遍的现象是,大模型通常会根据给定的提示直接生成回复。对于一些简单的任务,大模型或许能够较好地应对。然而,当我们面对更加复杂的任务时,往往希望大模型能够表现得更加“智能”,具备适应多样场景和解决复杂问题的能力。为此,AgentScope 提供了内置的 ReAct 智能体…

Bearer Token的神秘面纱:深入解析HTTP认证头的设计哲学

为何有些Token会带Bearer? 在接口测试与开发中,我们经常会遇到这样的请求头: Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9... 这个神秘的"Bearer"前缀从何而来?为何不直接使用Authorization: Token..…

【国产化】在银河麒麟ARM环境下离线安装docker

1、前言 采用离线安装的方式。 关于离线安装的方式官网有介绍,但是说的很简单,网址:Binaries | Docker Docs 官网介绍的有几种主流linux系统的安装方式,但是没有kylin的,所以在此记录一下。 在安装过程中也遇到了些…

java volatile关键字

volatile 是 Java 中用于保证多线程环境下变量可见性和禁止指令重排序的关键字。 普通变量不加volatile修饰有可见性问题,即有线程修改该变量值,其他线程无法立即感知该变量值修改了。代码: private static int intVal 0; // 普通变量未加 …

Vibe Coding: 优点与缺点

如果你最近在开发圈子里,你很可能听说过这个新趋势"vibe coding"(氛围编程)。 我只能说我对此感受复杂。以下是原因。 优势 在构建新项目时,靠着氛围编程达到成功感觉很自由!但对于遗留代码来说情况就不同了,尽管也不是不可能。 实时反馈和快速迭代 Cursor(…

技术分享 | 如何在2k0300(LoongArch架构)处理器上跑通qt开发流程

近期迅为售后团队反馈,许多用户咨询:2K0300处理器采用了LA264处理器核,若要在该处理器上运行Qt程序,由于架构发生了变化,其使用方法是否仍与ARM平台保持一致? 单纯回答‘一致’或‘不一致’缺乏说服力&…