场景题-3

news2025/6/8 1:16:06

如何实现一个消息队列

拆解分析主流的几种消息队列

1、基本架构

生产者Producer、消费者Consumer、Broker:生产者发送消息,消费者接受消息,Broker是服务端,处理消息的存储、备份、删除和消费关系的维护。

主题和分区:主题(topic)消息分类的标识,分区是主题的物理分割,有助于提高消息队列的吞吐量。

1.1 kafka:生产者将消息发布到kafka集群(broker)的一个或多个主题(topic),每个topic包含一个或多个分区(partition),消费者从kafka集群中的一个或多个主题消费消息,并将消费的偏移量(offset:分区中每条消息的位置信息,是一个单调递增且不变的值)提交回kafka以保证消息的顺序性和一致性。

kafka集群中,每个分区可以有多个副本,这些副本中包含一个Leader和多个Follower,只有Leader可以处理生产者和消费者的请求,Follower用于数据备份和容错,当Leader发生故障时,Follower提升为Leader。另外还有一个Zookeeper作为注册中心,协调服务,维护集群的状态和元数据信息。

1.2 RocketMQ:除了生产者Producer、消费者Consumer、Broker集群外,有NameServer(名称服务),负责维护Broker的元数据信息,Producerhr和Comsumer启动时需要连接到NameServer获取Broker的地址信息。每个Topic中可以有多个Queue(消息队列),Producer将消息发送到指定的Queue,Consumer从指定的Queue中l拉取消息。

1.3 RabbitMQ:生产者将消息发布到RabbitMQ的交换器(Exchange),交换器将消息路由到和它绑定(Binding)的队列(Queue),消费者从队列中获取消息。RabbitMQ的Broker就是一个个VHost(可以理解为操作系统的命名空间,里面对各资源进行隔离分组),每个VHost拥有自己的交换器、队列、绑定和权限设置,相互独立。

2、基本功能

2.1 消息存储:一般采用内存或者磁盘,内存读写快但可能丢数据;磁盘可以持久化消息但是读写速度相对慢一些。

2.2 消息传递协议:使用成熟的RPC框架(Dubbo或者Thrift)实现生产者和消费者与Broker之间的通信。

2.3 消息持久化和确认机制:一般做法是将消息存储在磁盘中,并且在消费者确认消费完成后再删除消息。

2。4 消息的分发方式:点对点或广播,点对对是每个消费者只会接收自己订阅的消息,广播是每个消费者都会接收到所有消息。

2.5 消息的传递方式:轮询、长连接、长轮询。一般都是支持推拉结合,或者基于拉实现推。

推消息就是消费者和中间件之间建立TCP长连接或者注册一个回调,当服务端数据发生变化,立即通过这个长连接或者回调将数据推送给消费者。这样的话好处就是能保证消息的实时性,但是一旦生产消息过快消息就会堆积在消费者端。

拉消息就是消费者轮询检查数据是否有变化,有变化的话就把数据拉过来。好处是消费者可以控制消息的数量和速度,缺陷就是消费者需要不断轮询,消息中间件也会因此有一定的压力。

另外有些生产环境下,不同环境的通信可能是单向的,此时就只能消费者采取拉的方式,因为长连接是双向通信。

实际使用时,很多中间件是结合使用长连接和轮询,又称长轮询,就是消费者向消息中间件发送一个长轮询请求,消息中间件如果有消息就直接返回,如果没有消息不会立即断开,等待一段时间,在超时时间到达之前有新小心就返回,否则就断开连接等待下一次长轮询。比如Kafka和RocketMQ。

3、消息的可靠性

其实主要就是保证消息不丢失,一般做法就是主从复制、集群模式或者分布式架构。

Kafka如何保证消息不丢失

发送端:发送消息时建议使用producer.send(msg,callback)方法。

Producer设置中acks=-1,表示Leader会等待消息被成功写进所有的ISR副本才认为producer请求成功。retries设置大于0;

