springboot 针对rabbitmq多vhost情况处理

news2025/6/9 6:43:44

在这里插入图片描述

目录

  • 说明
  • 代码
    • 自定义rabbitmq的pom信息:
    • 重写one的连接工厂
    • 重写two的连接工厂:
    • 创建队列、交换机并绑定:
    • 消费者消费消息
    • 生产者发送消息

说明

需求场景:
项目中在已接入rabbitmq一个vhost的基础上,需要再引入多个vhost进行消息处理,spring原来的支持以及满足不了,所以自己要重写。

代码

自定义rabbitmq的pom信息:

spring:
  rabbitmq:
    one:
      host: xxxx
      port: 5672
      virtual-host: one
      username: xxxxx
      password: xxxx
      queue: xxx.xxx.xxx
      topic:
        exchange:
          name: xxx_xxx_xxx
    two:
      host: xxxx
      port: 5672
      virtual-host: two
      username: xxxx
      password: xxxxx
      queue: xxx.xxx.xxx
      topic:
        exchange:
          name: xxx_xxx_xxx

重写one的连接工厂

在连接多个 MQ 的情况下,需要在某个连接加上 @Primary 注解,表示主连接,默认使用这个连接,如果不加,服务会起不来

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Slf4j
@Configuration
/**
 * 重写连接工厂
 */
public class OneMQConfig {

    @Value("${spring.rabbitmq.one.host}")
    private String host;

    @Value("${spring.rabbitmq.one.port}")
    private int port;

    @Value("${spring.rabbitmq.one.username}")
    private String username;

    @Value("${spring.rabbitmq.one.password}")
    private String password;

    @Value("${spring.rabbitmq.one.virtual-host}")
    private String virtualHost;

    /**
     * 定义与one的连接工厂
     */
    @Bean(name = "oneConnectionFactory")
    @Primary
    public ConnectionFactory oneConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean(name = "oneRabbitTemplate")
    @Primary
    public RabbitTemplate oneRabbitTemplate(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate oneRabbitTemplate = new RabbitTemplate(connectionFactory);
        oneRabbitTemplate.setMandatory(true);
        oneRabbitTemplate.setConnectionFactory(connectionFactory);
        oneRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 确认消息送到交换机(Exchange)回调
             * @param correlationData
             * @param ack
             * @param cause
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("确认消息送到交换机(Exchange)结果:");
                log.info("相关数据:{}", correlationData);
                boolean ret = false;
                if (ack) {
                    log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId());
                    //下面可自定义业务逻辑处理,如入库保存信息等

                } else {
                    log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause);
                    //下面可自定义业务逻辑处理,如入库保存信息等

                }
            }
        });

        oneRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            /**
             * 只要消息没有投递给指定的队列 就触发这个失败回调
             * @param message  投递失败的消息详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 当时这个消息发给那个交换机
             * @param routingKey 当时这个消息用那个路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //获取消息id
                String messageId = message.getMessageProperties().getMessageId();
                // 内容
                String result = null;
                try {
                    result = new String(message.getBody(), "UTF-8");
                } catch (Exception e) {
                    log.error("消息发送失败{}", e);
                }
                log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result);
                //下面可自定义业务逻辑处理,如入库保存信息等
            }
        });
        return oneRabbitTemplate;
    }

    @Bean(name = "oneFactory")
    @Primary
    public SimpleRabbitListenerContainerFactory oneFactory(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory,
                                                           SimpleRabbitListenerContainerFactoryConfigurer configurer) {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name = "oneRabbitAdmin")
    @Primary
    public RabbitAdmin oneRabbitAdmin(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

重写two的连接工厂:


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class TwoMQConfig {

    @Value("${spring.rabbitmq.two.host}")
    private String host;

    @Value("${spring.rabbitmq.two.port}")
    private int port;

    @Value("${spring.rabbitmq.two.username}")
    private String username;

    @Value("${spring.rabbitmq.two.password}")
    private String password;

    @Value("${spring.rabbitmq.two.virtual-host}")
    private String virtualHost;

    /**
     * 定义与two的连接工厂
     */
    @Bean(name = "twoConnectionFactory")
    public ConnectionFactory twoConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean(name = "twoRabbitTemplate")
    public RabbitTemplate twoRabbitTemplate(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate twoRabbitTemplate = new RabbitTemplate(connectionFactory);
        twoRabbitTemplate.setMandatory(true);
        twoRabbitTemplate.setConnectionFactory(connectionFactory);
        twoRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 确认消息送到交换机(Exchange)回调
             * @param correlationData
             * @param ack
             * @param cause
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("确认消息送到交换机(Exchange)结果:");
                log.info("相关数据:{}", correlationData);
                boolean ret = false;
                if (ack) {
                    log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId());
                    //下面可自定义业务逻辑处理,如入库保存信息等

                } else {
                    log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause);
                    //下面可自定义业务逻辑处理,如入库保存信息等

                }
            }
        });

        twoRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            /**
             * 只要消息没有投递给指定的队列 就触发这个失败回调
             * @param message  投递失败的消息详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 当时这个消息发给那个交换机
             * @param routingKey 当时这个消息用那个路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //获取消息id
                String messageId = message.getMessageProperties().getMessageId();
                // 内容
                String result = null;
                try {
                    result = new String(message.getBody(), "UTF-8");
                } catch (Exception e) {
                    log.error("消息发送失败{}", e);
                }
                log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result);
                //下面可自定义业务逻辑处理,如入库保存信息等
            }
        });
        return twoRabbitTemplate;
    }

    @Bean(name = "twoFactory")
    public SimpleRabbitListenerContainerFactory twoFactory(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory,
                                                           SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name = "twoRabbitAdmin")
    public RabbitAdmin twoRabbitAdmin(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

创建队列、交换机并绑定:


import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * 创建队列、交换机并绑定
 */
