Redisson - 实现延迟队列

news2026/2/18 10:07:51

Redisson 延迟队列

Redisson 是基于 Redis 的一款功能强大的 Java 客户端。它提供了诸如分布式锁、限流器、阻塞队列、延迟队列等高可用、高并发组件。

其中,RDelayedQueue 是对 Redis 数据结构的高阶封装,能让你将消息延迟一定时间后再进入消费队列。

延迟队列的组成

Redisson 延迟队列由两个 Redis 队列组成:

RDelayedQueue:延迟元素容器,暂存在 Redis 的 zset 中(按时间排序)
RBlockingQueue(目标队列):当延迟时间到达后,Redisson 会自动将元素移动到这个队列,供消费者消费。

Redisson 内部使用 定时轮询线程 来扫描延迟数据并迁移至目标队列。

原理

Redisson 的 RDelayedQueue 设计巧妙,它并非一个单一的 Redis 数据结构,而是结合了 Redis 的 ZSET (有序集合) 和 List (列表,具体实现为 RBlockingQueue) 来实现的

> 生产者  -- (元素, 延迟时间) -->  RDelayedQueue (API)
>                                      |
>                                      | (Redisson 内部)
>                                      V
>             +--------------------------------------------------+
>             |   ZSET (有序集合)                                |
>             |   Key: delayed_queue_name_zset                   |
>             |   Member: task_id_or_payload                     |
>             |   Score: execution_timestamp                     |
>             +--------------------------------------------------+
>                        ^
>                        |  (Eviction Scheduler 定期扫描)
>                        |  (到期任务)
>                        V
>             +--------------------------------------------------+
>             |   RBlockingQueue (目标队列,基于 Redis List)      |
>             |   Key: destination_queue_name                    |
>             |   Value: task_payload                            |
>             +--------------------------------------------------+
>                                      ^
>                                      |
>                                      | (消费者 take()/poll())
>                                      V 消费者  <---------------------------

Redisson 延迟队列的优势

分布式特性:基于 Redis,天然支持分布式环境,多个生产者和消费者实例可以共享同一个延迟队列系统。
高性能与持久化:依赖 Redis 的高性能特性。如果 Redis 配置了持久化 (AOF/RDB),延迟任务的元数据也能得到持久化保障。
易用性:Redisson 提供了简洁易懂的 API,屏蔽了底层 ZSET 和 List 的复杂操作。
精确度较高:任务的到期判断依赖 Redis 服务器的时间,通常比较准确。
支持任务取消:可以通过 RDelayedQueue.remove(object) 方法从延迟队列中移除尚未到期的任务(前提是任务对象能被正确 equals 比较)。

Redisson队列实践

添加依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.24.3</version>
</dependency>

Redisson 配置 (application.yml):

spring:

  application:
    name: redisson-delayed-order-app
  # Redisson 配置 (如果使用 redisson-spring-boot-starter,它会尝试自动配置)
  # 你也可以提供一个 redisson.yaml 文件或在 Java Config 中配置 RedissonClient Bean
  # 例如,指定单个 Redis 服务器:
  redis: # Spring Boot 2.x 使用 spring.redis, Spring Boot 3.x 使用 spring.data.redis
    # Redisson starter 会尝试使用这些配置,但更推荐使用 Redisson 自身的配置方式
    # address: redis://127.0.0.1:6379 # Redisson 推荐的格式
    host: 127.0.0.1
    port: 6379
    # database: 0
    # password:

# Redisson 自己的配置方式 (更灵活,可以放在 redisson.yaml 中)
# redisson:
#   singleServerConfig:
#     address: "redis://127.0.0.1:6379"
#     database: 0
#   # codec: org.redisson.codec.JsonJacksonCodec # 推荐使用 Jackson 序列化

定义延迟任务处理器(消费者)

@Component
public class OrderCloseConsumer implements InitializingBean {

    private static final String QUEUE_NAME = "order-close-queue";

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private OrderService orderService;

