一,RabbitMQ的工作模式
RabbitMQ 的工作模式是指 RabbitMQ 中不同的消息传递方式,包括简单模式、工作队列模式、发布订阅模式、路由模式和主题模式 。这些工作模式适用于不同的应用场景。详细的文档可参照RabbitMQ的官网:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ

依赖信息
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.itheima</groupId>
    <artifactId>rabbitmq-consumer</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>1,简单模式
Simple 简单模式:一个生产者、一个队列、一个消费者,这种交换机是不参与的

其中P是生产者,C是消费者,红色的为消息队列即 MQ,后面几种模式一样。
生产者端代码:
public class Producer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("123.207.72.43");//ip 默认值是localhost
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");//用户名 默认是guest
        factory.setPassword("123");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建 channel
        Channel channel = connection.createChannel();
        //5.创建队列 Queue
        //如果没有一个名字叫hello_world的队列就会自动创建一个
        /**
         *  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         *  queue:队列名称
         *  durable:是否持久化,当mq重启之后还在
         *  exclusive:是否独占,只能有一个消费者监听这个队列或者当connection关闭时,是否删除队列
         *  autoDelete:是否自动删除,当没有consumer时,自动删除掉
         *  arguments:参数
         */
        channel.queueDeclare("hello_world",true,false,false,null);
        String body = "hello rabbitmq";
        //6.发送消息
        /**
         * basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
         * exchange:交换机名称,简单模式下交换机会使用默认的“”
         * routingKey:路由名称,默认交换机使用队列名称
         * props:配置信息
         * body:发送消息数据
         */
        channel.basicPublish("","hello_world",null,body.getBytes());
        //7.释放资源
        channel.close();
        connection.close();
    }
}消费者端代码:
public class Consumer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("123.207.72.43");//ip 默认值是localhost
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");//用户名 默认是guest
        factory.setPassword("123");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建 channel
        Channel channel = connection.createChannel();
        //5.创建队列 Queue
        //如果没有一个名字叫hello_world的队列就会自动创建一个
        channel.queueDeclare("hello",true,false,false,null);
        //6.接收消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法,当收到消息后会执行这个方法
             * consumerTag:标识
             * envelope:获取一些信息,交换机,路由key...
             * properties:配置信息
             * body:数据
             * @param consumerTag
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: " +  consumerTag);
                System.out.println("Exchange: " + envelope.getExchange());
                System.out.println("RoutingKey: " + envelope.getRoutingKey());
                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
            }
        };
        channel.basicConsume("hello_world",true,consumer);
        //消费者不需要关闭资源,需要一直监听
    }
}
2,工作/队列模式
Queue 队列模式:生产者将消息发布到一个队列中,消费者从队列中获取消息
 
 
生产者端代码:
public class Producer_WorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("123.207.72.43");//ip 默认值是localhost
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");//用户名 默认是guest
        factory.setPassword("123");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建 channel
        Channel channel = connection.createChannel();
        //5.创建队列 Queue
        //如果没有一个名字叫hello_world的队列就会自动创建一个
        channel.queueDeclare("work_queues",true,false,false,null);
        for (int i = 0; i < 10; i++) {
            String body = i + "hello rabbitmq";
            //6.发送消息
            channel.basicPublish("","work_queues",null,body.getBytes());
        }
        //7.释放资源
        channel.close();
        connection.close();
    }
}消费者端代码:
//消费者1
public class Consumer_WorkQueues1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("123.207.72.43");//ip 默认值是localhost
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");//用户名 默认是guest
        factory.setPassword("123");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建 channel
        Channel channel = connection.createChannel();
        //5.创建队列 Queue
        //如果没有一个名字叫hello_world的队列就会自动创建一个
        channel.queueDeclare("work_queues",true,false,false,null);
        //6.接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法,当收到消息后会执行这个方法
             * @param consumerTag
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: " +  consumerTag);
                System.out.println("Exchange: " + envelope.getExchange());
                System.out.println("RoutingKey: " + envelope.getRoutingKey());
                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
            }
        };
        channel.basicConsume("work_queues",true,consumer);
    }
}
//消费者2
public class Consumer_WorkQueues2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("123.207.72.43");//ip 默认值是localhost
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");//用户名 默认是guest
        factory.setPassword("123");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建 channel
        Channel channel = connection.createChannel();
        //5.创建队列 Queue
        //如果没有一个名字叫hello_world的队列就会自动创建一个
        channel.queueDeclare("work_queues",true,false,false,null);
        //6.接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法,当收到消息后会执行这个方法
             * @param consumerTag
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: " +  consumerTag);
                System.out.println("Exchange: " + envelope.getExchange());
                System.out.println("RoutingKey: " + envelope.getRoutingKey());
                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
            }
        };
        channel.basicConsume("work_queues",true,consumer);
    }
}3,直连模式
Direct 直连模式:生产者直接将消息发送到队列中,消费者从队列中获取消息

