一、概念
MQ(消息队列):是指在消息传送过程中保存消息的容器,用于分布式系统之间的通信
 生产者:是发送消息的用户应用程序。
 队列:是存储消息的缓冲区。
 消费者:是接收消息的用户应用程序。
1、优劣势
优势
● 应用解耦:使用MQ使得应用间耦合度降低,提高系统容错性和可维护性
 ● 异步提速:系统将消息发给MQ之后,就可返回用户信息,后续操作异步执行
 ● 消峰填谷:并发量高时,将消息全部放到MQ中,限制消费的速度为固定并发量,这样就消掉了高峰期的并发量,这就是消峰;但是因为消息积压,在高峰期过后的一段时间内,消费速度也仍旧保持固定并发量,直到消费完积压的消息,这就是填谷
劣势
● 系统可用性降低:一旦MQ宕机
 ● 系统复杂度提高:如何保证消息没有重复被消费,消息丢失了怎么办,如何保证消息的顺序等等
 ● 一致性问题:A系统同时给B、C、D发送消息,BC成功,D失败,如何保证数据一致性
2、工作模式
-  
简单工作模式:一个生产者对应一个消费者

 -  
工作队列模式(Work Queues):一个生产者对应多个消费者,多个消费者之间属于竞争关系,当任务比较重时,可以提高处理速度

 -  
订阅模式(Publish/Subscribe):一个生产者对应多个消费者,多个消费者之间不是竞争关系,在这种模式中引入交换机的概念,交换机类型为fanout
交换机:接收生产者的消息,并将消息推送给队列;交换机必须知道要如何处理他接收到的消息,类型如下:
○ direct:定向,将消息交给符合指定routing key的队列
○ topic:通配符,将消息交给符合制定routing pattern的队列
○ headers:参数匹配
○ fanout:广播,将收到的所有消息广播到它知道的所有队列。发送消息时不需要指定routing key

 -  
路由模式(Routing):需要设计交换机类型为 direct,交换机和队列进行绑定,并指定通配符方式的routing key,当发送消息到交换机时,交换机会根据routing key将消息发送给队列
● 队列与交换机的绑定不再是随意绑定,而是指定要routing key;
● 发送方发送消息时,需要指定routing key;
● 只有队列的routing key与消息的routing key一致,才能接收到消息

 -  
通配符模式(Topics):需要设计交换机类型为Topics,交换机和队列进行绑定,并指定通配符方式的routing key,当发送消息到交换机时,交换机会根据routing key将消息发送给队列
● *(星号)只能代替一个词。
● # (hash) 可以替代零个或多个单词。

 
二、SpringBoot 整合RabbitMQ
1、引入依赖
implementation 'org.springframework.boot:spring-boot-starter-amqp'
 
2、配置文件
spring:
  rabbitmq:
    host: 192.168.252.206
    port: 5672
    username: admin
    password: admin
 
