【RabbitMQ学习日记】—— 发布确认与交换机

news2025/5/16 1:11:13

一、发布确认

1.1 发布确认的原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ackmultiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

1.2 发布确认的策略

🌔 1、如何开启发布确认?

  • 发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法
  • 因为消息被成功处理后确认消息是发给我们生产者的,所以对于这个方法的调用是在我们生产者的代码中使用的

🌔 2、接下来介绍我们的三种发布确认策略:

  • 记录三种发布方式的运行时间,方便后面我们比较他们的效率

1️⃣ 单个确认发布

  • 这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,**waitForConfirmsOrDie(long)**这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

  • 这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

  • 可以了理解为一手交钱一手交货,当未得到发出消息的确认,那么就不会再向其发送消息

    //同步确认
        public static void publishMessageIndividually() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //开启发布确认
            channel.confirmSelect();
            //队列的声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, true, false, false, null);
    
            //获取当前系统时间
            long begin = System.currentTimeMillis();
            //循环发送1k条数据
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String s = i + "";
                channel.basicPublish("", queueName, null, s.getBytes("UTF-8"));
                boolean flag = channel.waitForConfirms();
                if(flag){
                    System.out.println("消息发送成功!");
                }
    
            }
            //获取结束时间
            long end = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "个单独确认消息,耗时 " + (end - begin) +" ms");
        }
    

2️⃣ 批量确认发布

  • 先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:

    • 当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。
    • 当然这种方案仍然是同步的,也一样阻塞消息的发布
    //批量确认
        public static void publishMessageBatch() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //开启发布确认
            channel.confirmSelect();
            //队列的声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, true, false, false, null);
    
            //获取当前系统时间
            long begin = System.currentTimeMillis();
    
            //批量确认数据的大小
            int batchSize = 100;
    
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
                //每发送100条确认一次
                if(i % batchSize == 0){
                    channel.waitForConfirms();
                }
            }
            //获取结束时间
            long end = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "个批量确认消息,耗时 " + (end - begin) +" ms");
        }
    

3️⃣ 异步确认发布

  • 异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,

  • 下面就让我们来详细讲解异步确认是怎么实现的
    在这里插入图片描述

    //异步确认
        public static void publishMessageAsync() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //创建一个保存消息的集合
            ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
            //开启发布确认
            channel.confirmSelect();
            //队列的声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, true, false, false, null);
    
            //获取当前系统时间
            long begin = System.currentTimeMillis();
    
            //实现那两个接口,ackCallback消息确认成功回调函数,noAckCallback 消息确认失败回调函数
            ConfirmCallback ackCallback = (deliveryTag, multiple) ->{
                //消息确认成功就将其移除集合 [注意是否为批量处理]
                if(multiple){
                    //获取小于当前序号的确认消息集合
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
                    //清除该部分确认消息
                    confirmed.clear();
                }else{
                    outstandingConfirms.remove(deliveryTag);
                }
                System.out.println("确认的消息: " + deliveryTag);
            };
            ConfirmCallback noAckCallback = (deliverTag, multiple) ->{
                System.out.println("发布的消息"+outstandingConfirms.get(deliverTag)+"未被确认消息的标记"+deliverTag);
            };
            //设置监听器,第一个参数监听哪些消息成功了,第二个参数监听哪些消息失败了
            channel.addConfirmListener(ackCallback, noAckCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "消息:" + i;
                channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
    
                //将消息都放到集合中
                outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
            }
    
            //获取结束时间
            long end = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "个异步确认消息,耗时 " + (end - begin) +" ms");
    
        }
    

🌔 3、如何处理异步未确认消息?

  • 最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递

