RabbitMQ详解

news2025/7/9 0:33:27

RabbitMQ详解

文章目录

  • RabbitMQ详解
    • 一、AMQP 和 JMS
      • 1.1 JMS
      • 1.2 AMQP
      • 1.3 两者对比
    • 二、RabbitMQ消息模型
      • 2.1 基本消息模型
      • 2.2 工作消息模型
      • 2.3 订阅模型
        • 2.3.1 Fanout 订阅模型
        • 2.3.2 Direct 订阅模型
        • 2.3.3 Topic 订阅模型
    • 三、消息持久化
      • 3.1 交换机持久化
      • 3.2 队列持久化
      • 3.3 签收机制
        • 3.3.1 生产者签收机制
          • 事务机制
          • Confirm 机制
        • 3.3.2 消费者签收机制
          • 自动签收
          • 手动签收

一、AMQP 和 JMS

1.1 JMS

JMS 是由 Sun 公司早期提出的消息标准,是一个 Java 平台中关于面向消息中间件的 API,旨在为 Java 应用提供统一的消息操作。

它已经成为 Java EE(Java Enterprise Edition)的一部分。其担任的角色类似于 JDBC,开发人员只需要根据相应的接口就可以和实现了JMS的服务进行通信。

JMS包含的角色如下:

RolesDescription
Provider(提供者)实现了 JMS 接口的消息服务器
Client(客户端)生产或消费消息的应用
Producer(生产者)消息生产者
Consumer(消费者)消息消费者
Message(消息)生产者和消费者传递的数据内容
Queue(队列)生产者存在待消费的消息的地方
Topic(主题)发布订阅模式下使用的 topic 模式

JMS 提供了两种消息模型,分别是 P2P(点对点)和 P/S(发布订阅)模型。

P2P
在这里插入图片描述

  • 使用队列作为消息通信载体
  • 每个队列可以有多个消费者,但每条消息只有一个消费者,即消息一旦被消费就被移出队列

P/S

  • 使用主题作为消息通信载体
  • 发布者发布一条消息,消息会通过主题传递给所有的订阅者,类似于广播。
    在这里插入图片描述

1.2 AMQP

AMQP(Advanced message queuing protocol)是一种协议,更准确的说是一种 binary wire-level protocol(链接协议)。这是其和 JMS 的本质差别,AMQP 不从 API 层进行限定,而是直接定义网络交换的数据格式。

其协议使得实现了 AMQP 的 Provider 天然性就是跨平台的。意味着我们可以使用 Java 的 AMQP Provider的同时使用一个 Python 的 Producer。可以将其与 Http 进行类比,不关心实现的语言,只要按照约定的数据格式发送报文,不同语言的 Client 和 Server 可以无障碍连接。

AMQP 在消息的生产者以及传递信息的队列之间引入了一种间接的机制:Exchange,实现了生产者和队列的解耦。生产者将信息发布到一个 Exchange 上,Exchange 会绑定一个或多个队列上,它负责将信息路由到队列上。

AMQP 包含的角色如下:

RolesDescription
Publisher(生产者)消息发送端
Consumer(消费者)消息接收端
Connection(连接)一个 TCP 长连接,生产者和消费者先与 RabbitMQ 建立连接
Channel(信道)在 Connection 的基础上建立的虚拟连接,解决多线程问题
Broker(中间件)实现 AMQP 实体服务,如 RabbitMQ
Virtual Host(虚拟主机)不同用户访问同一个 Broker 时,可以创建自己的 Virtual Host
Exchange(交换机)根据不同的分发规则将消息分发到不同的 Queue
Queue(队列)用来存放消息的队列
Binding(绑定)Exchange 和 Queue之间的关联,取两者的多对多关系

AMQP 四种不同的 Exchange

Exchange概念
Direct Exchange消息的 routing key 与 Binding 的 routing key 直接匹配时,消息到此队列
Topic Exchange消息的 routing key 与 Binding 的 routing key 符合通配符匹配时,消息到此队列
Headers Exchange消息的 headers 与 Binding 的 headers 符合匹配时,消息到此队列
Fanout Exchange消息会到所有队列上,无论消息的 routing key 和 headers 是什么

在这里插入图片描述

1.3 两者对比

