RabbitMQ的其中工作模式介绍以及Java的实现

news2025/5/26 5:41:11

文章目录

  • 前文
  • 一、模式介绍
    • 1. 简单模式
    • 2. 工作队列模式
    • 3. 广播模式
    • 4. 路由模式
    • 5. 通配符模式
    • 6. RPC模式
    • 7. 发布确认模式
  • 二、代码实现
    • 1、简单模式
    • 2、工作队列模式
      • 生产者
      • 消费者
        • 消费者 1
        • 消费者 2
    • 3、广播模式 (Fanout Mode)
      • 生产者
      • 消费者
    • 4、路由模式 (Direct Mode)
      • 生产者
      • 消费者
    • 5、通配符模式
    • 6、RPC模式 (Remote Procedure Call Mode)
      • 服务器 (Server)
      • 客户端 (Client)
    • 7、发布确认模式 (Publisher Confirms)
      • 1. 单独确认 (Publishing Messages Individually)
      • 2. 批量确认 (Publishing Messages in Batches)
      • 3. 异步确认
      • 对比总结

前文

为了更好的理解RabbitMQ中的工作模式,最好先了解RabbitMQ的几种常见交换机的类型

  1. Fanout(扇出交换机)
    它会忽视路由键,把消息发送给所有绑定了该交换机的所有队列

  2. Direct(直接交换机)
    根据生产者发送消息时设置的routingKey和交换机与不同队列绑定的bindingKey进行匹配,如果匹配把消息发送给对应的队列

  3. Topic(通配符交换机)
    可以认为是Direct的升级版。Direct中bindingKey必须是一个常量字符串,在Topic中bindingKey可以是一个通配符,类似于正则表达式。只要routingKey符合bindingKey的字符串模式,那么就可以把消息发送给指定队列

RabbitMQ中用 .来分割每一个单词。*表示匹配一个任意单词,可以是单个字母。#表示可以匹配0个或者多个单词,比*宽松。
例如,# 可以匹配 a、a.b、a.b.c 等,而 . 只能匹配正好两个单词的路由键(如 a.b)

  1. Header
    这种交换机不依赖于routingKey和bindingKey。它会根据消息中的headers属性进行匹配。但是由于其性能低下,因此很少用。

此外,代码实现部分博客使用的RabbitMQ自带的依赖包。Spring也支持RabbitMQ。
两者在RabbitMQ官网都有说明。


一、模式介绍

1. 简单模式

七个模式中最简单的模式,特点是一个生产者p、一个消费者c,消息只能被消费一次。适用于消息只能被单个消费者消费的场景。
在这里插入图片描述

2. 工作队列模式

概述: ⼀个⽣产者P,多个消费者C1,C2. 在多个消息的情况下, Work Queue 会将消息分派给不同的消费者, 每个消费者都会接收到不同的消息

特点: 消息不会重复, 分配给不同的消费者

适⽤场景: 集群环境中做异步处理。例如12306候补成功的短信服务,其中每个短信服务功能是一样的,消息给到那个消费者都可以,类似于集群:在这里插入图片描述

在这里插入图片描述

3. 广播模式

概述: 图中x是exchange,exchange会根据消息中的routingkey与Q1、Q2绑定的bindkey进行匹配,如果匹配成功,把消息转发给指定的队列。
特点: 一个生产者发送给exchange的消息,会被exchange复制多分,分别发送给绑定了这个exchange的queue。每个消费者获得的消息都是一样的
应用场景: 比如1001就老喜欢这种东西了,想给自己的客户推销广告,用广播模式,就可以把消息发送给所有的用户。
在这里插入图片描述

4. 路由模式

概述: 这个模式相当于是广播模式的一个约束,它会根据消息中的routingKey和与其他队列绑定的bindingKey进行匹配,如果匹配才会把消息发送给指定队列。

💡routingKey 和bindgKey必须完全一直才能匹配成功

在这里插入图片描述

5. 通配符模式

概述: 相当于路由模式的升级版,只要消息中的routingKey与指定队列的通配符匹配进行发送消息。
在这里插入图片描述

