RabbitMQ通讯方式
RabbitMQ提供了很多中通讯方式,依然可以去官方查看:https://rabbitmq.com/getstarted.html
| 七种通讯方式 | 
|---|
|  | 
1 RabbitMQ提供的通讯方式
- Hello World!:为了入门操作!
- Work queues:一个队列被多个消费者消费
- Publish/Subscribe:手动创建Exchange(FANOUT)
- Routing:手动创建Exchange(DIRECT)
- Topics:手动创建Exchange(TOPIC)
- RPC:RPC方式
- Publisher Confirms:保证消息可靠性
2 构建Connection工具类
-  导入依赖:amqp-client,junit <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
-  构建工具类: package com.llp.rabbitmq.java_api.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMQConnectionUtil { public static final String RABBITMQ_HOST = "192.168.109.102"; public static final int RABBITMQ_PORT = 5672; public static final String RABBITMQ_USERNAME = "guest"; public static final String RABBITMQ_PASSWORD = "guest"; public static final String RABBITMQ_VIRTUAL_HOST = "/"; /** * 构建RabbitMQ的连接对象 * @return */ public static Connection getConnection() throws Exception { //1. 创建Connection工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置RabbitMQ的连接信息 factory.setHost(RABBITMQ_HOST); factory.setPort(RABBITMQ_PORT); factory.setUsername(RABBITMQ_USERNAME); factory.setPassword(RABBITMQ_PASSWORD); factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST); //3. 返回连接对象 Connection connection = factory.newConnection(); return connection; } }
3 Hello World-简单模式
| 通讯方式 | 
|---|
|  | 
ps: rabbitmq入门操作,这里使用的是默认的交换机在程序中指定为 空字符串,不需要指定路由,生产者和消费者直接通过队列名去匹配,并且消费者和生产者是一一对应的
生产者:
package com.llp.rabbitmq.java_api.helloworld;
import com.llp.rabbitmq.java_api.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
public class Publisher {
    public static final String QUEUE_NAME = "hello";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        /*
         如果队列不存在,则会创建
         Rabbitmq不允许创建两个相同的队列名称,否则会报错。
         @params1: queue 队列的名称
         @params2: durable 队列是否持久化
         @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
         @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
         @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
         * */
        //3. 构建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4. 发布消息
        String message = "Hello World!";
        /*
         @params1: 交换机exchange
         @params2: 队列名称/routing
         @params3: 属性配置
         @params4: 发送消息的内容
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送成功!");
    }
}
消费者:
package com.llp.rabbitmq.java_api.helloworld;
import com.llp.rabbitmq.java_api.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class Consumer {
    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:" + new String(body,"UTF-8"));
            }
        };
        //amq.ctag-8MwD17o5WKA699YqWQ01kg
        String result = channel.basicConsume(Publisher.QUEUE_NAME, true, callback);
        System.out.println(result);
        System.out.println("开始监听队列");
        System.in.read();
    }
}
4 Work Queues-工作模式
| WorkQueues需要学习的内容 | 
|---|
|  | 
-  生产者:生产者和Hello World的形式是一样的,都是将消息推送到默认交换机。 
-  消费者:让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息 
生产者:模拟发送十条消息
package com.llp.rabbitmq.java_api.workqueues;
import com.llp.rabbitmq.java_api.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
public class Publisher {
    public static final String QUEUE_NAME = "hello";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4. 发布消息
        for (int i = 0; i < 10; i++) {
            String message = "Hello World!"+i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        }
        System.out.println("消息发送成功!");
    }
}
消费者:模拟两个消费者,去消费生产者推送的10条消息,rabbitmq默认是轮询的机制去消费消息,当消费者之前的消费能力差异较大时,会非常影响消费者端的吞吐量,为了提升消费能力可以在消费者端手动ack并设置eg: channel.basicAck(envelope.getDeliveryTag(),false);,消息流控(每个消费者每次从队列取出多少条消息进行消费)eg: channel.basicQos(3);
package com.llp.rabbitmq.java_api.workqueues;
import com.llp.rabbitmq.java_api.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class Consumer {
    @Test
    public void consume1() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
        //3.5 设置消息的流控
        channel.basicQos(3);
        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号-获取到消息:" + new String(body,"UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME,false,callback);
        System.out.println("开始监听队列");
        System.in.read();
    }
    @Test
    public void consume2() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
        channel.basicQos(3);
        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号-获取到消息:" + new String(body,"UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME,false,callback);
        System.out.println("开始监听队列");
        System.in.read();
    }
}

5 Publish/Subscribe-发布订阅模式
| 自定义一个交换机 | 
|---|
|  | 
生产者:自行构建Exchange并绑定指定队列(FANOUT类型)
FANOUT发布订阅模式,这种模式通过交换机和队列名称去匹配生产者和消费者,与路由无关因此路由这里是可以随意填写的,建议写空字符串
package com.llp.rabbitmq.java_api.pubsub;
import com.llp.rabbitmq.java_api.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
public class Publisher {
    public static final String EXCHANGE_NAME = "pubsub";
    public static final String QUEUE_NAME1 = "pubsub-one";
    public static final String QUEUE_NAME2 = "pubsub-two";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建交换机 param1: 交换机名称 param2: 指定交换机类型
        //FANOUT发布订阅模式,这种模式通过交换机和队列名称去匹配生产者和消费者,与路由无关因此路由这里是可以随意填写的,建议空字符串
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
        //5. 绑定交换机和队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"1asdasd22");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"122asdasda1");
        //6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"45jk6h645jk",null,"publish/subscribe!".getBytes());
        System.out.println("消息成功发送!");
    }
}
消费者:前面生产者声明了两个队列pubsub-one和pubsub-two,消费者端模拟两个消费者进行消费
package com.llp.rabbitmq.java_api.pubsub;
import com.llp.rabbitmq.java_api.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class Consumer {
    @Test
    public void consume1() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME1,false,false,false,null);
        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:" + new String(body,"UTF-8"));
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME1, true, callback);
        System.out.println("开始监听队列");
        System.in.read();
    }
    @Test
    public void consume2() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME2,false,false,false,null);
        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:" + new String(body,"UTF-8"));
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME2, true, callback);
        System.out.println("开始监听队列");
        System.in.read();
    }
}

6 Routing-路由模式
| DIRECT类型Exchange | 
|---|
|  | 
路由模式,生产者在绑定Exchange和Queue时,需要指定好routingKey,同时在发送消息时,也指定routingKey,只有routingKey一致时,才会把指定的消息路由到指定的Queue;
生产者:
package com.llp.rabbitmq.java_api.routing;
import com.llp.rabbitmq.java_api.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
public class Publisher {
    public static final String EXCHANGE_NAME = "routing";
    public static final String QUEUE_NAME1 = "routing-one";
    public static final String QUEUE_NAME2 = "routing-two";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
        //5. 绑定交换机和队列
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");
        //6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"大橙子!".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"黑布林大狸子".getBytes());
        //WHITE和channel绑定的路由均不匹配,因此消息不会被消费
        channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔!".getBytes());
        System.out.println("消息成功发送!");
    }
}
消费者:
package com.llp.rabbitmq.java_api.routing;
import com.llp.rabbitmq.java_api.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class Consumer {
    @Test
    public void consume1() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME1,false,false,false,null);
        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号-获取到消息:" + new String(body,"UTF-8"));
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME1,true,callback);
        System.out.println("开始监听队列");
        System.in.read();
    }
    @Test
    public void consume2() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME2,false,false,false,null);
        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号-获取到消息:" + new String(body,"UTF-8"));
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME2,true,callback);
        System.out.println("开始监听队列");
        System.in.read();
    }
}

7 Topic-主题模式
| Topic模式 | 
|---|
|  | 
direct 模式会造成路由 RoutingKey 太多, 而实际开发中往往是按照某个规则来进行路由匹配的, RabbitMQ 提供了 Topic 模式/主题模式来适应这种需求.
 Topic 模式是 direct 模式上的一种扩展/叠加, 扩展/叠加了模糊路由 RoutingKey 的模式, 可以理解为是模糊的路由匹配模式
- *(星号):可以(只能)匹配一个单词
- #(井号):可以匹配多个单词(或者零个)
生产者:TOPIC类型可以编写带有特殊意义的routingKey的绑定方式
package com.mashibing.topics;
import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
/**
 * @author zjw
 * @description
 * @date 2022/1/25 20:28
 */
public class Publisher {
    public static final String EXCHANGE_NAME = "topic";
    public static final String QUEUE_NAME1 = "topic-one";
    public static final String QUEUE_NAME2 = "topic-two";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
        //5. 绑定交换机和队列,
        // TOPIC类型的交换机在和队列绑定时,需要以aaa.bbb.ccc..方式编写routingkey
    	// 其中有两个特殊字符:*(相当于占位符)可以(只能)匹配一个单词,#(相当通配符)可以匹配多个单词(或者零个)
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");
        //6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙兔子!".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog.dog.dog.dog",null,"懒狗狗狗狗狗狗".getBytes());
        System.out.println("消息成功发送!");
    }
}

8 RPC(了解)
因为两个服务在交互时,可以尽量做到Client和Server的解耦,通过RabbitMQ进行解耦操作
需要让Client发送消息时,携带两个属性:
- replyTo告知Server将相应信息放到哪个队列
- correlationId告知Server发送相应消息时,需要携带位置标示来告知Client响应的信息
| RPC方式 | 
|---|
|  | 
客户端:
package com.llp.rabbitmq.java_api.rpc;
import com.llp.rabbitmq.java_api.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.UUID;
public class Publisher {
    public static final String QUEUE_PUBLISHER = "rpc_publisher";
    public static final String QUEUE_CONSUMER = "rpc_consumer";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建队列
        channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
        channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
        //4. 发布消息
        String message = "Hello RPC!";
        String uuid = UUID.randomUUID().toString();
        AMQP.BasicProperties props = new AMQP.BasicProperties()
                .builder()
                .replyTo(QUEUE_CONSUMER)
                .correlationId(uuid)
                .build();
        channel.basicPublish("",QUEUE_PUBLISHER,props,message.getBytes());
        channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String id = properties.getCorrelationId();
                if(id != null && id.equalsIgnoreCase(uuid)){
                    System.out.println("接收到服务端的响应:" + new String(body,"UTF-8"));
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
        System.out.println("消息发送成功!");
        System.in.read();
    }
}
服务端:
package com.llp.rabbitmq.java_api.rpc;
import com.llp.rabbitmq.java_api.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class Consumer {
    public static final String QUEUE_PUBLISHER = "rpc_publisher";
    public static final String QUEUE_CONSUMER = "rpc_consumer";
    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channel
        Channel channel = connection.createChannel();
        //3. 构建队列
        channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
        channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:" + new String(body,"UTF-8"));
                String resp = "消费者端获取到了client发出的请求,这里是消费端响应的内容(应用于生产者推送消息,消费者返回响应给生产者的场景)";
                String respQueueName = properties.getReplyTo();
                String uuid = properties.getCorrelationId();
                AMQP.BasicProperties props = new AMQP.BasicProperties()
                        .builder()
                        .correlationId(uuid)
                        .build();
                channel.basicPublish("",respQueueName,props,resp.getBytes());
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE_PUBLISHER,false,callback);
        System.out.println("开始监听队列");
        System.in.read();
    }
}





















