RabbitMQ学习(第二天)

news2025/5/10 1:55:51

文章目录

  • 1、生产者可靠性
    • ①、生产者重连
    • ②、生产者确认
  • 2、MQ可靠性
    • ①、数据持久化
    • ②、LazyQueue(惰性队列)
  • 3、消费者可靠性
    • ①、消费者确认
    • ②、失败重试机制
    • ③、保证业务幂等性
  • 总结

之前的学习中,熟悉了java中搭建和操作RabbitMQ发送接收消息,熟悉使用之后,重点要关注一下面试中常考的点,以及工作经常遇到的问题。今天的学习主要从保证消息可靠性出发,保证可靠性分三部分,分别是生产者可靠性、MQ可靠性、消费者可靠性三部分出发。

1、生产者可靠性

①、生产者重连

有时候因为网络波动,可能导致客户端连接MQ失败,通过配置可以开启连接失败后的重试机制。
yml配置文件:

spring:
  rabbitmq:
    connection-timeout: 1s # 连接MQ的连接超时间
    template:
      retry:
        enabled: true # 开启模板的重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

我们故意让它连接不上,结果如下:
在这里插入图片描述

当网络不稳定的时刻,利用重试机制可以有数据显示消息发送的成功率。不过SpringAMQP重试的重复机制是
阻塞式的重试,也就是说多次重试等待的过程是中,当前线程是被阻塞的,会影响到业务性能。

如果对于业务性能有要求,建议参考用重试机制。如果一定需要使用,请合理配置重试等待时长和重试次数,当然也
可以考虑异步用异步多线程来执行发送消息的代码。

②、生产者确认

RabbitMQ有Publisher Confirm和Publisher Return两种确认机制。开启确认机制后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败
spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,异步confirm类型
    publisher-returns: true # 开启publisher return机制

publisher-confim:

这里设置 correlated 表示MQ异步回调方式返回回执消息

**publisher-return: **
开启后,会返回路由失败消息。

@Slf4j
@Configuration
public class CommonConfig {
    @Resource
    RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void setRabbitTemplate() {
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            log.error("收到消息的return callback: msg:{} exchange:{}, test:{}, key:{}, code:{}",
                    returnedMessage.getMessage(), returnedMessage.getExchange(), returnedMessage.getReplyText(),
                    returnedMessage.getRoutingKey(), returnedMessage.getReplyCode());
        });
    }
}

我们定义一个config类,里面添加一个处理publish-return消息的处理,

测试代码:

    @Test
    public void testConfirmCallBack() throws InterruptedException {
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                log.debug("消息回调失败", ex);
            }
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                log.debug("收到confim callback回执");
                if(result.isAck()) {
                    log.debug("消息发送成功,收到ack");
                } else{
                    log.debug("消息发送失败,收到nack, 原因:{}", result.getReason());
                }
            }
        });
        rabbitTemplate.convertAndSend("amq.direct", "red2", "hello", cd);
        Thread.sleep(2000);
    }

这里我们测试三种情况:
现有正确的交换机为 amq.direct,routingKey为red,cd逻辑如代码所示。

①、交换机与routingKey均正确,运行结果:

收到confim callback回执
消息发送成功,收到ack

②、交换机正确,routingKey不正确,运行结果:

收到confim callback回执
消息发送成功,收到ack
收到消息的return callback: msg:(Body:'"hello"' MessageProperties [headers={spring_returned_message_correlation=0a8a70fd-a66b-43a3-a681-324e46db7b79, __TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) exchange:amq.direct, test:NO_ROUTE, key:red2, code:312

③、交换机不正确,运行结果:

收到confim callback回执
消息发送失败,收到nack, 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'amq.direct.123' in vhost '/', class-id=60, method-id=40)

关于生产者确认消息要会的几点:

  • 生产者确认需要额外的网络和系统资源开销,尽量不要使用

  • 如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题

  • 对于nack消息可以有限次数重试,依然失败则记录异常消息

2、MQ可靠性

MO通常将消息保存在内存中,这样可以降低收发消息的延迟,但是会有一些问题:

  • 最常见的问题就是可能因为各种原因宕机重启,而这极大可能会导致消息丢失。
  • 消息堆积在内存中,如果消费者出故障,导致消息积压,引发MQ阻塞。

①、数据持久化

持久化分三部分:

  • 交换机持久化
  • 队列持久化,
  • 消息持久化

前两个的话,Spring中创建时默认将队列和交换机都创建为持久化,当然,手动设置也可以,设置为Durable:
在这里插入图片描述

消息持久化也可以设置:

在这里插入图片描述

我们向他发送了1百万条非持久化消息:
在这里插入图片描述
可以看到其中凹下去的地方就是消息积压导致的。

发送了1百万条持久化消息:
在这里插入图片描述
可以看到此时不存在消息积压。

每次下降的时候,是在将数据写入到磁盘中,但是不会出现宕机为0的情况。

②、LazyQueue(惰性队列)

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息)

  • 消费者要消费消息时才会从磁盘中读取并加载到内存

  • 支持数百万条的消息存储

  • 在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

