分布式高级篇3 —— RabbitMQ

news2025/5/31 20:33:36

  • 一、RabbitMQ
    • 1、RabbitMQ 介绍
    • 2、RabbitMQ 的相关概念
    • 3、安装 RabbitMQ
    • 4、交换机类型
      • (1)direct - 直连交换机
      • (2)fanout - 扇出交换机
      • (3)topic - 主题交换机
    • 5、RabbitMQ 管理界面
    • 6、SpringBoot 整合RabbitMQ
    • 7、AmqpAdmin 的使用
    • 8、使用 RabbitTemplate 发送消息
    • 9、使用 @RabbitListener&@RabbitHandler 接收消息
    • 10、发布确认
      • (1)生产端确认
      • (2)消费端确认
    • 11、延时队列
      • 实战
    • 12、消息丢失、积压、重复等方案
      • 1、消息丢失
      • 2、消息重复
      • 3、消息积压

一、RabbitMQ

视频来源: 【Java项目《谷粒商城》Java架构师 | 微服务 | 大型电商项目】
本笔记对应视频集数: P248 ~ P260

1、RabbitMQ 介绍

RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包

裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是

一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,

存储和转发消息数据。

RabbitMQ 的三大特点

流量消峰:

该场景一般在秒杀或者团购活动中使用广泛。

使用 MQ 后,用户的大量请求不在直接访问数据库,而是将大量请求积压在 MQ 消息队列中,数据库从 MQ 中拉取能处理的请求,避免了数据库因为大量请求出现崩溃、宕机等情况

在这里插入图片描述

应用解耦:

传统做法订单系统直接调用其他接口,如果有一个接口出现问题,整个订单系统无法正常运转的。

使用 MQ 后,将 MQ 作为中间件与其他接口相连,即使有一个接口出现问题,其他还是正常运转的。

image-20230131110145608

异步处理:

场景说明:用户注册后,需要发送注册邮件和注册短信,传统的做法:1、串行方式 2、并行方式 3、MQ 消息队列

1、一套流程全部完成后,返回客户端

image-20230131110207897

2、发送邮件的同时发送短信,节省了一定的时间

image-20230131110218038

3、使用 MQ

image-20230131110232682

2、RabbitMQ 的相关概念

image-20230131111138505

Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。

Producer/publisher :消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。

Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

Broker:消息队列服务进程,此进程包括两个部分:ExchangeQueue

Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,根据message中的routing-key决定转发到哪个 Queue 中

Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

Binding: 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定可以是多对多的关系。

Connection: 网络连接,比如一个TCP连接。

Channel: 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

-----发送消息-----

1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange根据routing-key将消息转发到指定的Queue(队列)

----接收消息-----

1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。

3、安装 RabbitMQ

1、拉取镜像并运行实例

docker run -d --name rabbitmq --restart=always -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

4369, 25672 (Erlang发现&集群端口)

5672, 5671 (AMQP端口)

15672 (web管理后台端口)

61613, 61614 (STOMP协议端口)

1883, 8883 (MQTT协议端口)

4、交换机类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、 fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接 看另外三种类型

(1)direct - 直连交换机

直连交换机通过指定的 routing-key 连接指定的队列,是一种完全匹配的方式。

image-20230131123907129

(2)fanout - 扇出交换机

fanout 交换机会将消息转发到所有与它绑定的队列上,无论是否指定了routing-key。是一种 广播的模式,

并且 fanout 交换机时散发消息最快的,因为无需判断 routing-key

image-20230131124218722

(3)topic - 主题交换机

topic 交换器通过模式匹配分配消息的 路由键属性,将路由键和某个模式进行 匹配,此时队列需要绑定到一个模式上。 它将路由键和绑定键的字符串切分成单 词,这些单词之间用点隔开。

它同样也 会识别两个通配符:符号“#”和符号 “*”。

# 匹配0个或多个单词, * 匹配一 个单词。

image-20230131124358574

5、RabbitMQ 管理界面

IP:15762

进入到管理界面

默认用户: guest

默认密码:guest

image-20230131121149234

1、创建交换机

image-20230131121534060

2、创建队列

image-20230131122010350

3、交换机绑定队列

image-20230131122127912

image-20230131122231931

如果使用 Topic 交换机,可以在绑定队列时,指明routing-key 使用通配符的方式:

image-20230131123443103

4、发送消息

image-20230131122451330

查看队列接受的消息

image-20230131122630272

