消息中间件特性

news2025/7/18 23:31:55

a5a48e1d9edc408c90932c4ed235f400.jpg一:消息队列的主要作用是什么?

 

1.消息队列的特性:

 

业务无关,一个具有普适性质的消息队列组件不需要考虑上层的业务模型,只做好消息的分发就可以了,上层业务的不同模块反而需要依赖消息队列所定义的规范进行通信。FIFO,先投递先到达的保证是一个消息队列和一个buffer的本质区别。容灾,对于普适的消息队列组件来说,节点的动态增删和消息的持久化,都是支持其容灾能力的重要基本特性。性能,这个不必多说了,消息队列的吞吐量上去了,整个系统的内部通信效率也会有提高。

 

2.为什么需要消息队列:

 

当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异。“ 消息 ”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中,“ 消息队列 ”是在消息的传输过程中保存消息的容器 。

 

使用场景:

 

业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力。就可以把短信发送申请丢到消息队列,直接返回用户成功,短信发送模块再可以慢慢去消息队列中取消息进行处理。

调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送。

任务处理类的系统,先把用户发起的任务请求接收过来存到消息队列中,然后后端开启多个应用程序从队列中取任务进行处理。

3.使用消息队列有什么好处(消息队列作用):

 

异步:假设你有一个系统调用链路,是系统A调用系统B,一般耗时20ms;系统B调用系统C,一般耗时200ms;系统C调用系统D,一般耗时2s。用户一个请求过来巨慢无比,因为走完一个链路,需要耗费:2220ms,如果业务流程支持异步化的话,是不是就可以考虑把系统C对系统D的调用抽离出去做成异步化的,不要放在链路中同步依次调用。这样,实现思路就是系统A -> 系统B -> 系统C,直接就耗费220ms后直接成功了。然后系统C就是发送个消息到MQ中间件里,由系统D消费到消息之后慢慢的异步来执行这个耗时2s的业务处理。通过这种方式直接将核心链路的执行性能提升了10倍。

解耦:假设你有个系统A,这个系统A会产出一个核心数据,现在下游有系统B和系统C需要这个数据。那简单,系统A就是直接调用系统B和系统C的接口发送数据给他们就好了。但是现在要是来了系统D、系统E、系统F、系统G,等等,十来个其他系统慢慢的都需要这份核心数据呢?那么负责系统A的就要被烦死了,然后如果要是某个下游系统突然宕机了呢?系统A的调用代码里是不是会抛异常?那系统A的同学会收到报警说异常了,结果他还要去care是下游哪个系统宕机了。所以在实际的系统架构设计中,如果全部采取这种系统耦合的方式,在某些场景下绝对是不合适的,系统耦合度太严重。并且互相耦合起来并不是核心链路的调用,而是一些非核心的场景(比如上述的数据消费)导致了系统耦合,这样会严重的影响上下游系统的开发和维护效率。因此在上述系统架构中,就可以采用MQ中间件来实现系统解耦,系统A就把自己的一份核心数据发到MQ里,下游哪个系统感兴趣自己去消费即可,不需要了就取消数据的消费。

消峰:假设有一个系统,平时正常的时候每秒可能就几百个请求,正常处理都是OK的,在高峰期一下子来了每秒钟几千请求,弹指一挥间出现了流量高峰,此时我们就可以用MQ中间件来进行流量削峰。所有机器前面部署一层MQ,平时每秒几百请求大家都可以轻松接收消息。一旦到了瞬时高峰期,一下涌入每秒几千的请求,就可以积压在MQ里面,然后那一台机器慢慢的处理和消费。等高峰期过了,再消费一段时间,MQ里积压的数据就消费完毕了。这个就是很典型的一个MQ的用法,用有限的机器资源承载高并发请求,如果业务场景允许异步削峰,高峰期积压一些请求在MQ里,然后高峰期过了,后台系统在一定时间内消费完毕不再积压的话,那就很适合用这种技术方案。

4.消息队缺点:

 

系统可用性降低:系统引入的外部依赖越多,越容易挂掉,本来就是A系统调用BCD三个系统的接口就好了,人ABCD四个系统好好的,偏加个MQ进来,万一MQ挂了,整套系统崩溃了。

系统复杂性提高:硬生生加个MQ进来,怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?

一致性问题:A系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是BCD三个系统那里,BD两个系统写库成功了,结果C系统写库失败了,这数据就不一致了。

 

二:如何保证消息队列的高可用?