根据上图示例:

  1. ff.a.j与*.a.*匹配,该消息就会发送到Q1
  2. 消息:c.jojo.hyy 与c.#匹配,该消息就会发送到Q2

6. RPC模式

概述: RCP模式下 没有Producter和Consumer的概念,取而代之的是Client和Server的结构。Client发送消息给Server并且希望Server能发送一个期望的响应给Client,可以使用RPC模式.

特点: Client发送消息会设定两个字段relyTo、correlationId。replyTo用于指定Server使用哪一个回调队列(图中使用的Reply)发送响应给到Client。Client会等待回调队列发送reply给到自己,根据correlationId确保是Cilen需要的响应。
在这里插入图片描述


7. 发布确认模式

概述: 发布确认机制是RabbitMQ用于保证消息可靠性的其中一个方式。

  1. producter把对应的channel设置成confirm模式(通过channel.confirmSelect方法实现),并且设定一个消息唯一ID,把消息与唯一ID关联起来
  2. exchange接收到消息后会发送一个ACK响应给到producter(响应中含有唯一ID),表明消息已经送达。
    这种方法可以尽可能的避免在消息发送过程中的丢失问题。

二、代码实现

代码实现,主要有两种,一个是RabbitMQ官方提供的依赖包,另一个是Spring官方AMQP对RabbitMQ的封装实现,两者都会演示

RabbitMQ中央仓库
找到合适的版本导入即可,本博客使用的5.20.0版本。

1、简单模式

生产者:

public class Producer {
    private static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//RabbitMQ的IP地址
        factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,默认是5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个
        factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号
        factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码
        Connection connection = factory.newConnection();
        return connection;
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、建立连接
        Connection connection = createConnection();
        //2、开启信道
        Channel channel = connection.createChannel();

        //3、声明队列
        channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);

        //4、发布消息
        channel.basicPublish("", Constants.SIMPLE_QUEUE, null, "呵呵".getBytes());
        System.out.println("执行了发布");
        //5、关闭连接
        channel.close();
        connection.close();

    }
}

💡

  1. Channel、Connection的相关包都来自于com.rabbitmq.client不要导错了
  2. 步骤四中参数 “” 的意思是使用默认交换机(Direct类型),bindingKey就是已经绑定的队列名字。
    消费者:
public class Consumer {

    private static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//RabbitMQ的IP地址
        factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个
        factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号
        factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码
        Connection connection = factory.newConnection();
        return connection;
    }

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //1、建立连接
        Connection connection = createConnection();

        //2、开启信道
        Channel channel = connection.createChannel();

        //3、声明队列
        channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);

        //5、定义consumer逻辑
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //根据实际场景消费消息
                System.out.println("消息已经被消费,获取的消息内容:" + new String(body));
            }
        };

        //6、消费内容
        channel.basicConsume(Constants.SIMPLE_QUEUE, true, consumer);

        //7、关闭连接
        channel.close();
        connection.close();
    }
}

💡
consumerTag: 标识不同消费者的唯一标签
envelope: 描述了消息传递的细节,如该消息是由那个交换机发送的,消息指定的routingKey是什么,消息的唯一标识deliveryTag。
properties: 用于设定RabbitMQ的高级属性
body: 消息的本体,以二进制方式存储

2、工作队列模式

工作队列模式(Work Queue Mode)是一种任务分发的模式,允许多个消费者从同一个队列中获取消息并处理,从而实现任务的负载均衡。消息会被轮询(Round-Robin)分发到不同的消费者,适合处理耗时任务的场景。

生产者

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Product {
    private static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST); // RabbitMQ的IP地址
        factory.setPort(Constants.PORT); // RabbitMQ的服务端口号,默认是5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机
        factory.setUsername(Constants.USER_NAME); // RabbitMQ登录账号
        factory.setPassword(Constants.PASSWORD); // RabbitMQ登录密码
        return factory.newConnection();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、建立连接
        Connection connection = createConnection();

        // 2、开启信道
        Channel channel = connection.createChannel();

        // 3、声明队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        // 4、发送20条消息
        for (int i = 0; i < 20; i++) {
            channel.basicPublish("", Constants.WORK_QUEUE, null, ("工作队列的消息" + i).getBytes());
        }

        // 5、关闭连接
        channel.close();
        connection.close();
    }
}

