🧑💻作者名称:DaenCode
🎤作者简介:CSDN实力新星,后端开发两年经验,曾担任甲方技术代表,业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开发。技术尚浅,闭关学习中······
😎人生感悟:尝尽人生百味,方知世间冷暖。
📖所属专栏:图解RabbitMQ

专栏推荐
- 专门为Redis入门打造的专栏,包含Redis基础知识、基础命令、五大数据类型实战场景、key删除策略、内存淘汰机制、持久化机制、哨兵模式、主从复制、分布式锁等等内容。
链接>>>>>>>>>《Redis从头学》 - SpringBoot实战相关专栏,包含SpringBoot过滤器、拦截器、AOP实现日志、整合Freemaker、整合Redis等等实战相关内容,多篇文章登入全站热榜、领域热榜、被技术社区收录。
链接>>>>>>《SpringBoot实战》 
文章目录
- 专栏推荐
 - 🌟前言
 - 🌟连接工具类
 - 🌟简单工作模型
 - 介绍
 - 代码实现
 
- 🌟工作队列模型
 - 介绍
 - 代码实现
 
- 🌟发布订阅模型
 - 介绍
 - 代码实现
 
- 🌟路由模型
 - 介绍
 - 代码实现
 
- 🌟主题模型
 - 介绍
 - 代码实现
 
- 🌟总结
 - 🌟写在最后
 
参考网站:https://www.rabbitmq.com/getstarted.html
🌟前言
在上一节学习了RabbitMQ中交换机的相关基础知识,本文来学习一下RabbitMQ中的五种队列模型的,对其有一个基本的认识。
🌟连接工具类
public class MQConnectionUtil {
    public static Connection createConnection() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.124.23");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        return factory.newConnection();
    }
}
 
🌟简单工作模型
介绍
模型图:
 
 流程:
- 生产者发送消息到队列。
 - 如果队列存在则直接存入消息;若不存在,先进行队列的创建。
 - 消费者监听队列。
 - 处理完消息,通过ACK机制确认消息已经消费。
 
特点:
- 只有一个消费者,并且其中没有交换机参与。
 
代码实现
生产者:
public class Send {
    private final static String QUEUE_NAME="hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        try (   //JDK7语法 或自动关闭 connnection和channel
                //创建连接
                Connection connection=MQConnectionUtil.createConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
            /**
             * 队列名称
             * 持久化配置:mq重启后还在
             * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
             * 自动删除: 当没有消费者的时候,自动删除掉,一般是false
             * 其他参数
             *
             * 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            /**
             * 参数说明:
             * 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由,
             * 路由健名称
             * 配置信息
             * 发送的消息数据:字节数组
             */
            //发布消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}
 
消费者:
public class Recv {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        //消费者一般不增加自动关闭
        Connection connection=MQConnectionUtil.createConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //回调方法,下面两种都行
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
                System.out.println("consumerTag消息标识="+consumerTag);
                //可以获取交换机,路由健等
                System.out.println("envelope元数据="+envelope);
                System.out.println("properties配置信息="+properties);
                System.out.println("body="+new String(body,"utf-8"));
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
//        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
//            String message = new String(delivery.getBody(), "UTF-8");
//            System.out.println(" [x] Received '" + message + "'");
//        };
        //自动确认消息
//        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}
 
🌟工作队列模型
介绍
模型图:
 
