中间件-RocketMQ

news2025/5/9 12:54:36

RocketMQ

  • 基本架构
  • 消息模型
  • 消费者消费消息模式
  • 顺序消息机制
  • 延迟消息
  • 批量消息
  • 事务消息
  • 消息重试
  • 最佳实践

基本架构

在这里插入图片描述

nameServer: 维护broker列表信息,客户端连接时只需要连接nameServer。可配置成集群。
broker:broker分为master和slave,master负责消息的发送和消费,slave是master的备份。master-slaver的集群方式,master挂掉时候slave不能主动转换为master提供服务(5.X版本后可以通过配置实现mater挂掉后slave转为master提供服务)。
leader-follower的集群方式,即高可用集群,各个broker是对等的,通过选举产生leader(在dashboart中显示为master),如果leader挂掉,在剩下的follower(显示为slave)中选举再产生新的leader。注意,只有超过半数的几点存活,才能选举出leader。

消息模型

在这里插入图片描述
⽣产者和消费者都可以指定⼀个Topic发送消息或者拉取消息。⽽Topic是⼀个逻辑概念。
Topic中的消息会分布在后⾯多个MessageQueue当中。这些MessageQueue会分布到⼀个或者多个broker中。

消费者消费消息模式

广播模式:所有关注topic的消费者都收到消息。广播模式下消息队列的消费位点由客户端自己维护,消费失败服务端不会重发。
集群模式:同一个消费者组只有一个成员收到消息。集群模式下消费点位由服务端维护,消费者组的所有成员共用一个位点,消费失败服务端会重发。

顺序消息机制

  1. ⽣产者只有将⼀批有顺序要求的消息,放到同⼀个MesasgeQueue上,通过MessageQueue的FIFO特性保证这⼀批消息的顺序。如果不指定MessageSelector对象,
    那么⽣产者会采⽤轮询的⽅式将多条消息依次发送到不同的MessageQueue上。
  2. 消费者需要实现MessageListenerOrderly接⼝,实际上在服务端,处理MessageListenerOrderly时,会给⼀个MessageQueue加锁,拿到MessageQueue上所有的消息,然后再去读取下⼀个MessageQueue的消息。
  3. 消费消息失败时,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。因为消费者端只进⾏有限次数的重试。如果⼀条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进⾏重试。但是,如果消费者⼀直处理失败,超过最⼤重试次数,那么RocketMQ就会跳过这⼀条消息,处理后⾯的消息,这会造成消息乱序。

延迟消息

  1. 定固定的延迟级别:对于指定固定延迟级别的延迟消息,RocketMQ的实现⽅式是预设⼀个系统Topic,名字叫做SCHEDULE_TOPIC_XXXXX。在这个Topic下,预设了18个MessageQueue。这⾥每个对列就对应了⼀种延迟级别。然后每次扫描这18个队列⾥的消息,进⾏延迟操作就可以了。
  2. 指定消息发送时间:RocketMQ是通过时间轮算法实现。

批量消息

⽣产者要发送的消息⽐较多时,可以将多条消息合并成⼀个批量消息,⼀次性发送出去。这样可以减少⽹络IO,提升消息发送的吞吐量。同⼀批消息的Topic必须相同,另外,不⽀持延迟消息。还有批量消息的⼤⼩不要超过1M,如果太⼤就需要⾃⾏分割。

事务消息

在这里插入图片描述

  1. ⽣产者将消息发送⾄ApacheRocketMQ服务端。
  2. ApacheRocketMQ服务端将消息持久化成功之后,向⽣产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. ⽣产者开始执⾏本地事务逻辑。
  4. ⽣产者根据本地事务执⾏结果向服务端提交⼆次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:⼆次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。⼆次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断⽹或者是⽣产者应⽤重启的特殊情况下,若服务端未收到发送者提交的⼆次确认结果,或服务端收到的⼆次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息⽣产者即⽣产者集群中任⼀⽣产者实例发起消息回查。
  6. ⽣产者收到消息回查后,需要检查对应消息的本地事务执⾏的最终结果。
  7. ⽣产者根据检查到的本地事务的最终状态再次提交⼆次确认,服务端仍按照步骤4对半事务消息进⾏处理。

消息重试