    @Override
    public void afterPropertiesSet() {
        RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(QUEUE_NAME);
        RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

        // 启动消费线程
        Executors.newSingleThreadExecutor().submit(() -> {
            while (true) {
                try {
                    String orderSn = blockingQueue.take(); // 阻塞等待
                    orderService.closeOrder(orderSn); // 业务处理
                    log.info("订单超时关闭成功:{}", orderSn);
                } catch (Exception e) {
                    log.error("延迟关单处理异常", e);
                }
            }
        });
    }
}

在下单时设置延迟任务

public void createOrder(String orderSn) {
    // 1. 业务入库逻辑...

    // 2. 加入延迟队列
    RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("order-close-queue");
    RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

    delayedQueue.offer(orderSn, 30, TimeUnit.MINUTES); // 30分钟后执行
    log.info("订单加入延迟关闭队列:{}", orderSn);
}

关键注意事项

1、序列化 (Codec):
Redisson 默认使用 MarshallingCodec,它要求任务对象实现 java.io.Serializable。
推荐配置 RedissonClient 使用 org.redisson.codec.JsonJacksonCodec,这样任务对象无需实现 Serializable,更灵活且跨语言友好。在 RedissonClient Bean 配置中设置:config.setCodec(new JsonJacksonCodec());

2、幂等性: 消费者的处理逻辑(如 processOrderTimeout)必须是幂等的。即使同一个任务被重复消费(如消费者处理中断后任务重投,或 remove 失败后任务仍到期),最终结果也应一致。通过检查订单当前状态是保证幂等性的关键。

3、任务取消的 equals() 和 hashCode(): RDelayedQueue.remove(object) 依赖于任务对象的 equals() 和 hashCode() 方法来查找并删除。确保任务载体类 (如 OrderTimeoutTask) 正确实现了这两个方法,通常基于任务的唯一标识(如订单号)。

4、消费者线程管理: 使用 InitializingBean 和 DisposableBean 来管理消费者线程的生命周期。消费者逻辑应在单独的线程中执行,避免阻塞主线程。可以使用 Executors 创建线程池来并发处理任务。

5、错误处理与重试: 消费者循环中应有完善的 try-catch 块,处理单个任务失败的情况,避免整个消费者线程挂掉。可以考虑引入失败任务记录、告警或简单的延时重试(但要注意避免无限重试)。

6、Redis 持久化: 为保证系统重启后延迟任务不丢失(ZSET 中的元数据),Redis 服务器应配置 RDB 或 AOF 持久化。

7、消费者并发与伸缩:
可以启动多个消费者实例(不同JVM进程),它们会从同一个目标 RBlockingQueue 中竞争获取任务,实现负载均衡。
在单个消费者实例内部,也可以使用线程池来并发处理从队列中获取的任务。

8、监控: 关注 Redis 中 ZSET 和 List 的长度、Redisson 调度线程的健康状况、任务处理的成功率和耗时等指标,以便及时发现和处理问题。

9、Redisson 版本: 确保使用较新的稳定版 Redisson,因为早期版本可能在延迟队列的某些边缘场景下存在问题。

总结

Redisson 的 RDelayedQueue 通过巧妙地结合 Redis ZSET 和 List,提供了一个强大、易用且高效的分布式延迟队列解决方案。它非常适合如订单超时关闭、延时通知、定时任务等场景。在实践中,务必注意序列化、幂等性、任务取消的正确实现以及消费者端的健壮性设计,才能充分发挥其优势,构建稳定可靠的分布式应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2401484.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

软件工程的定义与发展历程

文章目录 一、软件工程的定义二、软件工程的发展历程1. 前软件工程时期(1940s-1960s)2. 软件工程诞生(1968)3. 结构化方法时期(1970s)4. 面向对象时期(1980s)5. 现代软件工程(1990s-至今) 三、软件工程的发展趋势 一、软件工程的定义 软件工程是应用系统化、规范化、可量化的方…

第十三节:第五部分:集合框架:集合嵌套