JMSAMQP
定义Java API协议
跨语言
跨平台
传输类型P2P(端到端)和P/S(发布订阅)direct exchange
fanout exchange
topic change
headers exchange
消息类型StreamMessage(Java原始值的数据流)
MapMessage(一套名称-值对)
TextMessage(一个字符串对象)
ObjectMessage(一个序列化的 Java对象)
BytesMessage(一个字节的数据流)
Message (只有消息头和属性)
byte[](二进制)
实现ActiveMQRabbitMQ

二、RabbitMQ消息模型

2.1 基本消息模型

在这里插入图片描述
生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

队列只能存储在队列中,它只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。多个生产者可以发消息到同一个队列,多个消费者可以从一个队列接收数据。

消息一旦被消费者接收,队列中的消息就会被删除。如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是 RabbitMQ 无从得知,这样消息就丢失了!因此,RabbitMQ 有一个ACK 机制。当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。不过这种回执ACK 分两种情况:

自动 ACK:消息一旦被接收,消费者自动发送 ACK

手动 ACK:消息接收后,不会发送 ACK,需要手动调用

注意:

虽然图中没有出现 Exchange(交换机),但是本质上信息还是通过交换机传递的。

在不声明交换机的情况下,RabbitMQ 使用的是默认的交换机

2.2 工作消息模型

在这里插入图片描述

工作消息模型主要思想就是避免执行资源密集型任务时,必须等待它执行完成。

一个生产发送消息到队列,允许有多个消费者接收消息,但是一条消息只会被一个消费者获取。

如果多个消费者处理消息的时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置 prefetchCount 来限制 Queue 每次发送给每个消费者的消息数,比如我们设置 prefetchCount=1,则 Queue 每次给每个消费者发送一条消息;消费者处理完这条消息后 Queue 会再给该消费者发送一条消息。

2.3 订阅模型

2.3.1 Fanout 订阅模型

Fanout 又称广播。

在广播模式下:

  • 消费者监听自己的队列
  • 每个队列都有绑定的交换机
  • 生产者只声明交换机,发送的消息也只发送到交换机
  • 交换机把消息发送给绑定到该交换机下的所有队列
  • 消费者绑定队列和交换机时不再定义路由,只要和监听队列就会收到消息
    在这里插入图片描述

2.3.2 Direct 订阅模型

相较于广播模型,Direct 模型添加一个功能:只能订阅一部分消息。

例如在某些场景下,多个消费者需要分别监听生产者发布的部分消息,这时就需要 Direct 类型的交换机。

在该模型下,队形与交换机的绑定必须要指定一个 routing key(路由),同时生产者在向交换机发送消息时也必须指定消息的 routing key。

交换机收到消息之后会获取消息的路由,并在与自己绑定的队列中寻找绑定路由与消息的路由完全一致的队列,将消息传递给该队列。
在这里插入图片描述

2.3.3 Topic 订阅模型

相较于 Direct 模型,Topic 模型添加一个功能:更细致化的路由匹配规则。

路由一般是由一个或多个单词组成,单词之间以 “.” 分割。例如 good.insert、good.insert.success 等。

通配符规则:

‘#’:匹配一个或多个单词

‘*’:只能匹配一个单词

在这里插入图片描述

三、消息持久化

消息者的 ACK 机制可以防止消费者丢失消息。但是如果消息还没有被消息时 MQ 服务宕机了怎么办呢?

RabbitMQ 支持消息持久化,而将消息持仓化就意味着交换机和队列都要持久化。

3.1 交换机持久化

/**
 * 声明交换机
 *
 * @param exchange   交换机名
 * @param type       交换机类型 DIRECT、FANOUT、TOPIC、HEADERS
 * @param durable    是否持久化
 * @param autoDelete 自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除
 * @param internal   是否被 RabbitMQ 内部使用
 * @param arguments  自定义参数
 * @return
 */
DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments);

参数解析:boolean internal 是否被 RabbitMQ 内部使用

Internal 的意思是内部的意思,在交换机这里设置为“Yes”之后,表示当前 Exchange 是 RabbitMQ 内部使用,用户所创建的 Queue 不会消费该类型交换机下的消息,既然是为了 RabbitMQ 系统所用,作为用户,我们就没有必要创建该类型的 Exchange,当然默认也是选择 No。

如果对底层 MQ 进行封装的时候可以使用。

参数解析:Map<String, Object> arguments 自定义参数