 特点:
- 生产者将消息发送到队列,并由多个消费者进行消费。
 
两种消费策略:
 1 . 轮训策略:将消息平均分配给多个消费者进行消费,不考虑消费者的处理能力;采用自动ACK消息机制。
 2. 公平策略:消费者每次只能处理一个消息。一定时间内,能力强者消费的多,否则少;采用手动ACK消息机制。
代码实现
轮询策略:
//生产者
public class Send {
    private final static String QUEUE_NAME="work_rr";
    public static void main(String[] args) throws IOException, TimeoutException {
        try (   //JDK7语法 或自动关闭 connnection和channel
                //创建连接
               Connection connection= MQConnectionUtil.createConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
            /**
             * 队列名称
             * 持久化配置:mq重启后还在
             * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
             * 自动删除: 当没有消费者的时候,自动删除掉,一般是false
             * 其他参数
             *
             * 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 10; i++) {
                String message = "Hello World!";
                /**
                 * 参数说明:
                 * 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由,
                 * 路由健名称
                 * 配置信息
                 * 发送的消息数据:字节数组
                 */
                //发布消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}
//消费者
public class Recv {
    private final static String QUEUE_NAME = "work_rr";
    public static void main(String[] argv) throws Exception {
        //消费者一般不增加自动关闭
        Connection connection= MQConnectionUtil.createConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //回调方法,下面两种都行
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                // consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
                System.out.println("consumerTag消息标识="+consumerTag);
                //可以获取交换机,路由健等
                System.out.println("envelope元数据="+envelope);
                System.out.println("properties配置信息="+properties);
                System.out.println("body="+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
//        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
//            String message = new String(delivery.getBody(), "UTF-8");
//            System.out.println(" [x] Received '" + message + "'");
//        };
        //自动确认消息
//        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}
//消费者2
public class Recv2 {
    private final static String QUEUE_NAME = "work_rr";
    public static void main(String[] argv) throws Exception {
        Connection connection= MQConnectionUtil.createConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //回调方法,下面两种都行
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
                System.out.println("consumerTag消息标识="+consumerTag);
                //可以获取交换机,路由健等
                System.out.println("envelope元数据="+envelope);
                System.out.println("properties配置信息="+properties);
                System.out.println("body="+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
//        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
//            String message = new String(delivery.getBody(), "UTF-8");
//            System.out.println(" [x] Received '" + message + "'");
//        };
        //自动确认消息
//        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}
 
公平策略
 通过channel.basicQos(1);确保每个消费者每次只能处理一个未确认的消息。
public class Send {
    private final static String QUEUE_NAME="work_fair";
    public static void main(String[] args) throws IOException, TimeoutException {
        try (   //JDK7语法 或自动关闭 connnection和channel
                //创建连接
                Connection connection = MQConnectionUtil.createConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
            /**
             * 队列名称
             * 持久化配置:mq重启后还在
             * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
             * 自动删除: 当没有消费者的时候,自动删除掉,一般是false
             * 其他参数
             *
             * 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 10; i++) {
                String message = "Hello World!";
                /**
                 * 参数说明:
                 * 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由,
                 * 路由健名称
                 * 配置信息
                 * 发送的消息数据:字节数组
                 */
                //发布消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}
//消费者1
public class Recv {
    private final static String QUEUE_NAME = "work_fair";
    public static void main(String[] argv) throws Exception {
        Connection connection= MQConnectionUtil.createConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        channel.basicQos(1);
        //回调方法,下面两种都行
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                // consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
                System.out.println("consumerTag消息标识="+consumerTag);
                //可以获取交换机,路由健等
                System.out.println("envelope元数据="+envelope);
                System.out.println("properties配置信息="+properties);
                System.out.println("body="+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
//        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
//            String message = new String(delivery.getBody(), "UTF-8");
//            System.out.println(" [x] Received '" + message + "'");
//        };
        //自动确认消息
//        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}
//消费者2
public class Recv2 {
    private final static String QUEUE_NAME = "work_fair";
    public static void main(String[] argv) throws Exception {
        Connection connection=MQConnectionUtil.createConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        channel.basicQos(1);
        //回调方法,下面两种都行
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
                System.out.println("consumerTag消息标识="+consumerTag);
                //可以获取交换机,路由健等
                System.out.println("envelope元数据="+envelope);
                System.out.println("properties配置信息="+properties);
                System.out.println("body="+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
//        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
//            String message = new String(delivery.getBody(), "UTF-8");
//            System.out.println(" [x] Received '" + message + "'");
//        };
        //自动确认消息
//        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}
 
🌟发布订阅模型
介绍
模型图:
 
特点:
- 一条消息可以被多个消费者同时接收。
 - 采用扇形(Fanout)交换机。
 - 无需路由Key
 - 类似于公众号的订阅。
 
代码实现
生产者:
public class Send {
    private final static String EXCHANGE_NAME="exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        try (   //JDK7语法 或自动关闭 connnection和channel
                //创建连接
                Connection connection = MQConnectionUtil.createConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                String message="daencode rabbitmq pub";
                channel.basicPublish(EXCHANGE_NAME," ",null,message.getBytes(StandardCharsets.UTF_8));
                System.out.println("广播消息已经发送!!!!");
        }
    }
}
 
消费者:两个消费者都是一样的代码,都需要绑定相同的扇形交换机。
public class Recv {
    private final static String EXCHANGE_NAME="exchange_fanout";
    public static void main(String[] argv) throws Exception {
        Connection connection = MQConnectionUtil.createConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,fanout扇形,即广播类型
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机,fanout交换机不用指定routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}
 
🌟路由模型
介绍
模型图
 
 特点:
- 交换机类型采用直连交换机,特定的路由key由特定的消费者进行消费。
 - 交换机根据特定的路由key与队列进行绑定。
 
代码实现
以记录不同日志级别为例,不同的消费者进行不同日志级别的记录。
生产者:
public class Send {
    private final static String EXCHANGE_NAME="exchange_direct";
    public static void main(String[] args) throws IOException, TimeoutException {
        try (   //JDK7语法 或自动关闭 connnection和channel
                //创建连接
                Connection connection = MQConnectionUtil.createConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                String debugLog="[debug]daencode rabbitmq direct";
                String errorLog="[error]出现error错误";
                channel.basicPublish(EXCHANGE_NAME,"errorRoutingKey",null,errorLog.getBytes(StandardCharsets.UTF_8));
                channel.basicPublish(EXCHANGE_NAME,"debugRoutingKey",null,debugLog.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息已经发送!!!!");
        }
    }
}
 
消费者1:只记录ERROR级别日志。
public class Recv1 {
    private final static String EXCHANGE_NAME="exchange_direct";
    public static void main(String[] argv) throws Exception {
        Connection connection = MQConnectionUtil.createConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,fanout扇形,即广播类型
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机,fanout交换机不用指定routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}
 
消费者2:只记录Debug级别日志。
public class Recv2 {
    private final static String EXCHANGE_NAME="exchange_direct";
    public static void main(String[] argv) throws Exception {
        Connection connection = MQConnectionUtil.createConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,fanout扇形,即广播类型
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机,fanout交换机不用指定routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}
 
🌟主题模型
介绍
模型图
 
 特点:
- 交换机类型采用主题交换机。
 - 路由key根据通配符规则,限定消息消费规则。*匹配一个词,#匹配一个或者多个词。
 - 交换机通过通配符路由KEY将消息绑定到不同的队列,以此实现不同的消费者进行消息消费。
 - 同时满足路由模型和发布订阅模型。
 
代码实现
生产者:生产者通过路由KEY向交换机发送消息。
public class Send {
    private final static String EXCHANGE_NAME="exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        try (   //JDK7语法 或自动关闭 connnection和channel
                //创建连接
                Connection connection= MQConnectionUtil.createConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                String debugLog="[debug]daencode rabbitmq direct";
                String errorLog="[error]出现error错误";
                channel.basicPublish(EXCHANGE_NAME,"log.error",null,errorLog.getBytes(StandardCharsets.UTF_8));
                channel.basicPublish(EXCHANGE_NAME,"log.debug",null,debugLog.getBytes(StandardCharsets.UTF_8));
            System.out.println("广播消息已经发送!!!!");
        }
    }
}
 
消费者:
public class Recv1 {
    private final static String EXCHANGE_NAME="exchange_topic";
    public static void main(String[] argv) throws Exception {
        Connection connection= MQConnectionUtil.createConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,fanout扇形,即广播类型
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机,fanout交换机不用指定routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"*.debug");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}
 
消费者2:
public class Recv2 {
    private final static String EXCHANGE_NAME="exchange_topic";
    public static void main(String[] argv) throws Exception {
        Connection connection= MQConnectionUtil.createConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,fanout扇形,即广播类型
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机,fanout交换机不用指定routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"*.error");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}
 
🌟总结
| 模型 | 是否交换机参与 | 交换机类型 | 需要路由键 | 描述 | 
|---|---|---|---|---|
| 简单模型 | 否 | 无 | 否 | 消息直接发送到队列,最基本的消息传递模型。 | 
| 工作模型 | 否 | 无 | 否 | 多个消费者共同处理一个队列中的消息。 | 
| 发布订阅模型 | 是 | fanout | 否 | 将消息广播给所有绑定到交换机的队列,多个消费者同时订阅。 | 
| 路由模型 | 是 | direct | 是 | 根据消息的路由键将消息发送到与之匹配的队列。 | 
| 主题模型 | 是 | topic | 是 | 使用通配符进行灵活的路由,根据主题和通配符规则进行匹配。 | 
🌟写在最后
有关于图解RabbitMQ五种队列模型介绍及代码实现到此就结束了。感谢大家的阅读,希望大家在评论区对此部分内容散发讨论,便于学到更多的知识。












![学习 [Spring MVC] 的JSR 303和拦截器,提高开发效率](https://img-blog.csdnimg.cn/88b554277b194ed0ad2ec987a29e8ac5.gif)