Broker端:设置unclean.leader.election.enable = false,表示是否可以把非ISR集合中的副本选举为Leader副本,如果一个Broker落后原先的Leader太多,那么一旦它成为新的Leader则必然会丢失消息,所以这个参数设置为false。设置 min.insync.replicas > 1,控制的是消息至少要被写入到多少个副本才算是“已提交”。另外推荐设置成 replication.factor = min.insync.replicas + 1。

消费端:enable.auto.commit=false,采用手动提交位移的方式。

如上操作之后其实我们还是没法保证消息100%不丢失,首先生产者发送消息后如果kafka挂了,消息还没写进日志(同步到磁盘),那么消息会丢失。后续重试时如果生产者也挂了,那就没人知道这条消息失败了,也就没有重试了。其次,Kafka虽然引入了副本的机制,但是如果发生同步延迟,还没同步主副本就都挂了,那么消息也可能就丢失了。

RocketMQ如何保证消息不丢失

发送端:

同步发送消息的话将保存机制改成同步刷盘,因为Broker默认是先将消息保存在内存中,内存存储成功就返回结果给生产者,然后通过异步刷盘将消息存储到磁盘上,这时候如果机器挂了那么消息就可能丢失。

flushDiskType = SYNC_FLUSH

异步发送消息的话就需要生产者重写SendCallback的onSuccess和onException方法,用于Broker回调,方法中实现消息的确认和重发。

除此之外,RocketMQ集群部署通常采用的一主多从,并且采用主从同步方式做数据复制。Master在将数据同步到Slave节点后,再返回给生产者确认结果。

brokerRole = SYNC_MASTER

消费端:在业务逻辑的最后加上 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS。

RabbitMQ如何保证消息不丢失

首先,在消息从生产者到交换机Exchange和Exchange到Queue的过程中,为了保证消息发送成功,有两种方案:

一种是confirm机制,一种是事务机制。

confirm机制即注册回调来监听,开启Publisher Confirm(确保消息被Exchange成功接收和处理)和Publisher Return(处理消息在无法路由到队列时的异常)。

事务机制主要了解三个方法就行:

toSelect:将当前channel设置成transaction模式。

toCommit:提交事务。

toRollback:回滚事务。

其次,RabbitMQ接收到消息之后,消息也是先暂存内存,所以为了避免消息丢失,需要考虑的就是一个可靠持久化机制。

队列和交换机的持久化:设置durable参数为true来创建持久化交换机、持久化队列以及持久化绑定关系。

持久化消息:设置消息的deliveryMode为2来创建持久化消息,RabbitMQ才会将消息写入磁盘。

消费端同样的有相应的确认机制,消费者处理消息成功之后可以向MQ发送ack回执,MQ收到ack后才会删除消息。处理消息有异常则返回nack回执,MQ收到后可以重发消息,如果一直收不到返回则也会重试。

消息丢失解决方案

kafka、RocketMQ和RabbitMQ单靠自己其实都无法100%保证消息不丢失,针对消息可能的丢失我们可以引入一些其他机制,比如分布式事务、本地消息表等。

分布式事务:

就是保证数据的一致性(所有参与者在一次写操作过程中要么都成功要么都失败),可分强一致性和最终一致性。

强一致性引入一个协调者,方案包括基于XA的二阶段(2PC)及三阶段提交(3PC)。2PC可以理解为第一阶段是协调者先询问参与者是可以发起事务提交操作,若参与者可以执行事务提交,那么就是进行事务操作,只是执行完没有还没commit或者rollback,如果参与者成功执行事务操作就返回YES,没成功就返回NO。第二阶段就是协调者接收到所有参与者的YES的反馈后,就给参与者发送commit请求,如果有反馈为NO,就发送rollback请求。然后参与者将ACK结果返回给协调者。

2PC最关键的一个问题就是在第二阶段,如果参与者和协调者都挂了,那么就可能出现数据不一致的问题。因此引入3PC(CanCommit,PreCommit,doCommit),就是将2PC的第一阶段中的事务操作也分离出来。3PC的问题就是如果由于网络原因,参与者在等待超时后就会commit,这样可能就与其他接收到abort命令执行回滚的参与者不一致了。

最终一致性:方案是基于可靠消息的最终一致性(本地消息表、事务消息)、最大努力通知以及TCC。