@Configuration
public class QueueConfig {
    @Resource(name = "oneRabbitAdmin")
    private RabbitAdmin oneRabbitAdmin;

    @Resource(name = "twoRabbitAdmin")
    private RabbitAdmin twoRabbitAdmin;

    @Value("${spring.rabbitmq.one.queue}")
    private String oneOutQueue;

    @Value("${spring.rabbitmq.one.queue}")
    private String oneRoutingKey;

    @Value("${spring.rabbitmq.two.queue}")
    private String twoOutQueue;

    @Value("${spring.rabbitmq.two.queue}")
    private String twoRoutingKey;

    @Value("${spring.rabbitmq.one.topic.exchange.name}")
    private String oneTopicExchange;

    @Value("${spring.rabbitmq.two.topic.exchange.name}")
    private String twoTopicExchange;

    @PostConstruct
    public void oneRabbitInit() {
        //声明交换机
        oneRabbitAdmin.declareExchange(new TopicExchange(oneTopicExchange, true, false));
        //声明队列
        oneRabbitAdmin.declareQueue(new Queue(oneOutQueue, true, false, false));
        //绑定队列及交换机
        oneRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(oneOutQueue, true, false, false))
                .to(new TopicExchange(oneTopicExchange, true, false))
                .with(oneRoutingKey + ".#"));
    }

    @PostConstruct
    public void twoRabbitInit() {
        //声明交换机
        twoRabbitAdmin.declareExchange(new TopicExchange(twoTopicExchange, true, false));
        //声明队列
        twoRabbitAdmin.declareQueue(new Queue(twoOutQueue, true));
        //绑定队列及交换机
        twoRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(twoOutQueue, true, false, false))
                .to(new TopicExchange(twoTopicExchange, true, false))
                .with(twoRoutingKey));
    }
}

消费者消费消息

不同vhost换containerFactory里面的工厂和queue

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ziyun.iot.event.model.mysql.event.entity.UserEvent;
import com.ziyun.iot.event.model.mysql.event.mapper.EventMapper;
import com.ziyun.iot.event.model.mysql.event.mapper.UserEventMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;


@Slf4j
@Service
public class AddUserConsumer {

    @RabbitListener(queues = "${spring.rabbitmq.two.queue}", containerFactory = "twoFactory")
    public void addUserConsumer(Message message, Channel channel) {
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("收到rabbitmq消息: {}", body);
        try {
            //业务逻辑

                    //手动确认消息已经被消费
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                    log.info("新增用户:{} 消费消息成功:{}。", ssoIds, message.toString());
                } else {
                    log.info("新增用户:{} 消费消息失败:{}。", ssoIds, message.toString());
                }
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

}

生产者发送消息

不同vhost换交换机和队列

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Slf4j
@Service
public class SendMessage {

    @Resource(name = "oneRabbitTemplate")
    private RabbitTemplate oneRabbitTemplate;

