RabbitMQ
【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】
文章目录
- RabbitMQ
- 第一天 基础
- 4 RabbitMQ 的工作模式
- 4.4 Topic 通配符模式
- 4.4.1 模式说明
- 4.4.2 代码编写
- 4.4.3 小结
 
- 4.5 工作模式总结
 
 
 
第一天 基础
4 RabbitMQ 的工作模式
4.4 Topic 通配符模式
4.4.1 模式说明
看看文档


Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型中的 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc
 或者 item.insert,item.* 只能匹配 item.insert

没毛病,很清晰
4.4.2 代码编写
来个小需求:
之前我们已经可以“定向” 实现将error 级别的信息存入数据库了,

现在我希望 将订单相关的日志【不管什么级别】 都走这个 方式,存入数据库。
【生产者】
package com.dingjiaxiong.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * ClassName: Producer_Topics
 * date: 2022/11/16 10:58
 * 发送消息
 *
 * @author DingJiaxiong
 */
public class Producer_Topics {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("xxxxxxxxxxxxxxxxxx"); // 服务器IP【默认本机 localhost】
        factory.setPort(5672); //端口【默认也是 5672】
        factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】
        factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】
        factory.setPassword("12345"); //密码【默认 guest】
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建交换机
        /**
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         * 参数解释:
         *      1. exchange:交换机名称
         *      2. type:交换机类型
         *          【枚举】
         *           DIRECT("direct"):定向
         *           FANOUT("fanout"):扇形【广播】【发送消息到每一个与之绑定队列】
         *           TOPIC("topic"):通配符的方式
         *           HEADERS("headers"):参数匹配
         *      3. durable:是否持久化
         *      4. autoDelete:自动删除
         *      5. internal:内部使用【一般false】
         *      6. arguments:参数
         * */
        String exchangeName = "test_topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
        //6. 创建队列【两个】
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);
        //7. 绑定队列和交换机
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * 参数解释:
         *      1. queue:参数
         *      2. exchange:交换机名称
         *      3. routingKey:路由key【绑定规则】[交换机 类型若为fanout, 默认为空]
         * */
        //routing key 系统的名称.日志的级别
        // 需求:所有error 级别的日志存入数据库,所有order 系统的日志存入数据库
        channel.queueBind(queue1Name, exchangeName, "#.error");
        channel.queueBind(queue1Name, exchangeName, "order.*");
        channel.queueBind(queue2Name, exchangeName, "*.*");
        //8. 发送消息
        String body = "这是一条日志信息...【order 系统的】";
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
        //9. 释放资源
        channel.close();
        connection.close();
    }
}
OK,直接运行

OK, 查看管控台

交换机创建成功,查看绑定关系

OK,没问题
查看队列

可以看到 两个 队列中都收到了 消息
【消费者】
package com.dingjiaxiong.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * ClassName: Consumer_Topic1
 * date: 2022/11/16 12:37
 *
 * @author DingJiaxiong
 */
public class Consumer_Topic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("xxxxxxxxxxxxxxxxx"); // 服务器IP【默认本机 localhost】
        factory.setPort(5672); //端口【默认也是 5672】
        factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】
        factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】
        factory.setPassword("12345"); //密码【默认 guest】
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        // 接收消息【消费消息】
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 参数解释:
         *      1. queue:队列名称
         *      2. autoAck:是否自动确认
         *      3. callback:回调对象
         * */
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法,当收到消息后,会自动执行这个 方法
            /*
            * 1. consumerTag:消息的标识
            * 2. envelope:获取一些 信息【交换机、路由key...】
            * 3. properties: 配置信息
            * 4. body: 数据
            * */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:" + consumerTag);
//                System.out.println("Exchange:" + envelope.getExchange());
//                System.out.println("RoutingKey:" + envelope.getRoutingKey());
//                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
                System.out.println("将日志 存入数据库...");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);
        // 不要关闭资源,让它一直监听
    }
}
OK,消费者 2

OK
直接运行

OK,没毛病,order.info 既打印到 了控制台,也存入了 数据库
现在来条goods 日志

直接发送

再来个goods.error

没毛病。【这就是 Topic 工作模式】
4.4.3 小结
Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。
4.5 工作模式总结
- 简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。
 
- 工作队列模式 Work Queue
 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。

- 发布订阅模式 Publish/subscribe
 需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。

- 路由模式 Routing
 需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

- 通配符模式 Topic
 需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。







![BUUCTF Misc [ACTF新生赛2020]NTFS数据流  john-in-the-middle  [ACTF新生赛2020]swp  喵喵喵](https://img-blog.csdnimg.cn/32c992ed9863441a9ec57fbfef35d1f0.png)












