(九)RabbitMQ交换机(Exchange)

news2025/8/3 23:47:56

交换机Exchange

  • 1、交换机
    • 1.1. Exchanges 概念
    • 1.2. Exchanges 的类型
    • 1.3. 无名exchange(默认交换机)
  • 2、临时队列
  • 3、绑定(bindings)
  • 4、Fanout(发布/订阅)
  • 5、Direct exchange、
  • 6、Topics
    • 在这里插入图片描述

1、交换机

1.1. Exchanges 概念

RabbitMQ 消息传递模型的核心思想:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。

1.2. Exchanges 的类型

  • 直接(direct),
  • 主题(topic)
  • 标题(headers)
  • 扇出(fanout)

1.3. 无名exchange(默认交换机)

也就是推送消息时,交换机的参数是 “ ”,这里用的就是默认的交换机

channel.basicPublish("", queueName, null, msg.getBytes());

2、临时队列

也就是队列名称随机产生,一旦我们断开了消费者的连接,队列将被自动删除,也没有持久化

//创建随机队列
String queueName = channel.queueDeclare().getQueue();

3、绑定(bindings)

概念 :binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和哪个队列进行了绑定关系。

在这里插入图片描述

4、Fanout(发布/订阅)

将接收到的所有消息广播到它绑定的所有队列中,也就是关于routingKey他不管。系统中油默认 exchange 类型

在这里插入图片描述

不管是生产者还是消费者声明的交换机,只要谁先启动就只有一个

生产者

package com.feng.fanoutExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:26
 * @Version 1.0
 * @Description fanout模式生产者
 */
public class EmitLog {
    //交换机名称
    public static final String EXCHANGE_NAME = "logs";`在这里插入代码片`

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String msg = scanner.next();
            //设置消息持久化
            channel.basicPublish(EXCHANGE_NAME,"", null,msg.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+msg);
        }

    }
}

消费者1

package com.feng.fanoutExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:08
 * @Version 1.0
 * @Description 消息消费者1
 */
public class ReceiveLogs01 {
    //交换机名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机,因为是fanout所以不用声明交换机也行
        //channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //声明一个临时队列
        /**
         * 1、队列名是随机取得
         * 2、连接断开之后队列就删除了
         */
        String queueName = channel.queueDeclare().getQueue();//返回队列名
        //交换机绑定队列,这里因为是 Fanout模式,所以路由key其实并不重要,可以随便取
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        channel.basicConsume(queueName, true, (String consumerTag, Delivery message) -> {
            System.out.println("01接收消息是:" + new String(message.getBody(), "UTF-8"));
        }, consumerTag -> {
        });
    }
}

消费者2

package com.feng.fanoutExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:08
 * @Version 1.0
 * @Description 消息消费者1
 */
public class ReceiveLogs02 {
    //交换机名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机,因为是fanout所以不用声明交换机也行
        //channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //声明一个临时队列
        /**
         * 1、队列名是随机取得
         * 2、连接断开之后队列就删除了
         */
        String queueName = channel.queueDeclare().getQueue();//返回队列名
        //交换机绑定队列,这里因为是 Fanout模式,所以路由key其实并不重要,可以随便取
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        channel.basicConsume(queueName, true, (String consumerTag, Delivery message) -> {
            System.out.println("02接收消息是:" + new String(message.getBody(), "UTF-8"));
        }, consumerTag -> {
        });
    }
}

5、Direct exchange、

队列只对它绑定的交换机的消息感兴趣。也就是由routingKey决定交换机发送到哪个队列

代码实现

示意图
在这里插入图片描述
console队列

package com.feng.directExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:45
 * @Version 1.0
 * @Description direct 模式交换机消费者1
 */