集合嵌套案例分析 代码&#xff1a; package com.itheima.day27_Collection_nesting;import java.util.*;/*目标:理解集合的嵌套。 江苏省 "南京市","扬州市","苏州市","无锡市","常州市" 湖北省 "武汉市","…

Java设计模式之观察者模式详解

一、观察者模式简介 观察者模式&#xff08;Observer Pattern&#xff09;是一种行为型设计模式&#xff0c;它定义了对象之间的一对多依赖关系。当一个对象&#xff08;主题&#xff09;的状态发生改变时&#xff0c;所有依赖于它的对象&#xff08;观察者&#xff09;都会自…

freeRTOS 消息队列之一个事件添加到消息队列超时怎么处理

一 消息队列的结构框图 xTasksWaitingToSend‌&#xff1a;这个列表存储了所有因为队列已满而等待发送消息的任务。当任务尝试向一个已满的队列发送消息时&#xff0c;该任务会被挂起并加入到xTasksWaitingToSend列表中&#xff0c;直到队列中有空间可用‌&#xff0c; xTasksW…

Authpf(OpenBSD)认证防火墙到ssh连接到SSH端口转发技术栈 与渗透网络安全的关联 (RED Team Technique )

目录 &#x1f50d; 1. Authpf概述与Shell设置的作用 什么是Authpf&#xff1f; Shell设置为/usr/sbin/authpf的作用与含义 &#x1f6e0;️ 2. Authpf工作原理与防火墙绕过机制 技术栈 工作原理 防火墙绕过机制 Shell关联 &#x1f310; 3. Authpf与SSH认证及服务探测…

组合与排列

组合与排列主要有两个区别&#xff0c;区别在于是否按次序排列和符号表示不同。 全排列&#xff1a; 从n个不同元素中任取m&#xff08;m≤n&#xff09;个元素&#xff0c;按照一定的顺序排列起来&#xff0c;叫做从n个不同元素中取出m个元素的一个排列。当mn时所有的排列情况…

Apache Druid

目录 Apache Druid是什么&#xff1f; CVE-2021-25646(Apache Druid代码执行漏洞) Apache Druid是什么&#xff1f; Apache Druid是一个高性能、分布式的数据存储和分析系统。设计用于处理大量实时数据&#xff0c;并进行低延迟的查询。它特别适合用于分析大规模日志、事件数据…

使用深蓝词库软件导入自定义的词库到微软拼音输入法

我这有一个人员名单&#xff0c;把它看作一个词库&#xff0c;下面我演示一下如何把这个词库导入微软输入法 首先建一个text文件&#xff0c;一行写一个词条 下载深蓝词库 按照我这个配置&#xff0c;点击转换&#xff0c;然后在桌面微软输入法那右键&#xff0c;选择设置 点…

使用Node.js分片上传大文件到阿里云OSS

阿里云OSS的分片上传&#xff08;Multipart Upload&#xff09;是一种针对大文件优化的上传方式&#xff0c;其核心流程和关键特性如下&#xff1a; 1. ‌核心流程‌ 分片上传分为三个步骤&#xff1a; 初始化任务‌&#xff1a;调用InitiateMultipartUpload接口创建上传任务…

复变函数中的对数函数及其MATLAB演示

复变函数中的对数函数及其MATLAB演示 引言 在实变函数中&#xff0c;对数函数 ln ⁡ x \ln x lnx定义在正实数集上&#xff0c;是一个相对简单的概念。然而&#xff0c;当我们进入复变函数领域时&#xff0c;对数函数展现出更加丰富和复杂的性质。本文将介绍复变函数中对数函…

【Linux】Linux程序地址基础

参考博客&#xff1a;https://blog.csdn.net/sjsjnsjnn/article/details/125533127 一、地址空间的阐述 1.1 程序地址空间 下面的图片展示了程序地址空间的组成结构 我们通过代码来验证一下 int g_unval; int g_val 100;int main(int argc, char *argv[]);void test1() {i…

