RabbitMQ可靠传输——持久性、发送方确认

news2025/5/24 10:02:37

一、持久性

前面学习消息确认机制时,是为了保证Broker到消费者直接的可靠传输的,但是如果是Broker出现问题(如停止服务),如何保证消息可靠性?对此,RabbitMQ提供了持久化功能:

持久化分为三种:1. 交换机持久化   2. 队列持久化   3.消息持久化

1.1 交换机持久化

一、交换机持久化方法

声明交换机时,将durable置为true即可,如果不指定,默认为true

二、交换机持久化的作用

避免了当Broker重启时,未重新执行交换机声明代码,而导致生产者消息无法路由


1.2 队列持久化

一、队列持久化方法

在声明队列时,使用durable方法声明的队列为持久化队列,使用nonDurable声明的队列为非持久化队列

对应管理界面:

二、队列持久化的作用

在RabbitMQ服务器重启时,未持久化的队列将丢失,持久化队列保留


1.3 消息持久化

 一、消息持久化方法

前面在发送消息时,都是直接指定一个字符串来发送消息,如:


我们先进入convertAndSend方法,观察其源码:

接下来再进入MessageProperties,观察其源码:

可以看到,不指定消息是否持久化,默认为持久化

也就是说,如果要指定消息为非持久化,就选哟给convertAndSend传入一个Message对象而不是仅传一个消息字符串,接下来学习如何设置消息非持久化:

1> 创建一个Message对象

 Message message = new Message("persistent test...".getBytes(),new MessageProperties());

2> 获取MessageProperties对象,通过setDeliveryMode方法设置消息非持久化


/*
*如果要设置为持久化,可以直接换一个String的消息,也可以将这里的
*MessageDeliveryMode.NON_PERSISTENT改为MessageDeliveryMode.PERSISTENT
*/
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);

3> 将Message对象作为参数传递给converAndSend方法

rabbitTemplate.convertAndSend(Constants.NO_PERSISTENT_EXCHANGE,"",message);

整体代码:

二、消息持久化作用

RabbitMQ服务器重启时,未持久化的消息将丢失即使消息所在队列未持久化队列),持久化的消息将保留前提是消息所在的队列是持久化队列

但是,有了消息确认机制以及持久性就能保证消息传输的可靠性了吗?显然不是,因为消息确认机制保证的是Broker到消费者的可靠性 ,持久性保证的是Broker内部的可靠性,还有生产者到Broker的可靠性没有被保证,因此,RabbitMQ引入了publiser confirms(发送方确认)机制


二、发送方确认机制

前面已经学习了RabbitMQ核心机制之一——持久化,但是这就能保证消息传输的可靠性了吗?显然不是,如果发送方发送的消息没有到达Broker,又谈何持久化?因此,我们还需要了解RabbitMQ的发送方确认机制(通过事务也能解决,但是比较复杂,这里不谈)

publisher confirms 机制又包含两种模式

1> confirm确认模式

2> return退回模式

2.1 confirm确认模式

一、触发机制

Producer向Broker发送消息时,需要设置一个ConfirmCallback监听,这样消息无论是否到达exchange,这个监听都会触发,如果消息到达exchange,ACK为true,如果没有到达exchange,ACK为false


二、代码演示

1> 添加RabbitMQ配置

publisher-confirm-type: correlated #配置publisher confirm机制

2> 代码实现(队列、交换机随便声明一个就行,类型随意)

    @RequestMapping("/confirm")
    public String confirm(){
        //设置回调方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("执行confirm方法");
                if(ack){//ack为true,消息到达exchange
                    System.out.printf("接收到消息,消息ID:%s \n",correlationData==null ? null : correlationData.getId());
                }else {//ack为false,消息为到达exchange
                    System.out.printf("未接收到消息,消息ID:%s , cause: %s \n",correlationData==null ? null : correlationData.getId(),cause);
                }
            }
        });
        CorrelationData correlationData = new CorrelationData("1");
        //发送消息
        rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test...",correlationData);
        return "消息发送成功";
    }

3>运行程序,测试接口

    1.正确发送消息(交换机名、routingKey存在)

消息发送成功,接下来查看控制台信息:

可以看到,交换机成功接收到消息

  2.错误发送消息(改为一个不存在的交换机名)

再次运行程序,访问接口,发送消息:

消息发送成功,查看控制台:

可以看到,消息并没有到达指定交换机,原因是不存在这个交换机。

  3.错误发送消息(改为一个不存在的routingKey)

运行程序,测试接口:

消息发送成功,查看控制台:

可以看到,在routingKey不存在的情况下,消息还是到达了交换机,但是这个消息一定是无法路由到队列的,因此就需要通过publsher confirm的 return退回模式 来解决