基于本地消息表实现分布式事务,这个方案主要思路其实就是将分布式事务拆为本地事务和消息事务。参与者A发送消息前先创建一个本地消息,在参与者A的DB中写入本地业务数据和本地消息数据,两者在一个事务中,这样业务成功则本地消息也一定写入了。然后参与者A基于本地消息调用MQ发送远程消息,参与者B接收后做业务处理且成功之后再联动修改本地消息的状态。这个流程中如果参与者A消息发送MQ失败,那么就可以通过定时任务扫描本地消息数据,对未成功的消息进行重新投递。如果是MQ发送消息失败,那么MQ的重试机制也就派上用场了。如果是最终修改本地消息状态失败,那么起码现在分布式系统中的业务数据是一致了,只是本地消息的状态不对,这种情况可以借助定时任务重新投消息,下游幂等消费再重新更改消息状态,或者本系统通过定时任务主动去查询下游系统的状态,如果已经成功则直接修改消息状态。

基于事务消息实现分布式事务,参考RocketMQ的事务消息实现,参与者A先向RocketMQ Broker发送一条half消息(半消息),半消息存储在Broker的事务消息日志中,半消息发送成功后参与者A执行本地事务,如果A执行本地事务成功则通知RocketMQ Broker提交事务消息,消息状态从prepared改为committed,消费者可以接收消息。如果本地事务失败则A通知RocketMQ Broker回滚事务消息,消息从事务日志删除。这个过程中如果RocketMQ Broker没有接收到A执行本地事务的结果那么就会进行回查,A自查后返回自查结果,如果在规定时间没有结果那么消息就变味unknow状态,此时A如果有了结果还可以向MQ发送commit或者rollback,但是如果一直没有结果,过期时间一到MQ就自动回滚事务消息,将其从事务消息日志中删除。

TCC就是Try-Confirm-Cancel,将分布式事务分解为若干小事务,每个事务都有Try、Confirm和Cancel三个操作。try阶段参与者执行本地事务,并对全局事务预留资源,返回执行标识。所有参与者都返回成功则协调者通知所有参与者提交事务,即confirm阶段,参与者在本地提交事务,并释放全局事务资源。如果任一参与者try阶段返回失败则协调者通知所有参与者回滚。这里面就会有两个问题:空回滚和事务悬挂。空回滚就是try没成功也要执行回滚,注意处理逻辑。事务悬挂就是由于网络原因可能某个节点的try还没收到,而其他节点触发了cancel,然后这个节点先收到cancel进行了空回滚之后又收到了try并执行了,那么这个节点的try占用的资源就没法释放。解决方案就是引入一张分布式事务记录表,每个参与者都可以在本地事务的执行过程中同时记录一次分布式事务的操作记录。

除上述方案还有分布式事务的组件,如Seata。Seata包含三部分:Transaction Coordinator(TC),Transaction Manager™,Resource Mabager(RM),TC维护整个事务的全局状态,负责通知RM进行提交或回滚;TM可视为微服务中的聚合服务,开启一个全局事务或者提交或回滚一个全局事务;RM可对应微服务架构中的某个微服务,对应一个事务分支,负责执行事务分支的操作。TM接收到用户请求后调TC开启全局事务并从TC获得一个XID;TM通过RPC/Restful调用各RM并把XID传递过去;各RM接收到XID,在TC注册事务分支;TM根据所有调用全部完成后的状态确定是Commit还是Rollbask,将结果通知TC。TC协调各RM进行Commit或者Rollbask。

4、消息的高性能

性能这块可以参考kafka的设计,引入一些批量操作、顺序写入和零拷贝之类的技术。

消息发送

批量发送、异步发送、消息压缩、并行发送(数据分布在不同的分区,生产者并行发送消息)。

消息存储

1、零拷贝:一次IO流程可以简单概括有磁盘数据copy到内核缓冲区(页缓存),内核态中的数据copy到用户态中,用户态数据copy到内核态中(socket缓存),内核态缓冲区数据copy到网卡中。零拷贝就是通过各种技术来减少数据copy的次数或者说减少CPU参与数据拷贝的次数。

