RabbitMQ【直连、主题、扇形交换机实战】

news2025/7/18 21:17:19

目录

1. 直连交换机(Direct实战)

     provider生产者(publisher)

     consumer消费者

2. 主题交换机(Topic实战)

      provider生产者(publisher)

      consumer消费者

3. 扇形交换机(Fanout实战)

      provider生产者(publisher)

      consumer消费者


前言

   想学习RabbitMQ基础的请阅读下边博文链接

   RabbitMQ【基本使用】_JoneClassMate的博客-CSDN博客 


1.  直连交换机(Direct实战)

      provider生产者(publisher)

  • DirectConfig

package com.jmh.provider.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:02
 */
@Configuration
@SuppressWarnings("all")
public class DirectConfig {

    /**
     * 创建队列
     */
    @Bean
    public Queue directQueueA(){
        return new Queue("directQueueA",true);

    }
    @Bean
    public Queue directQueueB(){
        return new Queue("directQueueB",true);

    }
    @Bean
    public Queue directQueueC(){
        return new Queue("directQueueC",true);

    }

    /**
     * 创建交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }

    /**
     * 设置队列和交换机的绑定
     */
    @Bean
    public Binding bindingA(){
        return BindingBuilder.bind(directQueueA()).to(directExchange()).with("AA");
    }
    @Bean
    public Binding bindingB(){
        return BindingBuilder.bind(directQueueB()).to(directExchange()).with("BB");
    }
    @Bean
    public Binding bindingC(){
        return BindingBuilder.bind(directQueueC()).to(directExchange()).with("CC");
    }


}
  •  controller
package com.jmh.provider.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:08
 */
@RestController
@SuppressWarnings("all")
public class ProviderController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 直连交换机
     * @param key
     * @return
     */
    @RequestMapping("/directSend")
    public String directSend(String key){
        rabbitTemplate.convertAndSend("directExchange",key,"Hello World");
        return "yes";
    }



}

     consumer消费者

  • DirectReceiverA

package com.jmh.consumer.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:12
 */
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "directQueueA")
@Slf4j
public class DirectReceiverA {

    @RabbitHandler
    public void info(String msg){
        log.info("A接收到了"+msg);
    }

}
  •  DirectReceiverB
package com.jmh.consumer.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:12
 */
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "directQueueB")
@Slf4j
public class DirectReceiverB {

    @RabbitHandler
    public void info(String msg){
        log.info("B接收到了"+msg);
    }
}
  •  DirectReceiverC
package com.jmh.consumer.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:12
 */
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "directQueueC")
@Slf4j
public class DirectReceiverC {

    @RabbitHandler
    public void info(String msg){
        log.info("C接收到了"+msg);
    }
}

2. 主题交换机(Topic实战)

      provider生产者(publisher)

  • TopicConfig

package com.jmh.provider.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:02
 */
@Configuration
@SuppressWarnings("all")
public class TopicConfig {

    private static final String KEY_A="*.a.*";
    private static final String KEY_B="*.*.a";
    private static final String KEY_C="a.#";

    /**
     * 创建队列
     */
    @Bean
    public Queue topicQueueA(){
        return new Queue("topicQueueA",true);

    }
    @Bean
    public Queue topicQueueB(){
        return new Queue("topicQueueB",true);

    }
    @Bean
    public Queue topicQueueC(){
        return new Queue("topicQueueC",true);

    }

    /**
     * 创建交换机
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    /**
     * 设置队列和交换机的绑定
     */
    @Bean
    public Binding topicBindingA(){
        return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A);
    }
    @Bean
    public Binding topicBindingB(){
        return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B);
    }
    @Bean
    public Binding topicBindingC(){
        return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C);
    }


}
  •  controller
package com.jmh.provider.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:08
 */
@RestController
@SuppressWarnings("all")
public class ProviderController {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     * 主题交换机
     * @param key
     * @return
     */
    @RequestMapping("/topicSend")
    public String topicSend(String key){
        rabbitTemplate.convertAndSend("topicExchange",key,"Hello World");
        return "yes";
    }



}

      consumer消费者

  • TopicReceiverA

package com.jmh.consumer.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:12
 */
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "topicQueueA")
@Slf4j
public class TopicReceiverA {

    @RabbitHandler
    public void info(String msg){
        log.info("A接收到了"+msg);
    }

}
  •  TopicReceiverB
package com.jmh.consumer.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:12
 */
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "topicQueueB")
@Slf4j
public class TopicReceiverB {

    @RabbitHandler
    public void info(String msg){
        log.info("B接收到了"+msg);
    }

}
  •  TopicReceiverC