说明:

  • 队列声明:使用 channel.queueDeclare 声明一个持久化的队列(durable=true),确保队列在 RabbitMQ 重启后依然存在。
  • 消息发送:通过 basicPublish 方法向默认交换机("")发送消息,路由键为队列名称(Constants.WORK_QUEUE)。
  • 消息内容:循环发送 20 条消息,每条消息为 "工作队列的消息" + i
  • 连接关闭:发送完成后关闭信道和连接。

💡 注意

  • 参数 "" 表示使用默认交换机(Direct 类型),路由键直接绑定到队列名称。

消费者

消费者从工作队列中获取消息并处理。以下是两个消费者的实现,分别命名为 Consumer1Consumer2,它们共享同一队列的消息,每个消费者拿到不同的消息。

消费者 1
public class Consumer1 {
    private static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST); // RabbitMQ的IP地址
        factory.setPort(Constants.PORT); // RabbitMQ的服务端口号,默认是5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机
        factory.setUsername(Constants.USER_NAME); // RabbitMQ登录账号
        factory.setPassword(Constants.PASSWORD); // RabbitMQ登录密码
        return factory.newConnection();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、建立连接
        Connection connection = createConnection();

        // 2、开启信道
        Channel channel = connection.createChannel();

        // 3、声明队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        // 4、定义消费逻辑
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("第一个消费者消费消息:" + new String(body));
            }
        };

        // 5、消费
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        // 6、保持连接(注释掉关闭连接的代码)
        // channel.close();
        // connection.close();
    }
}
消费者 2

public class Consumer2 {
    private static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST); // RabbitMQ的IP地址
        factory.setPort(Constants.PORT); // RabbitMQ的服务端口号,默认是5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机
        factory.setUsername(Constants.USER_NAME); // RabbitMQ登录账号
        factory.setPassword(Constants.PASSWORD); // RabbitMQ登录密码
        return factory.newConnection();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、建立连接
        Connection connection = createConnection();

        // 2、开启信道
        Channel channel = connection.createChannel();

        // 3、声明队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        // 4、定义消费逻辑
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("第二个消费者消费消息:" + new String(body));
            }
        };

        // 5、消费
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        // 6、保持连接(注释掉关闭连接的代码)
        // channel.close();
        // connection.close();
    }
}

说明:

  • 队列声明:与生产者一致,消费者也声明相同的队列,确保队列存在。
  • 消费逻辑:通过继承 DefaultConsumer 并重写 handleDelivery 方法,定义消息处理逻辑。Consumer1Consumer2 分别打印接收到的消息,标识为“第一个消费者”和“第二个消费者”。
  • 消息消费:使用 channel.basicConsume 订阅队列,autoAck=true 表示自动确认消息(消费者接收消息后自动通知 RabbitMQ,把消息从队列中删除)。

💡 注意

  • consumerTag:标识消费者的唯一标签,用于区分不同的消费者。
  • envelope:包含消息的元数据,如路由键、交换机和 deliveryTag(消息的唯一标识)。
  • properties:消息的附加属性,可用于高级配置。
  • body:消息的实际内容,以字节数组形式存储。

3、广播模式 (Fanout Mode)

广播模式通过 Fanout 交换机将消息分发到所有绑定的队列,忽略路由键,适合发布/订阅场景。以下基于提供的代码续写。

生产者

关键点:

  • 声明 Fanout 交换机 (exchangeDeclare)。
  • 声明并绑定多个队列到交换机 (queueDeclare, queueBind)。
  • 发布消息到交换机,路由键为空 (basicPublish)。

消费者

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);

    }
}

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);

    }
}