4> 上述代码编写存在的问题

  上面我们设置了ConfirmCallback监听经过测试,似乎并没有问题,但是仔细思考就会发现,我们在上面的代码中是通过rabbitTemplate这个对象来设置的,那岂不是前面所有使用rabbitTemplate的接口都被设置了监听?访问其它接口也一样会打印回调方法中的信息?

下面我们测试一下下面的方法:

运行程序,测试接口:

消息发送成功,查看控制台:

可以看到,同样会触发监听,为了避免这个问题,我们可以在config包中自己配置一个RabbitTemplate对象并注入进来:

@Configuration
public class RabbitTemplateConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        //消息到达exchange时的回调方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("执行confirm方法");
                if(ack){//ack为true,表示消息到达交换机
                    System.out.printf("接收到消息,消息ID:%s \n",correlationData==null ? null : correlationData.getId());

                }else{//ack为false,表示消息未到达交换机
                    System.out.printf("未接收到消息,消息ID:%s , cause: %s \n",correlationData==null ? null : correlationData.getId(),cause);
                    //业务逻辑,如重发等
                }
            }
        });

        //消息退回时的回调方法
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("消息退回: " + returned);
            }
        });
        return rabbitTemplate;
    }
}

2.2 return退回模式

 一、触发机制

当消息到达exchange后,需要路由到queue中,如果一条消息无法被任何queue消费(routingKey不存在或队列不存在),可以把消息退回给producer,退回时可以设置一个回调方法ReturnCallback,对消息进行处理


二、代码演示

   //消息退回时的回调方法
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("消息退回: " + returned);
            }
        });

修啊routingKey为一个不存在的routingKey:

运行程序,测试接口:

查看控制台:

可以看到,消息被退回


2.3 总结

publisher confirms 机制可以保证消息从生产者到Broker的可靠性,其中confirm模式工作在生产者到exchange之间,return模式工作在exchange到queue之间 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2384516.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

无人机开启未来配送新篇章

低空物流(无人机物流)是利用无人机等低空飞行器进行货物运输的物流方式,依托低空空域(通常在120-300米)实现快速、高效、灵活的配送服务。它是低空经济的重要组成部分,广泛应用于快递配送、医疗物资运输、农…

Qt状态机QStateMachine

QStateMachine QState 提供了一种强大且灵活的方式来表示状态机中的状态,通过与状态机类(QStateMachine)和转换类(QSignalTransition, QEventTransition)结合,可以实现复杂的状态逻辑和用户交互。合理使用嵌套状态机、信号转换、动作与动画、…

Java详解LeetCode 热题 100(20):LeetCode 48. 旋转图像(Rotate Image)详解

文章目录 1. 题目描述2. 理解题目3. 解法一:转置 翻转3.1 思路3.2 Java代码实现3.3 代码详解3.4 复杂度分析3.5 适用场景 4. 解法二:四点旋转法4.1 思路4.2 Java代码实现4.3 代码详解4.4 复杂度分析4.5 适用场景 5. 详细步骤分析与示例跟踪5.1 解法一&a…

CAU人工智能class4 批次归一化

归一化 在对输入数据进行预处理时会用到归一化,将输入数据的范围收缩到0到1之间,这有利于避免纲量对模型训练产生的影响。 但当模型过深时会产生下述问题: 当一个学习系统的输入分布发生变化时,这种现象称之为“内部协变量偏移”…

Android11以上通过adb复制文件到内置存储让文件管理器可见

之前Android版本如果需要将文件通过adb push放到内置存储,push到/data/media/10下的目录即可,直接放/sdcard/文件管理器是看不到的。 现在最新的Android版本直接将文件放在/sdcard或/data/media/10下文件管理器也看不到 可以将文件再复制一份到一下路径…

篇章二 需求分析(一)

目录 1.知名MQ 2.需求分析 2.1 核心概念 2.2 生产者消费者模型的类别 2.3 BrokerServer 内部的关键概念(MQ) 1.虚拟主机(Virtual Host) 2.交换机(Exchange) 3.队列(Queue) 4…

图解深度学习 - 机器学习简史

前言 深度学习并非总是解决问题的最佳方案:缺乏足够数据时,深度学习难以施展;某些情况下,其他机器学习算法可能更为高效。 若初学者首次接触的是深度学习,可能会形成一种偏见,视所有机器学习问题为深度学…

Gmsh 代码深度解析与应用实例

在科学计算与工程仿真领域,Gmsh 是一款广受欢迎的开源有限元网格生成器,它不仅支持复杂的几何建模,还能高效生成高质量的网格,并具备强大的后处理功能。本文将深入解析几段具有代表性的 Gmsh 代码,从基础几何创建到高级…

