Spring Boot 与 RabbitMQ 的深度集成实践(三)

news2025/5/20 19:06:22

高级特性实现

消息持久化

在实际的生产环境中,消息的可靠性是至关重要的。消息持久化是确保 RabbitMQ 在发生故障或重启后,消息不会丢失的关键机制。它涉及到消息、队列和交换机的持久化配置。

首先,配置队列持久化。在创建队列时,将durable参数设置为true,表示该队列是持久化队列。当 RabbitMQ 服务器重启时,持久化队列会从磁盘中恢复,而不是被重新创建。例如,在之前创建队列的配置类中:

 

@Bean

public Queue directQueue() {

return new Queue("direct.queue", true);

}

这里创建的direct.queue队列通过true参数设置为持久化队列。这样,即使服务器出现故障,队列中的消息也不会丢失。

对于交换机,同样可以通过durable参数来设置持久化。以直连交换机为例:

 

@Bean

public DirectExchange directExchange() {

return new DirectExchange("direct.exchange", true, false);

}

direct.exchange交换机被设置为持久化,保证了在服务器重启后,交换机的配置信息仍然存在,不会影响消息的路由。

在消息层面,Spring AMQP 默认会将消息设置为持久化。当使用RabbitTemplate发送消息时,消息的MessageProperties中的deliveryMode属性默认被设置为MessageDeliveryMode.PERSISTENT,表示消息是持久化的。这意味着消息不仅会被存储在内存中,还会被写入磁盘,从而在服务器重启后仍然可用。例如,在生产者类RabbitMQProducer中发送消息时:

 

public void sendMessage(String exchange, String routingKey, String message) {

rabbitTemplate.convertAndSend(exchange, routingKey, message);

System.out.println("Sent message: " + message);

}

这里发送的消息会自动被标记为持久化,确保了消息在传输过程中的可靠性。通过配置消息、队列和交换机的持久化,可以大大提高消息系统的可靠性,避免因服务器故障导致的消息丢失问题,为企业级应用提供了坚实的消息保障。

消息确认机制

消息确认机制是保证消息在生产者和消费者之间可靠传递的重要手段,它分为生产者消息确认和消费者消息确认。

在生产者端,RabbitMQ 提供了confirm回调机制,用于确认消息是否成功发送到交换机。首先,在application.yml中开启publisher - confirm - type配置:

 

spring:

rabbitmq:

publisher - confirm - type: correlated

这表示开启了发布确认模式,当消息发送到交换机后,会触发回调方法。

然后,在配置类中为RabbitTemplate设置ConfirmCallback回调:

 

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class RabbitMQConfig {

@Bean

public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

if (ack) {

System.out.println("Message sent to exchange successfully, correlation data: " + correlationData);

} else {

System.out.println("Failed to send message to exchange, correlation data: " + correlationData + ", cause: " + cause);

}

});

return rabbitTemplate;

}

}

在这个回调中,correlationData包含了消息发送时的相关数据,如消息 ID 等;ack表示消息是否成功发送到交换机,true表示成功,false表示失败;cause则是失败的原因。通过这种方式,生产者可以根据回调结果来判断消息的发送状态,以便进行相应的处理,如记录日志、重新发送消息等。

对于消费者,RabbitMQ 支持自动确认和手动确认两种方式。自动确认是默认的方式,当消费者接收到消息后,RabbitMQ 会自动将消息从队列中移除。然而,这种方式存在一定的风险,如果消费者在处理消息过程中出现异常,消息已经被确认移除,可能会导致数据丢失。

手动确认则更加安全可靠。在application.yml中,可以将消费者的确认模式设置为手动确认:

 

spring:

rabbitmq:

listener:

simple:

acknowledge - mode: manual

在消费者类中,通过Channel对象来手动确认消息:

 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component

public class RabbitMQConsumer implements ChannelAwareMessageListener {

@RabbitListener(queues = "direct.queue")

@Override

public void onMessage(Message message, Channel channel) throws Exception {

try {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

// 处理消息逻辑

System.out.println("Received message: " + new String(message.getBody()));

// 手动确认消息,multiple为false表示只确认当前消息

channel.basicAck(deliveryTag, false);

} catch (Exception e) {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

// 处理异常,例如将消息放入死信队列或记录日志

System.out.println("Error processing message: " + e.getMessage());

// 拒绝消息,requeue为false表示不重新入队,消息会进入死信队列(如果配置了死信队列)

channel.basicNack(deliveryTag, false, false);

}

}

}

在手动确认模式下,消费者在成功处理消息后,通过channel.basicAck方法来确认消息;如果处理过程中出现异常,则通过channel.basicNack方法来拒绝消息,并可以根据业务需求决定是否将消息重新放入队列。这种方式确保了消息在被正确处理后才会从队列中移除,提高了消息处理的可靠性。