关键点:

  • 声明队列并监听消息 (queueDeclare, basicConsume)。
  • 每个消费者独立消费绑定队列的消息。
  • 自动确认消息 (autoAck=true)。

💡 注意:

  • Fanout 模式下,路由键被忽略,消息广播到所有绑定队列。
  • 确保交换机和队列正确绑定,避免消息丢失。

4、路由模式 (Direct Mode)

路由模式通过 Direct 交换机根据路由键精确分发消息到匹配的队列,适合需要条件路由的场景。

生产者

public class Producer {

    private static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//RabbitMQ的IP地址
        factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个
        factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号
        factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码
        Connection connection = factory.newConnection();
        return connection;
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、建立连接
        Connection connection = createConnection();

        //2、开启信道
        Channel channel=connection.createChannel();

        //3、声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);

        //4、声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);

        //5、交换机绑定队列q1 q2
        channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"q1");
        channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"q2");

        //6、生产者发送消息
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"q1",null,("q1需要接收到这个消息").getBytes());
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"q2",null,("q2需要接收到这个消息").getBytes());

        //7、关闭资源
        channel.close();
        connection.close();

    }
}

关键点:

  • 声明 Direct 交换机 (exchangeDeclare)。
  • 声明队列并绑定到交换机,指定路由键 (queueBind)。
  • 发布消息时指定路由键 (basicPublish)。

消费者

public class Consumer1 {
    private static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//RabbitMQ的IP地址
        factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个
        factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号
        factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码
        Connection connection = factory.newConnection();
        return connection;
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、建立连接
        Connection connection = createConnection();

        //2、开启信道
        Channel channel=connection.createChannel();

        //3、声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);

        //4、声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);

        //5、定义消费逻辑
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+new String(body));
            }
        };

        //5、消费
        channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);

        //6、关闭连接
        channel.close();
        connection.close();
    }
}

public class Consumer2 {
    private static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//RabbitMQ的IP地址
        factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个
        factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号
        factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码
        Connection connection = factory.newConnection();
        return connection;
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、建立连接
        Connection connection = createConnection();

        //2、开启信道
        Channel channel=connection.createChannel();

        //3、声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);

        //4、声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);

        //5、定义消费逻辑
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+new String(body));
            }
        };

        //5、消费
        channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);

        //6、关闭连接
        channel.close();
        connection.close();
    }
}

关键点:

  • 声明队列并监听消息 (queueDeclare, basicConsume)。
  • 根据队列绑定的路由键接收对应消息。
  • 自动确认消息 (autoAck=true)。

💡 注意:

  • 路由键必须精确匹配,消息才会分发到对应队列。
  • 队列可以绑定多个路由键,增加灵活性。
  • 未绑定路由键的队列不会收到消息。

5、通配符模式

通配符模式和路由模式实现的不同点就是交换机使用TOPIC类型,交换机和队列绑定使用通配符,其他代码几乎一致,这里就不演示了。

6、RPC模式 (Remote Procedure Call Mode)

RPC模式通过RabbitMQ实现客户端与服务器的双向通信,客户端发送请求到服务器并等待响应,适合需要同步响应的场景。

服务器 (Server)

public class Server {

    private static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//RabbitMQ的IP地址
        factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个
        factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号
        factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码
        Connection connection = factory.newConnection();
        return connection;
    }

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //1、建立连接
        Connection connection = createConnection();

        //2、开启信道
        Channel channel = connection.createChannel();

        //3、声明队列并且设定对多处理消息数
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.basicQos(1);

        //5、定义consumer逻辑
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //根据实际场景消费消息
                System.out.println("客服端发送了这个消息:" + new String(body));

                //获取消息中的correID将其发送会客户端
                AMQP.BasicProperties proper = new AMQP.BasicProperties()
                        .builder()
                        .correlationId(properties.getCorrelationId())
                        .build();

                //给客户端发送响应,指定使用replayTo
                channel.basicPublish("", properties.getReplyTo(), proper, ("收到来自客户端的请求").getBytes());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //6、消费内容
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE, true, consumer);

    }
}