1.RabbitMQ高可用

 

单机模式:Demo级别的,生产环境不会使用

 

普通集群模式:

 

 

 

 

 

 

 

同时部署多台RabbitMQ服务器,当生产者将消息发送到RabbitMQ的集群中时 ,消息会存在元数据(类似于消息的描述信息)+消息数据,收到消息的MQ会将消息的元数据信息同步到其他的节点上,当消费者从任意一台服务器上获取消息时,如果当前服务器存在该消息的数据信息就获取成功,否则就会根据元数据信息从其他节点上获取消息数据,这样做并没有保证MQ的高可用,因为存在消息数据的服务器挂掉,消息一样不存在,这样做只能保证MQ的吞吐量比较大。

 

采用镜像集群模式:

 

 

 

 

 

 

 

你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。 这样的话,好处在于,你任何一个机器宕机了,别的机器都可以用。坏处在于

 

性能开销大,消息同步所有机器,导致网络带宽压力和消耗很重!

没有扩展性,如果某个queue负载很重,你加机器,新增的机器也包含了这个queue的所有数据,并没有办法线性扩展你的queue  

开启镜像集群模式:rabbitmq有很好的管理控制台,在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候可以要求数据同步到所有节点的,也可以要求就同步到指定数量的节点,然后你再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

 

2.kafka的高可用

 

 

 

 

 

 

 

kafka一个最基本的架构认识:多个broker组成,每个broker是一个节点;你创建一个topic,这个topic可以划分为多个partition,每个partition可以存在于不同的broker上,每个partition就放一部分数据。

 

这就是天然的分布式消息队列,就是说一个topic的数据,是分散放在多个机器上的,每个机器就放一部分数据。kafka 0.8以前,是没有HA机制的,就是任何一个broker宕机了,那个broker上的partition就废了,没法写也没法读,没有什么高可用性可言。kafka 0.8以后,提供了HA机制,就是replica副本机制。每个partition的数据都会同步到其他机器上,形成自己的多个replica副本。然后所有replica会选举一个leader出来,那么生产和消费都跟这个leader打交道,然后其他replica就是follower。写的时候,leader会负责把数据同步到所有follower上去,读的时候就直接读leader上数据即可。只能读写leader?很简单,要是你可以随意读写每个follower,那么就要care数据一致性的问题,系统复杂度太高,很容易出问题。kafka会均匀的将一个partition的所有replica分布在不同的机器上,这样才可以提高容错性。

 

写数据的时候,生产者就写leader,然后leader将数据落地写本地磁盘,接着其他follower自己主动从leader来pull数据。一旦所有follower同步好数据了,就会发送ack给leader,leader收到所有follower的ack之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)

 

消费的时候,只会从leader去读,但是只有一个消息已经被所有follower都同步成功返回ack的时候,这个消息才会被消费者读到。

 

三:高并发情况下消息队列满了如何防止消息丢失?

其实这个防止消息丢失,每种MQ都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据,以RabbitMQ为例:

 

1.生产者丢数据:

 

从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。transaction机制就是说,发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。然而缺点就是吞吐量下降了。因此,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

 

2.消息队列丢数据

 

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步:

 

将queue的持久化标识durable设置为true,则代表是一个持久的队列

发送消息的时候将deliveryMode=2

这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据

 

3.消费者丢数据

 

消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rahbitMQ会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息,就会丢失该消息。至于解决方案,采用手动确认消息即可。

 

四:消费者消费消息,如何保证MQ幂等性(消息不被重复消费)?

1.幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品使用约支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额返发现多扣钱了,流水记录也变成了两条

 

2.MQ消息发送

 

 

 

 

 

 

 

发送端MQ-client(消息生产者:Producer)将消息发送给MQ-server;

MQ-server将消息落地(持久化存到数据库中);

MQ-server回ACK给MQ-client(Producer);

MQ-server将消息发送给消息接受端MQ-client(消息消费者:Customer);

MQ-client(Customer)消费接受到消息后发送ACK给MQ-server;

MQ-server将落地消息删除

3.消息重复发送原因

 

为了保证消息必达,MQ使用了消息超时、重传、确认机制。使得消息可能被重复发送,如上图中,由于网络不可达原因:3和5中断,可能导致消息重发。消息生产者a收不到MQ-server的ACK,重复向MQ-server发送消息。MQ-server收不到消息消费者c的ACK,重复向消息消费者c发消息。

 

4.MQ内部如何做到幂等性的

 