package com.jmh.consumer.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:12
 */
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "topicQueueC")
@Slf4j
public class TopicReceiverC {

    @RabbitHandler
    public void info(String msg){
        log.info("C接收到了"+msg);
    }

}

3. 扇形交换机(Fanout实战)

      provider生产者(publisher)

  • FanoutConfig

package com.jmh.provider.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:02
 */
@Configuration
@SuppressWarnings("all")
public class FanoutConfig {

    /**
     * 创建队列
     */
    @Bean
    public Queue fanoutQueueA(){
        return new Queue("fanoutQueueA",true);

    }
    @Bean
    public Queue fanoutQueueB(){
        return new Queue("fanoutQueueB",true);

    }
    @Bean
    public Queue fanoutQueueC(){
        return new Queue("fanoutQueueC",true);

    }

    /**
     * 创建交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 设置队列和交换机的绑定
     */
    @Bean
    public Binding fanoutBindingA(){
        return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutBindingB(){
        return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutBindingC(){
        return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
    }


}
  •  controller
package com.jmh.provider.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:08
 */
@RestController
@SuppressWarnings("all")
public class ProviderController {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     * 扇形交换机
     * @param key
     * @return
     */
    @RequestMapping("/fanoutSend")
    public String fanoutSend(){
        rabbitTemplate.convertAndSend("fanoutExchange",null,"Hello World");
        return "yes";
    }


}

      consumer消费者

  • FanoutConfig

package com.jmh.provider.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 蒋明辉
 * @data 2022/11/25 19:02
 */
@Configuration
@SuppressWarnings("all")
public class FanoutConfig {

    /**
     * 创建队列
     */
    @Bean
    public Queue fanoutQueueA(){
        return new Queue("fanoutQueueA",true);

    }
    @Bean
    public Queue fanoutQueueB(){
        return new Queue("fanoutQueueB",true);

    }
    @Bean
    public Queue fanoutQueueC(){
        return new Queue("fanoutQueueC",true);

    }

    /**
     * 创建交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 设置队列和交换机的绑定
     */
    @Bean
    public Binding fanoutBindingA(){
        return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutBindingB(){
        return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutBindingC(){
        return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/36076.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3.71 OrCAD新建原理图时,每一个类目的含义是什么?OrCAD软件怎么显示元器件的封装名称?

笔者电子信息专业硕士毕业,获得过多次电子设计大赛、大学生智能车、数学建模国奖,现就职于南京某半导体芯片公司,从事硬件研发,电路设计研究。对于学电子的小伙伴,深知入门的不易,特开次博客交流分享经验&a…

第四章:Java琐事

乐观锁和悲观锁悲观锁乐观锁八种案例演示synchronized到底锁的是什么字节码角度分析 Synchronizedsynchronized 同步代码块synchronized 同步方法为什么任意一个对象都可以是锁?公平锁和非公平锁为什么会有公平锁/非公平锁的设计?为什么默认是非公平&…

计算机毕业设计jspKTV点歌系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 JSP KTV点歌系统 是一套完善的web设计系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开 发,数据库为Mysql,使用jav…

Transformer Fusion for Indoor RGB-D Semantic Segmentation非官方自己实现的代码

声明:文章没有官方的代码,这里自己浅显的分析一下。 首先看一下encoder,就是swin transformer,假设RGB的维度为(1,3,480,480),Depth维度为(1,1,480,480)。维度分别变为**(1,64,120,120)—>(1,64,120,120)—>(1,128,60,60)—…

关于conda、虚拟环境、镜像通道、pip、pycharm解释器配置的一些总结

目录conda与虚拟环境相关命令查看当前存在哪些虚拟环境创建虚拟环境克隆虚拟环境删除指定虚拟环境删除指定虚拟环境中某个包设置国内镜像添加Anaconda的TUNA镜像添加USTC仓库镜像设置搜索时显示通道地址Conda 附加库查看channels恢复默认镜像(即删除手动配置的全部镜…

TCP/IP五层协议栈(1)

1.应用层协议 应用层协议相对来说比较简单,因为其他层的协议属于硬件上的.相对程序员来说已经固定了.不需要自己设计和实现了. 设计应用层协议有两个要点 要约定好传输的数据还要约定好数据的格式 1.1.协议模板 虽说可以自己设计,不过当前已经有很多模板被大佬设计好了.这…

51单片机语音进店迎宾器统计进店人数可定制播报铃声(可选PCB)

实践制作DIY- GC0115-语音进店迎宾器统计进店人数 一、功能说明: 基于51单片机设计-语音进店迎宾器统计进店人数 功能介绍: 硬件组成:STC89C52(AT89C51/52)单片机串口语音播报模块2个红外对射传感器一个按键&#x…

C++不知算法系列之高精度数值处理算法

1. 前言 什么是高精度数值处理算法? 高精度数值指因受限于计算机硬件的制约,超过计算机所能存储范围的数值。既然不能存储,更谈不上运算。 对此类数值的加、减、乘、除运算需要提供针对性的算法方能获取到结果。此类算法的设计思路因有别于…

VMware环境配置

文章目录一、环境配置1、修改主机名,然后切换到root用户2、确认主机网关a.确认windows主机网关b.确认虚拟机主机网关3、修改网络配置4、设置DNS域名解析的配置文件resolv.conf。5、修改hosts文件6、重启网络服务7、验证网络服务a.虚拟机ping百度b.主机ping虚拟机二、…

vscode使用restClient实现各种http请求

vscode使用restClient实现各种http请求 一,安装插件 首先,我们要在vscode的扩展中,搜索rest Client,然后安装它,这里我已经安装过了。 安装后,我们就可以使用rest client插件进行http各种操作了。 二&a…

MySQL---权限控制和用户、角色管理详解

目录 一、MySQL用户登录 二、用户管理 三、权限控制 四、角色管理 一、MySQL用户登录 一般在本机上我们的登录命令: mysql -u root -p密码这里介绍命令的作用: -u 指定用户名 -h 指定主机地址(默认为localhost) -p 指定用…

CHRONY - 时钟同步

一、同步公网时间源 安装chrony: yum install chrony -y 查看chrony的重要配置文件:rpm -ql chrony 修改chrony配置文件: vim /etc/chrony.conf 查看修改了的配置文件 egrep -v "^#|^$" /etc/chrony.conf

Baklib知识库|为什么知识共享工具对减少内部知识缺口至关重要

你的企业是否存在知识缺口? 知识缺口——没有对关键知识进行研究和记录,以有效地传播信息,并教育企业内外的用户——可能是寻求生产率最大化并最终实现利润增长的公司的一个关键缺陷。知识(或数据、关键信息等)是你的…

你一定要知道的四个程序员接外包的网站,悄悄把技术变现!

说起程序员接外包的网站,你在网上一搜发现数不胜数,但真正有用的却很少。然后你想快速的找到几个靠谱的网站,去看了看接外包的攻略,你会发现排雷的又数不胜数。一时间你还真不知道要选哪一个。 接下来就为大家推荐几个我认为比较…

165 pbi-utils 使用文档

165 pbi-utils 使用文档 一、背景 先来说一下为什么会有 pbi-utils 这个小工具吧。在我日常做演示的示例文件的时候,每次都要重新搞一次 Power BI Desktop,就想能不能像 PPT 模板那样,搞一个模板,然后更专心的去专注内容本身呢&…

网络规划.1.扩展.IP地址规划

第一章 IPv4 Internet中有数百万台以上的主机和路由器,IP 地址可以确切地标识它们。- 一台主机至少拥有一-个IP地址。任何两台主机的IP地址不能相同,但是允许一台主机拥有多个IP地址。如果一台计算机虽然也连入Internet, 使用Internet 的某些功能&#…

Android 开发中原始音频的录播和和自定义音频控制条的讲解及实战(超详细 附源码)

需要源码请点赞关注收藏后评论区留下QQ~~~ 一、原始音频的录播 语音通话功能要求实时传输,如果使用MediaRecorder与MediaPlayer组合,那么只能整句话都录完并编码好了才能传给对方去播放,这个时效性太差。 此时用到音频录制器AudioRecord与音…

[附源码]Python计算机毕业设计二手书交易系统

项目运行 环境配置: Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术: django python Vue 等等组成,B/S模式 pychram管理等等。 环境需要 1.运行环境:最好是python3.7.7,…

企业如何提供安全方面的投资回报率?

为什么增加在恢复方面的投资可以提高投资回报率? 所有企业都会认可安全的重要性,但在安全上的投入却经常令人迷惑。 一方面,由于安全威胁在不断变化,所以,安全建设维护需要长期持续大量投入。另一方面,长期大量投入后…

第6章 集成第3方依赖注入中间件“Autofac”

“Blog.Core-master”示例程序中接口及其具体实现类的注入操作,是通过第3方依赖注入中间件“Autofac”来以反射方式把Service.dll 程序集中所有接口及其具体实现类的实例依赖注入内置容器中。.Net(Core).x框架是中的内置依赖注入容器是不支持程序集注入的。 从最佳实…