生产者端代码:
public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("123.207.72.43");//ip 默认值是localhost
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");//用户名 默认是guest
        factory.setPassword("123");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建 channel
        Channel channel = connection.createChannel();
        //5.创建交换机
        /**
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         * exchange:交换机名称
         * type:交换机类型 是一个枚举类型
         * durable:是否持久化
         * autoDelete:是否自动删除
         * internal:内部使用,一般设为false
         * arguments:参数,设为null
         */
        String exchangeName = "test_fanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6.创建队列
        String queueName1 = "queue1";
        String queueName2 = "queue2";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        //7.绑定队列和交换机
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * queue:队列名称
         * exchange:交换机名称
         * routingKey:路由键,绑定规则
         *              如果交换机类型为FANOUT类型,则routingKey设为“” 说明交换机会绑定每一个queue
         */
        channel.queueBind(queueName1,exchangeName,"");
        channel.queueBind(queueName2,exchangeName,"");
        //8.发送消息
        String body = "日志信息:数据库被删除";
        channel.basicPublish(exchangeName,"",null,body.getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}消费者端代码:
//消费者1
public class Consumer_PubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("123.207.72.43");//ip 默认值是localhost
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");//用户名 默认是guest
        factory.setPassword("123");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建 channel
        Channel channel = connection.createChannel();
        String queueName1 = "queue1";
        String queueName2 = "queue2";
        //6.接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法,当收到消息后会执行这个方法
             * @param consumerTag
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag: " +  consumerTag);
//                System.out.println("Exchange: " + envelope.getExchange());
//                System.out.println("RoutingKey: " + envelope.getRoutingKey());
//                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
                System.out.println("将日志信息打印到控制台..");
            }
        };
        channel.basicConsume(queueName1,true,consumer);
    }
}
//消费者2
public class Consumer_PubSub2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("123.207.72.43");//ip 默认值是localhost
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");//用户名 默认是guest
        factory.setPassword("123");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建 channel
        Channel channel = connection.createChannel();
        String queueName1 = "queue1";
        String queueName2 = "queue2";
        //6.接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法,当收到消息后会执行这个方法
             * @param consumerTag
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag: " +  consumerTag);
//                System.out.println("Exchange: " + envelope.getExchange());
//                System.out.println("RoutingKey: " + envelope.getRoutingKey());
//                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
                System.out.println("将日志信息保存数据库..");
            }
        };
        channel.basicConsume(queueName2,true,consumer);
    }
}4,路由模式
Routing 路由模式:生产者将消息发布到一个交换器上,交换器根据规则将消息路由到目标队列中

生产者端代码:
public class Producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("123.207.72.43");//ip 默认值是localhost
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");//用户名 默认是guest
        factory.setPassword("123");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建 channel
        Channel channel = connection.createChannel();
        //5.创建交换机
        /**
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         * exchange:交换机名称
         * type:交换机类型 是一个枚举类型
         * durable:是否持久化
         * autoDelete:是否自动删除
         * internal:内部使用,一般设为false
         * arguments:参数,设为null
         */
        String exchangeName = "test_direct";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //6.创建队列
        String queueName1 = "queue1_direct";
        String queueName2 = "queue2_direct";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        //7.绑定队列和交换机
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * queue:队列名称
         * exchange:交换机名称
         * routingKey:路由键,绑定规则
         *              如果交换机类型为FANOUT类型,则routingKey设为“” 说明交换机会绑定每一个queue
         */
        //队列1的绑定  error
        channel.queueBind(queueName1,exchangeName,"error");
        //队列2的绑定  error info
        channel.queueBind(queueName2,exchangeName,"info");
        channel.queueBind(queueName2,exchangeName,"error");
        //8.发送消息
        String body = "日志信息:数据库被删除";
        channel.basicPublish(exchangeName,"error",null,body.getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}5,通配符模式
Topic 通配符模式:生产者将消息发布到一个主题上,消费者订阅该主题并获取消息