🌔 4、比较以上三种发布确认策略的速度

  • 单独发布消息
    • 同步等待确认,简单,但吞吐量非常有限。
  • 批量发布消息
    • 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。
  • 异步处理:
    • 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
	public class ConfirmMessage {
	    //准备发送1k条信息
	    public static final int MESSAGE_COUNT = 1000;
	    public static void main(String[] args) throws Exception{
	        publishMessageIndividually();//发送1000个单独确认消息,耗时 75658 ms
	        publishMessageBatch();//发送1000个批量确认消息,耗时 1560 ms
	        publishMessageAsync();//发送1000个异步确认消息,耗时 248 ms
	    }

二、交换机

  • 在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。
  • 在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者
  • 为了说明这种模式,我们将构建一个简单的日志系统
    • 它将由两个程序组成: 第一个程序将发出日志消息,第二个程序是消费者
    • 其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘,另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费者者

2.1 Exchange

🌔 1、什么是交换机(Exchange)?

  • RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产
    者甚至都不知道这些消息传递传递到了哪些队列中

在这里插入图片描述

  • 相反,生产者只能将消息发送到交换机(exchange)
  • 交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列
  • 交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们,这就的由交换机的类型来决定

🌔 2、交换机有哪些类型呢?

  • 总共有四种类型:直接(direct)、扇出(fanout)、主题(topic)、标题(headers)

  • 当然也支持自定义类型:alternate-exchange,这部分的内容会在备份交换机涉及

  • 对于之前的案例我们都没有提及到交换机,我们一直使用的是默认的交换机

    • 例如channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    • 第一个参数是交换机的名称,空字符串表示默认或无名称交换机
    • 如果它存在的RoutingKey的话,消息能路由发送到队列中其实是由 **routingKey(bindingkey)**绑定 key 指定的 【队列映射的路由key】

2.2 临时队列

  • 每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,

    • 为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了
    • 其次一旦我们断开了消费者的连接,队列将被自动删除
  • 通过 String queueName = channel.queueDeclare().getQueue(); 指令可以获得临时队列

  • 在 Web 控制台的效果是这样的
    在这里插入图片描述

2.3 绑定

  • 什么是 bingding 呢?
    • binding 其实是 exchangequeue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系
      在这里插入图片描述

2.4 Fanout

在这里插入图片描述

  • Fanout 这种类型非常简单,它是将接收到的所有消息广播到它知道的所有队列中,我们称之为 发布订阅模式
  • 在我们 RabbitMQ 系统中有一些默认的 exchange
    在这里插入图片描述
  • 接下来我们通过案例来演示:【发布订阅模式的 RoutingKey 为空串】
    在这里插入图片描述
    1️⃣ 生产者
package com.atguigu.rabbitmq.five;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * @author Bonbons
 * @version 1.0
 */
public class EmitLog {
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println("发送消息: " + message);
        }
    }
}

2️⃣ 消费者1

package com.atguigu.rabbitmq.five;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @author Bonbons
 * @version 1.0
 * 消息接收,演示Fanout
 */
public class ReceiveLog01 {
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //创建交换机 [交换机名、交换机类型]
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //创建一个临时队列,名字是随机的,当消费者断开连接就会自动删除
        String queueName = channel.queueDeclare().getQueue();
        //将交换机和队列绑定起来 [队列、交换机、routingKey(可以为空串)]
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println("等待接收消息,把接收到的消息打印到屏幕上......");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLog01控制台打印接收到的消息: " + new String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag ->{});
    }
}

3️⃣ 消费者2

package com.atguigu.rabbitmq.five;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @author Bonbons
 * @version 1.0
 * 消息接收,演示Fanout
 */
public class ReceiveLog02 {
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //创建交换机 [交换机名、交换机类型]
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //创建一个临时队列,名字是随机的,当消费者断开连接就会自动删除
        String queueName = channel.queueDeclare().getQueue();
        //将交换机和队列绑定起来 [队列、交换机、routingKey(可以为空串)]
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println("等待接收消息,把接收到的消息打印到屏幕上......");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLog02控制台打印接收到的消息: " + new String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag ->{});
    }
}
  • 预期效果:采用发布订阅的方式,我们生产者发出的消息会被所有的消费者接收 【针对于此案例】

请添加图片描述

请添加图片描述
请添加图片描述

2.5 Direct exchange

🌔 1、回顾

  • 在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

  • 我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解:队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key,创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");绑定之后的意义由其交换类型决定

🌔 2、什么是 Direct Exchange?

  • 上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。

  • Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去。

  • 和扇出类型交换机的区别在于:指定了我们的路由 key交换机会根据消息的路由key 将消息发送给与交换机绑定的且routingKey与这个key相同的队列

    • 用更直白的话说:fanout类型发给所有人,direct类型发给对应的人
      在这里插入图片描述
  • 在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct

    • 队列 Q1 绑定键为 orange
    • 队列 Q2 绑定键有两个:一个绑定键为 black
    • 另一个绑定键为 green
  • 在这种绑定情况下,生产者发布消息到 exchange

    • 绑定键为 orange 的消息会被发布到队列Q1
    • 绑定键为 blackgreen 和的消息会被发布到队列 Q2
    • 其他消息类型的消息将被丢弃 【因为在这个案例中没有对应的消费者】