image-20230131122740495

6、SpringBoot 整合RabbitMQ

1、引入依赖

RabbitAutoConfiguration 生效

引入了 CachingConnectionFactory、RabbitTemplate、AmqpAdmin、RabbitMessagingTemplate

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2、RabbitMQ 的所有配置都在 RabbitProperties 。 进行配置

spring:
    rabbitmq:
        host: 192.168.56.111
        port: 5672
        virtual-host: /

3、开启RabbitMQ

@EnableRabbit

7、AmqpAdmin 的使用

使用Java代码创建 Exchange、Queueu、Binding:

  • AmqpAdmin
@SpringBootTest
@Slf4j
@RunWith(SpringRunner.class)
public class GulimallOrderApplicationTest {

    @Autowired
    private AmqpAdmin amqpAdmin;

    /*
    * 创建一个交换机
    *   public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
    *   name 交换机名字
    *   durable 是否持久化
    *   autoDelete 是否自动删除
    *   arguments其他的一些参数
    * */
    @Test
    public void createExchange() {
        Exchange exchange = new DirectExchange("hello-java-exchange",true,false);
        // 声明一个交换机
        amqpAdmin.declareExchange(exchange);
        log.info("交换机创建成功:{}","hello-java-exchange");
    }

    /*
    * 创建一个队列
    *   public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
    *   name:队列名
    *   durable:是否持久化
    *   exclusive:是否是排他的
    *   autoDelete:是否自动删除
    *   arguments:其他的一些参数
    * */
    @Test
    public void createQueue() {
        Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("队列创建成功:{}","hello-java-queue");

    }

    /*
    * 创建绑定关系
    *   	public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
    *       Map<String, Object> arguments)
    *
    *       destination:绑定目标,绑定的队列名
    *       destinationType:绑定类型,QUEEN or EXCHANGE
    *       exchange: 绑定的交换机名
    *       routingKey:路由键
    *       arguments 其他参数
    * */
    @Test
    public void createBinding() {
        Binding binding = new Binding(
                "hello-java-queue",
                Binding.DestinationType.QUEUE,
                "hello-java-exchange",
                "hello.java",
                null);
        amqpAdmin.declareBinding(binding);
        log.info("绑定成功{},{}","hello-java-exchange","hello-java-queue");
    }
}

8、使用 RabbitTemplate 发送消息

    /*
    * 发送消息
    * */
    @Test
    public void senMessage() {
        String msg = "hello,world";
        // public void convertAndSend(String exchange, String routingKey, final Object object)
        // 交换机名称、路由键、消息
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",msg);
        log.info("发送消息成功:{}",msg);
    }

发送实体类:

    /*
    * 发送消息
    * */
    @Test
    public void senMessage() {
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setId(2L);
        orderEntity.setCreateTime(new Date());

        // String msg = "hello,world";
        // public void convertAndSend(String exchange, String routingKey, final Object object)
        // 交换机名称、路由键、消息
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
        log.info("发送消息成功:{}",orderEntity);
    }

在发送实体类时,默认是使用 jdk 的序列化机制,并且要求实体类实现 Serializable 接口。

image-20230131171006876

可以自定义消息转换器,使用不同的序列化方式,在 RabbitTemplate 中默认使用的是 SimpleMessageConverter 消息转换器。

image-20230131171305563

在 SimpleMessageConverter 中可以看见,如果是 string类型的消息,直接转化为 byte 流发送

如果实现了 Serializable 接口,就按照 jdk 的方式序列化

image-20230131171348684

可自定义的消息转换器,我们使用 Jackson2JsonMessageConverter

image-20230131171517553

创建配置类,自定义消息转换器:

@Configuration
public class MyRabbitConfig {

    /*
    * 自定义消息转换器
    * */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

重新发送消息:已经序列化为Json

image-20230131171858940

9、使用 @RabbitListener&@RabbitHandler 接收消息

使用 @RabbitListener 接受消息:


    /*
    * RabbitMQ 接收消息
    *   1、使用 @RabbitListener 接收消息,必须使用 @EnableRabbit 开启接受消息
    *       queueu 是一个 String[], 可指定接受多个队列的消息
    *   参数可接收的类型:
    *       rg.springframework.amqp.core.Message;: 封装的内容比较全,消息头+消息体
    *       T : 可接收发送消息的类型
    *       Channel channel : 信道信息
    *   (1) 在多服务下,一条消息只能有一个客户端接收
    *   (2) 处理完一条消息后,才能接收下一条消息
    *
    *   2、使用 @RabbitHandler +  @RabbitListener 接受不同类型的消息
    *   @RabbitHandler:标注在方法上
    *   @RabbitListener: 标注在类、方法上
    * */
    @RabbitListener(queues = {"hello-java-queue"})
    public void receiveOrderEntityMessage(Message message, OrderEntity entity, Channel channel) {

        byte[] body = message.getBody();
        MessageProperties header = message.getMessageProperties();
        // System.out.println("接收到的消息: " + message);
        System.out.println("接收到的消息体:" + entity);
    }

使用 @RabbitHandler + @RabbitListener 接收不同类型的消息:

@RabbitHandler 标注在方法上

@RabbitListener 标注在方法、类上

如果我们发送消息的类型不是一种类型,单独使用 @RabbitListener 还需要获取 body 的数据然后判断类型,非常麻烦,这时就可以组合使用 @RabbitHandler + @RabbitListener 接收不同的消息:

例子

消息发送者:

@RestController
@Slf4j
public class RabbitController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMQ/{num}")
    public String sendMQ(@PathVariable("num") Integer num) {
        for (Integer i = 0; i < num; i++) {
            if (i % 2 == 0) {
                OrderEntity orderEntity = new OrderEntity();

                orderEntity.setId(i.longValue());
                orderEntity.setCreateTime(new Date());

                // String msg = "hello,world";
                // public void convertAndSend(String exchange, String routingKey, final Object object)
                // 交换机名称、路由键、消息
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
                log.info("发送消息成功:{} 第", + i + "条orderEntity消息..");
            }else {
                OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
                orderReturnApplyEntity.setId(i.longValue());
                orderReturnApplyEntity.setCreateTime(new Date());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderReturnApplyEntity);
                log.info("发送消息成功:{} 第", + i + "条orderReturnApplyEntity消息..");
            }
        }

        return  "sendMQ OK";
    }
}

消息接收者:

@RabbitListener 标注在类上,指明接受哪个队列的消息,使用 @RabbitHandler 标注在不同的方法上,一个方法接收一种类型的数据

@Service("orderItemService")
@RabbitListener(queues = {"hello-java-queue"})
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {

    /*
    * RabbitMQ 接收消息
    *   1、使用 @RabbitListener 接收消息,必须使用 @EnableRabbit 开启接受消息
    *       queueu 是一个 String[], 可指定接受多个队列的消息
    *   参数可接收的类型:
    *       rg.springframework.amqp.core.Message;: 封装的内容比较全,消息头+消息体
    *       T : 可接收发送消息的类型
    *       Channel channel : 信道信息
    *   (1) 在多服务下,一条消息只能有一个客户端接收
    *   (2) 处理完一条消息后,才能接收下一条消息
    *
    *   2、使用 @RabbitHandler +  @RabbitListener 接受不同类型的消息
    *   @RabbitHandler:标注在方法上
    *   @RabbitListener: 标注在类、方法上
    * */
    // @RabbitListener(queues = {"hello-java-queue"})
    @RabbitHandler
    public void receiveOrderEntityMessage(Message message, OrderEntity entity, Channel channel) {

        byte[] body = message.getBody();
        MessageProperties header = message.getMessageProperties();
        // System.out.println("接收到的消息: " + message);
        System.out.println("接收到的消息体:" + entity);
    }

    @RabbitHandler
    public void receiveOrderReturnApplyEntityMessage(Message message, OrderReturnApplyEntity entity,Channel channel) {
        // System.out.println("接收到的消息: " + message);
        System.out.println("接收到的消息体:" + entity);
    }
}

结果:

image-20230131181606902

10、发布确认

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。因此保证 RabbitMQ 消息的可靠投递,需要采取一些措施。

可以使用事务消息,但是性能下降250倍,为此引入确认 机制

image-20230131183805211

根据RabbitMQ消息的投递流程,可将确认机制分为俩部分:

第一部分:消息生产者的确认回调

  • publisher:confirmCallback 确认模式
  • publisher:returnCallback 未投递到 queue 退回模式

第二部分:消息消费者的确认

  • consumer:ack机制

image-20230131183840456

(1)生产端确认

消息生产者:

@RestController
@Slf4j
public class RabbitController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMQ/{num}")
    public String sendMQ(@PathVariable("num") Integer num) {
        for (Integer i = 0; i < num; i++) {
            if (i % 2 == 0) {
                OrderEntity orderEntity = new OrderEntity();

                orderEntity.setId(i.longValue());
                orderEntity.setCreateTime(new Date());
                /*
                * 	public void convertAndSend(String exchange, String routingKey, final Object object,@Nullable CorrelationData correlationData)
                *   exchange: 交换机名称
                *   routingKey 路由键
                *   object 消息体
                *   CorrelationData  消息 id
                * */
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity,new CorrelationData(UUID.randomUUID().toString()));
                // log.info("发送消息成功:{}",i);
            }else {
                OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
                orderReturnApplyEntity.setId(i.longValue());
                orderReturnApplyEntity.setCreateTime(new Date());
                
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java11",orderReturnApplyEntity,new CorrelationData(UUID.randomUUID().toString()));
                // log.info("发送消息成功:{}",i);

            }
        }

        return  "sendMQ OK";
    }
}

配置 ConfirmCallback、ReturnCallback 回调

@Configuration
public class MyRabbitConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /*
     * 自定义消息转换器
     * */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /*
     * 设置发布确认机制
     *   1、ConfirmCallback,只要生产者发送消息就会执行此回调。
     *      spring.rabbitmq.publisher-confirms=true
     *   2、ReturnCallback 只有交换机将消息转发到Queue失败时,才会调用此回调
     *      # 开启发送端确认机制。 Exchange --> Queue
     *      spring.rabbitmq.publisher-returns=true
     *      # 只要消息成功发送到Queue,就优先异步调用 ReturnCallback
     *      spring.rabbitmq.template.mandatory=true
     * */
    @PostConstruct // MyRabbitConfig初始化之后执行
    public void InitRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @description
             * @date 2023/1/31 18:55
             * @param correlationData 保存消息的id以及相关信息,可在发送消息时指定 new CorrelationData()
             * @param ack 消息是否发送成功。true:Broke接收到消息, false:Broker没有接收到消息
             * @param cause 消息发送失败的原因
             * @return void
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("Broker接收消息成功, correlationData: " + correlationData + " ack:" + ack + " cause:" + cause);
                } else {
                    System.out.println("Broker接收消息失败, correlationData: " + correlationData + " ack:" + ack + " cause:" + cause);
                }
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * @description
             * @date 2023/1/31 22:25
             * @param message 投递失败的消息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本
             * @param exchange  投递失败的交换机
             * @param routingKey    投递失败消息的 routing-key
             * @return void
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("message: " + message + " replyCode: " + replyCode + " replyText: " + replyText + " exchange: " + exchange + " routingKey: " + routingKey);
            }
        });
    }

生产端确认的配置:

# 开启发送端确认机制。 生产者 --> Broker
spring.rabbitmq.publisher-confirms=true
# 开启发送端确认机制。 Exchange --> Queue
spring.rabbitmq.publisher-returns=true
# 只要消息成功发送到Queue,就优先异步调用 ReturnCallback
spring.rabbitmq.template.mandatory=true

(2)消费端确认

消费端消费一个消息默认是自动确认的,当消费者启动时,队列中数据会全部转发给消费者处理,并自动进行消息确认,在队列中删除消息。但是当消费者处理完一条消息后,突然宕机,就会造成其他消息的丢失。

因此在消费者接收消息时应该使用手动确认模式,只要消息没有手动进行 Ack,消息就一直是 unChecked,即使宕机也不会丢失,会重新进入到 Ready 状态。

开启手动确认

# 设置手动Ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费端手动确认方法: channel.basicAck()