生产者端代码:
public class Producer_Topics {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("123.207.72.43");//ip 默认值是localhost
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");//用户名 默认是guest
        factory.setPassword("123");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建 channel
        Channel channel = connection.createChannel();
        //5.创建交换机
        /**
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         * exchange:交换机名称
         * type:交换机类型 是一个枚举类型
         * durable:是否持久化
         * autoDelete:是否自动删除
         * internal:内部使用,一般设为false
         * arguments:参数,设为null
         */
        String exchangeName = "test_topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        //6.创建队列
        String queueName1 = "queue1_topic";
        String queueName2 = "queue2_topic";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        //7.绑定队列和交换机
        //需求:所有error级别的日志打印控制台和存入数据库,所有order系统的日志存入数据库
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * queue:队列名称
         * exchange:交换机名称
         * routingKey:路由键,绑定规则
         *              如果交换机类型为FANOUT类型,则routingKey设为“” 说明交换机会绑定每一个queue
         */
        //队列1的绑定
        channel.queueBind(queueName1,exchangeName,"#.error");
        channel.queueBind(queueName1,exchangeName,"order.*");
        channel.queueBind(queueName2,exchangeName,"*.*");
        //8.发送消息
        String body = "日志信息:数据库被删除";
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}消费者端代码:
//消费者1
public class Consumer_Topic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("123.207.72.43");//ip 默认值是localhost
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");//用户名 默认是guest
        factory.setPassword("123");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建 channel
        Channel channel = connection.createChannel();
        String queueName1 = "queue1_topic";
        String queueName2 = "queue2_topic";
        //6.接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法,当收到消息后会执行这个方法
             * @param consumerTag
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag: " +  consumerTag);
//                System.out.println("Exchange: " + envelope.getExchange());
//                System.out.println("RoutingKey: " + envelope.getRoutingKey());
//                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
                System.out.println("将日志信息打印到控制台..");
            }
        };
        channel.basicConsume(queueName1,true,consumer);
    }
}
//消费者2
public class Consumer_Topic2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("123.207.72.43");//ip 默认值是localhost
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");//用户名 默认是guest
        factory.setPassword("123");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建 channel
        Channel channel = connection.createChannel();
        String queueName1 = "queue1_topic";
        String queueName2 = "queue2_topic";
        //6.接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法,当收到消息后会执行这个方法
             * @param consumerTag
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag: " +  consumerTag);
//                System.out.println("Exchange: " + envelope.getExchange());
//                System.out.println("RoutingKey: " + envelope.getRoutingKey());
//                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
                System.out.println("将日志信息存入数据库..");
            }
        };
        channel.basicConsume(queueName2,true,consumer);
    }
}二,SpringBoot 整合 RabbitMQ
生产者端整合
步骤:
- 创建生产者工程 ;
- 添加依赖(也可以在创建 Spring Boot的时候添加依赖);
- 添加配置信息;
- 编写代码发送消息。
配置rabbitMQ的基本信息:
#配置rabbitmq的基本信息 为了创建连接工厂
spring:
  rabbitmq:
    host: 123.207.72.43
    port: 5672
    username: admin
    password: 123
    virtual-host: /配置信息(创建一个RabbitMQConfig类):
/**
 * 该类用来创建交换机和队列的 同时将交换机和队列进行绑定
 */
@Configuration
public class RabbitMQConfig {
    //交换机名称
    public static final String EXCHANGE_NAME = "topic_exchange";
    //队列名称
    public static final String QUEUE_NAME = "topic_queue";
    //创建交换机
    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    //创建队列
    @Bean("bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    //将交换机和队列进行绑定
    /**
     * 1.知道哪个队列
     * 2.知道哪个交换机
     * 3.routing key
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding bindExchangeQueue(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}编写测试代码发送消息:
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Test
    public void send() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","hello rabbitmq");
        System.out.println("消息发送成功!");
    }
}消费者端整合
步骤:
- 创建生产者工程;
- 添加依赖(也可以在创建 Spring Boot的时候添加依赖);
- 配置整合;
- 编写消息监听器。
配置rabbitMQ的基本信息:
#配置rabbitmq的基本信息 为了创建连接工厂
spring:
  rabbitmq:
    host: 123.207.72.43
    port: 5672
    username: admin
    password: 123
    virtual-host: /编写消息监听器:
@Component
public class RabbitMQListener {
    @RabbitListener(queues = "topic_queue")
    //这里的message对象就是接收到的消息
    public void listenQueue(Message message) {
        System.out.println("message: " + message);
    }
}


