    @Resource(name = "twoRabbitTemplate")
    private RabbitTemplate twoRabbitTemplate;

    @Value("${spring.rabbitmq.one.topic.exchange.name}")
    private String oneTopicExchange;

    @Value("${spring.rabbitmq.two.topic.exchange.name}")
    private String twoTopicExchange;

    @Value("${spring.rabbitmq.one.queue}")
    private String oneRoutingKey;

    @Value("${spring.rabbitmq.two.queue}")
    private String twoRoutingKey;

    public void sendToOneMessage(String msg) {
        oneRabbitTemplate.convertAndSend(oneTopicExchange, oneRoutingKey, msg);
    }

    public void sendToTwoMessage(String msg) {
        twoRabbitTemplate.convertAndSend(twoTopicExchange, twoRoutingKey, msg);
    }
}

就先说到这 \color{#008B8B}{ 就先说到这} 就先说到这
在下 A p o l l o \color{#008B8B}{在下Apollo} 在下Apollo
一个爱分享 J a v a 、生活的小人物, \color{#008B8B}{一个爱分享Java、生活的小人物,} 一个爱分享Java、生活的小人物,
咱们来日方长,有缘江湖再见,告辞! \color{#008B8B}{咱们来日方长,有缘江湖再见,告辞!} 咱们来日方长,有缘江湖再见,告辞!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/334367.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

线性杂双功能peg化试剂——HS-PEG-COOH,Thiol-PEG-Acid

英文名称:HS-PEG-COOH,Thiol-PEG-Acid 中文名称:巯基-聚乙二醇-羧基 HS-PEG-COOH是一种含有硫醇和羧酸的线性杂双功能聚乙二醇化试剂。它是一种有用的带有PEG间隔基的交联或生物结合试剂。巯基或SH、巯基或巯基选择性地与马来酰亚胺、OPSS、…

基于“python+”潮汐、风驱动循环、风暴潮等海洋水动力模拟实践技术

ADCIRC是新一代海洋水动力计算模型,它采用了非结构三角形网格广义波动连续方程的设计,在提高计算精确度的同时还减小了计算时间。被广泛应用于:模拟潮汐和风驱动的循环、预测风暴潮和洪水和近岸海上作业等。除此之外,ADCIRC也是我…

MySQL 存储引擎

MySQL内部展示图 分为三个层次 客户端 mysqlserver 存储引擎 我认为与之前先的UBD三层相类似 UI端传数据 B端逻辑处理 数据库端进行执行方面进行类似 存储引擎的分类 mySQL 5.5 版本采用InnoDB 为默认引擎(生成.frm和.ibd文件) 给大家介绍其他存储引…

I/O多路复用

基础概念 Socket 套接字。百科:对网络中不同主机上的应用进程之间进行双向通信的端点的抽象。 例子1:客户端将数据通过网线发送到服务端,客户端发送数据需要一个出口,服务端接收数据需要一个入口,这两个“口子”就是…

【面试题】2023 Vue面试题 高频考题

大厂面试题分享 面试题库 后端面试题库 (面试必备) 推荐:★★★★★ 地址:

【微服务】Elasticsearch数据聚合自动补全数据同步(四)

🚗Es学习第四站~ 🚩Es学习起始站:【微服务】Elasticsearch概述&环境搭建(一) 🚩本文已收录至专栏:微服务探索之旅 👍希望您能有所收获 在第二站的学习中,我们已经导入了大量数据到es中&…

IBM AIX 升级Openssh 实现篇(编译安装)

升级成功佐证 !!!本文所有内容仅作参考,请在测试环境中具体测试完毕后才能应用于生产环境!!! [1]备份和恢复方案 开启telnet 服务,防止ssh 掉线后无法重连维护。在修复漏洞后关闭telnet。 备份该服务相关的所有文件,以便恢复。 root@TEST:/etc# vi inetd.conf #ftp…

原型图设计软件哪个好用?6款好用软件推荐

原型图软件列表 1、墨刀-极简超快的移动应用原型工具 2、ProcessOn-在线作图工具,你不用装 Visio 了 3、摩客-简洁高效的原型图设计工具 4、xiaopiu-国内优雅高效的在线 APP 原型工具 5、Axure-老牌原型工具,8.0 开始对响应式设计做了更好的支持&…

ONES 支持多项信创适配,打造自主可控的国产化平台

近日,ONES 顺利通过麒麟软件 NeoCertify、华为鲲鹏技术、达梦数据库的兼容性测试认证,至此,ONES 已完成国产操作系统、国产 CPU、国产数据库的多维度适配,成为目前唯一支持信创的研发管理平台,这标志着 ONES 在自主可控…

聚类分析--基本原理、方法(Kmeans,层次聚类)

文章目录聚类分析的定义基本原理商业应用场景聚类分析步骤聚类分析方法层次分析法/系统聚类法(小样本)提问:如何选择合适的分类结果K-means疑问:聚类分析的定义 聚类分析就是将研究对象根据一些特征指标,把比较相似的…

ubuntu qt程序无法输入中文 QLineEdit输入框无法切换输入法

目录一、问题描述二、解决思路三、步骤描述一、问题描述 测试软件在运行时无法通过键盘快捷键切换中文输入法,主要原因为qt应用程序没成功加载到输入法插件。 本文的以测试程序demo为例,进行过程展示,demo名字为“test-chinese-lineedit”。…

stream流处理初识

stream流处理初识 java8中的集合支持stream方法, 它会返回一个流(java.util.stream.Stream)IDEA集成的工具查看流式链过程: 流的操作 : 流的概念: java8中的集合支持stream方法,它会返回一个流(java.util.stream.Stream) 元素序列: 就像集…

【数据结构】优先级队列----堆

优先级队列----堆优先级队列堆堆的创建堆的插入:堆的删除:PriorityQueue的特性PriorityQueue的构造与方法优先级队列 优先级队列: 不同于先进先出的普通队列,在一些情况下,优先级高的元素要先出队列。而这种队列需要提…

开启Openharmony 开发之旅

之前因为太懒,所以很少写博客。最近做了一年的鸿蒙开发。想记录下,故开始写点东西,作为学习和开发笔记吧!先分享几个开源鸿蒙的学习网站。 1.开源鸿蒙官网 OpenAtom OpenHarmonyhttps://docs.openharmony.cn/pages/v3.1/zh-cn/a…

《MySQL系列-InnoDB引擎19》文件-日志文件-二进制日志

日志文件 日志文件记录了影响MySQL数据库的各种类型活动。MySQL数据库中常见的日志文件有: 错误日志(error log)二进制日志(bilog)慢查询日志(slow query log)查询日志(log) 这些日志文件可以帮助DBA对MySQL数据库的运行状态进行诊断,从而更好的进行数…

INTx中断机制源码分析

INTx中断机制源码分析 文章目录INTx中断机制源码分析参考资料:一、 配置空间二、 扫描设备时分配中断号三、 使用INTx中断四、 PCIe中断树五、 PCIe INTx中断映射过程5.1 PCIe控制器支持的中断5.2 PCIe控制器注册中断5.3 PCIe设备中断号的分配5.3.1 IRQ domain5.3.2…

Java源码程序设计-房屋出租管理系统设计与实现

摘 要系统设计系统实现开发环境摘 要 随着我国市场经济的快速发展和人们生活水平的不断提高,简单的房屋出租服务已经不能满足人们的需求。如何利用先进的管理手段,提高房屋出租的管理水平,是当今社会所面临的一个重要课题。 本文采用结构化…

Win10系统电脑开机后总是蓝屏无法使用怎么办?

Win10系统电脑开机后总是蓝屏无法使用怎么办?电脑开机的时候出现了蓝屏问题,这个情况是我们的电脑系统不兼容导致的。遇到这个问题一般是需要去进行系统的重装来解决,安装一个更兼容的系统就可以解决问题了。一起来看看详细的解决方法分享吧。…

前端学习第八站——CSS定位和装饰

目录 一、定位 1.1 网页常见布局方式 1.2 定位的常见应用场景 2.1 定位初体验 2.2 使用定位 3.1 静态定位 4.1 相对定位 5.1 绝对定位 6.1 子绝父相 7. 固定定位 8.1 定位的层级关系 8.2 更改定位元素的层级 9.总结 二、装饰 1.1 了解基线 1.2 文字对齐问…

安卓小游戏:小板弹球

安卓小游戏:小板弹球 前言 这个是通过自定义View实现小游戏的第三篇,是小时候玩的那种五块钱的游戏机上的,和俄罗斯方块很像,小时候觉得很有意思,就模仿了一下。 需求 这里的逻辑就是板能把球弹起来,球…