死信队列和延迟队列

死信队列(Dead Letter Queue,DLQ)是一种特殊的队列,用于处理那些无法被正常消费的消息。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换器中,这个交换器就是死信交换器(DLX),绑定 DLX 的队列就称为死信队列。

导致消息成为死信的常见原因有以下几种:

  • 消息被拒绝:当消费者使用basic.reject或basic.nack方法拒绝消息,并且设置requeue参数为false时,消息会成为死信。这通常发生在消息内容不符合预期,或者消费者处理消息时出现严重错误,无法继续处理该消息的情况下。例如,在处理订单消息时,如果消息格式错误,无法解析订单信息,消费者可以拒绝该消息并将其标记为死信。
  • 消息过期:如果为消息或队列设置了生存时间(TTL,Time To Live),当消息在队列中的存活时间超过了 TTL 值时,消息就会过期成为死信。例如,在电商场景中,用户下单后生成的订单消息,如果在一定时间内未被处理(如 30 分钟),可以将其设置为过期,进入死信队列进行后续处理,如取消订单、通知用户等。
  • 队列达到最大长度:当队列中的消息数量达到了其设置的最大长度限制时,新进入队列的消息会被视为死信。这在一些对队列容量有限制的场景中很有用,例如,为了防止队列无限增长导致内存耗尽,可以设置队列的最大长度,当队列满时,新消息进入死信队列,以便进行特殊处理。

死信队列在实际应用中有广泛的场景。例如,在订单处理系统中,当订单消息处理失败(如库存不足、支付失败等)时,可以将订单消息放入死信队列,由专门的处理程序对死信队列中的消息进行分析和处理,如重新尝试处理订单、通知管理员等。在消息重试机制中,也可以利用死信队列,当消息多次重试仍未成功时,将其放入死信队列,避免消息在正常队列中无限循环重试,占用资源。

在 Spring Boot 中配置死信队列,首先需要创建正常队列、死信队列和死信交换机。以下是一个配置示例:

 

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class DeadLetterQueueConfig {

public static final String NORMAL_QUEUE = "normal.queue";

public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";

public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";

public static final String ROUTING_KEY = "routing.key";

// 创建正常队列,并配置死信交换机和路由键

@Bean

public Queue normalQueue() {

Map<String, Object> args = new HashMap<>();

args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);

args.put("x-dead-letter-routing-key", ROUTING_KEY);

return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();

}

// 创建死信队列

@Bean

public Queue deadLetterQueue() {

return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();

}

// 创建死信交换机

@Bean

public DirectExchange deadLetterExchange() {

return new DirectExchange(DEAD_LETTER_EXCHANGE);

}

// 绑定死信队列和死信交换机

@Bean

public Binding bindingDeadLetterQueue(@Qualifier("deadLetterQueue") Queue queue,

@Qualifier("deadLetterExchange") DirectExchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);

}

}

在这个配置中,normalQueue方法创建了一个正常队列,并通过x-dead-letter-exchange和x-dead-letter-routing-key参数配置了死信交换机和路由键。当正常队列中的消息成为死信时,会根据这些配置被发送到死信交换机,再由死信交换机路由到死信队列。deadLetterQueue方法创建了死信队列,deadLetterExchange方法创建了死信交换机,最后通过bindingDeadLetterQueue方法将死信队列和死信交换机进行绑定,建立起死信消息的路由通道。

延迟队列是一种特殊的队列,它允许消息在指定的延迟时间后才被消费者消费。在 AMQP 协议中,RabbitMQ 本身并没有直接支持延迟队列的功能,但可以通过死信队列和 TTL(Time To Live)来模拟实现延迟队列的效果。

具体实现原理是:生产者将消息发送到一个设置了 TTL 的正常队列中,当消息在正常队列中的存活时间超过了 TTL 值时,消息会成为死信,并被发送到死信队列中。由于死信队列有消费者监听,所以当消息进入死信队列时,就相当于延迟了 TTL 时间后被消费,从而实现了延迟队列的功能。

在实际应用中,延迟队列有很多场景。例如,在电商系统中,用户下单后,如果在一定时间内(如 30 分钟)未支付,订单将被自动取消。可以将取消订单的消息发送到延迟队列,设置延迟时间为 30 分钟,当 30 分钟后,消息从延迟队列中被消费,系统可以检查订单状态,如果仍未支付,则取消订单。在定时任务场景中,也可以利用延迟队列来实现定时执行任务的功能,如定时发送邮件、定时生成报表等。