3、配置类
@Configuration
public class RabbitConfig {
    public static String EXCHANGE_NAME = "test_exchange";
    public static String QUEUE_NAME = "test_queue";
    /**
     * 1、交换机
     *
     * @return
     */
    @Bean(name = "testExchange")
    public Exchange exchanger() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }
    /**
     * 2、队列
     *
     * @return
     */
    @Bean(name = "testQueue")
    public Queue queue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    /**
     * 3、绑定交换机和队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding binding(@Qualifier(value = "testExchange") Exchange exchange, @Qualifier(value = "testQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
    }
}
 
4、生产者发送消息
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
    }
}
 
5、消费者接收消息
@Component
public class RabbitMqListener {
    
    @RabbitListener(queues = "test_queue")
    public void testListener(Message message) {
        System.out.println(message);
    }
}
 
结果如下:
(Body:'hello world!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test.kk, deliveryTag=1, consumerTag=amq.ctag-nYz1fLxW0ezTLEcO3W1rVw, consumerQueue=test_queue])
 
三、特性
1、消息的可靠投递
RabbitMQ为我们提供了两种控制消息可靠性的模式
confirm 确认模式
1、开启确认模式
 spring:
 rabbitmq:
 publisher-confirm-type: correlated
- NONE 禁用发布确认模式,是默认值
 - CORRELATED 发布消息成功到交换器后会触发回调方法
 - SIMPLE 经测试有两种效果: 
  
- 其一效果和 CORRELATED 值一样会触发回调方法,
 - 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;
 
 
2、设置ConfirmCallback
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test() {
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息接收成功!");
            } else {
                System.out.println("消息接收失败!" + cause);
            }
        });
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
    }
}
 
return 退回模式
当消息发送给Exchange时,Exchange路由到Queue失败,才会执行RetureCallback
 1、开启回退模式
spring:
  rabbitmq:
  	publisher-returns: true
 
2、设置RetureCallback
 3、设置Exchange消息处理模式
- 如果消息没有路由到Queue,则丢弃消息
 - 如果消息没有路由到Queue,将消息返回给发送方
 
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test() {
        // 设置交换机失败处理模式,true:返回消息给发送方;默认为false,即丢弃消息
        rabbitTemplate.setMandatory(true);
        // 设置RetureCallback
        rabbitTemplate.setReturnsCallback((returned) -> {
            System.out.println("return 执行了");
            System.out.println(returned);
        });
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test1.kk", "hello world!");
    }
}
 
2、Consumer ack
1、设置模式
spring:
  rabbitmq:
  	listener:
      direct:
        acknowledge-mode: manual
 
- none:自动确认
 - manual:手动确认
 
2、设置监听器
- 如果在消费端没有出现异常,就调用basicAck()方法签收消息
 - 如果在消费端出现异常,就调用basicNack()方法拒绝消息,让mq重新发送
 
@Component
public class AckListener implements ChannelAwareMessageListener {
    @RabbitListener(queues = "test_queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("处理业务逻辑");
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, true, true);
        }
    }
}
 
3、消费端限流
1、这是Consumer ack的模式为手动确认
spring:
  rabbitmq:
  	listener:
      direct:
        acknowledge-mode: manual
		prefetch: 2
 
- none:自动确认
 - manual:手动确认
 - prefetch:表示消费端每次从mq中拉取多少条消息,直到手动确认消费完,才会拉取下一条消息
 
2、设置监听器
@Component
public class AckListener implements ChannelAwareMessageListener {
    @RabbitListener(queues = "test_queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(message.getBody().toString());
            System.out.println("处理业务逻辑");
            //channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, true, true);
        }
    }
}
 
4、生产者
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test() {
        for (int i = 0; i < 3; i++) {
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
        }
    }
}
 
4、TTL
TTL全程 time to live,也就是存活时间/过期时间;当消息到达存活时间后,还没有被消费,会被清除;RabbitMQ可以对消息设置存活时间也可以对队列设置存活时间
 对队列统一设置:是对 x-message-ttl 参数设置
 对消息单独设置:是对 expiration 参数设置
 如果两者都设置了,以时间短的为准
设置队列的存活时间
1、配置队列,将ttl设置为10秒
@Configuration
public class RabbitConfig {
    public static String EXCHANGE_NAME = "test_exchange";
    public static String QUEUE_NAME = "test_queue";
    /**
     * 1、交换机
     *
     * @return
     */
    @Bean(name = "testExchange")
    public Exchange exchanger() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }
    /**
     * 2、队列
     *
     * @return
     */
    @Bean(name = "testQueue")
    public Queue queue() {
        return QueueBuilder.durable(QUEUE_NAME).ttl(10000).build();
    }
    /**
     * 3、绑定交换机和队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding binding(@Qualifier(value = "testExchange") Exchange exchange, @Qualifier(value = "testQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
    }
}
 
2、发送mq
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test() {
        for (int i = 0; i < 3; i++) {
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
        }
    }
}
 
3、去RabbitMQ界面查看,会发现该队列的ready在10秒之后会置0
 
 
设置消息的存活时间
1、生产者发送MQ:只需在发送消息时加上messagePostProcessor即可
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!",messagePostProcessor);
    }
}
 
5、死信队列
消息成为死信的条件:
- 消息队列长度达到限制
 - 消费者拒收消息:basicNack/basicReject,并且不把消息放回原目标队列:requeue=false
 - 原队列存在消息过期设置,消息达到过期时间未被消费
 
队列绑定死信交换机
- x-dead-letter-exchange
 - x-dead-letter-routing-key
 
1、配置
spring:
  rabbitmq:
    host: 192.168.252.206
    port: 5672
    username: admin
    password: admin
 
2、配置类
@Configuration
public class RabbitConfig {
    public static String EXCHANGE_NAME = "test_exchange";
    public static String QUEUE_NAME = "test_queue";
    public static String DEAD_EXCHANGE_NAME = "dead_test_exchange";
    public static String DEAD_QUEUE_NAME = "dead_test_queue";
    /**
     * 1、死信交换机
     *
     * @return
     */
    @Bean(name = "deadTestExchange")
    public Exchange deadExchanger() {
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).build();
    }
    /**
     * 2、死信队列
     *
     * @return
     */
    @Bean(name = "deadTestQueue")
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
    }
    /**
     * 3、绑定死信交换机和死信队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding bindingDead(@Qualifier(value = "deadTestExchange") Exchange exchange,
                               @Qualifier(value = "deadTestQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.kk.#").noargs();
    }
    /**
     * 1、交换机
     *
     * @return
     */
    @Bean(name = "testExchange")
    public Exchange exchanger() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }
    /**
     * 2、队列
     *
     * @return
     */
    @Bean(name = "testQueue")
    public Queue queue() {
        return QueueBuilder.durable(QUEUE_NAME)
                .ttl(10000)
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("test.kk")
                .maxLength(3)
                .build();
    }
    /**
     * 3、绑定交换机和队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding binding(@Qualifier(value = "testExchange") Exchange exchange, @Qualifier(value = "testQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
    }
}
 
3、测试类
- 超时情况
 
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
    }
}
 
- 超出长度
 
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test() {
        for (int i = 0; i < 5; i++) {
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
        }
    }
}
 
- 消费端拒收
 
生产端
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
    }
}
 
消费端
@Component
public class DeadListener implements ChannelAwareMessageListener {
    @RabbitListener(queues = "test_queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(new String(message.getBody()));
            System.out.println("处理业务逻辑");
            int i = 3 / 0;
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, true, false);
        }
    }
}
 
6、延迟队列
延迟队列:消息进入消费端之后,不会立马被消费,会在指定时间达到后,才会消费。RabbitMQ通过TTL和死信队列实现延迟队列
- 只需设置队列或者消息过期时间,当消息过期后即可进入死信队列
 - 消费端监听队列要监听死信队列
 



















