Springboot整合RabbitMq,详细步骤
- 1 添加springboot-starter依赖
- 2 添加连接配置
- 3 在启动类上添加开启注解`@EnableRabbit`
- 4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。
- 5 生产者推送消息
- 6 消费者接收消息
- 7 生产者的消息回调机制
- 8 消费者的确认机制
 
消息队列(Message Queue)是一种应用间的通信方式。顾名思义,将消息放到队列中,排队发出。消息发布者只管把消息发布到MQ中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
而且消息队列一般有完整的接收确认,发布消息回调等一系列机制,可以确保接收方一定能接受。
用到的场景如:异步处理,应用解耦,流量削锋和消息通讯等。
以下先详细介绍下springboot项目怎么使用RabbitMq
1 添加springboot-starter依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2 添加连接配置
以下几项是最基础的配置,其他配置下面用到时额外添加
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest #默认用户名和密码
    password: guest
    virtual-host: /  # 虚拟主机
3 在启动类上添加开启注解@EnableRabbit
 
4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。
可以直接在java代码中通过注入实体类的方式创建交换机及队列等设备。但此方式添加的’设备‘是懒加载的形式,只要当使用到识别到监听注解或调用发送消息的方法时,才会真在rabbitmq中创建。
可以定位到amqp依赖的源码看到在程序启动的时候并不创建连接,只有在添加了监听注解启动程序或要发送消息时,才会走创建连接的方法。
配置类的示例代码如下:
@Configuration
public class RabbitConfig {
    /**
     * 队列
     */
    @Bean
    Queue createDirectQueue(){
        /**
         * durable:是否持久化,默认是false。true为持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在;false为暂存队列:当前连接有效。
         * exclusive:默认也是false。true是只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable。
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
         * 一般设置一下队列的持久化就好,其余两个就是默认false
         */
        //两种创建方式
        //QueueBuilder.durable("queue.test1").build();
        return new Queue("queue.test1",true,false,false);
    }
    /**
     *  交换机
     */
    @Bean
    DirectExchange createDirectExchange(){
        /**
         * durable、autoDelete参数性质和上面队列的一致
         */
        return new DirectExchange("direct.test1",true,false);
    }
    /**
     * 将队列和交换机绑定, 并设置用于匹配键
     */
    @Bean
    Binding binding(){
        return BindingBuilder.bind(createDirectQueue()).to(createDirectExchange()).with("testRoute");
    }
}
以上是以直连交换机为例,创建其他交换机写法一样,具体对应哪个实体类可以在Exchange接口 —>AbstractExchange实现类下看到。

可以通过客户端看到队列、交换机、路由关系已经创建成功

 
 
5 生产者推送消息
@Autowired
RabbitTemplate rabbitTemplate;
@PostMapping("/sendMessage")
public AjaxResult sendMessage(@RequestBody Map params) {
    String id = UUID.randomUUID().toString();
    String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    params.put("messageId",id);
    params.put("createTime",createTime);
    /**
     * 发给交换机,在发给路由绑定的队列
     */
    rabbitTemplate.convertAndSend("direct.test1","testRoute",params);
    return AjaxResult.success("成功");
}
可以看到,rabbitmq成功接收到消息。

 
6 消费者接收消息
@Component
@RabbitListener(queues = "queue.test1")
public class Receiver {
    @RabbitHandler
    public void process(Map message){
        System.out.printf("消费者接收到消息:" + message.toString());
    }
}
可以看到消息成功被消费,监听处理方法也成功被执行

 如果多个监听器监听同一个队列,是轮询的方式进行消费,不会出现重复消费的情况;如果多个队列同时以相同的路由绑定同一个交换机,消息会以复制的形式发送至每个队列。
7 生产者的消息回调机制
在实际运用中,作为消息的生产者,很多时候我们需要确认消息是否成功发送到了mq中。同时我们还需要知道,假如消息出现异常时的异常情况。为了满足这个业务场景,我们就需要配置消息回调。
-  增加配置项 spring: rabbitmq: publisher-confirm-type: correlated #消息发送成功交互 publisher-returns: true可能之前老的版本是 publisher-confirm:true,但现在写的话会发现变红了,说明过时了。因为在springboot的自动配置依赖里该配置级别已经为error了

