RabbitMQ中的Publish-Subscribe模式

news2025/7/13 9:25:33

在现代分布式系统中,消息队列(Message Queue)是实现异步通信和解耦系统的关键组件。RabbitMQ 是一个功能强大且广泛使用的开源消息代理,支持多种消息传递模式。其中,Publish/Subscribe(发布/订阅)模式是一种常见且重要的模式,它允许消息发布者将消息广播给多个订阅者。

本文将深入探讨 RabbitMQ 中的 Publish/Subscribe 模式,包括其工作原理、实现方式、适用场景以及最佳实践。


1. Publish/Subscribe 模式简介

1.1 什么是 Publish/Subscribe 模式?

Publish/Subscribe(发布/订阅)模式是一种消息传递模式,它将消息的发送者(发布者)和接收者(订阅者)解耦。发布者将消息发布到一个交换机(Exchange),而订阅者通过绑定到交换机的**队列(Queue)**来接收消息。

与点对点模式(如工作队列)不同,Publish/Subscribe 模式允许多个订阅者接收相同的消息,从而实现消息的广播。

在这里插入图片描述

1.2 核心概念

在 RabbitMQ 中,Publish/Subscribe 模式依赖以下核心组件:

  • 发布者(Publisher):发送消息的客户端。
  • 交换机(Exchange):接收发布者发送的消息,并根据规则将消息路由到队列。
  • 队列(Queue):存储消息的缓冲区。
  • 订阅者(Subscriber):从队列中消费消息的客户端。
  • 绑定(Binding):定义交换机和队列之间的关系。

2. Publish/Subscribe 模式的工作原理

2.1 交换机的作用

在 RabbitMQ 中,消息不会直接发送到队列,而是发送到交换机。交换机根据绑定规则将消息路由到相应的队列。

RabbitMQ 提供了多种类型的交换机,其中最常用的是:

  • Fanout 交换机:将消息广播到所有绑定到它的队列,忽略路由键(Routing Key)。
  • Direct 交换机:根据消息的路由键将消息路由到匹配的队列。
  • Topic 交换机:支持更复杂的路由规则,允许使用通配符匹配路由键。
  • Headers 交换机:根据消息的头部属性进行路由。

在 Publish/Subscribe 模式中,通常使用 Fanout 交换机,因为它能够将消息广播到所有绑定的队列。

2.2 消息的广播过程

  1. 发布者将消息发送到交换机。
  2. 交换机接收到消息后,将消息广播到所有绑定的队列。
  3. 订阅者从队列中消费消息。

3. Java 实现 Publish/Subscribe 模式

以下是使用 Java 和 RabbitMQ Java Client 实现 Publish/Subscribe 模式的完整示例。

3.1 添加依赖

在 Maven 项目中,添加 RabbitMQ Java Client 依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

3.2 创建发布者(Publisher)

发布者负责将消息发送到交换机。以下是发布者的代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Publisher {
    private static final String EXCHANGE_NAME = "publisher_subscriber";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.138");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明一个 Fanout 交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            // 发布消息
            String message = "Hello, Subscribers!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

3.3 创建订阅者(Subscriber)

订阅者负责从队列中消费消息。以下是订阅者的代码:

import com.rabbitmq.client.*;

import java.nio.charset.StandardCharsets;

public class Subscriber {
    private static final String EXCHANGE_NAME = "publisher_subscriber";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.138");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明一个 Fanout 交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 创建一个临时队列,并绑定到交换机
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 定义消息处理函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };

        // 开始消费消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

3.4 运行示例

  • 启动多个订阅者,在不同的终端窗口中运行多个订阅者实例

    启动多个订阅者后,能在RabbitMQ终端页面,能看到多个临时的队列,但交换机只有一个publisher_subscriber

在这里插入图片描述

在这里插入图片描述

  • 启动发布者,在另一个终端窗口中运行发布者
3.4.1 观察输出

所有订阅者都会收到发布者发送的消息。例如:

发布者输出:

 [x] Sent 'Hello, Subscribers!'

订阅者输出:

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello, Subscribers!'

在这里插入图片描述


4. 代码解析

4.1 发布者代码解析

  • 连接工厂ConnectionFactory 用于创建到 RabbitMQ 服务器的连接。
  • 交换机声明channel.exchangeDeclare(EXCHANGE_NAME, "fanout") 声明一个 Fanout 交换机。
  • 消息发布channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)) 将消息发送到交换机。

4.2 订阅者代码解析

  • 临时队列channel.queueDeclare().getQueue() 创建一个非持久化的、独占的临时队列。
  • 队列绑定channel.queueBind(queueName, EXCHANGE_NAME, "") 将队列绑定到交换机。
  • 消息处理DeliverCallback 定义了如何处理接收到的消息。
  • 消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }) 开始消费消息。

5. Publish/Subscribe 模式的适用场景

5.1 日志记录