49页 @《人工智能生命体 新启点》中國龍 原创连载

《 人工智能生命体 新启点 》一书,以建立意识来建立起生命体,让其成为独立、自主的活动个体;也就可以理解为建立生命体的思想指导。 让我们能够赋予他灵魂!

量化研究---bigquant策略交易api研究

api接口来平台的代码整理,原理是读取bigquant的模拟测试信号,下单,可以完美的对接qmt交易,我优化了交易api的部分内容 我开发对接qmt的交易系统 看api源代码 源代码 # 导入系统包 import os import json import requests from ty…

编译原理 期末速成

一、基本概念 1. 翻译程序 vs 编译程序 翻译程序的三种方式 编译:将高级语言编写的源程序翻译成等价的机器语言或汇编语言。(生成文件,等价)解释:将高级语言编写的源程序翻译一句执行一句,不生成目标文件…

echarts之漏斗图

vue3echarts实现漏斗图 echarts中文官网&#xff1a;https://echarts.apache.org/examples/zh/index.html 效果图如下&#xff1a; 整体代码如下&#xff1a; <template><div id"funnelChart" style"width:100%;height:400px;"></div&g…

零基础设计模式——第二部分:创建型模式 - 原型模式

第二部分&#xff1a;创建型模式 - 5. 原型模式 (Prototype Pattern) 我们已经探讨了单例、工厂方法、抽象工厂和生成器模式。现在&#xff0c;我们来看创建型模式的最后一个主要成员——原型模式。这种模式关注的是通过复制现有对象来创建新对象&#xff0c;而不是通过传统的…

java 进阶 1.0.3

Thread API说明 自己滚去看文档 CPU线程调度 每一个线程的优先使用权都是系统随机分配的&#xff0c;人人平等 谁先分配到就谁先用 也可以耍赖&#xff0c;就是赋予某一个线程拥有之高使用权&#xff1a;优先级 这样的操作就叫做线程调度 最基本的是系统轮流获得 java的做法是抢…

从 Docker 到 runC

从 Docker 到 runC:容器底层原理详解 目录 1. Docker 与 runC 的关系 2. Docker 的核心组件 3. runC 的核心功能 4. 实战示例:从 Docker 到 runC 4.1 示例场景:运行一个简单容器 4.2 Docker 底层调用 runC 的流程 4.3 查看 runC 的调用 4.4 直接调用 runC 创建容器 …

PET,Prompt Tuning,P Tuning,Lora,Qlora 大模型微调的简介

概览 到2025年&#xff0c;虽然PET&#xff08;Pattern-Exploiting Training&#xff09;和Prompt Tuning在学术界仍有探讨&#xff0c;但在工业和生产环境中它们已基本被LoRA/QLoRA等参数高效微调&#xff08;PEFT&#xff09;方法取代 。LoRA因其实现简单、推理零开销&#…

02-jenkins学习之旅-基础配置

0 配置主路径 jenkins安装目录下找到jenkins.xml文件&#xff0c;C:\ProgramData\Jenkins\.jenkins目录下会存放jenkins相关的配置信息。 1 jdk配置 jenkins是java开发开源的项目&#xff0c;进而服务器需要jdk环境 1.1 服务器安装jdk 1.2 jenkins jdk配置 2 git配置 在je…

Appium+python自动化(三)- SDK Manager

简介 一开始打算用真机做的&#xff0c;所以在前边搭建环境时候就没有下载SDK&#xff0c;但是考虑到绝大多数人都没有真机&#xff0c;所以顺应民意整理一下模拟器。SDK顾名思义&#xff0c;Android SDK Manager就是一个Android软件开发工具包管理器&#xff0c;就像一个桥梁&…

3D Gaussian Splatting for Real-Time Radiance Field Rendering——文章方法精解

SfM → Point-NeRF → 3D Gaussian Splatting &#x1f7e6;SfM Structure-from-Motion&#xff08;运动恢复结构&#xff0c;简称 SfM&#xff09;是一种计算机视觉技术&#xff0c;可以&#xff1a; 利用多张从不同角度拍摄的图像&#xff0c;恢复出场景的三维结构和相机的…

【Unity实战笔记】第二十四 · 使用 SMB+Animator 实现基础战斗系统

转载请注明出处&#xff1a;&#x1f517;https://blog.csdn.net/weixin_44013533/article/details/146409453 作者&#xff1a;CSDN|Ringleader| 1 结构 1.1 状态机 1.2 SMB 2 代码实现 2.1 核心控制 Player_Base_SMB 继承 StateMachineBehaviour &#xff0c;控制变量初始…