实现方式有mmap、sendfile、dma、directI/O等/

2、磁盘顺序写入

3、页缓存

4、系数索引:kafka存储消息是通过分段的日志文件,每个分段有自己的索引文件。

5、分区和副本:kafka采用分区和副本的机制,可以将数据分散到多个节点上进行处理。具体可以了解下ISR机制,即同步副本。kafka中每个主题可以有多个副本,ISR是与主副本保持同步的副本集合。当消息写入Kafka的分区时,首先会写入Leader,然后Leader将消息复制给ISR中的所有副本,只有当ISR中的所有副本都成功接收到并确认了消息之后,主副本才会认为消息已成功提交。

消息消费

消费者群组、并行消费、批量拉取

5、扩展功能

顺序消息
kafka顺序消息

kafka的一个topic下有多个partition,当生产者向某个partiton发送消息时,消息被追加到该partiton的日志文件中,并且分配一个唯一的offset,文件读写是有序的。当消费者从该分区消费消息时,会从该分区最早的offset开始读取消息。所以同一个partiton下的消息是有序的。

所以想要实现消息顺序消费,那么一个topic下只创建一个partition,或者消息被发送到同一个partiton。

要想实现消息发送到同一个partition,可以了解下DefaultPartitoner这个类,实现方式有三种:

一是在key为null的话直接指定partiton。

二是指定key,这样同样的ykey经过hash之后还是会指向同一个partiton编号。

三是自己写一个分区器类,实现Partitoner接口,重写partition方法,在生产者的配置中指定使用自己写的分区器类。

RocketMQ的顺序消息

RocketMQ是基于队列的顺序消费,同一个队列的消息可以做到有序。

生产者需要同步发送消息,并且在send方法中传入一个MessageQueueSelector,这个MessageQueueSelector中需要实现一个select方法,用来定义要把消息发送到哪个MessageQueue。

消息有序进入同一个队列之后,要保证顺序消费,需要加三把锁,先锁定Broker上的MessageQueue,确保消息只会投递给一个消费者,对本地的MessageQueue加锁,确保只有一个线程能处理这个消息队列,对存储消息的ProcessQueue加锁,确保在重平衡的过程中不会出现重复消费。值得注意的是,多次加锁虽然能做到顺序消费,但这无疑会降低系统的吞吐量,可能会导致消息阻塞。

延迟消息
RabbitMQ延迟消息

死信队列:给消息设置一个TTL,到期后消息进入死信队列,监听死信队列消费消息。存在问题是可能造成对头阻塞。因为RabbitMQ只定期扫描队头消息是否过期,如果队头消息没过期,队列中的消息即使过期了也不会进入死信队列,一直被阻塞。另外这个方案实现也比较麻烦。

RabbitMQ插件: 对版本有要求,3.6.12版本开始支持的。基于rebbitmq_delayded_message_exchange插件,消息不是在队列中,而是一个基于Erlang开发的Mnesia数据库中,通过一个定时器去查询需要被投递的消息,投递到x-delayed-message交换机中。这个插件支持的最大延迟时间有限。

RocketMQ延迟消息

基于Timer定时器:先将消息存处在内存上,叨叨指定时间后再写入磁盘。

基于时间轮(5.0版本):将消息按照过期时间放置在不同的槽位,到达过期时间就将该槽位的所有消息投递给消费者。

事务消息

参考上文解决消息丢失中的RocketMQ的事务消息。

重复消费
kafka如何防止重复消费

1、kafka中消费者必须至少加入一个消费者组,同组消费者共享消费者的负载,因此只要有一个消费者在消费某条消息,其他消费者就不会接收这个消息。

2、手动提交位移控制+处理结果去重。

3、客户端做幂等控制:一锁二判三更新之类。

4、Kafka的Exactly-Once消费语义(生产者开启幂等+事务 或者 消费者端精确控制)

RabbitMQ如何防止重复消费

根据发送消息时设置的唯一标识在消费者端做幂等控制。

消息堆积

一般是因为客户端本地消费过程中消费时间过长或者消费并发小。