在分布式系统中,日志记录是一个常见的需求。使用 Publish/Subscribe 模式,可以将日志消息广播给多个日志处理器,分别将日志写入文件、数据库或发送到监控系统。

5.2 实时通知

在社交网络或即时通讯应用中,可以使用 Publish/Subscribe 模式向多个用户发送实时通知。例如,当用户发布新动态时,通知所有关注者。

5.3 分布式缓存更新

在分布式缓存系统中,当缓存数据更新时,可以使用 Publish/Subscribe 模式通知所有缓存节点同步更新。

5.4 事件驱动架构

在事件驱动架构中,Publish/Subscribe 模式用于实现事件的广播。例如,当用户注册成功时,发布一个事件,通知多个服务(如邮件服务、积分服务)执行相应的操作。


6. 最佳实践

6.1 使用持久化

为了确保消息不会丢失,建议将交换机和队列设置为持久化。例如:

channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
channel.queueDeclare("my_queue", true, false, false, null);

6.2 处理消息确认

在生产环境中,建议启用消息确认机制,确保消息被成功消费。例如:

channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });

6.3 避免消息积压

在高并发场景下,可能会出现消息积压的情况。可以通过设置队列的最大长度或使用**死信队列(DLX)**来处理积压的消息。

6.4 监控和报警

使用 RabbitMQ 的管理界面或监控工具(如 Prometheus + Grafana)监控消息队列的状态,并设置报警规则,及时发现和解决问题。


7. 总结

Publish/Subscribe 模式是 RabbitMQ 中一种强大且灵活的消息传递模式,适用于需要将消息广播给多个订阅者的场景。通过使用 Fanout 交换机,可以轻松实现消息的广播,同时结合持久化、消息确认和监控机制,可以构建高可靠性的分布式系统。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2260351.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

专业140+总分410+浙江大学842信号系统与数字电路考研经验浙大电子信息与通信工程,真题,大纲,参考书。

考研落幕&#xff0c;本人本中游211&#xff0c;如愿以偿考入浙江大学&#xff0c;专业课842信号系统与数字电路140&#xff0c;总分410&#xff0c;和考前多次模考预期差距不大&#xff08;建议大家平时做好定期模考测试&#xff0c;直接从实战分数中&#xff0c;找到复习的脉…

Unity类银河战士恶魔城学习总结(P178 Archer s arrow 弓箭手的箭)

【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili 教程源地址&#xff1a;https://www.udemy.com/course/2d-rpg-alexdev/ 本章节制作了一个弓箭手的箭 Arrow_Controller.cs 1.OnTriggerEnter2D方法 功能&#xff1a;检测箭矢与其他对象的碰撞。逻辑&#xff1…

后端接受前端传递数组进行批量删除

问题描述&#xff1a;当我们需要做批量删除功能的时候&#xff0c;我们循环单次删除的接口也能进行批量删除&#xff0c;但要删除100条数据就要调用100次接口&#xff0c;或者执行100次sql&#xff0c;这样系统开销是比较大的&#xff0c;那么我们直接采用接收的数组格式数据sq…

ByteCTF2024

wp参考&#xff1a; 2024 ByteCTF wp 2024 ByteCTF WP- Nepnep ByteCTF 2024 writeup by Arr3stY0u 五冠王&#xff01;ByteCTF 2024 初赛WriteUp By W&M ByteCTF 2024 By W&M - W&M Team ByteCTF Re WP - 吾爱破解 - 52pojie.cn 2024 ByteCTF - BediveRe_R…

Envoy 服务发现原理大揭秘与核心要点概述

1 Envoy动态配置介绍 动态资源&#xff0c;是指由envoy通过xDS协议发现所需要的各项配置的机制&#xff0c;相关的配置信息保存 于称之为管理服务器&#xff08;Management Server &#xff09;的主机上&#xff0c;经由xDS API向外暴露&#xff1b;下面是一个 纯动态资源的基…

转盘抽奖功能(附加代码)

写在开头 上期代码主要实现PC端电商网站商品放大效果&#xff0c;本期就来实现积分随机抽奖效果&#xff0c;开发久了很多功能都是通过框架组件库来完成&#xff0c;但是如果组件满足不了开发需求&#xff0c;还需要开发人员手动封装组件&#xff0c;专门出这样一期文章&#x…

【CSS in Depth 2 精译_075】12.2 Web 字体简介 + 12.3 谷歌字体的用法

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第四部分 视觉增强技术 ✔️【第 12 章 CSS 排版与间距】 ✔️ 12.1 间距设置 12.1.1 使用 em 还是 px12.1.2 对行高的深入思考12.1.3 行内元素的间距设置 12.2 Web 字体 ✔️12.3 谷歌字体 ✔️12.…

ARM嵌入式学习--第七天(GPT)