-  目前回调包含发送成功回调 ConfirmCallback和失败回调ReturnsCallback。一些老版本的可能有ReturnCallback。下面先自定义两个回调的回调方法ConfirmCallback的回调 /** * 消息发送成功回调 */ public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback { /** * 消息成功到达exchange,ack=true * @param correlationData * @param ack * @param s */ @Override public void confirm(CorrelationData correlationData, boolean ack, String s) { System.out.println("相关数据:" + correlationData); System.out.println("确认状态:" + ack); System.out.println("造成原因:" + s); } }ReturnsCallback的回调 /** * 发生异常时的消息返回提醒 */ public class RabbitReturnsCallback implements RabbitTemplate.ReturnsCallback { @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("失败回调:" + returnedMessage); } }将自定义回调配置到模板中 在Rabbit配置类中添加 RabbitTemplate并配置两个回调@Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack()); rabbitTemplate.setReturnsCallback(new RabbitReturnsCallback()); return rabbitTemplate; } }那以上两种回调函数什么时候回执行呢? - 消息发送到exchange,且传播到队列,则只有ConfirmCallback回调,ack=true
- 消息发送不到exchange,则只有ConfirmCallback回调,ack=false
- 消息发送到exchange,没传播到队列(或找不到路由),则ConfirmCallback回调,ack=true、ReturnsCallback回调
 
由此可见ConfirmCallback回调是exchange的一种反馈,是发生在生产者和交换机之间的,无论能不能发到都会回调。消息发送出去如果收到交换机的确认反馈则回调为成功,如果没有收到确认反馈,则回调为失败。
ReturnsCallback回调是队列的一种反馈,是发生在交换机和队列之间的。只有消息先到达交换机,且发送到队列失败才会执行此回调。
下面是对以上三种情况的测试
-  消息完全成功发送到队列 模拟:交换机和路由都存在 @PostMapping("/sendMessage") public AjaxResult sendMessage(@RequestBody Map params) { String id = UUID.randomUUID().toString(); String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); params.put("messageId",id); params.put("createTime",createTime); //direct.test1和testRoute都存在 rabbitTemplate.convertAndSend("direct.test1","testRoute",params); return AjaxResult.success("成功"); }消费者监听且 ConfirmCallback回调为true
  
-  消息没有发送到exchange 模拟:交换机不存在 @PostMapping("/sendMessageFailByNoExchange") public AjaxResult sendMessageFailByNoExchange(@RequestBody Map params) { String id = UUID.randomUUID().toString(); String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); params.put("messageId",id); params.put("createTime",createTime); //该交换机不存在 rabbitTemplate.convertAndSend("direct.exchange不存在","testRoute",params); return AjaxResult.success("成功"); }ConfirmCallback回调为false
  
-  消息发送到exchange,但没发送到队列 模拟:该交换机存在但该路由不存在 @PostMapping("/sendMessageFailByNoRoute") public AjaxResult sendMessageFailByNoRoute(@RequestBody Map params) { String id = UUID.randomUUID().toString(); String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); params.put("messageId",id); params.put("createTime",createTime); //交换机存在但该路由不存在 rabbitTemplate.convertAndSend("direct.test1","failRoute",params); return AjaxResult.success("成功"); }ConfirmCallback回调为true,ReturnsCallback失败回调执行
  
可以通过两个回调确定哪些消息没有成功发送到队列,记录下来再次发送,保证消息不丢失。
8 消费者的确认机制
消费者和生产者不同,消费者本身就是凭自己喜好,符合条件才会消费。
所有消费者的确认机制有三种模式:
-  自动确认 是默认的消息确认模式,即mq成功将消息发出,消费者成功接收到,就反馈确认。不管消费者是不是已经成功处理。 所以如果处理逻辑抛出异常,就相当于丢失了消息。 一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。 
-  手动确认 
 后面还会详细介绍两者确认机制的用法及代码实现










![[HDLBIts] Exams/m2014 q4j](https://img-blog.csdnimg.cn/img_convert/e20a6409553190b8edbbbf004dfea770.png)




![基层社会治理平台建设方案[113页PPT]](https://img-blog.csdnimg.cn/img_convert/bfe7ec192189cf439b643cd9b1edc344.jpeg)