如上解决方案有:

增加消费者数量,提升消费速度(引入线程池,本地存储消息即返回成功后续慢慢消费等)、清理过期消息(评估过期消息和一些一直无法成功的消息是否可清理),调整一些关于参数比如队列数、消息拉取间隔时间等(具体根据MQ类型修改调试)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2403564.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中医的十问歌和脉象分类

中医核心理论框架如下 诊断技术如下 本文主要介绍问诊和切诊。 十问歌的“十”是虚指,实际包含12个核心问题,脉象28种中常见仅10余种,重点解释脉诊的物理本质(血流动力学触觉感知) 以下是中医十问歌的完整内容及脉…

构建 MCP 服务器:第 4 部分 — 创建工具

这是我们构建 MCP 服务器的四部分教程的最后一部分。在第一部分中,我们使用基本资源创建了第一个 MCP 服务器。第二部分添加了资源模板并改进了代码组织。在第三部分中,我们添加了提示符并进一步完善了服务器结构。现在,我们将通过添加工具来…

如何以 9 种方式将照片从手机传输到笔记本电脑

使用 USB 电缆可以将照片从智能手机复制到计算机。但是,如果没有 USB 数据线,如何将照片从手机无线传输到笔记本电脑呢?为了解决这个问题,我们搜索并测试了不同的应用程序,然后总结了本指南中分享的 9 个有效选项。您可…

生成JavaDoc文档

生成 JavaDoc 文档 1、快速生成 文档 注解 2、常见的文档注解 3、脚本生成 doc 文档 4、IDEA工具栏生成 doc 文档 第一章 快速入门 第01节 使用插件 在插件工具当中,找到插件 javaDoc 使用方式,在代码区域,直接点击右键。选择 第02节 常用注…

Web后端基础(Maven基础)

https://blog.csdn.net/q20202828/article/details/148459525?spm1001.2014.3001.5501 这是我总结了一下aliyun私服maven依赖配置Maven 3.9.1下载安装的操作 Maven的作用 统一项目结构 Maven 还提供了标准、统一的项目结构 。 1). 未使用Maven 由于java的开发工具呢&#x…

set map数据结构

#include <set> #include <iostream> using namespace std;int main() {// 设置控制台输出编码为UTF-8system("chcp 65001");set<int> s1; // 创建一个整数集合// 插入元素s1.insert(5);s1.insert(3);s1.insert(7);s1.insert(1);s1.insert(9);//默…

面试题小结(真实面试)

面试题 1.call与apply的区别2.vue3的响应式原理3.js的垃圾回收机制4.说说原型链5.什么是防抖和节流6.说一下作用域链7.在一个页面加载数据时&#xff08;还没加载完成&#xff09;&#xff0c;切换到另一个页面&#xff0c;怎么暂停之前页面的数据加载。 浏览器自动中止机制 这…

计算机网络领域所有CCF-A/B/C类期刊汇总!

本期小编统计了【计算机网络】领域CCF推荐所有期刊的最新影响因子&#xff0c;分区、年发文量以及投稿经验&#xff0c;供大家参考&#xff01; CCF-A类 1 IEEE Journal on Selected Areas in Communications 【影响因子】13.8 【期刊分区】JCR1区&#xff0c;中科院1区TOP …

有意向往gis开发靠,如何规划学习?

听说GIS开发工资不错、还不像互联网那么卷&#xff1f;心动了&#xff1f;但一看那些“WebGL”、“空间分析”、“OGC规范”的词儿就头大&#xff1f;别急&#xff01; 今天咱就聊聊零基础/转行选手&#xff0c;咋规划学习GIS开发这条路。不整高大上&#xff0c;就讲实在的&am…

五、查询处理和查询优化

五、查询处理和查询优化 主要内容 查询概述查询处理过程关系操作的基本实现算法查询优化技术代数优化基于存取路径的优化基于代价估算的优化 1. 查询概述 查询是数据库管理系统中使用最频繁、最基本的操作&#xff0c;对系统性能有很大影响。 对于同一个SQL查询&#xff0c…

缓解骨质疏松 —— 补钙和补维 D