以电商订单超时取消为例,展示如何配置延迟队列。首先,在上述死信队列配置的基础上,为正常队列设置 TTL:

 

// 创建正常队列,并配置死信交换机、路由键和TTL

@Bean

public Queue normalQueue() {

Map<String, Object> args = new HashMap<>();

args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);

args.put("x-dead-letter-routing-key", ROUTING_KEY);

// 设置队列中消息的TTL为30分钟(30 * 60 * 1000毫秒)

args.put("x-message-ttl", 30 * 60 * 1000);

return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();

}

在生产者类中,发送订单消息到正常队列:

 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class OrderProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void sendOrderMessage(String orderInfo) {

rabbitTemplate.convertAndSend("", DeadLetterQueueConfig.NORMAL_QUEUE, orderInfo);

System.out.println("Sent order message: " + orderInfo);

}

}

在消费者类中,监听死信队列,处理超时订单:

 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class OrderConsumer {

@RabbitListener(queues = DeadLetterQueueConfig.DEAD_LETTER_QUEUE)

public void handleTimeoutOrder(String orderInfo) {

System.out.println("Received timeout order message: " + orderInfo);

// 处理超时订单逻辑,如取消订单、通知用户等

}

}

通过以上配置和代码,实现了利用死信队列和 TTL 模拟延迟队列,实现电商订单超时自动取消的功能。这种方式充分利用了 RabbitMQ 的特性,为分布式系统中的定时任务和延迟处理提供了灵活可靠的解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2380185.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

进阶-数据结构部分:1、数据结构入门

飞书文档https://x509p6c8to.feishu.cn/wiki/HRLkwznHiiOgZqkqhLrcZNqVnLd 一、存储结构 顺序存储 链式存储 二、常用数据结构 2.1、栈 先进后出 场景&#xff1a; 后退/前进功能&#xff1a;网页浏览器中的后退和前进按钮可以使用栈来实现。在浏览网页时&#xff0c;每次…

React 19中useContext不需要Provider了。

文章目录 前言一、React 19中useContext移除了Provider&#xff1f;二、使用步骤总结 前言 在 React 19 中&#xff0c;useContext 的使用方式有所更新。开发者现在可以直接使用 作为提供者&#xff0c;而不再需要使用 <Context.Provider>。这一变化简化了代码结构&…

Json schema校验json字符串(networknt/json-schema-validator库)

学习链接 json-schema官网 - 英文 jsonschemavalidator 可在线校验网站 networknt的json-schema-validator github地址 networknt的json-schema-validator 个人gitee地址 - 里面有md文档说明和代码示例 JSON Schema 入门指南&#xff1a;如何定义和验证 JSON 数据结构 JS…

交易所开发:构建功能完备的金融基础设施全流程指南

交易所开发&#xff1a;构建功能完备的金融基础设施全流程指南 ——从技术架构到合规安全的系统性解决方案 一、开发流程&#xff1a;从需求分析到运维优化 开发一款功能完备的交易所需要遵循全生命周期管理理念&#xff0c;涵盖市场定位、技术实现、安全防护和持续迭代四大阶…

Axure疑难杂症:统计分析页面引入Echarts示例动态效果

亲爱的小伙伴,在您浏览之前,烦请关注一下,在此深表感谢! Axure产品经理精品视频课已登录CSDN可点击学习https://edu.csdn.net/course/detail/40420 课程主题:统计分析页面引入Echarts示例动态效果 主要内容:echart示例引入、大小调整、数据导入 应用场景:统计分析页面…

展锐Android14及更新版本split_build编译方法

更改split_build.py文件内容后按照下面方法编译&#xff1a; zip -r sys/vendor/sprd/release/split_build.zip sys/vendor/sprd/release/split_build/ rm -r sys/vendor/sprd/release/split_build/ cp -r vnd/vendor/sprd/release/split_build/ sys/vendor/sprd/release/cd s…

青少年ctf平台应急响应-应急响应2

题目&#xff1a; 当前服务器被创建了一个新的用户&#xff0c;请提交新用户的用户名&#xff0c;得到的结果 ssh rootchallenge.qsnctf.com -p 30327 这个命令用于通过 SSH 协议连接到指定的远程服务器。具体解释如下&#xff1a; ssh&#xff1a;这是在 Unix-like 系统中…

k8s监控方案实践补充(二):使用kube-state-metrics获取资源状态指标

k8s监控方案实践补充&#xff08;二&#xff09;&#xff1a;使用kube-state-metrics获取资源状态指标 文章目录 k8s监控方案实践补充&#xff08;二&#xff09;&#xff1a;使用kube-state-metrics获取资源状态指标一、Metrics Server简介二、kube-state-metrics实战部署1. 创…