🌔 3、再介绍一下什么是多重绑定?

  • 当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同
  • 在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了
    在这里插入图片描述

🌔 4、接下来通过案例来演示 直接类型交换机(也称为路由模式)的使用

在这里插入图片描述
在这里插入图片描述
1️⃣ 生产者

package com.atguigu.rabbitmq.six;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * @author Bonbons
 * @version 1.0
 */
public class DirectLogs {
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        Scanner scanner = new Scanner(System.in);

        while(scanner.hasNext()){
            String message = scanner.next();
            String routingKey = scanner.next();
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

        }
    }
}

2️⃣ 消费者1

package com.atguigu.rabbitmq.six;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @author Bonbons
 * @version 1.0
 */
public class ReceiveLogsDirect01 {
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare("console", false, false, false, null);
        channel.queueBind("console", EXCHANGE_NAME, "info");
        channel.queueBind("console", EXCHANGE_NAME, "warning");//多重绑定

        //接收消息回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogsDirect01控制台接收到的消息: " + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume("console", deliverCallback, consumerTag -> {});
    }
}

3️⃣ 消费者2

package com.atguigu.rabbitmq.six;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @author Bonbons
 * @version 1.0
 */
public class ReceiveLogsDirect02 {
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare("disk", false, false, false, null);
        channel.queueBind("disk", EXCHANGE_NAME, "error");
        channel.queueBind("disk", EXCHANGE_NAME, "info");

        //接收消息回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogsDirect02控制台接收到的消息: " + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume("disk", deliverCallback, consumerTag -> {});
    }
}
  • 我采用的测试方式给这俩消费者提供了一个相同的路由key
    • 这样就满足他俩有相同key、还有不同的key,测试效果更佳明显 【最初设计他俩没有相同的 routingKey
  • 其实这里案例最初是指定的 routingKey,我为了可以多次测试,就变成从键盘输入了

请添加图片描述
请添加图片描述
请添加图片描述

2.6 Topics

🌔 1、之前的交换机存在一定的局限性:

  • 在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志
  • 尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.baseinfo.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了,这个时候就只能使用 topic 类型
  • 因为直接交换机不够灵活,才引出的这种交换机,我的理解为就是通过表达式去匹配一个或多个队列 【我们也称之为主题模式】

🌔 2、对于主题交换机的要求?

  • 发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。
  • 这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的
  • 当然这个单词列表最多不能超过 255 个字节
  • 在这个规则列表中,其中有两个替换符:
    • *(星号)可以代替一个单词
    • #(井号)可以替代零个或多个单词

🌔 3、Topic 匹配案例:

  • 下图绑定关系如下:
    • Q1–>绑定的是: 中间带 orange 带 3 个单词的字符串(*.orange.*)
    • Q2–>绑定的是:
      • 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
      • 第一个单词是 lazy 的多个单词(lazy.#)

在这里插入图片描述

  • 对于主题交换机也有一些特殊情况:
    • 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout
    • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct

🌔 4、接下来通过案例演示如何使用 Topic 交换机?

在这里插入图片描述

  • 下面是我们要测试的数据:
quick.orange.rabbit 被队列 Q1Q2 接收到
lazy.orange.elephant 被队列 Q1Q2 接收到
quick.orange.fox 被队列 Q1 接收到
lazy.brown.fox 被队列 Q2 接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配 Q2

1️⃣ 生产者

package com.atguigu.rabbitmq.seven;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Bonbons
 * @version 1.0
 * 生产者
 */
public class EmitLogTopic {
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //交换机的声明我们在消费者里面定义了,执行一次就行,所以此处就不写了
        Map<String, String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");

        for(Map.Entry<String, String> entry : bindingKeyMap.entrySet()){
            String routingKey = entry.getKey();
            String message = entry.getValue();
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息: " + message);
        }
    }
}

2️⃣ 消费者1

package com.atguigu.rabbitmq.seven;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

/**
 * @author Bonbons
 * @version 1.0
 * 消费者C1,声明主题交换机与相关队列
 */
public class ReceiveLogsTopic01 {
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        channel.queueDeclare("Q1", false, false, false, null);
        channel.queueBind("Q1", EXCHANGE_NAME, "*.orange.*");
        System.out.println("等待接收消息......");
        channel.basicConsume("Q1", (consumersTag, message) ->{
            System.out.println("Q1控制台打印接收到的信息: " + new String(message.getBody(), "UTF-8")+ " 绑定键: " + message.getEnvelope().getRoutingKey());
        },consumersTag -> {});

    }
}