GPT -介绍 GPT有一个32位向上计数器&#xff0c;定时计数器值可以使用外部引脚上的事件捕获到寄存器中&#xff0c;捕获触发器可以被编程为上升沿和下降沿。GPT还可以在输出比较引脚上生成事件&#xff0c;并在定时器达到编程值时产生中断。GPT有一个12位预分频器&#xff0c;…

搭建Tomcat(一)---SocketServerSocket

目录 引入1 引入2--socket 流程 Socket&#xff08;应用程序之间的通讯保障&#xff09; 网卡(计算机之间的通讯保障) 端口 端口号 实例 client端 解析 server端 解析 相关方法 问题1&#xff1a;ServerSocket和Socket有什么关系&#xff1f; ServerSocket Soc…

SpringBoot快速使用

一些名词的碎碎念: 1> 俩种网络应用设计模式 C/S 客户端/服务器 B/S 浏览器/服务器 俩者对比: 2> 集群和分布式的概念 集群: 分布式: 例子: 一个公司有一个人身兼多职 集群: 招聘N个和上面这个人一样身兼多职 分布式: 招聘N个人,分担上面这个人的工作,进行工作的拆分. 工…

【含开题报告+文档+PPT+源码】基于SpringBoot的开放实验管理平台设计与实现

开题报告 设计开放实验管理平台的目的在于促进科学研究与教学的融合。传统实验室常常局限于特定地点和时间&#xff0c;而开放平台可以为学生、教师和研究人员提供一个便捷的交流与共享环境。通过在线平台&#xff0c;他们可以分享实验资源、交流经验&#xff0c;从而促进科学…

分布式 漏桶算法 总结

前言 相关系列 《分布式 & 目录》《分布式 & 漏桶算法 & 总结》《分布式 & 漏桶算法 & 问题》 概述 简介 LBA Leaky Bucket Algorithm 漏桶算法是一种流行于网络通信领域的流量控制/频率限制算法。漏桶算法的核心原理是通过一个概念上的“漏桶”来…

linux glances vs top

一、安装 apt-get install glances glances top显示效果&#xff1a;

CTF知识集-PHP特性

title: CTF知识集-PHP特性 写在开头可能会用到的提示 call_user_func 调用的函数可以不区分大小写preg_match过滤存在长度溢出&#xff0c;长度超过100w检测失效。str_repeat(‘show’,250000); 生成100w个字符preg_match是无法处理数组的&#xff0c;例如:preg_match( n u m…

Hadoop运行Mapreduce问题集锦——Ubuntu虚拟机配置

一、端口访问问题 问题描述 运行任务前一直重连。具体来说&#xff0c;错误发生在尝试从czs-virtual-machine虚拟机的127.0.1.1地址连接到同一台机器的8032端口时&#xff0c;连接被拒绝。 如下&#xff1a; 2024-11-17 23:05:45,800 INFO retry.RetryInvocationHandler: java…

【经验分享】搭建本地训练环境知识点及方法

最近忙于备考没关注&#xff0c;有次点进某小黄鱼发现首页出现了我的笔记还被人收费了 虽然我也卖了一些资源&#xff0c;但我以交流、交换为主&#xff0c;笔记都是免费给别人看的 由于当时刚刚接触写的并不成熟&#xff0c;为了避免更多人花没必要的钱&#xff0c;所以决定公…

流程引擎Activiti性能优化方案

流程引擎Activiti性能优化方案 基于关系型数据库层面优化 MySQL建表语句优化 Activiti在MySQL中创建默认字符集为utf8&#xff08;即utf8mb3&#xff09;格式&#xff0c;本文将默认字符集设置为utf8mb4&#xff0c;排序规则为utf8mb4_general_ci&#xff0c;并修改变量等类…

Unix 传奇 | 谁写了 Linux | Unix birthmark

注&#xff1a;本文为 “左耳听风”陈皓的 unix 相关文章合辑。 皓侠已走远&#xff0c;文章有点“年头”&#xff0c;但值得一阅。 文中部分超链已沉寂。 Unix 传奇 (上篇) 2010 年 04 月 09 日 陈皓 了解过去&#xff0c;我们才能知其然&#xff0c;更知所以然。总结过去…

TimerPickerDialog组件的用法

文章目录 1 概念介绍2 使用方法3 示例代码我们在上一章回中介绍了Snackbar Widget相关的内容,本章回中将介绍TimePickerDialog Widget.闲话休提,让我们一起Talk Flutter吧。 1 概念介绍 我们在这里说的TimePickerDialog是一种弹出窗口,只不过窗口的内容固定显示为时间,它主…

大模型系列4--开源大模型本地部署到微调(WIP)

背景 一直想真正了解大模型对硬件资源的需求&#xff0c;于是准备详细看一篇视频&#xff0c;将核心要点总结记录下。本文内容参考视频&#xff1a;保姆级教程&#xff1a;6小时掌握开源大模型本地部署到微调&#xff0c;感谢up主 训练成本 训练 > 微调 > 推理训练GPT…