消费端手动拒绝方法: channel.basicReject()/ channel.basicNack()

    /*
    * RabbitMQ 接收消息
    *   1、使用 @RabbitListener 接收消息,必须使用 @EnableRabbit 开启接受消息
    *       queueu 是一个 String[], 可指定接受多个队列的消息
    *   参数可接收的类型:
    *       rg.springframework.amqp.core.Message;: 封装的内容比较全,消息头+消息体
    *       T : 可接收发送消息的类型
    *       Channel channel : 信道信息
    *       (1) 在多服务下,一条消息只能有一个客户端接收
    *       (2) 处理完一条消息后,才能接收下一条消息
    *
    *   2、使用 @RabbitHandler +  @RabbitListener 接受不同类型的消息
    *        @RabbitHandler:标注在方法上
    *        @RabbitListener: 标注在类、方法上
    *      
    * */
    // @RabbitListener(queues = {"hello-java-queue"})
    @RabbitHandler
    public void receiveOrderEntityMessage(Message message, OrderEntity entity, Channel channel) {

        // byte[] body = message.getBody();
        // MessageProperties header = message.getMessageProperties();
        // System.out.println("接收到的消息: " + message);
        // System.out.println("接收到的消息体:" + entity);

        /*
        *  消息确认
        *       void basicAck(long deliveryTag, boolean multiple) throws IOException;
        *           deliveryTag: 消息标签,channel内顺序自增
        *           multiple 是否批量确认
        *   拒绝消息
        *       void basicNack(long deliveryTag, boolean multiple, boolean requeue)
        *            multiple 是否批量拒绝消息
        *            requeue 拒绝的消息是否重新入队。如果重新入队还重新发送给消费者
        *       void basicReject(long deliveryTag, boolean requeue) throws IOException;
        *           与 basicNack 区别就是没有批量拒绝消息
        *
        * */
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            if (deliveryTag % 2 == 0) {
                // 手动确认消息
                channel.basicAck(deliveryTag,false);
                System.out.println("签收了货物..." + deliveryTag);
            }else {
                // 拒绝消息
                channel.basicNack(deliveryTag,false,true);
                // channel.basicReject();
                System.out.println("没有签收货物..." + deliveryTag);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

11、延时队列

延时队列使用场景

订单超过指定时间未支付,解锁库存。

image-20230204160807623

一些概念说明

消息TTL【Time To Live】:消息存活时间。

RabbitMQ 可以为队列消息 分别设置 TTL。 对队列设置 TTL,就是对队列中的所有消息设置 TTL

死信【Dead Letter】 : 顾名思义就是死掉的消息,没有消费者接收的消息。

死信的来源

  • 消息超过指定 TTL 没有被消费者接收
  • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
  • 消息被拒绝(basic.reject 或 basic.nack)并且 不允许重新放到队列中【requeue=false】

死信交换机【Dead Letter Exchange】 : 死信交换机和普通交换机的创建一样,唯一的区别就是死信交换机专门处理 死信 , 与死信交换机绑定的队列中如果有死信,就会被转发到死信交换机中做下一步处理。

因此,我们可以使用 TTL + 死信交换机就可以实现一个延迟队列。

延迟队列的实现方式一:为队列设置 TTL

生产者发送的消息到达设置TTL的队列后,如果在这个指定时间内没有消费者接收,那么该消息就变为 死信,同时转发给死信交换机,由死信交换机转发到特定的队列中再次进行消费。

image-20230204163230766

延迟队列实现方式二:为消息设置TTL

为生产者发送的每一条消息都设置TTL,不推荐这种方法

因为 RabbitMQ 使用的惰性机制对消息进行检查,如果第一条消息的 TTL = 5min,第二条消息的 TTL = 30s。

第三条消息的 TTL = 1s ,RabbitMQ检查第一条消息一看 5 分钟过期,就会5分钟后来检查,那么第二、第三条消息都会在 5min 后 转发给死信交换机。

image-20230204163509409

实战

模拟 订单超时关闭的场景

由生产者P 向 order-event- exchange 交换机发送订单消息,路由键为 order.create.order。

交换机与俩个队列绑定:

第一个:order.delay.queue 为延迟队列,通过order.create.order 路由键绑定,设置三个参数,死信交换机、死信路由键、TTL

第二个:order.release.order.queue 普通队列,通过 order.release.order 路由键绑定。

在页面下单之后,随之生产者会向交换机发送一条订单创建消息,路由键为 order.create.order , 交换机会将此消息发送到 延迟队列,等到达指定的 TTL 之后,说明订单超时未支付,将消息转发到绑定的 死信交换机 中,交换机在通过 order.release.order queue 队列转发给消费者 C

image-20230204170303854

消费者:创建订单完成后,向RabbitMQ 发送消息

@RestController
@Slf4j
public class RabbitController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 模拟生成订单,向MQ发送消息
    @RequestMapping("/sendMQ/createOrder")
    public String createOrder() {
        // 创建订单
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setOrderSn(UUID.randomUUID().toString());

        // 向MQ发送消息,监听订单是否支付成功
        // String exchange, String routingKey, Object message,CorrelationData correlationData
        // 交换机、消息的路由键,发送的消息,消息的唯一标识
        rabbitTemplate.convertAndSend("order-event- exchange","order.create.order",orderEntity);

        return "Order created !!";
    }
    }

创建交换机、队列,绑定关系,消费者:

@Configuration
public class MyMQConfig {

    // 消费者
    @RabbitListener(queues = "order.release.order.queue")
    public void consumer(OrderEntity order, Message message, Channel channel) throws IOException {
        System.out.println("订单超时未支付,即将关闭订单: " + order.getOrderSn());
        // 手动确认
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }

    /*
    * 使用 @Bean 的方式创建 Exchange、Queue、Binding...服务启动会自动向RabbitMQ创建。
    * 前提是RabbitMQ中没有这些  Exchange、Queue、Binding... 如果存在,即使配置不一样也不会重新创建。
    * */


    // 延迟队列
    @Bean
    public Queue orderDelayQueue() {
        // String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        HashMap<String, Object> arguments = new HashMap<>();
        // 设置与队列相连的死信交换机
        arguments.put("x-dead-letter-exchange","order-event-exchange");
        // 转发死信的 路由键
        arguments.put("x-dead-letter-routing-key","order.release.order");
        // 设置队列的 TTL。超过30s就表示未支付订单,准备关闭
        arguments.put("x-message-ttl",3000);

        return new Queue("order.delay.queue",true,false,false,arguments);
    }

    // 普通队列
    @Bean
    public Queue orderReleaseOrderQueue() {
        return new Queue("order.release.order.queue",true,false,false,null);
    }

    // 交换机
    @Bean
    public TopicExchange orderEventExchange() {
        //String name, boolean durable, boolean autoDelete)
        return new TopicExchange("order-event-exchange",true,false);
    }

    // 设置绑定关系: order-event- exchange ——》order.delay.queue
    @Bean
    public Binding orderCreateOrder() {
        //String destination, DestinationType destinationType, String exchange, String routingKey,Map<String, Object> arguments
        // 绑定目的地-绑定的队列,绑定类型【交换机 OR 队列】,交换机,路由键,其他参数信息
        return new Binding(
                "order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event- exchange",
                "order.create.order",
                null);
    }

    // 设置绑定关系: order-event- exchange ——》order.release.order.queue
    @Bean
    public Binding orderReleaseOrder() {
        return new Binding(
                "order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }
}

启动服务,已成功创建出交换机、队列

image-20230204185117203

image-20230204185130939

创建四条订单,已成功监听到…

image-20230204185837554

12、消息丢失、积压、重复等方案

1、消息丢失

消息发送出去,由于网络问题没有抵达服务器

  • 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机 制,可记录到数据库,采用定期扫描重发的方式
  • 做好日志记录,每个消息状态是否都被服务器收到都应该记录
  • 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进 行重发

消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚 未持久化完成,宕机。

  • publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机

  • 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重 新入队

总体来说:

1、一定要在 publisher【回调机制】、consumer【手动确认】 俩端做消息确认机制

2、将消息的状态信息保存到数据库中,比如可以创建如下这张表

CREATE TABLE `mq_message`(
`message_id` CHAR(32) NOT NULL,
 `content` TEXT,
`to_exchane` VARCHAR(255) DEFAULT NULL,
 `routing_key` VARCHAR(255) DEFAULT NULL,
`class_type` VARCHAR(255) DEFAULT NULL,
`message_status` INT(1) DEFAULT '0' COMMENT '0-新建1-己发送2-错误抵达3-己抵达',
`create_time` DATETIME DEFAULT NULL,
`update_time` DATETIME DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4

2、消息重复

(1)消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息 重新由unack变为ready,并发送给其他消费者

(2)消息消费失败,由于重试机制,自动又将消息发送出

(3)成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送

解决方案

消费者的业务消费接口应该设计为幂等性的。比如扣库存有 工作单的状态标志

使用防重表(redis/mysql),发送消息每一个都有业务的唯 一标识,处理过就不用处理

rabbitMQ的每一个消息都有redelivered字段,可以获取是否 是被重新投递过来的,而不是第一次投递过来的

3、消息积压

(1)消费者宕机积压

(2)消费者消费能力不足积压

(3)发送者发送流量太大

解决方案

  • 上线更多的消费者,进行正常消费
  • 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理
    及处理就noAck并重 新入队

总体来说:

1、一定要在 publisher【回调机制】、consumer【手动确认】 俩端做消息确认机制

2、将消息的状态信息保存到数据库中,比如可以创建如下这张表

CREATE TABLE `mq_message`(
`message_id` CHAR(32) NOT NULL,
 `content` TEXT,
`to_exchane` VARCHAR(255) DEFAULT NULL,
 `routing_key` VARCHAR(255) DEFAULT NULL,
`class_type` VARCHAR(255) DEFAULT NULL,
`message_status` INT(1) DEFAULT '0' COMMENT '0-新建1-己发送2-错误抵达3-己抵达',
`create_time` DATETIME DEFAULT NULL,
`update_time` DATETIME DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/334745.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Autowired注解源码解析

一、Autowired注解的原理的概览 我们都知道一个Bean的大致生命周期有这几个阶段&#xff0c;实例化--> 属性填充 --> 初始化 --> 销毁回调 其中Autowired作用的时间就是在属性填充阶段&#xff0c;而且是通过AutowiredAnnotation BeanPostProcessor类进行处理的。注…

android14预览版介绍及解读

​ 前言&#xff1a; android14快要来了&#xff0c;最近2月8日&#xff0c;android14的第一个开发者预览版发布了&#xff0c;正式版大约会和往常一样&#xff0c;大概率在六月份左右推出&#xff0c;八九月份时会有国内会有第一批手机支持安卓14。所以&#xff0c;本文就带…

为什么要用频谱分析仪测量频谱?

频谱分析仪是研究电信号频谱结构的仪器&#xff0c;用于信号失真度、调制度、谱纯度、频率稳定度和交调失真等信号参数的测量&#xff0c;可用以测量放大器和滤波器等电路系统的某些参数&#xff0c;是一种多用途的电子测量仪器。从事通信工程的技术人员&#xff0c;在很多时候…

免费下载学术文献的网站,好用!

推荐几款好用的免费下载学术文献网站&#xff0c;让你的查找文献环节更加事半功倍&#xff01; 1、Open Access Library&#xff08;OALib&#xff09;图书馆让学者可以免费下载学术文献和论文&#xff0c;并在这个平台上发表自己的论文。提供Open Access数据库资源。 2、文献…

Spring Security实现RBAC权限模型练习

1.Spring Security介绍 Spring Security的核心功能就是认证、授权、攻击防护&#xff0c;Spring Boot项目启动之后会自动进行配置&#xff0c;其核心就是一组链式过滤器。 如下图所示&#xff0c;对于一个用户请求&#xff0c;Username Password Authentication Filter验证用…

2022年API安全研究报告

导读 API应用的增速与其安全发展的不平衡,使其成为恶意攻击的首选目标,围绕API安全的攻防较量愈演愈烈。 2022年API安全风险概况 2022年平均每月遭受攻击的API数量超21万 2022年全年平均每月遭受攻击的API数量超过21万,第二季度(4-6月)遭受攻击的API数量达到高峰,月均…

经典文献阅读之--IGP2(可解释性目标的自动驾驶预测与规划)

0. 简介 对于自动驾驶的预测和规划而言&#xff0c;能够有效的对目标产生可解释性是非常有必要的&#xff0c;而《Interpretable Goal-based Prediction and Planning for Autonomous Driving》文中就提出了一种综合的自动驾驶预测和规划系统&#xff0c;它利用合理的逆规划来…

php mysql娱乐场所运营管理系统

目 录 1 背景与意义 3 1.1 研究背景 3 1.2 国内外发展状况研究 3 2 系统开发环境与技术 4 2.1 PHP介绍 4 2.2 MYSQL介绍 5 2.3 APACHE介绍 6 2.4 dreameaver介绍 7 2.5 wamp介绍 7 3 系统分析 8 3.1 系统可行性分析 8 3.1.1 技术可行性 …

【编程基础之Python】1、初始Python

【编程基础之Python】1、初始Python初始Python什么是PythonPython的运行过程Python的应用领域如何学好Python初始Python Python是一种跨平台的、开源免费的、解释型的、面向对象的高级编程语言。 Python的应用领域非常广泛&#xff0c;包括客户端程序、服务器程序、移动端程序…

Redis未授权漏洞蜜罐模拟与捕获分析

1.概述 文章主要分析Redis未授权漏洞的原理及形成原因&#xff0c;使用vulhub靶场进行漏洞复现&#xff0c;在了解漏洞原理并复现的基础上使用golang编写蜜罐代码进行模拟&#xff0c;开放端口在网上捕获真实存在的恶意攻击行为&#xff0c;对恶意样本进行分析&#xff0c;总结…

C++与Lua交互实例 -- 矩阵的加减乘除(版本二)

C与Lua交互实例 – 矩阵的加减乘除&#xff08;版本二&#xff09; TIPS&#xff1a;关于使用矩阵的加减乘除测试C与Lua的交互以及下面没讲述到的知识点可以阅读第一版&#xff1a; https://blog.csdn.net/qq135595696/article/details/128960951 同时下面两个方式矩阵的数据都…

爬虫JS逆向思路 - - 扣JS(data解密)

网络上几千块都学不到的JS逆向思路这里全都有&#x1f44f;&#x1f3fb;&#x1f44f;&#x1f3fb;&#x1f44f;&#x1f3fb; 本系列持续更新中&#xff0c;三连关注不迷路&#x1f44c;&#x1f3fb; 干货满满不看后悔&#x1f44d;&#x1f44d;&#x1f44d; ❌注意…

电机过流的一次bug排查记录

一、bug现象描述如下&#xff1a; 有一天&#xff0c;某员工给自己的组件换一个语音模块&#xff0c;其中电机和主板是通过单总线连接&#xff0c;据该员工回忆曾经在换语音芯片时曾将电源线不小心短路过。 电机已经DVT试产&#xff0c;功能和硬件测试已经通过&#xff0c;但是…

小白系列Vite-Vue3-TypeScript:007-配置axios并封装api

上一篇我们介绍了ViteVue3TypeScript项目中Element Plus的安装和配置&#xff0c;本篇我们来介绍一下如何配置axios并封装api。axios是一个基于promise的HTTP库&#xff0c;可以用在浏览器和node.js中&#xff0c;其最大的亮点就是支持了ES6里的Promise Api。废话不多说&#x…

Node =>Express学习

1.Express 能做什么 能快速构建web网站的服务器 或 Api接口的服务期 Web网站服务器&#xff0c;专门对外提供Web网页资源的服务器Api接口服务器&#xff1a;专门对外提供API接口的服务器 2.安装 在项目所处的目录中&#xff0c;运行以下命令&#xff0c;简装到项目中了 npm …

ChatGPT与马斯克 在 “ 遥感 ” 中的初探索

有人说&#xff1a;一个人从1岁活到80岁很平凡&#xff0c;但如果从80岁倒着活&#xff0c;那么一半以上的人都可能不凡。 生活没有捷径&#xff0c;我们踩过的坑都成为了生活的经验&#xff0c;这些经验越早知道&#xff0c;你要走的弯路就会越少。 1前言 文章开始前&#x…

栈和队列基本原理

栈和队列基本原理1.栈1.1 栈基本原理1.2. 栈操作步骤1.2.1 插入数据流程【压栈】1.2.2 移除数据流程【出栈】1.3. 栈代码实现2.队列2.1 队列基本原理2.2 队列操作步骤2.2.1 插入数据2.2.2 移除数据2.3. 队列代码实现3.栈与队列对比1.栈 1.1 栈基本原理 栈顶【末尾】&#xff…

突破边界:“超融合+”带来的商业化精益之路

相信大家都看了《流浪地球2》&#xff0c;其中人类一次次超越极限&#xff0c;以勇气和责任完成伟大征程的情节让我们深深感动。在现实的科技发展中&#xff0c;我们可能不会像科幻作品那样完成惊险万分地完成突破。但超越极限&#xff0c;却时时刻刻发生在科技产业当中。“超融…

K_A12_002 基于STM32等单片机采集光敏电阻传感器参数串口与OLED0.96双显示

K_A12_002 基于STM32等单片机采集光敏电阻传感器参数串口与OLED0.96双显示一、资源说明二、基本参数参数引脚说明三、驱动说明IIC地址/采集通道选择/时序对应程序:四、部分代码说明1、接线引脚定义1.1、STC89C52RC光敏电阻传感器模块1.2、STM32F103C8T6光敏电阻传感器模块五、基…

星河案例 | 冲量在线助力中国电信打造数据要素融通与AI能力开放外拓新范式

2022 年大数据“星河”案例征集活动由中国信息通信研究院、中国通信标准化协会大数据技术标准推进委员会(CCSA TC601)共同组织&#xff0c;旨在促进大数据技术产品及相关产业发展&#xff0c;加快培育数据要素市场、充分发挥数据作为生产要素的独特价值&#xff0c;树立行业标杆…