3️⃣ 消费者2

package com.atguigu.rabbitmq.seven;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

/**
 * @author Bonbons
 * @version 1.0
 * 消费者C1,声明主题交换机与相关队列
 */
public class ReceiveLogsTopic02 {
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        channel.queueDeclare("Q2", false, false, false, null);
        channel.queueBind("Q2", EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind("Q2", EXCHANGE_NAME, "lazy.#");
        System.out.println("等待接收消息......");
        channel.basicConsume("Q2", (consumersTag, message) ->{
            System.out.println("Q2控制台打印接收到的信息: " + new String(message.getBody(), "UTF-8")+ " 绑定键: " + message.getEnvelope().getRoutingKey());
            },consumersTag -> {});

    }
}
  • 测试结果:

请添加图片描述

请添加图片描述
请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/414664.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

小白快速学习Markdown

这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注…

做好Python工程师,首先你需要做好的几件事

做好Python工程师&#xff0c;需要做好的几件事&#xff0c;我想分享给大家。首先千万不要做事周折。在你提问之前&#xff0c;先好好想一想&#xff0c;这个问题自己能不能解决。如果能解决&#xff0c;尽量自己解决&#xff1b;如果解决不了&#xff0c;那就要把你的问题描述…

大宗商品进口管理软件可以帮助企业解决哪些?

什么是大宗商品贸易&#xff1f;简单讲就是大宗商品在国际上自由流通。以贸易的形式&#xff0c;把商品从价格低的地方拉到价格高的地方出售。大宗商品是指可进入流通领域&#xff0c;可在工农业领域生产与消费使用的大批量买卖的。主要包括的类别有&#xff1a;能源商品、基础…

网络编程答疑融合连环tcp/nio/bio/redis/redisson/lettuce/netty/dubbo

如果有不对的地方, 欢迎在评论区指正: bio 1.1 请求-响应模型. 对于接收方, serverSocket.accept() 为每个请求(连接)安排一个线程 1.2浪费(阻塞占比大): socket.getInputStream().read()调用是阻塞的, 实际情况对于常见的web应用, 大家都是长连接, 同一时刻, 阻塞在此在线程会…

蓝牙技术|苹果获空间音频新专利,AirPods可动态调整声学输出

美国商标和专利局&#xff08;USPTO&#xff09;公示的清单显示&#xff0c;苹果在近日获得了一项名为“测定虚拟聆听环境”的新专利。据悉&#xff0c;该技术可以改善用户的聆听体验&#xff0c;增强空间音频的沉浸感&#xff0c;未来有望应用在AirPods上。 这项专利技术可以…

第二章 Linux目录结构

第二章 Linux目录结构linux的文件系统是采用级层式的树状目录结构&#xff0c;在此结构中的最上层是根目录“/”&#xff0c;然后在此目录下再创建其他的 目录。 2)深刻理解 linux 树状文件目录是非常重要的。3)记住一句经典的话:在Linux世界里&#xff0c;一切皆文件(!!)4)示意…

4.12--计算机网络之TCP篇之TCP 协议的缺陷+如何基于 UDP 协议实现可靠传输?--(复习+大总结)---沉下心来(加油呀)

TCP 协议四个方面的缺陷&#xff1a; 1.升级 TCP 的工作很困难&#xff1b; TCP 协议是在内核中实现的&#xff0c;应用程序只能使用不能修改&#xff0c;如果要想升级 TCP 协议&#xff0c;那么只能升级内核。 而升级内核这个工作是很麻烦的事情 2.TCP 建立连接的延迟&#x…

Linux -- 进程间通信

文章目录1. vscode软件下载和使用1.1 下载1.1.1 解决下载慢问题1.1.2 推荐下载链接1.2 vscode是什么1.3 Windows本地vscode使用1.4 远程连接linux1.5 推荐插件2. 进程间通信目的3. 为什么需要通信4. 匿名管道4.1 原理4.2 代码案例4.3 玩一玩(进程池)4.3.1 模型4.3.2 代码5. 命名…

STM32+W5500实现以太网通信

STM32系列32位微控制器基于Arm Cortex-M处理器&#xff0c;旨在为MCU用户提供新的开发自由度。它包括一系列产品&#xff0c;集高性能、实时功能、数字信号处理、低功耗/低电压操作、连接性等特性于一身&#xff0c;同时还保持了集成度高和易于开发的特点。本例采用STM32作为MC…