对于每条消息,MQ内部生成一个全局唯一、与业务无关的消息ID:inner-msg-id。当MQ-server接收到消息时,先根据inner-msg-id判断消息是否重复发送,再决定是否将消息落地到DB中。这样,有了这个inner-msg-id作为去重的依据就能保证一条消息只能一次落地到DB。

 

5.消息消费者应当如何做到幂等性

 

对于非幂等性业务且要求实现幂等性业务:生成一个唯一ID标记每一条消息,将消息处理成功和去重日志通过事物的形式写入去重表。

对于非幂等性业务可不实现幂等性的业务:权衡去重所花的代价决定是否需要实现幂等性,如:购物会员卡成功,向用户发送通知短信,发送一次或者多次影响不大。不做幂等性可以省掉写去重日志的操作。

6.结合业务思考

 

业务表添加约束条件 如果你的数据库将来都不会分库分表,那么可以在业务表字段加上唯一约束条件(UNIQUE),这样相同的数据就不会保存为多份。

将处理后的消息写数据库(主键唯一),你先根据主键查一下,如果这数据已经存在,就不再消费。

使用 redis 如果你的系统是分布式的,又做了分库分表,那么可以使用 redis 来做记录,把消息 id 存在 redis 里,下次再有重复消息 id 在消费的时候,如果发现 redis 里面有了就不能进行消费。

基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1426749.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

亚信安全助力宁夏首个人工智能数据中心建成 铺设绿色算力安全底座

近日,由宁夏西云算力科技有限公司倾力打造,亚信安全科技股份有限公司(股票代码:688225)全力支撑,总投资达数十亿元人民币的宁夏智算中心项目,其一期工程——宁夏首个采用全自然风冷技术的30KW机…

软考高项十大管理49个过程记忆口诀

一、十大管理口诀 口诀:范进整狗子,成人风采干 内容:范围管理、进度管理、整合管理、沟通管理、质量管理、成本管理、资源管理、风险管理、采购管理、干系人管理 二、49个过程口诀 1、整合管理 口诀:按章程计划指导知识、监控…

Java打印图形 九九乘法表