public class ReceiveLogsDirect01 {
    //交换机名称
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机,因为是fanout所以不用声明交换机也行
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明一个队列
        channel.queueDeclare("console",false,false,false,null);
        //交换机绑定队列,这里因为是 direct,所以路由key得绑定
        channel.queueBind("console", EXCHANGE_NAME, "info");
        channel.queueBind("console", EXCHANGE_NAME, "warning");
        //接收消息
        channel.basicConsume("console", true, (String consumerTag, Delivery message) -> {
            System.out.println("console队列接收消息是:" + new String(message.getBody(), "UTF-8"));
        }, consumerTag -> {
        });
    }
}

disk队列

package com.feng.directExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:45
 * @Version 1.0
 * @Description direct 模式交换机消费者2
 */
public class ReceiveLogsDirect02 {
    //交换机名称
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机,因为是fanout所以不用声明交换机也行
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明一个队列
        channel.queueDeclare("disk",false,false,false,null);
        //交换机绑定队列,这里因为是 direct,所以路由key得绑定
        channel.queueBind("disk", EXCHANGE_NAME, "error");
        //接收消息
        channel.basicConsume("disk", true, (String consumerTag, Delivery message) -> {
            System.out.println("disk队列接收消息是:" + new String(message.getBody(), "UTF-8"));
        }, consumerTag -> {
        });
    }
}

发送方

package com.feng.directExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:58
 * @Version 1.0
 * @Description   direct 模式交换机生产者
 */
public class DirectLogs {
    //交换机名称
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT );

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String msg = scanner.next();
            //这里想给谁发,就指定对应得RountingKey就行
            channel.basicPublish(EXCHANGE_NAME,"error", null,msg.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+msg);
        }

    }
}

6、Topics

topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。比如说:“stock.usd.nyse”, “nyse.vmw”,有两个替换符,
*(星号) 可以代替一个单词
#(井号) 可以替代零个或多个单词

缺点:就是速度慢,需要匹配
在这里插入图片描述

特殊情况

  • 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

代码实现

绑定情况

在这里插入图片描述

消费者1

package com.feng.topicExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/25 14:03
 * @Version 1.0
 * @Description Topic交换机消费者01
 */
public class ReceiveLogsTopic01 {

    //交换机名称
    private static final String EXCHANGE_NAME = "topic_logs";
    //队列名
    private static final String QUEUE_NAME = "Q1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
        //消费消息
        channel.basicConsume(QUEUE_NAME,true,(String consumerTag, Delivery message) -> {
            System.out.println(QUEUE_NAME+"接收消息是:" + new String(message.getBody(), "UTF-8"));
            System.out.println("绑定的路由是:"+message.getEnvelope().getRoutingKey());
        }, consumerTag -> {});
    }
}

消费者2

package com.feng.topicExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/25 14:03
 * @Version 1.0
 * @Description Topic交换机消费者02
 */
public class ReceiveLogsTopic02 {

    //交换机名称
    private static final String EXCHANGE_NAME = "topic_logs";
    //队列名
    private static final String QUEUE_NAME = "Q2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
        //消费消息
        channel.basicConsume(QUEUE_NAME,true,(String consumerTag, Delivery message) -> {
            System.out.println(QUEUE_NAME+"接收消息是:" + new String(message.getBody(), "UTF-8"));
            System.out.println("绑定的路由是:"+message.getEnvelope().getRoutingKey());
        }, consumerTag -> {
        });
    }
}

生产者

package com.feng.fanoutExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:26
 * @Version 1.0
 * @Description fanout模式生产者
 */
public class EmitLog {
    //交换机名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String msg = scanner.next();
            //设置消息持久化
            channel.basicPublish(EXCHANGE_NAME,"", null,msg.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+msg);
        }

    }
}

曾出现的错误 ShutdownSignalException

在这里插入图片描述

由于自己之前将队列绑定交换机写错了,导致队列没有绑定交换机,所以报错

在这里插入图片描述

  • exchangeBind:这个是用于交换机的绑定,可以交换机绑队列,或者交换机绑交换机
  • queueBind:这个是队列的绑定,用于绑定交换机

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/34712.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

