【RabbitMQ 实战】09 客户端连接集群生产和消费消息

news2025/7/10 11:47:45

一、部署一个三节点集群

下面的链接是最快最简单的一种集群部署方法
3分钟部署一个RabbitMQ集群
上的的例子中,没有映射端口,所以没法从宿主机外部连接容器,下面的yml文件中,暴露了端口。
每个容器应用都映射了宿主机的端口,分别是5602,5612,5622
docker compse文件如下

version: '3'

services:
  stats:
    image: bitnami/rabbitmq
    environment:
      - RABBITMQ_NODE_TYPE=stats
      - RABBITMQ_NODE_NAME=rabbit@stats
      - RABBITMQ_ERL_COOKIE=s3cr3tc00ki3
    ports:
      - '15672:15672'
      - '5602:5672'
    volumes:
      - 'rabbitmqstats_data:/bitnami/rabbitmq/mnesia'
  queue-disc1:
    image: bitnami/rabbitmq
    environment:
      - RABBITMQ_NODE_TYPE=queue-disc
      - RABBITMQ_NODE_NAME=rabbit@queue-disc1
      - RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats
      - RABBITMQ_ERL_COOKIE=s3cr3tc00ki3
    ports:
      - '5612:5672'
    volumes:
      - 'rabbitmqdisc1_data:/bitnami/rabbitmq/mnesia'
  queue-ram1:
    image: bitnami/rabbitmq
    environment:
      - RABBITMQ_NODE_TYPE=queue-ram
      - RABBITMQ_NODE_NAME=rabbit@queue-ram1
      - RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats
      - RABBITMQ_ERL_COOKIE=s3cr3tc00ki3
    ports:
      - '5622:5672'
    volumes:
      - 'rabbitmqram1_data:/bitnami/rabbitmq/mnesia'

volumes:
  rabbitmqstats_data:
    driver: local
  rabbitmqdisc1_data:
    driver: local
  rabbitmqram1_data:
    driver: local

通过docker-compose up命令,就可以启动三个集群的容器了

[root@localhost mycompose]# docker-compose up

二、配置文件

原来的单节点只配置host和port,现在集群节点,就要配置addresses了,如下所示:

server:
  port: 8080
spring:
  application:
    name: rabbitmq-demo
  #配置rabbitMq 服务器
  rabbitmq:
#单节点直接可以写host和port
#    host: 192.168.56.201
#    port: 5672
    #集群连接写ip和端口
    addresses: 192.168.56.202:5602,192.168.56.202:5612,192.168.56.202:5622
    username: user
    password: bitnami
    #虚拟host
    virtual-host: virtual01
    template:
      mandatory: true #当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息
    publisher-confirm-type: correlated #生产者回调确认机制,由回调来确定消息是否发布成功
    publisher-returns: true #是否开启生产者returns
    listener:
      simple:
        acknowledge-mode: manual #手动回复方式,一般建议手动回复,即需要我们自己调用对应的ACK方法
        prefetch: 10 #每个消费者可拉取的,还未ack的消息数量
        concurrency: 3 #消费端(每个Listener)的最小线程数
        max-concurrency: 10 #消费端(每个Listener)的最大线程数

三、代码

生产者

和单节点的发送和消费代码一致,没有变化