目录 双重循环九九乘法表打印长方形打印平行四边形打印三角形打印菱形打印空心菱形 三重循坏百钱买百鸡 双重循环 九九乘法表 在Java中,你可以使用嵌套的for循环来打印九九乘法表。以下是一个简单的示例: public class Main {public static void main…

wordpress找不回密码怎么办?4种方法设置新密码

有些WordPress站长太久不登录后台了,所以就忘记了管理员登录密码,这种情况我们应该怎么找回密码呢?或者设置一个新密码呢?下面boke112百科就跟大家分享4种方法设置WordPress新密码。 方法一、登录页面的“忘记密码?”…

React + react-device-detect 实现设备特定的渲染

当构建响应式网页应用时,了解用户正在使用的设备类型(如手机、平板或桌面)可以帮助我们提供更优化的用户体验。本文将介绍如何在 React 项目中使用 react-device-detect 库来检测设备类型,并根据不同的设备显示不同的组件或样式。…

用Python处理TDC激光测距数据并绘制为图片

用Python处理TDC激光测距数据并绘制为图片 说明一、定义全局变量变二、主函数入口三、处理原始文件数据四、将数据叠加统计生成图片五、额外的辅助函数六、将数据进行各种形式统计叠加七、原始数据形式八、 测试结果 说明 1. 主要是将TDC激光测距数据进行统计叠加并绘制为图片…

网络原理-TCP/IP(3) - 三次握手超详解析

TCP协议 连接管理 TCP的连接是虚拟的,抽象的,目的是让通信双方保存对方信息.在正常情况下,TCP要经过三次握手建立连接,四次挥手断开连接. 之前我们在网络编程中的 socket new Socket(ip, port); 这个操作就是建立连接.而这个操作知识调用了socket的api,真正建立的过程,是在…

FW如何区别 PAW3212DB-TJDT 和 PAW3220DB-TJDL/TJDR/TJDS

PAW3212DB-TJDT 和 PAW3220DB-TJDL/TJDR/TJDS 的引脚功能定义是一样的,只是封装有一点不一样。PAW3212DB-TJDT是圆形火山口,配的是圆孔透镜,PAW3220DB-TJDL/TJDR/TJDS是方形火山口,配的是方孔透镜。 PAW3212DB-TJDT 和 PAW3220DB-…

杂题——试题-算法训练-P0602

分析: 把要重排序的数字转成数组对数组进行排序,从小到大排序数组转成字符串,字符串转成数字,得到最小数再把最小数的字符串反转,得到最大数注意: 在java语言中,如果使用Arrays.toString(digits…

C++之平衡二叉搜索树查找

个人主页:[PingdiGuo_guo] 收录专栏:[C干货专栏] 大家好,我是PingdiGuo,今天我们来学习平衡二叉搜索树查找。 目录 1.什么是二叉树 2.什么是二叉搜索树 3.什么是平衡二叉搜索树查找 4.如何使用平衡二叉搜索树查找 5.平衡二叉…

C#,阿格里数(Ugly Number)的多种算法与源代码

1 丑数,阿格里数 阿格里数,即丑数(Ugly Number)、逊数(Humble Number)。 一般而言:把只包含质因子2,3和5的数称作丑数(Ugly Number)。例如6、8都是丑数&…

基于PSO-BP神经网络的风电功率MATLAB预测程序

微❤关注“电气仔推送”获得资料(专享优惠) 参考文献 基于风电场运行特性的风电功率预测及应用分析——倪巡天 资源简介 由于自然风具有一定的随机性、不确定性与波动性,这将会使风电场的功率预测受到一定程度的影响,它们之间…

FPGA高端项目:Xilinx Zynq7020系列FPGA 多路视频缩放拼接 工程解决方案 提供4套工程源码+技术支持

目录 1、前言版本更新说明给读者的一封信FPGA就业高端项目培训计划免责声明 2、相关方案推荐我这里已有的FPGA图像缩放方案我已有的FPGA视频拼接叠加融合方案本方案的Xilinx Kintex7系列FPGA上的ov5640版本本方案的Xilinx Kintex7系列FPGA上的HDMI版本本方案的Xilinx Artix7系列…

探索设计模式的魅力:从单一继承到组合模式-软件设计的演变与未来

设计模式专栏:http://t.csdnimg.cn/nolNS 在面对层次结构和树状数据结构的软件设计任务时,我们如何优雅地处理单个对象与组合对象的一致性问题?组合模式(Composite Pattern)为此提供了一种简洁高效的解决方案。通过本…

C++类与对象:默认成员函数

文章目录 1.类的6个默认成员函数2.构造函数3.析构函数4. 拷贝构造函数5.赋值运算符和运算符重载6.日期类实现7.const成员8.重载流插入<< &#xff0c;流提取>>1.流插入2.流提取 9.取地址及const取地址操作符重载 1.类的6个默认成员函数 空类:也就是什么成员都没有的…

Spring Boot集成Redisson详细介绍

Redisson是一个用于Java的分布式和高可用的Java对象的框架&#xff0c;它基于Redis实现。在Spring Boot应用程序中集成Redisson可以帮助我们更轻松地实现分布式锁、分布式对象、分布式集合等功能。本文将介绍如何在Spring Boot项目中集成Redisson&#xff0c;并展示一些基本用法…

vs正则搜索 int main() 排除 // int main()

1 ctrl f 2 选择正则 3 表达式 ^\s*int\smain\(\) ^ 表示匹配行开始\s* 匹配0个或多个空格int 匹配int关键字\s 匹配一个或多个空格main\( 匹配main函数和左括号

ChatGPT真有很多人在用吗?——回答一位知友的问题

先上结论 是的。数据不会撒谎&#xff0c;用户拿脚投票&#xff0c;ChatGPT发布仅五天内就达到了100万用户&#xff0c;是有史以来增长最快的消费者应用程序。2023年全球前50款AI工具就收获了240亿次访问&#xff0c;其中ChatGPT收获了146亿次访问。 一些想法和思考 我的一些…

9.defer语句调用顺序

目录 概述实践defer结果defer和return执行顺序 结束 概述 defer 类似 java 中的异常处理中的 finally &#xff0c;在 Go 中 defer 是一种压栈出栈操作。 实践 defer package mainimport "fmt"func demo1() {fmt.Println("demo1") }func demo2() {fmt.Pr…

MySql调优(三)Query SQL优化(2)profiler诊断工具

Mysql中自带性能分析工具Profile。注意&#xff1a;profile仅对当前会话有效 一、操作步骤 1、打开 profile set profiling1; 2、执行sql语句 3、分析sql语句执行时间 show profiles 其他参数&#xff1a; ALL&#xff1a;显示所有的开销信息。 BLOCK IO&#xff1a;显示块…