关键点:

  • 声明请求和响应队列 (queueDeclare)。
  • 设置消息处理限制 (basicQos),确保按序处理。
  • 消费请求队列消息,发送响应到客户端指定的 replyTo 队列。
  • 使用 correlationId 关联请求和响应。

客户端 (Client)

public class Client {
    private static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//RabbitMQ的IP地址
        factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个
        factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号
        factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码
        Connection connection = factory.newConnection();
        return connection;
    }

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1、建立连接
        Connection connection = createConnection();
        //2、开启信道
        Channel channel = connection.createChannel();

        //3、声明队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
        //4、生成唯一ID用于区分当前消息
        String corrID= UUID.randomUUID().toString();

        //5、配置请求相关属性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(corrID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();

        //4、发布消息
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, "呵呵".getBytes());

        System.out.println("执行了发布");

        //5、等待响应
        final BlockingQueue<String> bq= new LinkedBlockingQueue<>(1);

        DefaultConsumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String response=new String(body);
                    if(properties.getCorrelationId().equals(corrID)){
                        System.out.println("接收到回调消息:"+response);
                        bq.offer(response);
                    }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        bq.take();
    }
}

关键点:

  • 声明请求和响应队列 (queueDeclare)。
  • 生成唯一 correlationId 标识请求。
  • 发送请求到 RPC_REQUEST_QUEUE,指定 replyTo 为响应队列。
  • 监听 RPC_RESPONSE_QUEUE,验证 correlationId 匹配后处理响应。

7、发布确认模式 (Publisher Confirms)

发布确认模式确保生产者发送的消息被RabbitMQ正确接收,提供可靠性保证。确认方式有以下三种:

1. 单独确认 (Publishing Messages Individually)

  private static void individually() throws Exception {
        try(Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
            //4. 发送消息, 并等待确认
            long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
                //等待确认
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }

关键点:

  • 启用确认模式 (confirmSelect)。
  • 每发送一条消息,同步等待确认 (waitForConfirmsOrDie)。
  • 适合小规模消息发送,因为性能较低。

2. 批量确认 (Publishing Messages in Batches)

 private static void publishingMessagesInBatches() throws Exception{
        try(Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
            //4. 发送消息, 并进行确认
            long start = System.currentTimeMillis();
            int batchSize = 100;
            int outstandingMessageCount = 0;
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
                outstandingMessageCount++;
                if (outstandingMessageCount==batchSize){
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            if (outstandingMessageCount>0){
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);

        }
    }

关键点:

  • 启用确认模式 (confirmSelect)。
  • 每发送一批消息 (如100条),同步等待确认 (waitForConfirmsOrDie)。
  • 平衡了性能与可靠性。
  • 但在一些消息容易遗失的场景,我们不清楚具体是那个消息出现问题,需要批量重发消息,性能可能不增返降。

3. 异步确认

    private static void asynchronously() throws Exception{
        try (Connection connection = createConnection()){
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);
            //4. 监听confirm
            //集合中存储的是未确认的消息ID
            long start = System.currentTimeMillis();
            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());

            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple){
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple){
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                    //业务需要根据实际场景进行处理, 比如重发, 此处代码省略
                }
            });
            //5. 发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
                confirmSeqNo.add(seqNo);
            }
            while (!confirmSeqNo.isEmpty()){
                Thread.sleep(10);
            }
            long end = System.currentTimeMillis();
            System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }

关键点:

  • 启用确认模式 (confirmSelect)。
  • 使用 channel.ConfirmListener() 开启监听,异步处理确认 (handleAck, handleNack)。
  • 通过 SortedSet 跟踪未确认消息。
  • 最高吞吐量,适合大规模消息发送。

对比总结

策略优点缺点适用场景
单独确认简单,高可靠性延迟高,吞吐量低小规模、可靠性优先
批量确认平衡性能与可靠性仍需同步等待,部分延迟中等规模、可靠性与性能兼顾
异步确认高吞吐量,低延迟实现复杂,需处理失败重发大规模、高性能需求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2385870.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue2项目搭建