控制台创建:
在这里插入图片描述
spring中创建:

@Bean
public Queue lazyQueue() {
    return QueueBuilder
        .durable("lazy.queue")
        .lazy() // 开启Lazy模式
        .build();
}

@RabbitListener(queuesToDeclare = @Queue(
    name = "lazy.queue",
    durable = "true",
    arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg) {
    log.info("接收到 lazy.queue 的消息: {}", msg);
}

向lazy.queue发送1百万条非持久消息:
在这里插入图片描述
可以看到,性能特别好,因为数据都是直接通过page out到磁盘。

3、消费者可靠性

①、消费者确认

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

ack:成功处理消息,RabbitMQ从队列中删除该消息
nack:消息处理失败,RabbitMQ需要再次投递消息
reject:消息处理失败并且拒绝该消息,RabbitMQ从队列中删除消息

我们在Spring中配置,auto代表Spring帮我们自动处理这三种情况:

spring:
  rabbitmq:
      simple:
        acknowledge-mode: auto

一般都是直接配置auto。

但是这样就会导致一个问题,如果程序中有异常,那么mq一直重试,这样的话,会导致资源的消耗,对系统造成大的压力,因此我们后面引入了失败重试机制。

②、失败重试机制

配置以下yml内容可以开启失败重试机制:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        retry:
          enabled: true #开启失败重试机制
          initial-interval: 1000ms #初始的失败重试等待时长为1秒
          multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
          max-attempts: 3
          stateless: true # true无状态,false有状态,如果业务中包含事务,这里为false

失败消息处理策略:
在开启重试模式后,重试次数耗尽,如果消息仍然失败,则需要MessageRecoverer接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer: 重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer: 重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer: 重试耗尽后,将失败消息投递到指定的交换机

收发消息定义:


    @RabbitListener(queues = {"object.queue"})
    public void ObjectConsumer(String msg) {
        System.out.println(msg);
        throw new RuntimeException("......");
    }

    @Test
    public void testMessage() {
        Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
        rabbitTemplate.convertAndSend("object.queue", message);
    }

java代码中实现失败消息处理:


@Slf4j
@Configuration
public class CommonConfig {
    @Resource
    RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void setRabbitTemplate() {
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            log.error("收到消息的return callback: msg:{} exchange:{}, test:{}, key:{}, code:{}",
                    returnedMessage.getMessage(), returnedMessage.getExchange(), returnedMessage.getReplyText(),
                    returnedMessage.getRoutingKey(), returnedMessage.getReplyCode());
        });
    }
}

运行结果:
在这里插入图片描述
可以看到报错信息被发送到了error交换机。

③、保证业务幂等性

幂等是一个数学概念,用函数表达式来描述是:f(x) = f(f(x))。在程序开发中,则是指同一个业务,执行一次或多次的结果是一致的。

像查询一般都是幂等的,因为它的执行对于业务状态没有影响,差多少次都没事,就是幂等的。
像用户下单,退单涉及扣钱,退款操作,如果执行多次,会造成经济损失,就是非幂等的。

为了保证幂等性,我们通过一些方法来实现:

(1)、 唯一消息id

是给每个消息都设置一个唯一id,利用id区分是否重复消费:

  • 每一条消息都生成一个唯一的id,与消息一起投递给消费者。

  • 消费者接收到消息后根据自己的业务,业务处理成功后将消息ID保存到数据库库

  • 如果下次又收到相同消息,去数据库查找判断是否存档,存档则为重复消费放弃处理。