基于SpringBoot的小型民营加油站管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

技术架构缺乏灵活性,如何应对变化需求?

技术架构缺乏灵活性会导致企业在面临市场变化、用户需求演化或新技术出现时难以及时响应&#xff0c;直接影响产品更新速度与竞争力。要有效应对变化需求&#xff0c;需要从引入模块化架构设计、推动微服务拆分、加强架构治理与决策机制、构建中台与平台化能力等方面系统推进。…

【AI时代】Java程序员大模型应用开发详细教程(上)

目录 一、大模型介绍 1. 大模型介绍 1.1 什么是大模型 1.2 技术储备 1.3 大模型的分类 2. 入门案例 3.Token的介绍 二、提示词工程 1. 好玩的提示词案例 1.1 翻译软件 1.2 让Deepseek绘画 1.3 生成数据 1.4 代码生成 2. 提示词介绍 3. Prompt Engineering最佳实…

虚拟网络编辑器

vmnet1 仅主机模式 hostonly 功能&#xff1a;虚拟机只能和宿主机通过vmnet1通信&#xff0c;不可连接其他网络&#xff08;包括互联网&#xff09; vmnet8 地址转换模式 NAT 功能&#xff1a;虚拟机可以和宿主通过vmnet8通信&#xff0c;并且可以连接其他网络&#xff0c;但是…

第31讲 循环缓冲区与命令解析

串口在持续接收数据时容易发生数据黏包&#xff08;先接收的数据尚未被处理&#xff0c;后面的数据已经将内存覆盖&#xff09;的情况&#xff0c;循环缓冲区的本质就是将串口接受到的数据马上拷贝到另外一块内存之中。为了避免新来的数据覆盖掉尚未处理的数据&#xff0c;一方…

数据结构(十)——排序

一、选择排序 1.简单选择排序 基本思想&#xff1a;假设排序表为[1,…,n]&#xff0c;第i趟排序即从[i,…,n]中选择关键字最小的元素与L[i]交换 eg&#xff1a;给定关键字序列{87&#xff0c;45&#xff0c;78&#xff0c;32&#xff0c;17&#xff0c;65&#xff0c;53&…

美蛋工具箱:一站式解决图片、视频、音频和文档处理需求的聚合神器

先放下载链接:夸克网盘下载 宝子们&#xff0c;今天不啰嗦&#xff0c;直接给大家安利一款超好用的聚合工具&#xff0c;有需要的小伙伴赶紧码住&#xff01; 今天要介绍的这款工具叫美蛋工具箱&#xff0c;它是一款聚合类工具。这个软件是绿色版的&#xff0c;聚合了图片工具…

python打卡day16

NumPy 数组基础 因为前天说了shap&#xff0c;这里涉及到数据形状尺寸问题&#xff0c;所以需要在这一节说清楚&#xff0c;后续的神经网络我们将要和他天天打交道。 知识点&#xff1a; numpy数组的创建&#xff1a;简单创建、随机创建、遍历、运算numpy数组的索引&#xff1a…

Redis 学习笔记 5:分布式锁

Redis 学习笔记 5&#xff1a;分布式锁 在前文中学习了如何基于 Redis 创建一个简单的分布式锁。虽然在大多数情况下这个锁已经可以满足需要&#xff0c;但其依然存在以下缺陷&#xff1a; 事实上一般而言&#xff0c;我们可以直接使用 Redisson 提供的分布式锁而非自己创建。…

游戏开发实战(一):Python复刻「崩坏星穹铁道」嗷呜嗷呜事务所---源码级解析该小游戏背后的算法与设计模式【纯原创】

文章目录 奇美拉项目游戏规则奇美拉(Chimeras)档案领队成员 结果展示&#xff1a; 奇美拉项目 由于项目工程较大&#xff0c;并且我打算把我的思考过程和实现过程中踩过的坑都分享一下&#xff0c;因此会分3-4篇博文详细讲解本项目。本文首先介绍下游戏规则并给出奇美拉档案。…

02- 浏览器运行原理

文章目录 1. 网页的解析过程浏览器内核 2. 浏览器渲染流程2.1 解析html2.2 生成css规则2.3 构建render tree2.4 布局(Layout)2.5 绘制(Paint) 3. 回流和重绘3.1 回流reflow&#xff08;1&#xff09;理解&#xff1a;&#xff08;2&#xff09;出现情况 3.2 重绘repaint&#x…

移除链表元素数据结构oj题(力扣题206)

目录 题目描述&#xff1a; 题目解读&#xff08;分析&#xff09; 解决代码 题目描述&#xff1a; 给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 新的头节点 。 题目解读&#xff08;分析&#…