key 如下,value可以自己定义:

  • x-message-ttl 发送到队列的消息在丢弃之前可以存活多长时间(毫秒)。

  • x-expires 队列在被自动删除(毫秒)之前可以使用多长时间。

  • x-max-length 队列在开始从头部删除之前可以包含多少就绪消息。

  • x-max-length-bytes 队列在开始从头部删除之前可以包含的就绪消息的总体大小。

  • x-dead-letter-exchange 设置队列溢出行为。这决定了在达到队列的最大长度时消息会发生什么。有效值为drop-head或reject-publish。交换的可选名称,如果消息被拒绝或过期,将重新发布这些名称。

  • x-dead-letter-routing-key 可选的替换路由密钥,用于在消息以字母为单位时使用。如果未设置,将使用消息的原始路由密钥。

  • x-max-priority 队列支持的最大优先级数;如果未设置,队列将不支持消息优先级。

  • x-queue-mode 将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用;如果未设置,队列将保留内存缓存以尽快传递消息。

  • x-queue-master-locator 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。

3.2 队列持久化

/**
 * 声明队列
 *
 * @param queue      队列名
 * @param durable    是否持久化
 * @param exclusive  排他队列
 * @param autoDelete 自动删除
 * @param arguments  自定义参数
 * @return
 */
DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);

参数解析:boolean exclusive 排他队列

如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。

  • 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创 建的排他队列的。
  • “首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的 排他队列的,这个与普通队列不同。
  • 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动 删除的。

这种队列适用于只限于一个客户端发送读取消息的应用场景。

参数解析:Map<String, Object> arguments 自定义参数

  • x-max-length:消息条数限制,该参数是非负整数值。先进先出原则,超过10条后面的消息会顶替前面的消息。
  • x-max-length-bytes:消息容量限制,该参数是非负整数值。该参数和 x-max-length 目的一样限制队列的容量,但是这个是靠队列大小(bytes)来达到限制。
  • x-message-ttl:消息存活时间,该参数是非负整数值。创建 Queue 时设置该参数可指定消息在该 Queue 中待多久,可根据 x-dead-letter-routing-key 和 x-dead-letter-exchange 生成可延迟的死信队列。
  • x-max-priority:消息优先级,创建 Queue 时声明优先级队列 。该参数应该是一个整数,表示队列应该支持的最大优先级。建议使用1到10之间。目前使用更多的优先级将消耗更多的资源(Erlang进程)。
  • x-expires:存活时间,该 Queue 会在 x-expires 到期后直接消息,哪怕里面有未消费的消息。
  • x-dead-letter-exchange 和 x-dead-letter-routing-key:在 x-message-ttl 时间到期后把消息放到x-dead-letter-routing-key 和 x-dead-letter-exchange 指定的队列中达到延迟队列的目的。

3.3 签收机制

3.3.1 生产者签收机制

RabbitMQ中对生产者提供了两种签收机制:事务机制和 Confirm 机制。

事务机制

RabbitMQ 中与事务机制有关的方法有三个:txSelect(),txCommit() 以及 txRollback()。

  • txSelect 用于将当前 channel 设置成 transaction 模式
  • txCommit 用于提交事务
  • txRollback 用于回滚事

在通过 txSelect 开启事务之后,我们便可以发布消息给 broker 代理服务器了,如果
txCommit 提交成功了,则消息一定到达了 broker 了,如果在 txCommit 执行之前 broker
异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过 txRollback 回滚事务了。

代码如下:

try {
    // 开启事务模式
    channel.txSelect();
    // 发送消息
    channel.basicPublish();
    // 模拟异常
    int num = 1/0;
    // 事务提交
    channel.txCommit();
} catch (Exception e) {
    // 事务回滚
    channel.txRollback();
    e.printStackTrack();
}
Confirm 机制

通过将 Channel 设置成 Confirm 模式来实现。需要注意,事务方式和 Confirm 两种模式不能共存。

生产者将信道设置成 Confirm 模式,一旦信道进入 Confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。

如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出。

broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

在编程中我们可以选择下面的几种编程方式:

  • 普通 confirm 模式:每发送一条消息后,调用 waitForConfirms() 方法,等待服务器端 confirm。实际上是一种串行 confirm 了。
  • 批量 confirm 模式:每发送一批消息后,调用 waitForConfirms() 方法,等待服务器端confirm。一旦出现 confirm 返回 false 或者超时的情况时,客户端需要将这一批次的消息全部重发。
  • 异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后 Client 端会回调这个方法。

3.3.2 消费者签收机制