骨质老化/疏松原理&#xff08;机制&#xff09;骨密度下降与骨小梁结构退化局部受压导致的微损伤或压力集中 诊断要点治疗策略吃什么食物能补钙呢&#xff1f;钙片吃什么食物能补维生素 D 呢&#xff1f; 骨质老化/疏松 骨质老化&#xff08;常指骨密度下降或骨质疏松&#x…

《PMBOK® 指南》第八版草案重大变革:6 大原则重构项目管理体系

项目管理领域的权威指南迎来关键升级&#xff01;PMI 最新发布的《PMBOK 指南》第八版草案引发行业广泛关注&#xff0c;此次修订首次将项目管理原则浓缩为 6 大黄金法则&#xff0c;重构 7 大绩效域&#xff0c;并首度公开过程组与绩效域的映射关系。本文将全面解析新版核心变…

Ctrl+R 运行xxx.exe,发现有如下问题.

CtrlR 运行xxx.exe,发现有如下问题. (1)找不到Qt5Core.all,Qt5Cored.dll,Qt5Gui.dll,Qt5Guid.dll,Qt5Widgets.all,Qt5Widgetsd.dll? (2)之后找不到libwinpthread-1.dll 从这个目录拷贝相应的库到运行xx.exe目录下 方法二:将库路径添加到系统PATH环境变量里: 在Path中添加路…

极智项目 | 基于PyQT+Whisper实现的语音识别软件设计

这是一个基于OpenAI的Whisper模型的语音识别应用程序&#xff0c;使用PyQt5构建了简洁直观的用户界面。该应用支持多语言识别&#xff0c;特别优化了中文识别体验。 项目下载&#xff1a;链接 功能特点 简洁现代的深色主题界面支持多语言识别&#xff08;中文、英语、日语等…

vue+cesium示例:地形开挖(附源码下载)

基于cesium和vue绘制多边形实现地形开挖效果&#xff0c;适合学习Cesium与前端框架结合开发3D可视化项目。 demo源码运行环境以及配置 运行环境&#xff1a;依赖Node安装环境&#xff0c;demo本地Node版本:推荐v18。 运行工具&#xff1a;vscode或者其他工具。 配置方式&#x…

升级:用vue canvas画一个能源监测设备和设备的关系监测图!

用vue canvas画一个能源电表和设备的监测图-CSDN博客 上一篇文章&#xff0c;我是用后端的数据来画出监测图。这次我觉的&#xff0c;用前端来控制数据&#xff0c;更爽。 本期实现功能&#xff1a; 1&#xff0c;得到监测设备和设备的数据&#xff0c;然后进行存库 2&…

深入理解 transforms.Normalize():PyTorch 图像预处理中的关键一步

深入理解 transforms.Normalize()&#xff1a;PyTorch 图像预处理中的关键一步 在使用 PyTorch 进行图像分类、目标检测等深度学习任务时&#xff0c;我们常常会在数据预处理部分看到如下代码&#xff1a; python复制编辑transform transforms.Compose([transforms.ToTensor…

爆炸仿真的学习日志

今天学习了一下【Workbench LS-DYNA中炸药在空气中爆炸的案例-哔哩哔哩】 https://b23.tv/kmXlN29 一开始 如果你的 ANSYS Workbench 工具箱&#xff08;Toolbox&#xff09;里 只有 SPEOS&#xff0c;即使尝试了 右键刷新、重置视图、显示全部 等方法仍然没有其他分析系统&a…

[华为eNSP] OSPF综合实验

目录 配置流程 画出拓扑图、标注重要接口IP 配置客户端IP 配置服务端IP 配置服务器服务 配置路由器基本信息&#xff1a;名称和接口IP 配置路由器ospf协议 测试结果 通过配置OSPF路由协议&#xff0c;实现跨多路由器的网络互通&#xff0c;并验证终端设备的访问能力。 …

完美搭建appium自动化环境

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 桌面版appium提供可视化操作appium主要功能的使用方式&#xff0c;对于初学者非常适用。 如何在windows平台安装appium桌面版呢&#xff0c;大体分两个步骤&…