作者碎碎念&#xff1a;开历史倒车了&#xff0c;没想到不兼容&#xff0c;只能从vue3->vue2了。 1 vue3和vue2 这部分参考了官网的《vue3迁移指南》&#xff1a;Vue 3 的支持库进行了重大更新。以下是新的默认建议的摘要: 新版本的 Router, Devtools & test utils 来…

Spring AI 源码解析:Tool Calling链路调用流程及示例

Tool工具允许模型与一组API或工具进行交互&#xff0c;增强模型功能&#xff0c;主要用于&#xff1a; 信息检索&#xff1a;从外部数据源检索信息&#xff0c;如数据库、Web服务、文件系统或Web搜索引擎等 采取行动&#xff1a;可用于在软件系统中执行特定操作&#xff0c;如…

2025年- H48-Lc156 --236. 二叉树的最近公共祖先(递归、深搜)--Java版

1.题目描述 递归终止条件&#xff1a; 如果当前节点 root 为 null&#xff0c;表示到达了叶子节点的空子树&#xff1b; 如果当前节点是 p 或 q&#xff0c;就返回它&#xff08;因为从这里可以回溯寻找公共祖先&#xff09;。 2.思路 &#xff08;1&#xff09; 如果当前节…

Hertz+Kitex快速上手开发

本篇文章以用户注册接口为例&#xff0c;快速上手HertzKitex 以用户注册接口来演示hertz结合kitex实现网关微服务架构的最简易版本 项目结构 api- gateway&#xff1a;网关实现&#xff0c;这里采用hertz框架 idl&#xff1a;接口定义用来生成kitex代码 kitex_gen&#xff…

机器学习课程设计报告 —— 基于二分类的岩石与金属识别模型

机器学习课程设计报告 题 目&#xff1a; 基于二分类的岩石与金属识别模型 专 业&#xff1a; 机器人工程 学生姓名&#xff1a; XXX 指导教师&#xff1a; XXX 完成日期&#xff1a…

分词算法BPE详解和CLIP的应用

一、TL&#xff1b;DR BPE通过替换相邻最频繁的字符和持续迭代来实现压缩CLIP对text进行标准化和预分词后&#xff0c;对每一个单词进行BPE编码和查表&#xff0c;完成token_id的转换 二、BPE算法 2.1 核心思想和原理 paper&#xff1a;Neural Machine Translation of Rare…

STM32F103_Bootloader程序开发02 - Bootloader程序架构与STM32F103ZET6的Flash内存规划

导言 在工业设备和机器人项目中&#xff0c;固件远程升级能力已成为提升设备维护性与生命周期的关键手段。本文将围绕STM32平台&#xff0c;系统性介绍一个简洁、可靠的Bootloader程序设计思路。 我们将Bootloader核心流程划分为五大功能模块&#xff1a; 启动入口与升级模式判…

通过Auto平台与VScode搭建远程开发环境(以Stable Diffusion Web UI为例)

文章目录 Stable Diffusion Web UI一、&#x1f3af;主要功能概述二、&#x1f9e0;支持的主要模型体系三、&#x1f4e6;安装方式简述✅ 一、前提准备✅ 二、安装步骤混乱版本&#xff08;仅用于记录测试过程&#xff09;第一步&#xff1a;克隆仓库&#xff08;使用清华大学镜…

Unity 打包程序全屏置顶无边框

该模块功能: 1. 打包无边框 2. 置顶 3. 不允许切屏 4.多显示器状态下,程序只在主显示上运行 5.全屏 Unity 打包设置: 如果更改打包设置,最好将Version版本增加一下,否则可能不会覆盖前配置文件 代码: 挂在场景中即可 using UnityEngine; using System; // 确保这行存…

GAMES104 Piccolo引擎搭建配置

操作系统&#xff1a;windows11 家庭版 inter 17 12 th 显卡&#xff1a;amd 运行内存&#xff1a;>12 1、如何构建&#xff1f; 在github下载&#xff1a;网址如下 https://github.com/BoomingTech/Piccolo 下载后安装 git、vs2022 Git Visual Studio 2022 IDE - …