对于消费者的消息签收有有自动和手动两种:自动签收和手动签收。

自动签收

不会管消息的处理即其它问题。类似于签收快递,自动签收则是将快递放入快递柜,至于后续,不管你是否接收,以及快递是否有问题(程序执行不通过),都不会有所反应。

手动签收

是由程序确认是否签收。即自己本人签收快递,即细致检查(程序处理)后,如果没有问题,则会确定签收,若有问题,可以拒签。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/9321.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

玩推特营销必知的基础常识上篇

现如今&#xff0c;Twitter已从一个让人们分享其想法的地方变成了一个强大的营销&#xff0c;该可让品牌实时与受众交流。每月活跃用户超过3.21亿&#xff0c;很容易看出为什么公司在所有这些时间之后仍继续使用Twitter。但是&#xff0c;仅仅偶尔发出有关趋势的话题&#xff0…

S标签肽,H2N-KETAAAKFERQHMDS-OH

S Tag Peptide 是一种合成的多肽&#xff0c;由15个氨基酸残基构成。S Tag Peptide is a 15 amino acid peptide derived from RNase A. 编号: 188345中文名称: 多肽标签S-tag 、S标签肽英文名: S-tag peptide单字母: H2N-KETAAAKFERQHMDS-OH三字母: H2N-Lys-Glu-Thr-Ala-Ala-A…

mysql 从入门到放弃— 数据库设计

之前我们已经了解了 mysql 的基本增删改查 mysql 从入门到放弃——基本约束以及语法 现在我们系统的进行一遍数据库的设计&#xff0c;建议收藏 ~~ 直接进入主题 来个例子&#xff1a;下面我们将围绕这个例子来进行数据库的设计 我们就来简单的模拟 大学教务处的选课 系统 …

vue-生成二维码【生成、点击输入框内叉号移除生成的二维码、输入框聚焦】

博主介绍 &#x1f4e2;点击下列内容可跳转对应的界面&#xff0c;查看更多精彩内容&#xff01; &#x1f34e;主页&#xff1a;水香木鱼 &#x1f34d;专栏&#xff1a;后台管理系统 文章目录 简介&#xff1a;这是一篇有关【vue-生成二维码【生成、点击输入框内叉号移除生成…

Cortex-M架构MCU位带操作最详细解析(主要以STM32为例,包括判断哪些MCU可用)

Cortex-M架构MCU位带操作最详细解析&#xff08;主要以STM32为例&#xff0c;包括判断哪些MCU可用&#xff09; 代码实践部分直接跳转&#xff1a; https://blog.csdn.net/weixin_53403301/article/details/125543844 一、位带操作理论及实践 位带操作的概念其实30年前就有了…

FlutterWeb性能优化探索与实践

美团外卖商家端基于 FlutterWeb 的技术探索已久,目前在多个业务中落地了App、PC、H5的多端复用,有效提升了产研的整体效率。在这过程中,性能问题是我们面临的最大挑战,本文结合实际业务场景进行思考,介绍美团外卖商家端在 FlutterWeb 性能优化上所进行的探索和实践,希望对…

【原生Ajax】全面了解xhr的概念与使用。

✍️ 作者简介: 前端新手学习中。 &#x1f482; 作者主页: 作者主页查看更多前端教学 &#x1f393; 专栏分享&#xff1a;css重难点教学 Node.js教学 从头开始学习 ajax学习 文章目录XHR的基本使用  什么是XHR  使用xhr发起GET请求  了解xhr对象的readyState属性  使…

婴儿肠绞痛怎么办?

婴儿肠绞痛的原因婴儿绞痛是婴儿生活中最难解释的现象之一。没有人知道为什么有些婴儿比其他婴儿绞痛更严重&#xff0c;但有很多关于绞痛的理论。其中一个原因可能是一些婴儿的消化系统更不成熟或更敏感。婴儿消化道中用于分解食物的消化酶或消化液很少&#xff0c;尤其是淀粉…

图像锐化一:几个常见的滤波核

文章目录1.滤波核2.代码3. 效果分析示例1.示例2.图像锐化和图像平滑相对应&#xff0c;前者用于增强细节表现&#xff0c;后者一般用于降噪在图像锐化时&#xff0c;往往会 1. 放大 噪声&#xff0c;2. 引入aritfact, 3. 振铃效应 等负面效果 因此需要分析相关锐化方法的效果和…

计算机里一半的部件是什么