@Bean
public MessageConverter messageConverter() {
    // 1. 定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2. 配置自动创建消息id,用于识别不同消息,也可以自己在业务中生成ID判断是否重复消费
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

但是,这种方案对数据库压力太大,同时速度也相对慢很多,因此只做了解,只需要知道这种方式是jjmc内部帮你创建了一个UUID的随机值作id。

其它方式的话,需要我们结合实际项目在业务逻辑中进行操作来保证幂等性。

总结

最后,用两个问题来总结:

如何保证对服务与交易服务之间的订单状态一致性?

  • 首先,支付服务会在用户支付成功后利用MQ消息通知交易服务,完成订单状态同步。

  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消息重确认、消费者失败重试等策略,确保消息投递和处理可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失。

  • 最后,我们还在交易服务端新增了业务幂等判断,避免重复消费。

如果交易服务消息处理失败,有没有什么兜底方案?

  • 我们可以在交易服务设置定时任务,定期查询订单支付状态。这样即使用MQ通知失败,还可以用定时任务作为兜底方案,确保订单状态最终一致性。

即Spring Task中的@Schedule注解,定时扫描订单状态,用来弥补消息失败,兜底方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2371950.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【JS逆向基础】爬虫核心模块:request模块与包的概念

前言&#xff1a;这篇文章主要介绍JS逆向爬虫中最常用的request模块&#xff0c;然后引出一系列的模块的概念&#xff0c;当然Python中其他比较常用的还有很多模块&#xff0c;正是这些模块也可以称之为库的东西构成了Python强大的生态&#xff0c;使其几乎可以实现任何功能。下…

LabVIEW燃气轮机测控系统

在能源需求不断增长以及生态环境保护备受重视的背景下&#xff0c;微型燃气轮机凭借其在经济性、可靠性、维护性及排放性等方面的显著优势&#xff0c;在航空航天、分布式发电等众多领域得到广泛应用。随着计算机技术的快速发展&#xff0c;虚拟仪器应运而生&#xff0c;LabVIE…

QT | 常用控件

前言 &#x1f493; 个人主页&#xff1a;普通young man-CSDN博客 ⏩ 文章专栏&#xff1a;C_普通young man的博客-CSDN博客 ⏩ 本人giee: 普通小青年 (pu-tong-young-man) - Gitee.com 若有问题 评论区见&#x1f4dd; &#x1f389;欢迎大家点赞&#x1f44d;收藏⭐文章 —…

LLM论文笔记 28: Universal length generalization with Turing Programs

Arxiv日期&#xff1a;2024.10.4机构&#xff1a;Harvard University 关键词 图灵机 CoT 长度泛化 核心结论 Turing Programs 的提出 提出 Turing Programs&#xff0c;一种基于图灵机计算步骤的通用 CoT 策略。通过将算法任务分解为逐步的“磁带更新”&#xff08;类似图灵…

AI日报 · 2025年5月07日|谷歌发布 Gemini 2.5 Pro 预览版 (I/O 版本),大幅提升编码与视频理解能力

1、谷歌发布 Gemini 2.5 Pro 预览版 (I/O 版本)&#xff0c;大幅提升编码与视频理解能力 谷歌于5月6日提前发布 Gemini 2.5 Pro 预览版 (I/O 版本)&#xff0c;为开发者带来更强编码能力&#xff0c;尤其优化了前端与UI开发、代码转换及智能体工作流构建&#xff0c;并在WebDe…

指定Docker镜像源,使用阿里云加速异常解决

yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo异常贴图 yum-config-manager&#xff1a;找不到命令 因为系统默认没有安装这个命令&#xff0c;这个命令在yum-utils 包里&#xff0c;可以通过命令yum -y install yum-util…

VITA STANDARDS LIST,VITA 标准清单下载

VITA STANDARDS LIST&#xff0c;VITA 标准清单下载 DesignationTitleAbstractStatusVMEbus Handbook, 4th EditionA users guide to the VME, VME64 and VME64x bus specifications - features over 70 product photos and over 160 circuit diagrams, tables and graphs. The…

Python从入门到高手8.3节-元组的常用操作方法

目录 11.3.1 元组的常用操作方法 11.3.2 元组的查找 11.3.3 祈祷明天不再打雷下雨 11.3.1 元组的常用操作方法 元组类型是一种抽象数据类型&#xff0c;抽象数据类型定义了数据类型的操作方法&#xff0c;在本节的内容中&#xff0c;着重介绍元组类型的操作方法。 ​ 元组是…

Linux系统安装PaddleDetection

一、安装cuda 1. 查看设备 先输入nvidia-smi&#xff0c;查看设备支持的最大cuda版本&#xff0c;选择官网中支持的cuda版本 https://www.paddlepaddle.org.cn/install/quick?docurl/documentation/docs/zh/install/conda/linux-conda.html 2. 下载CUDA并安装 使用快捷键…

【漫话机器学习系列】239.训练错误率(Training Error Rate)

机器学习基础概念 | 训练错误率&#xff08;Training Error Rate&#xff09;详解 在机器学习模型训练过程中&#xff0c;评估模型性能是至关重要的一个环节。其中&#xff0c;训练错误率&#xff08;Training Error Rate&#xff09; 是最基础也最重要的性能指标之一。 本文将…

OpenCV 图形API(80)图像与通道拼接函数-----仿射变换函数warpAffine()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 对图像应用仿射变换。 函数 warpAffine 使用指定的矩阵对源图像进行变换&#xff1a; dst ( x , y ) src ( M 11 x M 12 y M 13 , M 21 x M…

数据结构与算法:图论——最短路径

最短路径 先给出一些leetcode算法题&#xff0c;以后遇见了相关题目再往上增加 最短路径的4个常用算法是Floyd、Bellman-Ford、SPFA、Dijkstra。不同应用场景下&#xff0c;应有选择地使用它们&#xff1a; 图的规模小&#xff0c;用Floyd。若边的权值有负数&#xff0c;需要…

提示词工程:通向AGI时代的人机交互艺术

‌引言&#xff1a;从基础到精通的提示词学习之旅‌ 欢迎来到 ‌"AGI时代核心技能"‌ 系列课程的第二模块——‌提示词工程‌。在这个模块中&#xff0c;我们将系统性地探索如何通过精心设计的提示词&#xff0c;释放大型语言模型的全部潜力&#xff0c;实现高效、精…

是更换Window资源管理器的时候了-> Files-community/Files

Files • 主页https://files.community/ 它已经做到了 云盘文件集成、标签页和多种布局、丰富的文件预览…… 您想要的一切现代文件管理器的强大功能&#xff0c; Files 都能做到。 概述 Files 是一个现代文件管理器&#xff0c;可帮助用户组织他们的文件和文件夹。Files 的…

基于windows安装MySQL8.0.40

基于windows安装MySQL8.0.40 基于windows 安装 MySQL8.0.40&#xff0c;解压文件到D:\mysql-8.0.40-winx64 在D:\mysql-8.0.40-winx64目录下创建my.ini文件&#xff0c;并更新一下内容 [client] #客户端设置&#xff0c;即客户端默认的连接参数 # 设置mysql客户端连接服务…

【Vue】组件自定义事件 TodoList 自定义事件数据传输

目录 一、绑定 二、解绑 组件自定义事件总结 TodoList案例对数据传输事件的修改 总结不易~ 本章节对我有很大收获&#xff0c; 希望对你也是&#xff01;&#xff01;&#xff01; 本章节素材已上传Gitee&#xff1a;yihaohhh/我爱Vue - Gitee.com 前面我们学习的clikc、…

基于Centos7的DHCP服务器搭建

一、准备实验环境&#xff1a; 克隆两台虚拟机 一台作服务器&#xff1a;DHCP Server 一台作客户端&#xff1a;DHCP Clinet 二、部署服务器 在网络模式为NAT下使用yum下载DHCP 需要管理员用户权限才能下载&#xff0c;下载好后关闭客户端&#xff0c;改NAT模式为仅主机模式…

LabVIEW超声波液位计检定

在工业生产、运输和存储等环节&#xff0c;液位计的应用十分广泛&#xff0c;其中超声波液位计作为非接触式液位测量设备备受青睐。然而&#xff0c;传统立式水槽式液位计检定装置存在受建筑高度影响、量程范围受限、流程耗时长等问题&#xff0c;无法满足大量程超声波液位计的…

[STM32] 4-2 USART与串口通信(2)

文章目录 前言4-2 USART与串口通信(2)数据发送过程双缓冲与连续发送数据发送过程中的问题 数据接收过程TXE标志位&#xff08;发送数据寄存器空&#xff09;TC标志位&#xff08;发送完成标志位&#xff09;单个数据的发送数据的连续发送 接收过程中遇到的问题问题描述&#xf…

基于Python+MongoDB猫眼电影 Top100 数据爬取与存储

前言&#xff1a;从猫眼电影排行榜页面&#xff08;TOP100榜 - 猫眼电影 - 一网打尽好电影 &#xff09;爬取 Top100 电影的电影名称、图片地址、主演、上映时间和评分等关键信息&#xff0c;并将这些信息存储到本地 MongoDB 数据库中&#xff0c;&#x1f517; 相关链接Xpath&…