将图形可视化工具的 Python 脚本打包为 Windows 应用程序

前文我们已经写了一个基于python的tkinter库和matplotlib库的图形可视化工具。 基于Python的tkinter库的图形可视化工具&#xff08;15种图形的完整代码&#xff09;:基于Python的tkinter库的图形可视化工具&#xff08;15种图形的完整代码&#xff09;-CSDN博客 在前文基础上&…

无人机军用与民用技术对比分析

一、材料区别 军用无人机&#xff1a; 1. 高强度特种材料&#xff1a; 大量使用钛合金、碳纤维复合材料&#xff0c;兼顾轻量化与高强度&#xff0c;提升抗冲击性和隐身性能。 关键部件依赖进口材料。 2. 隐身涂层&#xff1a; 采用雷达吸波材料和低红外特征涂料&#xf…

刷leetcode hot100--矩阵6/1

1.螺旋矩阵【很久】6/1【感觉就是思路的搬运工&#xff0c;没完全理解】 54. 螺旋矩阵 - 力扣&#xff08;LeetCode&#xff09; 原来想 但是如果是奇数矩阵&#xff0c;遍历不到中间 解决思路&#xff1a; 用left,right,top,down标记/限定每次遍历的元素&#xff0c;每次从…

Docker轻松搭建Neo4j+APOC环境

Docker轻松搭建Neo4jAPOC环境 一、简介二、Docker部署neo4j三、Docker安装APOC插件四、删除数据库/切换数据库 一、简介 Neo4j 是一款高性能的 原生图数据库&#xff0c;采用 属性图模型 存储数据&#xff0c;支持 Cypher查询语言&#xff0c;适用于复杂关系数据的存储和分析。…

定制开发开源AI智能名片S2B2C商城小程序在无界零售中的应用与行业智能升级示范研究

摘要&#xff1a;本文聚焦无界零售背景下京东从零售产品提供者向零售基础设施提供者的转变&#xff0c;探讨定制开发开源AI智能名片S2B2C商城小程序在这一转变中的应用。通过分析该小程序在商业运营成本降低、效率提升、用户体验优化等方面的作用&#xff0c;以及其与京东AI和冯…

【大模型:知识图谱】--5.neo4j数据库管理(cypher语法2)

目录 1.节点语法 1.1.CREATE--创建节点 1.2.MATCH--查询节点 1.3.RETURN--返回节点 1.4.WHERE--过滤节点 2.关系语法 2.1.创建关系 2.2.查询关系 3.删除语法 3.1.DELETE 删除 3.2.REMOVE 删除 4.功能补充 4.1.SET &#xff08;添加属性&#xff09; 4.2.NULL 值 …

贪心算法应用:装箱问题(BFD算法)详解

贪心算法应用&#xff1a;装箱问题(BFD算法)详解 1. 装箱问题与BFD算法概述 1.1 装箱问题定义 装箱问题(Bin Packing Problem)是组合优化中的经典问题&#xff0c;其定义为&#xff1a; 给定n个物品&#xff0c;每个物品有大小wᵢ (0 < wᵢ ≤ C)无限数量的箱子&#xf…

编程技能:格式化打印05,格式控制符

专栏导航 本节文章分别属于《Win32 学习笔记》和《MFC 学习笔记》两个专栏&#xff0c;故划分为两个专栏导航。读者可以自行选择前往哪个专栏。 &#xff08;一&#xff09;WIn32 专栏导航 上一篇&#xff1a;编程技能&#xff1a;格式化打印04&#xff0c;sprintf 回到目录…

MPLAB X IDE ​软件安装与卸载

1、下载MPLAB X IDE V6.25 MPLAB X IDE | Microchip Technology 正常选Windows&#xff0c;点击Download&#xff0c;等待自动下载完成&#xff1b; MPLAB X IDE 一台电脑上可以安装多个版本&#xff1b; 2、安装MPLAB X IDE V6.25 右键以管理员运行&#xff1b;next; 勾选 I a…