解码器 在下图中&#xff0c;“a”和“b”是来自左侧的输入。它们都连接到NOT门。NOT门产生与其输入相反的输入。页面上有四条垂直线&#xff0c;分别来自“a”和“b”以及“a”与“b”的反数&#xff0c;因此&#xff0c;对于每个“a”和“b”&#xff0c;页面上都有两条导线&…

Jira—使用 JMX 接口进行实时监控

使用 JMX 接口进行实时监控 什么是 JMX? JMX(Java Management Extensions) 是一种用于监视和管理 Java 应用程序的技术。JMX 使用称为 MBean(托管 Bean)的对象来公开应用程序中的数据和资源。 对于 Jira Server 或 Jira Data Center 的大型实例,启用 JMX 可以让您更轻松…

知识蒸馏算法汇总

知识蒸馏有两大类&#xff1a;一类是logits蒸馏&#xff0c;另一类是特征蒸馏。logits蒸馏指的是在softmax时使用较高的温度系数&#xff0c;提升负标签的信息&#xff0c;然后使用Student和Teacher在高温softmax下logits的KL散度作为loss。中间特征蒸馏就是强迫Student去学习T…

文件上传漏洞实验-通过截取http请求绕过前端javascript验证进行文件上传

1、什么是文件上传漏洞 文件上传漏洞是指网络攻击者上传了一个可执行的文件到服务器并执行。这里上传的文件可以是木马&#xff0c;病毒&#xff0c;恶意脚本或者WebShell等。这种攻击方式是最为直接和有效的&#xff0c;部分文件上传漏洞的利用技术门槛非常的低&#xff0c;对…

如何使用AI图片清晰度增强器软件增强和锐化图片、提高照片清晰度并去除噪点

通过使用深度学习AI算法对照片进行批量锐化、去噪和去模糊处理&#xff0c;该程序可以应用再大部分照片和图片&#xff0c;包括徽标、卡通和动漫 可能很多朋友都会遇到需要批量增强和锐化照片的情况&#xff1a;例如&#xff0c;如果拍摄过程中曝光不足、夜晚噪点多或者画面模…

基于蚁群算法的TPS问题求解策略研究(Matlab代码实现)

&#x1f352;&#x1f352;&#x1f352;欢迎关注&#x1f308;&#x1f308;&#x1f308; &#x1f4dd;个人主页&#xff1a;我爱Matlab &#x1f44d;点赞➕评论➕收藏 养成习惯&#xff08;一键三连&#xff09;&#x1f33b;&#x1f33b;&#x1f33b; &#x1f34c;希…

Java面试干货:关于数组查找的几个常用实现算法

查找算法在我们的面试和开发中&#xff0c;是很常见的一种算法&#xff0c;今天我就给大家介绍几个常用的查找算法。 一. 线性查找 1.概念 线性查找也叫顺序查找&#xff0c;这是最基本的一种查找方法。该算法是从给定的值中进行搜索&#xff0c;从一端开始逐一检查每个元素…

华为堆叠技术讲解

目录 为什么出现堆叠 什么是堆叠 堆叠的特征 堆叠的优缺点 华为堆叠技术 框式交换机堆叠技术CSS CSS堆叠涉及的相关基础概念 主交换机选举过程 堆叠系统主备倒换 CSS两种堆叠口 CSS堆叠方式 CSS以太网链路聚合 本地优先转发 CSS双主检测 CSS版本升级 CSS堆叠实…

java和vue车辆管理系统车管所系统

简介 车辆管理系统车管所系统&#xff0c;管理员添加车主信息&#xff0c;车主提交自己的车辆信息&#xff0c;管理员审核车辆&#xff0c;对车辆行进年检&#xff0c;统计&#xff0c;记录车辆违规信息。车主可以查看自己的车辆信息、投诉、查看自己的违规记录等。 演示视频…

【附源码】计算机毕业设计JAVA客户台账管理

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis Maven Vue 等等组成&#xff0c;B/…

2021亚太杯C题全网最全解题思路+塞罕坝林场数据数据分享

全网绝对能获奖的免费思路&#xff01;&#xff01;&#xff01; 文章目录1.写在前面&#xff0c;需要塞罕坝林场数据的这里链接获取&#xff1a;2.C题全网最全解题思路1.写在前面&#xff0c;需要塞罕坝林场数据的这里链接获取&#xff1a; https://download.csdn.net/downlo…