RocketMQ的消费者端,如果处理消息失败了,Broker是会将消息重新进⾏投送的。⽽在重试时,RocketMQ实际上会为每个消费者组创建⼀个对应的重试队列。重试的消息会进⼊⼀个“%RETRY%”+ConsumeGroup的队列中。
RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:
在这里插入图片描述
如果消息重试16次后仍然失败,消息将不再投递,转为进⼊死信队列。重试次数可以通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2⼩时。
如果消息超过最⼤重试次数,RocketMQ会将消息发送到死信队列。⼀个死信队列对应⼀个消费组。死信队列的默认权限为2(禁读)。如果需要处理死信队列的消息,需要把权限修改为6(可读可写后)消费该Topic的消息进行处理。队列中超过有效期(默认3天)的消息会被删除,不管有没有消费。

最佳实践

  1. ⼀个应⽤尽可能⽤⼀个Topic,⽽消息⼦类型则可以⽤tags来标识。tags过滤消息的性能很高,相当于索引。
  2. 消费端幂等控制:RocketMQ的每条消息都有⼀个唯⼀的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以⽤这个MessageId来作为判断幂等的关键依据。但是,这个MessageId是⽆法保证全局唯⼀的,也会有冲突的情况。所以在⼀些对幂等性要求严格的场景,最好是使⽤业务上唯⼀的⼀个标识⽐较靠谱。例如订单ID。⽽这个业务标识可以使⽤Message的Key来进⾏传递。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2371476.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python就业方向有哪些?

Python 作为一门通用、易学且功能强大的编程语言,在多个领域都有广泛的应用,因此就业方向也非常多样化。以下是 Python 主要的就业方向及相关技能要求。 1. Web 开发 岗位:Python Web 开发工程师、后端工程师、全栈工程师技术栈&#xff1a…

iptables 访问控制列表使用记录

iptables 是linux操作系统上自带的防火墙程序,功能强大,能够依据策略过滤掉一些恶意访问流量,本次记录一下iptables的常见使用方法,未尽之处,欢迎补充。 一、iptables 下载 我这里使用的是华为openEuler 22.03版本&am…

16. Qt系统相关:事件、定时器

1. Qt事件 1.1 简介 事件是应用程序内部或者外部产生的事情或者动作的统称。在Qt中使用一个对象来表示一个事件。所有的Qt事件均继承于抽象类QEvent。事件是由系统或者Qt平台本身在不同的时刻发出的。当用户按下鼠标、敲下键盘,或者是窗口需要重新绘制的时候&#…

云平台搭建

物联网云平台的基本概述 基本概念 随着物联网技术的快速发展,越来越多的设备需要接入网络以实现智能化功能,物联网平台应运而生。 物联网云平台(IoT Cloud Platform)是物联网生态系统中的核心组件,它通过提供一系列…

数学实验(Matlab语言环境和线性代数实验)

一、Matlab语言环境和线性代数实验 1.Matlab语言环境 Matlab简介 Matlab:Matrix Laboratry 矩阵实验室 Matlab 提供了强大的科学计算、灵活的程序设计流程、高质量的图形可视化与界面设计等功能,被广泛应用于科学计算、控制系统、信息处理等领域的分…

Elasticsearch 中的索引模板:如何使用可组合模板

作者:来自 Elastic Kofi Bartlett 探索可组合模板以及如何创建它们。 更多阅读: Elasticsearch:可组合的 Index templates - 7.8 版本之后 想获得 Elastic 认证吗?查看下一期 Elasticsearch Engineer 培训的时间! El…

【LeetCode 42】接雨水(单调栈、DP、双指针)

题面: 思路: 能接雨水的点,必然是比两边都低(小)的点。有两种思路,一种是直接计算每个点的最大贡献(也就是每个点在纵向上最多能接多少水),另一种就是计算每个点在横向上…

【JS逆向基础】前端基础-HTML与CSS

1,flask框架 以下是一个使用flask框架写成的serve程序 # noinspection PyUnresolvedReferences #Flash框架的基本内容from flask import Flask app Flask(__name__)app.route(/index) def index():return "hello index"app.route(/login) def login():re…

手机网页提示ip被拉黑名单什么意思?怎么办

‌当您使用手机浏览网页时,突然看到“您的IP地址已被列入黑名单”的提示,是否感到困惑和不安?这种情况在现代网络生活中并不罕见,但确实会给用户带来诸多不便。本文将详细解释IP被拉黑的含义、常见原因,并提供一系列实…