学生HTML个人网页作业作品 基于HTML+CSS+JavaScript明星个人主页(15页)

🎉精彩专栏推荐👇🏻👇🏻👇🏻 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 💂 作者主页: 【主页——🚀获取更多优质源码】 🎓 web前端期末大作业…

0105 蓝桥杯真题08

/* * 中国古代文献中,曾记载过“大衍数列”, 主要用于解释中国传统文化中的太极衍生原理。 * 它的前几项是:0、2、4、8、12、18、24、32、40、50 ... * 其规律是:对偶数项,是序号平方再除2,奇数项,是序号…

【RT-Thread Studio更新】英飞凌 PSOC62-IFX-PROTO-KIT 开发环境搭建指南

本文将介绍在 RT-Thread Studio 上如何基于 PSOC62-IFX-PROTO-KIT 开发板搭建开发环境进行开发、烧录、调试功能。开发环境搭建步骤1、PSOC62-IFX-PROTO-KIT 开发板资源包安装打开Studio,点击工具栏上的SDK管理器在Board_Support_Packages 找到 Infineon 下的 PSOC6…

TOWER 成就徽章 NFT 系列介绍——TOWER 生态系统的第一个灵魂通证(SBT)

2022 年 7 月,团队推出了成就徽章 NFT 系列,记录每个成员在 TOWER 生态系统中的努力。这是第一个不可转让的灵魂 NFT 系列(SBT),代表了每个玩家的独特身份。 关于灵魂通证(SBT) 以太坊联合创始人…

力扣(LeetCode)809. 情感丰富的文字(C++)

模拟 分析单词可扩张条件 : 对于某个字母&#xff0c;设目标字母长度 c1c1c1 &#xff0c;待扩张字母长度 c2c2c2 当 c1<c2c1<c2c1<c2&#xff0c;目标字母比待扩张字母少&#xff0c;false 当 c1≥c2c1\ge c2c1≥c2&#xff0c;目标字母比待扩张字母多或者相等&…

大数据开发——Hive实战案例

文章目录1. 创建表结构1.1 视频表结构1.2 用户表结构2. 准备工作2.1 创建临时表2.2 创建最终使用表2.3 对创建表进行解读3. 业务分析1. 创建表结构 1.1 视频表结构 1.2 用户表结构 2. 准备工作 2.1 创建临时表 由于使用的是orc方式进行存储&#xff0c;所以我们需要建立一个…

OpenFlow协议原理及基本配置-网络测试仪实操

一、OpenFlow协议原理 1.OpenFlow技术背景 ●转发和控制分离是SDN网络的本质特点之一。在SDN网络架构中&#xff0c;控制平面与转发平面分离&#xff0c;网络的管理和状态在逻辑上集中到一起&#xff0c;底层的网络基础从应用中独立出来&#xff0c;由此&#xff0c;网络获得…

不知道照片上怎么文字翻译成英文?来看看这篇文章

不知道你们在遇到看不懂的英文图片时&#xff0c;是不是和以前的我一样&#xff0c;一个一个的把图片内容输到翻译软件里&#xff0c;然后再进行翻译&#xff0c;其实这种办法不仅费时还费力&#xff0c;而且一旦遇到其它的外文就彻底没辙了&#xff0c;那怎么办呢&#xff1f;…

[附源码]java毕业设计音乐交流平台

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

我做数画ai绘画教程日赚过千,良心分享给想兼职赚钱的人

ai绘画能实现日赚过千&#xff0c;你信吗&#xff1f; 现在什么是风口&#xff1f;当然是ai绘画。而AI绘画里&#xff0c;什么最受欢迎&#xff0c;不用说&#xff0c;自然是二次元。 然后&#xff0c;很多人一拥而上&#xff0c;都去拍自己的二次元照片&#xff0c;然后在各…

10000m3d城镇生活污水处理工艺设计

目 录 1前 言 1 1.1 设计任务 2 1.2 设计目的 2 1.3 设计要求 2 1.4设计的数据以及资料 2 1.5 处理程度的计算 3 2 总体设计 5 2.1工艺比较的选择 5 2.2设计流量 8 2.2.1 设计规模 8 2.2.2 设计最大流量 8 2.3 格栅的设计计算 8 2.3.1格栅的作用及种类 8 2.3.2格栅的设计原则 8…

Linux系统编程(一)——环境搭建

准备写系统的总结Linux系统的一些知识以及Linux系统编程。这一篇先讲Linux搭建常用的开发环境。 目录 0x01 Linux开发环境搭建 一、远程链接操作 0x02 GCC 一、安装 二、了解GCC 0x03 静态库的制作及使用 一、库的介绍 二、静态库 0x04 动态库的制作和使用 一、配置…

Python遥感开发之批量掩膜和裁剪

Python遥感开发之批量掩膜和裁剪1.使用arcpy进行批量掩膜1.1 批量掩膜代码1.2 单个掩膜代码2.使用GDAL进行批量掩膜3.使用rasterio进行批量裁剪前言&#xff1a;主要介绍了使用arcpy、gdal、rasterio对遥感影像进行批量掩膜和裁剪。 1.使用arcpy进行批量掩膜 注意&#xff1a;…

Spring Boot——日志文件

文章目录1.日志的作用2.日志的用法3.自定义日志打印日志格式的说明4.日志级别5.在配置文件中设置日志级别5.1设置全局的日志级别和局部文件夹的日志级别6. 日志持久化7. 更简单的日志输出-lombok1.日志的作用 日志的作用&#xff1a;用来排除和定位问题 通过日志还可以具有以…

Greenplum数据库故障排查及修复

场景一&#xff1a;gp服务正常&#xff0c;存在部分segment实例丢失 1、异常现象 主节点切换gpadmin用户输入gpstate查看状态 如果红色框内有指向左边的箭头则说明存在部分segment实例丢失。 2、排查思路 首先查看主节点日志&#xff0c;重点关注发生segment丢失那段时间的…

【ML特征工程】第 3 章 :文本数据:扁平化、过滤和分块

&#x1f50e;大家好&#xff0c;我是Sonhhxg_柒&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流&#x1f50e; &#x1f4dd;个人主页&#xff0d;Sonhhxg_柒的博客_CSDN博客 &#x1f4c3; &#x1f381;欢迎各位→点赞…

关于SD-WAN的十问十答(最强攻略戳这里)

1. WAN和SD-WAN之间的区别&#xff1f; 从底层来看&#xff0c;相较基于硬件物理设施的WAN&#xff0c;SD-WAN是一种覆盖现有网络的软件技术&#xff0c;是部署在物理基础设施下层的流量管理网络。 和常规WAN相比&#xff0c;SD-WAN具有虚拟WAN体系结构和软件驱动技术&#xff…

[附源码]java毕业设计圆梦山区贫困学生助学系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

EasyRecovery适用于Windows和Mac的专业硬盘恢复软件

电脑中的数据文件对很多的小伙伴来说都是非常重要的&#xff0c;在下载安装新的软件设备时都需要非常谨慎&#xff0c;一旦碰到一些问题就可能会导致文件丢失&#xff0c;想要恢复这些文件并不是很容易&#xff0c;需要使用专业的数据恢复工具才可以对其进行恢复&#xff0c;那…

考研数据结构填空题整合_做题版

考研数据结构填空题整合 目录考研数据结构填空题整合一、ZYL组ZYL组一ZYL组二ZYL组三ZYL组四ZYL组五ZYL组六ZYL组七ZYL组八二、TJP组TJP组一TJP组二TJP组三三、LZH组LZH 组一LZH 组二LZH 组三LZH 组四LZH 组五LZH 组六LZH 组七四、LB组LB组一LB组二LB组三LB组四LB组五LB组六LB组…