【RocketMQ】底层架构核心流程
1、基本概念Producer生产者负责“发送消息”的应用。Consumer消费者负责“消费消息”的应用。Broker真正存储消息、处理请求的服务器进程。Producer 和 Consumer 最终都是通过网络直接跟 Broker 打交道RPC。NameServer只负责注册和路由发现。保存“哪个 Topic 有哪些队列这些队列在哪些 Broker 上”这种元数据。Producer/Consumer 启动时会去 NameServer 拉这个路由信息。Topic逻辑概念可以理解成“一个业务场景的消息分类”。对开发者来说发消息要指定 Topic订阅也要指定 Topic。QueueMessage Queue物理上的拆分单元一个 Topic 会被拆成多个 Queue。用途提高吞吐不同 Queue 可以在多台 Broker 上、被多个 Consumer 实例并行消费。做负载均衡同一个消费组里的实例按 Queue 分摊消息。Message一条真正的数据包含 topic、body、tags、keys 等。在磁盘上最终会被顺序写入到 Broker 的 CommitLog。2、使用方式生产者代码ResourceprivateMQProducermqProducer;OverridepublicvoidsendDelayMsg(ImMsgBodyimMsgBody){StringjsonJSON.toJSONString(imMsgBody);MessagemessagenewMessage();message.setBody(json.getBytes());message.setTopic(ImCoreServerProviderTopicNames.QIYU_LIVE_IM_ACK_MSG_TOPIC);//等级1 - 1s等级2 - 5smessage.setDelayTimeLevel(2);try{SendResultsendResultmqProducer.send(message);LOGGER.info([MsgAckCheckServiceImpl] msg is {},sendResult is {},json,sendResult);}catch(Exceptione){LOGGER.error([MsgAckCheckServiceImpl] error is ,e);}}消费者代码OverridepublicvoidafterPropertiesSet()throwsException{DefaultMQPushConsumermqPushConsumernewDefaultMQPushConsumer();mqPushConsumer.setVipChannelEnabled(false);//设置我们的namesrv地址mqPushConsumer.setNamesrvAddr(rocketMQConsumerProperties.getNameSrv());//声明消费组mqPushConsumer.setConsumerGroup(rocketMQConsumerProperties.getGroupName()_ImAckConsumer.class.getSimpleName());//每次只拉取一条消息mqPushConsumer.setConsumeMessageBatchMaxSize(1);mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);mqPushConsumer.subscribe(ImCoreServerProviderTopicNames.QIYU_LIVE_IM_ACK_MSG_TOPIC,);mqPushConsumer.setMessageListener((MessageListenerConcurrently)(msgs,context)-{StringjsonnewString(msgs.get(0).getBody());ImMsgBodyimMsgBodyJSON.parseObject(json,ImMsgBody.class);intretryTimesmsgAckCheckService.getMsgAckTimes(imMsgBody.getMsgId(),imMsgBody.getUserId(),imMsgBody.getAppId());LOGGER.info(retryTimes is {},msgId is {},retryTimes,imMsgBody.getMsgId());if(retryTimes0){returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}//只支持一次重发if(retryTimes2){msgAckCheckService.recordMsgAck(imMsgBody,retryTimes1);msgAckCheckService.sendDelayMsg(imMsgBody);routerHandlerService.sendMsgToClient(imMsgBody);}else{msgAckCheckService.doMsgAck(imMsgBody);}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});mqPushConsumer.start();LOGGER.info(mq消费者启动成功,namesrv is {},rocketMQConsumerProperties.getNameSrv());}3、NS/B/P/C 的协作流程NameServer注册中心 路由服务NameServer先启动Broker 启动后主动去连 NameServer把自己的信息“注册”上去Broker 会定时向所有 NameS****erver发心跳/注册请求请求体里带上自己的地址IP:PortBrokerName、ClusterName本机 Topic 配置Topic 名、读/写队列数等Producer / Consumer 启动时从 NameServer 拉取路由信息某个 Topic 对应哪些 Broker、每个 Broker 上有几个 Queue 是属于这个topic的。总结就是该 Topic 的 Queue 列表。例如[Queue0BrokerA, Queue1BrokerA, Queue2BrokerB, Queue3BrokerB]Producer 发消息的大致步骤1启动时设置 NameServer 地址。定时从 NameServer 拉 Topic 路由。拿到 Topic 的完整 Queue 列表2发送时根据路由信息为当前 Topic 选择一个目标 Broker Queue可以轮询、hash 等策略。选哪个 Queue 的规则在 Producer 里实现常见两种不指定 key、不要求顺序在列表里轮询Round-Robin选一个例如第一次发用 Queue0第二次用 Queue1……这样自然就负载均衡到多个 Broker、多个 Queue。指定了 key、要顺序消息对key 做 hash再对队列个数取模得到固定下标总是选同一个 Queue。这样同一 key 的消息总进同一个 Queue消费端单线程消费该 Queue 就能保序。通过 Netty 向这个 Broker 发一条 RPC 请求SEND_MESSAGE。Broker 在本地写磁盘CommitLog写成功后返回 SendResult。3如果发送超时/异常RocketMQ Client 内部有重试机制看配置可以自动换 Queue 或换 Broker 再发。Consumer 收消息的大致步骤1启动时设置 NameServer 地址。设置消费组consumerGroup。设置要订阅的 Topic。从 NameServer 获取路由信息知道哪些 Queue 属于这个 Topic。拿到 Topic 的完整 Queue 列表2负载均衡同一消费组里的多个 Consumer 实例会由客户端做“Queue 分配”根据当前 Group 内在线实例数和 Topic 的 Queue 数把 Queue 分给各个实例每个实例只消费分到的那部分。一个 Queue 只被组内一个实例消费。实例数 你部署并启动了多少个跑这段 Consumer 代码的进程3拉取消息RocketMQ 的“Push”模式底层其实也是定时 PullConsumer 主动从 Broker 的某个 Queue 拉一批消息。拉到后回调你的 MessageListener。4确认 / 重试如果你的 Listener 返回 CONSUME_SUCCESS客户端会更新消费位点提交给 Broker下次从新的 offset 继续拉。如果返回 RECONSUME_LATER 或抛异常Broker 会认为你消费失败稍后再把这条消息投递给你或同组其他实例次数超过阈值就进死信队列。4、RocketMQ 的存储设计CommitLog ConsumeQueue IndexFileCommitLogCommitLog是真正存消息的地方所有 Topic 所有 Queue 的消息都混在一起顺序追加写入这样可以保证磁盘顺序写吞吐量很高。每个文件默认 1GB写满就建新文件。ConsumeQueueConsumeQueue是 CommitLog 的索引每个 Topic 的每个 Queue 有自己的 ConsumeQueue 文件。每条记录固定 20 字节存的是 [commitLogOffset(8), msgSize(4), tagsHashCode(8)]。Consumer 消费时先读 ConsumeQueue 拿到 偏移量和大小再去 CommitLog 读完整消息。整体设计思路是写走 CommitLog 保证顺序写高吞吐读走 ConsumeQueue 保证快速定位底层版的写-读流程Producer 发消息写Producer.send(message) ↓ Broker 收到 ↓ CommitLog.putMessage() - 加锁或用 CAS - 找到当前 CommitLog 文件的末尾 - 把消息序列化成二进制追加写入 - 返回 physicalOffset这条消息在 CommitLog 里的位置 ↓ 返回 SendResult 给 Producer ↓ 异步ReputMessageService 扫描 CommitLog 新消息 - 写对应的 ConsumeQueuetopic queueId - 写 IndexFile如果消息有 keysConsumer 消费读Consumer.pull(topic, queueId, offset:100, MaxMsgNums:10//最多拉多少条) ↓ Broker 收到 ↓ 读 ConsumeQueue 文件 - 定位到 offset 对应的 20 字节记录 - 取出 [commitLogOffset, msgSize, tagsHashCode] - 可选如果有 tag 过滤先用 tagsHashCode 判断是否匹配 ↓ 读 CommitLog 文件 - 定位到 commitLogOffset 位置 - 读 msgSize 字节 - 反序列化成 Message 对象 ↓ 返回给 Consumer ↓ Consumer 处理 - Consumer 的 Listener 处理完这批消息返回 CONSUME_SUCCESS - 本地 offset 更新为 100 10 110 - 定时5s向 Broker 提交updateConsumerOffset(group, topic, queueId, 110)下次从110开始拉 - Broker 更新内存 默认每5s持久化到 consumerOffset.json consumer第一次启动 向 Broker 查这个 Group 在 qiyu_live_im_ack_msg_topic 的 Queue 0 的 offset Broker 返回没有记录 根据 CONSUME_FROM_FIRST_OFFSET 配置从 offset0 开始消费 consumer重启后 向 Broker 查 offset拿到上次提交的值比如 100 从 offset100 继续消费不会重复消费 0~99Consumer 消费 - push模式消费者启动 ↓ 连接NameServer获取路由信息 ↓ 向Broker注册消费者 ↓ 启动各种服务线程 ↓ ┌─────────────────────────────────────┐ │ 负载均衡服务 │ │ (每20秒执行一次) │ └─────────────────────────────────────┘ ↓ 计算队列分配 (比如分配到Queue0, Queue1, Queue2) ↓ 为每个队列创建初始PullRequest ┌─────────────────────────────────────┐ │ PullRequest(Queue0, offset100) │ │ PullRequest(Queue1, offset200) │ │ PullRequest(Queue2, offset300) │ └─────────────────────────────────────┘ ↓ 将PullRequest放入拉取请求队列 ↓ ┌─────────────────────────────────────┐ │ 拉取线程循环 │ │ (24小时不停工作) │ └─────────────────────────────────────┘ ↓ 从拉取请求队列取出一个PullRequest ↓ 检查流控条件 ↓ ┌─────────────────┐ ┌─────────────────┐ │ 流控检查通过 │ │ 流控检查失败 │ └─────────────────┘ └─────────────────┘ ↓ ↓ 向Broker发送拉取请求 延迟50ms后重新放入队列 ↓ ↓ ┌─────────────────────────────────────┐ │ Broker端处理异步 │ └─────────────────────────────────────┘ ↓ 立即检查是否有消息 ↓ ┌─────────────────┐ ┌─────────────────┐ │ 有消息 │ │ 没有消息 │ └─────────────────┘ └─────────────────┘ ↓ ↓ 立即返回消息列表 挂起请求长轮询等待 ↓ ↓ ┌─────────────────┐ │ 等待30秒内... │ │ - 有新消息到达 │ │ - 或者超时 │ └─────────────────┘ ↓ 返回结果(消息或空) ↓ ┌─────────────────────────────────────┐ │ 客户端收到响应 │ └─────────────────────────────────────┘ ↓ 解析拉取结果 ↓ ┌─────────────────┐ ┌─────────────────┐ │ 拉取到消息 │ │ 没拉取到消息 │ └─────────────────┘ └─────────────────┘ ↓ 消息存储到对应队列的本地缓存 (ProcessQueue) ↓ 按批次大小拆分消息 创建新PullRequest ↓ ↓ 创建消费任务 放入拉取请求队列 ↓ ↓ 提交给消费线程池 继续下次拉取 ↓ 立即创建新PullRequest ↓ 继续处理其他请求拉取线程循环继续…… ↓ ┌─────────────────────────────────────┐ │ 消费线程池处理异步 │ └─────────────────────────────────────┘ ↓ 消费线程获取任务 ↓ 执行用户消费逻辑 ↓ ┌─────────────────┐ ┌─────────────────┐ │ 消费成功 │ │ 消费失败 │ └─────────────────┘ └─────────────────┘ ↓ ↓ 发送ACK给Broker 发送NACK给Broker ↓ ↓ 更新消费进度 消息进入重试队列 详细说明各阶段 1、拉取请求循环 拉取线程的工作循环 while (消费者运行中) { 1. 从拉取请求队列取出PullRequest 2. 检查流控条件本地缓存是否过多 3. 向Broker发送拉取请求 4. 处理Broker响应[msg100,msg101,msg102], nextOffset103这个nextBeginOffset是基于拉取位点pullOffset计算的 5. 创建新的PullRequest继续循环 } 值得注意的是拉取和broker处理是异步当broker返回结果时才会触发回调 回调中将消息提交给消费线程池然后立即创建新PullRequest放入队列 拉取线程不会等待消费线程池处理完成。 broker其实就是一个服务端然后里面有相应的拉取线程池专门处理拉取请求。 broker定时将消费位点刷盘。 拉取请求中的位点是拉取位点。拉取位点每次在拉取线程处理broker响应时更新然后传入新的拉取请求。 重启时从broker获取消费位点作为拉取的起始位点。 补充 Rebalance分配队列 ↓ 创建PullRequest(mqQueue0, offset100) ↓ 放入pullRequestQueue ↓ PullMessageService取出PullRequest ↓ 向Broker发送拉取请求(Queue0, offset100) ↓ Broker返回: [msg100, msg101, msg102], nextBeginOffset103 ↓ 更新PullRequest.nextOffset 103 ↓ 消息放入ProcessQueue本地缓存 ↓ 提交消费任务 ↓ PullRequest放回pullRequestQueue复用同一个对象 ↓ PullMessageService再次取出PullRequest ↓ 向Broker发送拉取请求(Queue0, offset103) ↓ 循环继续... 2、 流控检查详细规则 检查以下条件 - 本地缓存消息数 1000条 - 本地缓存消息大小 100MB - 消费进度落后 2000条 任一条件满足 → 延迟拉取 所有条件不满足 → 立即拉取 流控检查的目的 如果拉取速度 消费速度本地缓存会越来越多 最终导致内存溢出 流控机制防止这种情况发生 3、Broker端长轮询机制 Broker收到拉取请求后 1. 立即检查队列是否有新消息 2. 有消息 → 立即返回 3. 没消息 → 将请求挂起30秒 4. 30秒内有新消息 → 立即唤醒并返回 5. 30秒超时 → 返回空结果 6. 消息处理和任务创建 4、收到消息后的处理 ConsumeMessageService负责从ProcessQueue取消息创建消费任务提交给线程池 1. 根据consumeMessageBatchMaxSize拆分消息 2. 每个批次默认是1条创建一个ConsumeRequest任务 3. 记录消费开始时间戳 4. 提交任务到消费线程池 5. 更新消息状态为消费中 5、超时检测机制 1. 有一个定时清理线程每20秒执行一次 2. 遍历ProcessQueue中所有消费中状态的消息 3. 计算当前时间 - 消费开始时间 4. 如果超过15分钟标记为超时消息 6、超时处理 1. 超时消息从ProcessQueue中移除 2. 构造一个重试消息 3. 设置延迟级别第1次重试 延迟级别1 4. 发送给Broker注意这时还不是发到重试队列类似于生产者发送普通消息给broker 5. broker判断这是一个有延迟属性的消息于是存储到 SCHEDULE_TOPIC_XXXX 的对应延迟级别队列 6. 定时任务扫描延迟队列中的消息检查投递时间是否到达 7. 如果到达读取消息内容恢复真实的Topic%RETRY%ConsumerGroup和QueueId重新写入CommitLog更新%RETRY%ConsumerGroup的ConsumeQueue索引 8. 此时消费者就能消费到重试队列的消息了 假设场景 - 消费者组order-group3个实例 - 业务TopicOrderTopic8个队列 - 重试Topic%RETRY%order-group4个队列 分配结果 消费者实例1 - OrderTopic: Queue0, Queue1, Queue2 - %RETRY%order-group: Queue0, Queue1 消费者实例2 - OrderTopic: Queue3, Queue4, Queue5 - %RETRY%order-group: Queue2 消费者实例3 - OrderTopic: Queue6, Queue7 - %RETRY%order-group: Queue3 重试消息默认是随机放到重试Topic下的任意一个队列 7、本地缓存 本地缓存ProcessQueue的核心价值 消息状态管理跟踪每条消息的完整生命周期 可靠性保证消费失败可重试宕机可恢复 流控保护防止内存溢出保护系统稳定 进度管理准确计算和报告消费进度 并发协调协调拉取线程和消费线程的工作 8、消费过程 public class ConsumeMessageConcurrentlyService { public void processConsumeResult( ConsumeConcurrentlyStatus status, ConsumeRequest consumeRequest) { // 不管返回SUCCESS还是RECONSUME_LATER // 都标记为已处理完成 for (MessageExt msg : consumeRequest.getMsgs()) { long offset msg.getQueueOffset(); processQueue.updateStatus(offset, MessageStatus.CONSUMED); } // 根据返回状态决定后续处理 if (status ConsumeConcurrentlyStatus.CONSUME_SUCCESS) { // 更新消费位点 updateOffset(consumeRequest);// 先更新内存每5秒持久化到Broker } else { // 发送到重试队列 sendMessageBack(consumeRequest.getMsgs()); } } } 本地队列的状态 public class ProcessQueue { // 消息状态枚举 enum MessageStatus { WAITING, // 等待消费 CONSUMING, // 正在消费 CONSUMED, // 已消费完成 TIMEOUT // 消费超时 } // 消息状态映射表 private final MapLong, MessageStatus msgStatusTable new ConcurrentHashMap(); }┌─────────────────────────────────────────────────────┐ │ 完整的削峰机制 │ ├─────────────────────────────────────────────────────┤ │ 生产者高峰流量 │ │ ↓ │ │ Broker队列 (持久化存储容量大) │ │ ↓ │ │ 拉取控制 (按消费能力拉取不是按生产速度) │ │ ↓ │ │ 本地缓存 (内存缓存有流控保护) │ │ ↓ │ │ 线程池队列 (任务缓存有界队列) │ │ ↓ │ │ 消费线程池 (按实际处理能力消费) │ │ ↓ │ │ 业务处理 (最终的处理速度) │ └─────────────────────────────────────────────────────┘ 值得注意的是RocketMQ的本地缓存设计是每个队列拥有独立的本地缓存。Push和Pull底层机制相同区别在于自动化程度。核心概念pullBatchSize拉取批次大小作用每次从Broker拉取多少条消息默认值32条配置方式consumer.setPullBatchSize(64)影响网络传输效率consumeMessageBatchMaxSize消费批次大小作用每个消费任务处理多少条消息默认值1条配置方式consumer.setConsumeMessageBatchMaxSize(10)影响消费线程的任务粒度5、rocketmq延迟消息/重试消息实现原理RocketMQ的延迟消息实现采用了临时存储 定时扫描 重新投递的机制。核心思想是将延迟消息先暂存到特殊的Topic中通过定时任务扫描到期消息然后重新投递到原始Topic。当Broker收到延迟消息后会进行Topic替换将原始Topic替换为一个特殊Topic所有延迟级别共享同一个特殊Topic。根据延迟级别确定QueueId比如延迟级别3对应Queue2然后将原始的Topic和QueueId保存在消息属性中用于后续恢复。消息仍然写入CommitLog基于特殊Topic和queue建立ConsumeQueue索引。Broker为每个延迟级别启动独立的定时任务延迟消息的定时任务是直接运行在Broker服务上的延迟消息的定时任务实际上是一个特殊的Consumer它自己管理消费进度自己更新进度自己持久化进度定时任务扫描对应Queue的ConsumeQueue文件。维护消费进度记录扫描到ConsumeQueue的哪个位置。根据ConsumeQueue中的消息物理偏移量和消息大小读取CommitLog完整消息内容定时任务读取到消息后进行时间计算判断当前时间是否大于消息预期投递时间如果消息到期了则执行重新投递。重新投递从消息属性中恢复原始的Topic和QueueId将消息重新投递到原始Topic的指定Queue中基于原始Topic和queue建立ConsumeQueue索引。Consumer就可以正常消费到这条延迟消息了。6、rocketmq怎么保证消息的可靠性的消息在传输过程中可能在三个阶段丢失①生产者阶段消息发送到Broker失败②Broker存储阶段Broker接收到消息但存储失败③消费者阶段消费者接收到消息但处理失败因此RocketMQ主要通过在这三个阶段提供可靠性保障机制来防止消息丢失。在生产者端通过同步发送和自动重试保证可靠性Producer使用同步发送时会阻塞等待Broker返回SendResult响应。如果在超时时间内没收到SendResult响应或者收到了SendResult但状态不是成功则会自动重试重试时会选择不同的Broker避免单点故障。在Broker端通过存储和复制机制保证可靠性刷盘策略选择同步刷盘确保消息持久化到磁盘后才返回成功主从复制选择同步复制消息同步到从节点后才返回成功。同步刷盘加同步复制是最高可靠性配置虽然性能会有所下降但可靠性最高。在消费者端通过确认和重试机制保证可靠性Consumer处理完消息后返回消费状态成功时Consumer会向Broker发送ACK确认失败时不发送ACK。Broker没收到Consumer的ACK确认时会重新投递消息。超过重试次数的消息进入死信队列防止消息彻底丢失。生产者重试失败直接抛异常给业务代码需要业务层处理通过这三个阶段的保障机制RocketMQ可以做到消息的高可靠性传输。在实际应用中需要根据业务对可靠性和性能的要求选择合适的配置策略。除了前面提到的基础可靠性机制RocketMQ还提供了事务消息来解决分布式事务场景下的可靠性问题。:::tips同步刷盘/异步刷盘不管什么刷盘broker收到消息后都会发送sendresult给生产者。异步刷盘broker收到消息后就直接返回sendresult同步发送/异步发送同步发送 业务线程阻塞 IO线程工作业务线程发送请求 → 阻塞等待 → 被唤醒 → 返回结果重试IO线程发送消息 → 接收响应 → 唤醒业务线程异步发送 业务线程不阻塞 IO线程工作业务线程发送请求 → 立即返回 → 继续干活IO线程发送消息 → 接收响应 → 执行回调重试1、为什么说同步发送比异步发送可靠因为异步发送的失败处理是在回调中业务流程已经走完难以保证业务一致性。适合对一致性要求不高、追求性能的场景2、同步发送rocketmq已经自动重试过了的两个结果情况A收到sendresultSEND_OK发送成功FLUSH_DISK_TIMEOUT刷盘超时FLUSH_SLAVE_TIMEOUT同步复制超时SLAVE_NOT_AVAILABLE从节点不可用情况B抛出异常网络或系统问题网络超时异常网络延迟、Broker处理慢、Broker宕机网络连接异常Broker宕机、网络断开、防火墙Broker返回错误Topic不存在、权限不足、Broker繁忙7、如何解决重复消费问题重复消费产生的原因①网络异常场景比如消费者的业务处理逻辑执行时间过长监听器方法一直没有返回consume_success。消费者有一个定时清理线程每20秒检查一次本地队列中正在消费的消息。如果发现某条消息的消费时间超过15分钟就会将其标记为超时从本地队列移除并构造一个带有延迟属性的重试消息发送到Broker的重试队列。Broker收到后将消息先存储到内部的延迟队列中然后broker的定时任务会将消息从延迟队列转移到真正的重试队列这时消费者就能拉取到这条重试消息进行消费。所以会出现这种情况消息被重新投递但原来的消费任务其实已经消费完或者还在消费过程中最终导致同一条消息被处理两次。或者生产者因为网络问题没有收到sendResult重新发送也会造成重复投递。②消费者重启场景消费者处理消息过程中消费位点还没来得及提交消费者就宕机重启重启后从上次提交的没有更新的旧的offset开始消费从而出现重复消费问题。解决重复消费通常有三种思路1、发消息的时候给消息分配一个唯一id消费者消费的时候把消息id用setnx存到redis中如果保存成功则说明是第一次消费失败了说明是重复消费。给key设置过期时间避免一直占用内存。2、通过业务消息的唯一id比如订单id、支付流水号消费的时候先查后插需要保证原子性或者直接利用唯一索引去避免重复消息。3、通过业务逻辑保持幂等性。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2422277.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!