CCF编程能力等级认证 一级 第一次课

介绍 CCF 编程能力等级认证(GESP)为青少年计算机和编程学习者提供学业能力验证的规则和平台,由中国计算机学会发起并主办。 每年考试分四次,时间是每年的3月、6月、9月、12月,以当年每期公布的时间为准。 GESP适用年…

SpringBoot 讯飞星火AI WebFlux流式接口返回 异步返回 对接AI大模型 人工智能接口返回

介绍 用于构建基于 WebFlux 的响应式 Web 应用程序。集成了 Spring WebFlux 模块,支持响应式编程模型,构建非阻塞、异步的 Web 应用。WebFlux 使用了非阻塞的异步模型,能够更好地处理高并发请求。适合需要实时数据推送的应用场景。 WebClie…

Python爬虫中time.sleep()与动态加载的配合使用

一、动态加载网页的挑战 动态加载网页是指网页的内容并非一次性加载完成,而是通过JavaScript等技术在用户交互或页面加载过程中逐步加载。这种设计虽然提升了用户体验,但对于爬虫来说,却增加了抓取的难度。传统的爬虫方法,如简单…

AtCoder Beginner Contest 404 A-E 题解

还是ABC好打~比ARC好打多了&#xff08; 题解部分 A - Not Found 给定你一个长度最大25的字符串&#xff0c;任意输出一个未出现过的小写字母 签到题&#xff0c;map或者数组下标查询一下就好 #include<bits/stdc.h>using namespace std;#define int long long #def…

【mysql】常用命令

一 系统mysql用户密码查询 1、在工程目录如/usr/local/httpd/下的*.php中查找类似有db.inf的文件 以php为例。 2、在代码文件中确认有数据库连接的的功能实现 例如&#xff1a; $dbconf parse_ini_file(/usr/local/httpd/conf/db.inf); $link mysql_connect($dbconf[d…

macOS Arduino IDE离线安装ESP8266支持包

其实吧&#xff0c;本来用platformio也是可以的&#xff0c;不过有时候用Arduino IDE可能更快一些&#xff0c;因为以前一直是Arduino.app和Arduino IDE.app共存了一段时间&#xff0c;后来下决心删掉Arduino.app并升级到最新的Arduino IDE.app。删除了旧的支持板级支持包之后就…

网络靶场基础知识

一、网络靶场的核心概念 网络靶场&#xff08;Cyber Range&#xff09;是一种基于虚拟化和仿真技术的网络安全训练与测试平台&#xff0c;通过模拟真实网络环境和业务场景&#xff0c;为攻防演练、漏洞验证、安全测试和人才培养提供安全可控的实验空间。其核心目标是通过“虚实…

Python项目源码57:数据格式转换工具1.0(csv+json+excel+sqlite3)

1.智能路径处理&#xff1a;自动识别并修正文件扩展名&#xff0c;根据转换类型自动建议目标路径&#xff0c;实时路径格式验证&#xff0c;自动补全缺失的文件扩展名。 2.增强型预览功能&#xff1a;使用pandastable库实现表格预览&#xff0c;第三方模块自己安装一下&#x…

雷赛伺服电机

ACM0经济 编码器17位&#xff1a; ACM1基本 编码器23位磁编&#xff0c; ACM2通用 编码器24位光电&#xff0c; 插头定义&#xff1a;

【deepseek教学应用】001:deepseek如何撰写教案并自动实现word排版

本文讲述利用deepseek如何撰写教案并自动实现word高效完美排版。 文章目录 一、访问deepseek官网二、输入教案关键词三、格式转换四、word进一步排版 一、访问deepseek官网 官网&#xff1a;https://www.deepseek.com/ 进入主页后&#xff0c;点击【开始对话】&#xff0c;如…

CH32V208GBU6沁恒绑定配对获取静态地址

从事嵌入式单片机的工作算是符合我个人兴趣爱好的,当面对一个新的芯片我即想把芯片尽快搞懂完成项目赚钱,也想着能够把自己遇到的坑和注意事项记录下来,即方便自己后面查阅也可以分享给大家,这是一种冲动,但是这个或许并不是原厂希望的,尽管这样有可能会牺牲一些时间也有哪天原…