【开懂C++】命名空间 函数重载 缺省参数

目录一.命名空间二.缺省参数三.函数重载一.命名空间 在编写C语言代码时&#xff0c;当工程较大时&#xff0c;很容易产生变量命名冲突的情况——一般有两种冲突的情况 1.变量名与库中的函数名、关键字冲突。2.工程模块化搭建时不同文件的命名冲突。 而C为了优化这一缺陷&#…

安装Ubuntu系统后的实用工具配置指南

1. 修改软件源 Ubuntu 默认的软件源是境外的&#xff0c;速度上会有些问题&#xff0c;我们可以在Software & Updates(软件和更新)中选择国内的镜像。 一般我们选择清华源或者阿里云源。 2. 安装chorme浏览器 在ubuntu下我比较习惯用火狐浏览器和谷歌浏览器。 谷歌浏览…

vue 自定义指令directive的使用场景

1. 一个指令定义对象可以提供如下几个钩子函数(均为可选) bind:只调用一次&#xff0c;指令第一次绑定到元素时调用。在这里可以进行一次性的初始化设置。inserted:被绑定元素插入父节点时调用(仅保证父节点存在&#xff0c;但不一定已被插入文档中)。update:只要当前元素不被…

Leetcode.1971 寻找图中是否存在路径

题目链接 Leetcode.1971 寻找图中是否存在路径 easy 题目描述 有一个具有 n 个顶点的 双向 图&#xff0c;其中每个顶点标记从 0 到 n - 1&#xff08;包含 0 和 n - 1&#xff09;。图中的边用一个二维整数数组 edges 表示&#xff0c;其中 edges[i] [ui, vi]表示顶点 ui和顶…

关于maxwell

这里写目录标题什么是Maxwell如何使用MaxwellMaxwell是一个mysql二进制binlog日志分析工具&#xff0c;Java语言编写&#xff0c;功能十分强大&#xff0c;可以将日志转换成json并发送到kafka&#xff0c;redis&#xff0c;rabbitmq等中间组件&#xff0c;因为最近在理解怎样在…

QtSqlite加密--QtCipherSqlitePlugin的使用

文章目录QtSqlite加密第一步&#xff1a;环境准备第二步&#xff1a;连接数据库第三步&#xff1a;数据库操作第四步&#xff1a;使用新的可视化工具查看数据库数据QtSqlite加密 上次说了QxOrm的数据库连接、映射和基础的增删改查&#xff0c;但是我们在使用数据库的时候并不希…

期刊论文图片代码复现【由图片还原代码】(OriginMatlab)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5;&#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密…

【数据结构】图解八大排序(上)

文章目录一、排序简介二、直接插入排序三、希尔排序四、直接选择排序五、堆排序六、冒泡排序七、冒泡排序与直接插入排序效率对比一、排序简介 生活中&#xff0c;我们经常能看到排序的应用。例如&#xff0c;我们在网购商品的时候&#xff0c;经常按销量从高到低排序。 那么这…

Linux服务器怎么分区

Linux服务器怎么分区 我是艾西&#xff0c;linux系统除了从业某个行业经常要用到的程序员比较熟悉&#xff0c;对于小白或只会用Windows系统的小伙伴还是会比较难上手的。今天艾西简单的跟大家聊聊linux系统怎么分区&#xff0c;让身为小白的你也能一眼看懂直接上手操作感受程序…

【数据结构】用Java实现七大排序算法

目录 &#x1f337;1. 排序的概念及引用 1.1 排序的概念 1.2 衡量指标 1.2 十个排序算法 1.3 十个排序性能对比 &#x1f337;2. 冒泡排序 2.1 算法描述 2.2 动图 ⭐️代码优化 &#x1f337;3. 选择排序 3.1 算法描述 3.2 动图 3.3 代码 &#x1f337;4. 插入排序 4.1 算法描述…

(大数据开发随笔9)Hadoop 3.3.x分布式环境部署——全分布式模式

索引完全分布式模式守护进程布局集群搭建准备总纲配置文件格式化集群启动集群集群控制命令集群启停进程查看启动日志查看集群常见问题案例演示&#xff1a;WordCount完全分布式模式 分布式文件系统中&#xff0c;HDFS相关的守护进程也分布在不同的机器上&#xff0c;如&#x…