@Slf4j
@RestController
@RequestMapping("/rabbit")
public class RabbitSendController implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    private static final String EXCHANGE_NAME = "my_exchange";
    private static final String ROUTING_KEY = "my_routing";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 正常发送并被broker接收
     * @return
     */
    @RequestMapping("send")
    public String send() {
        for (int i = 0; i < 10; i++) {
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setAddress("成都市高新区");
            orderInfo.setOrderId(String.valueOf(i));
            orderInfo.setProductName("华为P60:" + i);

            //设置回调关联的一个id
            String messageId = UUID.randomUUID().toString();
            log.info("开始发送消息,当前消息关联id为:{}", messageId);
            CorrelationData correlationData = new CorrelationData(messageId);

            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8))
                    .andProperties(messageProperties).build();
            //设置ack回调
            rabbitTemplate.setConfirmCallback(this);
            //退回消息的回调
            rabbitTemplate.setReturnCallback(this);
            rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData);
        }
        return "ok";
    }

    /**
     * 设置一个非法的路由键,模拟消息被broker退回的情况,前提是
     * spring.rabbitmq.template.mandatory=true 当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息
     * <p>
     * spring.rabbitmq.publisher-returns=true 生产者回调确认机制,由回调来确定消息是否发布成功
     *
     * @return
     */
    @RequestMapping("send-return")
    public String sendAndReturn() {
        OrderInfo orderInfo = new OrderInfo();
        orderInfo.setAddress("成都市高新区");
        orderInfo.setOrderId("111");
        orderInfo.setProductName("小米13");

        //设置回调关联的一个id
        String messageId = UUID.randomUUID().toString();
        log.info("开始发送消息,当前消息关联id为:{}", messageId);
        CorrelationData correlationData = new CorrelationData(messageId);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8))
                .andProperties(messageProperties).build();
        //设置ack回调
        rabbitTemplate.setConfirmCallback(this);
        //退回消息的回调
        rabbitTemplate.setReturnCallback(this);
        //下面这个RoutingKey是没有绑定的,所以发不出去
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "error.routing", message, correlationData);
        return "ok";
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (correlationData == null) {
            return;
        }
        String messageId = correlationData.getId();
        if (ack) {
            log.info("【confirm回调方法】,消息发布成功,messageId={}", messageId);
        } else {
            log.info("【confirm回调方法】,消息发布失败,messageId={}", messageId);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("【returnedMessage回调方法】,消息被退回,message={},replyCode:{},replyText:{},exchange:{},routingKey:{}",
                new String(message.getBody()), replyCode, replyText, exchange, routingKey);

    }
}

消费者

@Slf4j
@Component
public class RabbitOrderConsumer {
    private static final String EXCHANGE_NAME = "my_exchange";
    private static final String QUEUE_NAME = "my_queue";
    private static final String ROUTING_KEY = "my_routing";

    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_NAME, type = "topic", durable = "true"), key = ROUTING_KEY)})
    public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        //上面这个tag是这么写的么,为什么每次传过来都是1?导致channel被重新创建
        log.info("接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), tag);
        channel.basicAck(tag, false);
    }
}

访问地址:http://localhost:8080/rabbit/send,然后就可以发送消息了,输出日志如下:

开始发送消息,当前消息关联id为:18049efe-a624-4288-a8f0-9c28fd776773
开始发送消息,当前消息关联id为:83d93f90-62f4-41cf-af02-03d496812561
开始发送消息,当前消息关联id为:f83257b2-95b6-408e-a5b9-74d0ec9f30b0
开始发送消息,当前消息关联id为:16a7e471-23ba-408b-9095-6add9ad1e270
开始发送消息,当前消息关联id为:152b0fb0-3a22-452d-93fe-662252c2fd8c
开始发送消息,当前消息关联id为:ade4f703-6075-485f-8e34-ec9b95bf59de
开始发送消息,当前消息关联id为:e4511f82-476a-4f4c-b704-4399baadeaf4
接收到消息:{"orderId":"1","productName":"华为P60:1","address":"成都市高新区"},deliveryTag:1
接收到消息:{"orderId":"0","productName":"华为P60:0","address":"成都市高新区"},deliveryTag:1
开始发送消息,当前消息关联id为:d8cd2dd6-bb9e-4d46-bc42-0d96df70748f
开始发送消息,当前消息关联id为:76950a93-5887-43c1-adef-edc1e29e2fab
开始发送消息,当前消息关联id为:f08a7a68-60da-4c5d-b1b8-c9e4d9453969
【confirm回调方法】,消息发布成功,messageId=18049efe-a624-4288-a8f0-9c28fd776773
【confirm回调方法】,消息发布成功,messageId=83d93f90-62f4-41cf-af02-03d496812561
接收到消息:{"orderId":"3","productName":"华为P60:3","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"2","productName":"华为P60:2","address":"成都市高新区"},deliveryTag:1
接收到消息:{"orderId":"6","productName":"华为P60:6","address":"成都市高新区"},deliveryTag:3
接收到消息:{"orderId":"5","productName":"华为P60:5","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"9","productName":"华为P60:9","address":"成都市高新区"},deliveryTag:4
接收到消息:{"orderId":"4","productName":"华为P60:4","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"7","productName":"华为P60:7","address":"成都市高新区"},deliveryTag:3
接收到消息:{"orderId":"8","productName":"华为P60:8","address":"成都市高新区"},deliveryTag:3
【confirm回调方法】,消息发布成功,messageId=f83257b2-95b6-408e-a5b9-74d0ec9f30b0
【confirm回调方法】,消息发布成功,messageId=16a7e471-23ba-408b-9095-6add9ad1e270
【confirm回调方法】,消息发布成功,messageId=152b0fb0-3a22-452d-93fe-662252c2fd8c
【confirm回调方法】,消息发布成功,messageId=ade4f703-6075-485f-8e34-ec9b95bf59de
【confirm回调方法】,消息发布成功,messageId=e4511f82-476a-4f4c-b704-4399baadeaf4
【confirm回调方法】,消息发布成功,messageId=d8cd2dd6-bb9e-4d46-bc42-0d96df70748f
【confirm回调方法】,消息发布成功,messageId=76950a93-5887-43c1-adef-edc1e29e2fab
【confirm回调方法】,消息发布成功,messageId=f08a7a68-60da-4c5d-b1b8-c9e4d9453969

上述代码仓库:https://gitee.com/syk1234/mqdmo

四、后台管理

登录管理后台页面:http://192.168.56.202:15672/
在这里插入图片描述

共有三个节点,两个磁盘节点,一个内存节点。如果你还不清楚什么是磁盘节点,什么是内存节点,可以参考【RabbitMQ 实战】08 集群原理剖析

查看连接情况,发现是连接的是节点rabbit@stats节点在这里插入图片描述
查看队列的情况,队列是在rabbit@stats节点上
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1065432.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jupyter notebook代码自动换行,超过一行长度自动换行,不用左右滑动

效果如下: 步骤 1.打开cmd&#xff0c;输入jupyter --config-dir找到jupyter notebook的位置 2.打开jupyter所在位置&#xff0c;进入nbconfig文件夹 3.打开notebook.json 4.输入以下代码 "MarkdownCell": {"cm_config": {"lineWrapping": t…

寻找下个图文爆款“潜力股”!图文返现热潮涌动,看看他们怎么做?

近半年&#xff0c;抖音电商挂车图文日均发布次数增长5倍&#xff0c;日均GMV增长10倍以上……凭借着低成本、高转化、快变现等特点&#xff0c;图文带货已成为抖音电商带货体裁的“新热门”&#xff0c;且流量规模及成交效率仍在持续增长。 为鼓励电商作者把握机遇、积极创作…

3D目标检测实战 | KITTI数据集可视化详解(附Python实现)

目录 1 概述2 点云可视化2.1 原始点云2.2 3D检测框点云2.3 点云鸟瞰视图BEV 3 图像可视化3.1 原始图像3.2 2D检测框图像3.3 3D检测框图像3.4 点云-图像对齐 1 概述 KITTI数据集是一个广泛被用于研究和开发自动驾驶和计算机视觉算法的公开数据集&#xff0c;其数据格式详解请参…

[HNCTF 2022 WEEK2]easy_unser - 反序列化+wakeup绕过+目录绕过

题目代码&#xff1a; <?php include f14g.php;error_reporting(0);highlight_file(__FILE__);class body{private $want,$todonothing "i cant get you want,But you can tell me before I wake up and change my mind";public function __construct($want){…

一文讲清楚网络安全是什么?网络安全工程师需要学什么?就业前景如何?

前言 什么是网络安全&#xff1f; 网络安全是指网络系统的硬件、软件及其系统中的数 据受到保护&#xff0c;不因偶然的或者恶意的原因而遭受到破坏、 更改、泄露&#xff0c;系统连续可靠正常地运行&#xff0c;网络服务不中断。 网络安全是指通过各种技术、措施和行为来保…

解决Ubuntu系统字体太小的问题

使用一个工具即可 sudo apt install gnome-tweaks安装成功之后&#xff1a; 在终端输入下面的命令 gnome-tweaks 这里可以将缩放比例调大&#xff0c;整个字体就放大了&#xff01;

MySQL数据库入门到精通——进阶篇(2)

黑马程序员 MySQL数据库入门到精通——进阶篇&#xff08;2&#xff09; 1. SQL优化1.1 插入数据1.2 主键优化1.3 order by优化1.4 group by优化1.5 limit优化1.6 count优化1.7 update优化 2. 视图2.1 视图-介绍及基本语法2.2 视图-检查选项(cascaded)2.3 视图-检查选项(local)…

MATLAB学习

前言 MATLAB是“MATrix LABoratory”的缩写&#xff0c;它是由美国Mathworks公司于1984年推出的一种科学计算软件。 语言及其特点 1.功能强大 (1)运算功能强大。MATLAB是以复数矩阵为基本编程单元的程序设计语言其强大的运算功能使其成为世界顶尖的数学应用软件之一。 (2)功能…

自动化测试 selenium+Junit 总结知识

文章目录 Selenium 自动化测试什么是selenium&#xff1f;selenium的原理是什么&#xff1f;Selenium 自动化测试的流程是什么&#xff1f;Selnium还有一些其他的操作 Selenium 如何定位动态元素&#xff1f; Junit 测试框架注解断言执行顺序测试套件参数化单参数多参数动态参数…

ELFK(filebeat)部署

部署环境 主机名ip地址主要软件系统node1192.168.154.70ElasticSearh、KibanaCentos7.5node2192.168.154.60ElasticSearhCentos7.5Apache192.168.154.50Logstash、ApacheCentos7.5Filebeat192.168.154.40FilebeatCentos7.5 Node1节点上安装Filebeat #上传软件包 filebeat-6.2…

jmeter,性能测试,Locust

一。性能测试的概念 1.性能&#xff1a;就是软件质量属性中的 “ 效率 ” 特性 2.效率特性&#xff1a; 时间特性&#xff1a;指系统处理用户请求的响应时间 资源特性&#xff1a;指系统在运行过程中&#xff0c;系统资源的消耗情况 CPU 内存 磁盘IO&#xff08;磁盘的写…

【Spark学习笔记】- 4运行架构核心组件核心概念

目录标题 1 运行架构2 核心组件2.1 Driver2. 2 Executor2. 3 Master & Worker2. 4 ApplicationMaster 3 核心概念3.1 Executor 与 Core3. 2 并行度(Parallelism)3. 3 有向无环图(DAG) 4 提交流程4.1 Yarn Client 模式4. 2 Yarn Cluster 模式 5 分布式计算模拟5.1 Driver5.2…

利达卓越团队:打造投资界“天花板”,用行动创造财富

在这个充满竞争与挑战的新时代下,众多投资者纷纷开始寻找高收益项目。尽管拥有前卫的理想抱负和专业的投资经验,但抵不过一路上的“畔脚石”,大部分投资者会因为项目不稳定和风控经验不足等问题提前退场,未能在投资市场上获取红利。为了能够给这些投资者创造盈利机会,郑建祥、林…

Qt扫盲-QListView理论总结

QListView理论总结 一、概述二、提高性能三、使用注意 一、概述 QListView显示存储在model 中的item&#xff0c;要么是一个简单的非层次列表&#xff0c;要么是一个图标集合。这个类用于提供列表和图标视图&#xff0c;之前像这些 列表 和 图标视图 之前是由QListBox和 QIcon…

android 12 U盘 /mnt/media_rw 下读取文件异常 没有权限

现象 storage下可读取&#xff0c;但/mnt/media_rw不可读取 /mnt/media_rw/A009-1B4F/rk3568_s-ota-20230704.zip (Permission Denied&#xff09; 解决方法 把/mnt/media_rw/ 替换为 /storage

基于知识蒸馏的两阶段去雨去雪去雾模型学习记录(三)之知识测试阶段与评估模块

去雨去雾去雪算法分为两个阶段&#xff0c;分别是知识收集阶段与知识测试阶段&#xff0c;前面我们已经学习了知识收集阶段&#xff0c;了解到知识阶段的特征迁移模块&#xff08;CKT)与软损失&#xff08;SCRLoss&#xff09;,那么在知识收集阶段的主要重点便是HCRLoss(硬损失…

JavaScript中的模块化编程,包括CommonJS和ES6模块的区别。

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 模块化编程概述⭐ CommonJS 模块⭐ ES6 模块⭐ 区别⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、…

苹果恢复微信聊天记录的3个实用方法!

愁死我了&#xff01;朋友们&#xff01;把手机借给了亲戚家的小孩玩&#xff0c;拿回手机后发现很重要的聊天记录丢失了&#xff0c;怎么办呀&#xff0c;有什么方法能够恢复回来吗&#xff1f; 微信是架起我们与家人、朋友、同事之间沟通的桥梁&#xff0c;无论是工作还是生活…

【软考】8.1 程序语言基本概念-成分-函数

《程序设计语言的基本概念》 汇编&#xff1a;将汇编语言翻译成目标程序执行编译&#xff1a;生成独立的可执行文件&#xff08;逻辑上与源程序等价的目标程序&#xff09;&#xff1b;直接运行&#xff1b;运行时无法控制源程序&#xff1b;效率高解释&#xff1a;不生成可执行…

mmap底层驱动实现(remap_pfn_range函数)

mmap底层驱动实现 myfb.c&#xff08;申请了128K空间&#xff09; #include <linux/init.h> #include <linux/tty.h> #include <linux/device.h> #include <linux/export.h> #include <linux/types.h> #include <linux/module.h> #inclu…