用service 和 SCAN实现sqlplus/jdbc连接Oracle 11g RAC时负载均衡

说明 11.2推出的SCAN &#xff0c;简化了客户端连接&#xff08;当增加或者减少RAC实例时&#xff0c;不需要修改客户端配置&#xff0c;并且scan listener有各个实例的负载情况&#xff0c;可以实现连接时负载均衡。 不过客户端需要使用专门建立的service,而不能用RAC数据库…

防火墙流量管理

带宽管理介绍 针对企业用户流量&#xff0c;防火墙提供了带宽管理功能&#xff0c;基于出/入接口、源/目的安全区域、源/目的地址、时间段、报文DSCP优先级等信息&#xff0c;对通过自身的流量进行管理和控制。 带宽管理提供带宽限制、带宽保证和连接数限制功能&#xff0c;可…

Linux系统移植①:uboot概念

Linux系统移植①&#xff1a;uboot概念 uboot概念 1、uboot是一个比较复杂的裸机程序。 2、uboot就是一个bootloader,作用就是用原于启动Linux或其他系统。uboot最主要的工作就是初始化DDR。因为Linux是运行再DDR里面的。一般Linux镜像zImage&#xff08;uImage&#xff09;设…

DAY 35

import torch import torch.nn as nn import torch.optim as optim from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split from sklearn.preprocessing import MinMaxScaler import time import matplotlib.pyplot as plt# 设置GPU设…

AWS EC2实例安全远程访问最佳实践

EC2 远程连接方案对比 远程访问 Amazon EC2 实例主要有以下四种方式&#xff1a; Secure Shell (SSH) 远程访问AWS Systems Manager 会话管理器适用于 Linux 实例的 EC2 Serial ConsoleAmazon EC2 Instance Connect SSH 远程访问 SSH&#xff08;Secure Shell&#xff09;广…

【强化学习】#7 基于表格型方法的规划和学习

主要参考学习资料&#xff1a;《强化学习&#xff08;第2版&#xff09;》[加]Richard S.Suttion [美]Andrew G.Barto 著 文章源文件&#xff1a;https://github.com/INKEM/Knowledge_Base 本章更是厘清概念厘到头秃&#xff0c;如有表达不恰当之处还请多多指教—— 概述 环境…

EasyRTC嵌入式音视频通信SDK一对一音视频通信,打造远程办公/医疗/教育等场景解决方案

一、方案概述​ 数字技术发展促使在线教育、远程医疗等行业对一对一实时音视频通信需求激增。传统方式存在低延迟、高画质及多场景适配不足等问题&#xff0c;而EasyRTC凭借音视频处理、高效信令交互与智能网络适配技术&#xff0c;打造稳定低延迟通信&#xff0c;满足基础通信…

网络安全-等级保护(等保) 3-2-1 GB/T 28449-2019 第6章 方案编制活动

################################################################################ GB/T 28449-2019《信息安全技术 网络安全等级保护测评过程指南》是规定了等级测评过程&#xff0c;是纵向的流程&#xff0c;包括&#xff1a;四个基本测评活动:测评准备活动、方案编制活…

【免费使用】剪Y专业版 8.1/CapCut 视频编辑处理,素材和滤镜

—————【下 载 地 址】——————— 【​本章下载一】&#xff1a;https://pan.xunlei.com/s/VOQxk38EUe3_8Et86ZCH84JsA1?pwdkp7h# 【​本章下载二】&#xff1a;https://pan.quark.cn/s/388008091ab4 【​本章下载三】&#xff1a;https://drive.uc.cn/s/d5ae5c725637…

实现rpc通信机制(待定)

一、概述 &#xff08;1&#xff09;rpc&#xff08;remote procedure call, 远程接口调用&#xff09;,就像在本地调用函数一样&#xff0c;是应用组成服务内部分布式的基础功能。应用场景是在内网中的计算&#xff0c;比如&#xff1a;(a) 为上传的一张图片加水印、&#xf…