如何利用Redis实现延迟队列?

news2025/5/19 11:22:27

延迟队列概念解析

延迟队列(Delay Queue)是一种特殊的消息队列,核心特性是允许消息在指定的延迟时间后被消费者处理,而非立即消费。它解决了传统队列(FIFO)无法处理“定时任务”或“超时任务”的问题,常见于需要异步延迟处理的场景(如订单超时取消、定时提醒、重试机制等)。


核心要素

  1. 延迟时间:消息入队时需指定“延迟时长”或“绝对执行时间”(如“5分钟后处理”或“2024-08-01 10:00执行”)。
  2. 任务存储:需可靠存储未到期的任务(避免宕机丢失),支持快速查询到期任务。
  3. 触发机制:能高效检测并提取已到期的任务(时间精度需满足业务需求)。
  4. 处理逻辑:消费者对到期任务进行业务处理(如调用接口、更新数据库)。

与普通队列的区别

特性普通队列(FIFO)延迟队列
消费时机消息入队后立即可被消费消息需等待指定延迟时间后才被消费
排序规则按入队顺序(先进先出)按到期时间排序(时间早的优先)
核心目标解耦、异步、削峰填谷解决“定时/超时”类异步任务需求

典型应用场景

  • 订单超时取消:用户下单后未支付,15分钟后自动取消订单。
  • 重试机制:接口调用失败后,延迟30秒重试(避免立即重试加重系统负担)。
  • 定时通知:活动开始前30分钟,向用户推送提醒消息。
  • 缓存预热:每日凌晨3点触发缓存数据加载任务。

关键设计挑战

  1. 延迟精度:需平衡性能与时间精度(如Redis轮询间隔过短会增加QPS,过长可能导致任务延迟处理)。
  2. 持久化:避免因服务宕机导致未到期任务丢失(如Redis通过RDB/AOF持久化,RabbitMQ通过消息持久化)。
  3. 分布式支持:多消费者场景下需避免任务重复消费(如Redis使用Lua脚本原子化取任务)。
  4. 内存/存储限制:单机方案(如JDK DelayQueue)受内存限制,需评估任务量上限。

一、Redis 实现延迟队列(Java 代码)

Redis 延迟队列通常利用 有序集合(ZSET) 存储任务,任务的执行时间作为 score,通过轮询或阻塞方式获取到期任务。以下是核心实现:

1. 依赖引入(Maven)
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.4.3</version>
</dependency>
2. 生产者(任务入队)
import redis.clients.jedis.Jedis;
import java.util.UUID;

public class RedisDelayQueueProducer {
    private final Jedis jedis;
    private final String queueKey = "delay_queue";

    public RedisDelayQueueProducer(Jedis jedis) {
        this.jedis = jedis;
    }

    // 添加延迟任务(score 为执行时间戳)
    public void addTask(String taskData, long executeTime) {
        String taskId = UUID.randomUUID().toString();
        jedis.zadd(queueKey, executeTime, taskId + ":" + taskData);
    }
}
3. 消费者(任务出队)

使用 Lua 脚本原子化获取并删除到期任务(避免多消费者竞态条件):

import redis.clients.jedis.Jedis;
import java.util.Arrays;
import java.util.List;

public class RedisDelayQueueConsumer {
    private final Jedis jedis;
    private final String queueKey = "delay_queue";
    // Lua 脚本:获取并删除 score <= 当前时间的任务(最多取 10 个)
    // 使用Lua保证删除时间和任务的原子性
    private final String luaScript = "" +
        "local tasks = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 10)\n" +
        "if #tasks > 0 then\n" +
        "    redis.call('zrem', KEYS[1], unpack(tasks))\n" +
        "end\n" +
        "return tasks";

    public RedisDelayQueueConsumer(Jedis jedis) {
        this.jedis = jedis;
    }

    public List<String> pollExpiredTasks() {
        long currentTime = System.currentTimeMillis();
        return jedis.eval(luaScript, Arrays.asList(queueKey), Arrays.asList(String.valueOf(currentTime)));
    }
}

方案缺点(消费者去消费这条消息只有轮询去消费,会导致大量线程空转,特别是高峰期,不太推荐使用):
由于 Redis ZSET 不支持原生的阻塞命令(如 BLPOP ),实际中需通过以下方式模拟阻塞:

  • 短轮询+休眠 :轮询间隔设置为较小值(如100ms),减少延迟但增加 Redis 压力。
  • 事件触发 :结合 Redis 的 PUBLISH/SUBSCRIBE 机制,生产者在添加任务时发布事件,消费者订阅事件后立即触发轮询(减少无效轮询)。

二、其他延迟队列实现方案(Java)

方案 1:JDK DelayQueue(单机版)

基于 java.util.concurrent.DelayQueue,任务需实现 Delayed 接口。

代码实现
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

// 延迟任务类
class DelayTask implements Delayed {
    private final String taskId;
    private final String data;
    private final long expireTime; // 绝对时间戳(毫秒)

    public DelayTask(String taskId, String data, long delayMs) {
        this.taskId = taskId;
        this.data = data;
        this.expireTime = System.currentTimeMillis() + delayMs;
    }

    // 剩余延迟时间
    @Override
    public long getDelay(TimeUnit unit) {
        long diff = expireTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    // 按到期时间排序
    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.expireTime, ((DelayTask) o).expireTime);
    }
}

// 生产者与消费者
public class JdkDelayQueueDemo {
    private static final DelayQueue<DelayTask> queue = new DelayQueue<>();

    public static void main(String[] args) {
        // 生产者:添加延迟 5 秒的任务
        new Thread(() -> {
            queue.put(new DelayTask("task1", "data1", 5000));
        }).start();

        // 消费者:阻塞获取到期任务
        new Thread(() -> {
            while (true) {
                try {
                    DelayTask task = queue.take();
                    System.out.println("处理任务:" + task.taskId + ", 数据:" + task.data);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }).start();
    }
}

方案缺点:轮询不推荐

方案 2:RabbitMQ 死信队列(分布式)

通过设置消息 TTL(过期时间),过期后消息转发到死信队列(DLX),消费者监听死信队列。

代码实现(需 RabbitMQ 环境)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class RabbitMqDelayQueueDemo {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    private static final String DEAD_LETTER_QUEUE = "dead_letter_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 1. 配置死信队列(DLX)
        channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.queueDeclare(DEAD_LETTER_QUEUE, true, false, false, null);
        channel.queueBind(DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE, "dead_letter_key");

        // 2. 配置普通队列(设置 TTL 和 DLX)
        Map<String, Object> normalQueueArgs = new HashMap<>();
        normalQueueArgs.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        normalQueueArgs.put("x-dead-letter-routing-key", "dead_letter_key");
        normalQueueArgs.put("x-message-ttl", 5000); // 消息 5 秒后过期
        channel.queueDeclare("normal_queue", true, false, false, normalQueueArgs);
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.queueBind("normal_queue", NORMAL_EXCHANGE, "normal_key");

        // 3. 生产者发送消息到普通队列
        String message = "延迟任务数据";
        channel.basicPublish(NORMAL_EXCHANGE, "normal_key", 
            MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

        // 4. 消费者监听死信队列(处理延迟任务)
        channel.basicConsume(DEAD_LETTER_QUEUE, false, (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody());
            System.out.println("处理延迟任务:" + msg);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }, consumerTag -> {});
    }
}

方案特点:使用死信队列机制实现延迟队列,如果有RabbitMQ 推荐使用。


方案 3:RocketMq实现(推荐)
一、核心概念

RocketMQ 的延迟时间并非任意值,而是通过「延迟级别」控制(由 Broker 配置决定)。默认延迟级别对应的时间为:

1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h

对应级别为 1~18(级别 0 表示不延迟)。

二、实现步骤
1. 生产者发送延迟消息

在发送消息时,通过 setDelayTimeLevel(int level) 方法设置延迟级别。

示例代码(Java)

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class DelayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        // 创建消息并设置延迟级别(例如级别 3,对应 10s 延迟)
        Message msg = new Message(
            "DelayTopic",  // 主题
            "TagA",        // 标签
            "Hello RocketMQ".getBytes("UTF-8")  // 消息内容
        );
        msg.setDelayTimeLevel(3);  // 设置延迟级别为 3(10秒)

        // 发送消息
        producer.send(msg);
        producer.shutdown();
    }
}
2. 消费者消费消息

消费者无需特殊配置,正常订阅主题即可,Broker 会在延迟时间到达后投递消息。

示例代码(Java)

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class DelayConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer_group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("DelayTopic", "*");  // 订阅主题

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                System.out.printf("收到消息:%s,延迟时间:%ds%n", 
                    new String(msg.getBody()), 
                    msg.getStoreTimestamp() - msg.getBornTimestamp() / 1000);  // 计算实际延迟时间(毫秒转秒)
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.println("消费者启动");
    }
}
三、注意事项
  1. 延迟级别限制:Broker 默认仅支持 18 个延迟级别,如需自定义延迟时间,需修改 Broker 配置文件(broker.conf)中的 messageDelayLevel 参数(格式:1s 5s 10s ...)。
  2. 消息时效性:延迟消息的存储和投递依赖 Broker 稳定性,需确保 Broker 有足够资源处理延迟队列。
  3. 版本兼容性:RocketMQ 4.2.0 及以上版本支持延迟消息,低版本需升级。

方案特点:原生api支持延迟队列,推荐此方案,实现简单易配置。


三、方案对比

方案优势劣势内聚耦合扩展性
Redis 延迟队列支持分布式、持久化(RDB/AOF)、高性能(O(logN) 插入/查询)需要维护 Redis 集群;需处理网络抖动(如 Lua 脚本原子性)低(依赖 Redis)高(可通过集群扩展)
JDK DelayQueue无额外依赖、实现简单、单机性能高单机限制(无法分布式)、无持久化(宕机任务丢失)、任务数受内存限制高(纯 JDK)低(仅单机)
RabbitMQ 死信队列天然分布式、支持持久化、消息可靠(ACK 机制)依赖 RabbitMQ 集群;配置复杂(需设置 TTL/DLX);延迟精度受 TTL 限制中(依赖 MQ)中(需扩展 MQ 集群)

总结

  • Redis:适合需要分布式、高吞吐的延迟任务(如订单超时取消)。
  • JDK DelayQueue:适合单机、小规模、对延迟精度要求不高的场景(如本地缓存清理)。
  • RabbitMQ:适合需要严格消息可靠、已集成 MQ 的分布式系统(如电商促销活动通知)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2379241.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【刚下赛场!】2025年江西省电子专题赛 - 现场制作:简易数控直流电流源原题

一、题目要求 二、赛场注意事项 1、一定要用铜柱将板子升起来&#xff0c;不然我们剪下来的引脚在测试的时候放在桌子上非常容易导致我们的板子短路&#xff08;记得把铜柱卸下来再上交作品&#xff0c;不然会被认为是做标记判0分&#xff09;&#xff1b; 2、发下来器件之后…

材料×工艺×AI:猎板PCB重构汽车电子四层板技术逻辑

一、汽车电子四层板的三大核心挑战 1. 极端环境下的可靠性保障 汽车电子需在-40℃至150℃的剧烈温变、高湿振动等环境中稳定运行。例如&#xff0c;电池管理系统&#xff08;BMS&#xff09;要求PCB在高温下阻抗漂移率低于8%&#xff0c;且镀层需具备抗腐蚀能力。猎板PCB通…

MCP(一)——QuickStart

目录 1. MCP简介2. MCP的优势3. MCP核心4. QuickStart For Server Developers(仅具参考)4.1 MCP核心概念4.2 构建MCP服务器的代码4.2.1 设置MCP服务器实例4.2.2 辅助函数4.2.3 实现工具执行4.2.4 在Cherry-Studio中添加MCP服务器4.2.5 演示4.2.5.1 测试工具get_alerts4.2.5.2 测…

Spring AOP从0到1

Spring有两大核心&#xff1a; 1、IoC 控制反转 2、AOP 面向切面编程 AOP&#xff1a;切面就是指某⼀类特定问题, 所以AOP也可以理解为面向特定⽅法编程. 引入AOP依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spri…

软考IPSEC案例分析

要回忆IPSEC点击这里 题目 5/21 某全国连锁企业的总部和分布在全国各地的30家分公司之间经常需要传输各种内部数据&#xff0c;因此公司决定在总部和各分公司之间建立VPN技术。具体拓扑如下&#xff1a; 配置部分只显示了与总部与分公司1的配置。 根据拓扑完成问题1-问题2。…

C++(23):容器类<vector>

目录 一、核心概念 二、基本语法 1. 头文件 2. 声明与初始化 三、常用操作 四、具体实例 1、size()、front()、back() 2、push_back()、pop_back()、capacity() 3、reserve&#xff08;&#xff09; 一、核心概念 Vectors 包含着一系列连续存储的元素,其行为…

Hugo 安装保姆级教程(搭建个人blog)

Hogo 安装保姆级教程 友链 参考文章&#xff1a; https://blog.csdn.net/xianyun_0355/article/details/140261279 前言 Hugo 是 Go 编写的静态网站生成器&#xff0c;速度快&#xff0c;易用&#xff0c;可配置。作为一款跨平台开源建站系统&#xff0c;当前提供 Windows&…

tomcat查看状态页及调优信息

准备工作 先准备一台已经安装好tomcat的虚拟机&#xff0c;tomcat默认是状态页是默认被禁用的 1.添加授权用户 vim /usr/local/tomcat/conf/tomcat-users.xml22 <role rolename"manager-gui"/>23 <user username"admin" password"tomcat&q…

从坏道扫描到错误修复:HD Tune实战指南

一、硬盘检测的必要性 随着计算机使用时间的增加&#xff0c;机械硬盘和固态硬盘都会出现不同程度的性能衰减。定期进行硬盘健康检查可以&#xff1a;及时发现潜在故障&#xff1b;预防数据丢失风险&#xff1b;掌握存储设备实际状态。 二、HD Tune功能解析 性能测试&#x…

将嵌入映射到 Elasticsearch 字段类型:semantic_text、dense_vector、sparse_vector

作者&#xff1a; Andre Luiz 讨论如何以及何时使用 semantic_text、dense_vector 或 sparse_vector&#xff0c;以及它们与嵌入生成的关系。 通过这个自定进度的 Search AI 实践学习亲自体验向量搜索。你可以开始免费云试用&#xff0c;或者在本地机器上尝试 Elastic。 多年来…

解决uni-app开发中的“TypeError: Cannot read property ‘0‘ of undefined“问题

问题背景 在使用uni-app开发小程序或App时&#xff0c;你可能会遇到这样一个错误: TypeError: Cannot read property 0 of undefinedat uni.promisify.adaptor.js:7这个错误看起来很唬人&#xff0c;但它实际上与uni-app框架中的Promise适配器有关。今天&#xff0c;我们将深…

翻译:20250518

翻译题 文章目录 翻译题一带一路中国结 一带一路 The “One Belt and One Road” Initiative aims to achieve win-win and shared development. China remains unchanged in its commitment to foster partnerships. China pursues an independent foreign policy of peace, …

西门子1200/1500博图(TIA Portal)寻址方式详解

西门子博图&#xff08;TIA Portal&#xff09;是西门子公司推出的自动化工程软件平台&#xff0c;广泛应用于工业自动化领域。在编写PLC程序时&#xff0c;寻址方式是一个非常重要的概念&#xff0c;它决定了如何访问和操作PLC中的数据和资源。本文将详细介绍西门子博图中的寻…

记录一次win11本地部署deepseek的过程

20250518 win11 docker安装部署 ollama安装 ragflow部署 deepseek部署 文章目录 1 部署Ollama下载安装ollama配置环境变量通过ollama下载模型deepseek-r1:7b 2 部署docker2.1 官网下载amd版本安装2.2 配置wsl2.3 Docker配置&#xff1a;位置代理镜像源 3 部署RAGFlow更换ragfl…

嵌入式STM32学习——外部中断EXTI与NVIC的基础练习⭐

按键控制LED灯 按键控制LED的开发流程&#xff1a; 第一步&#xff1a;使能功能复用时钟 第二布&#xff0c;配置复用寄存器 第三步&#xff0c;配置中断屏蔽寄存器 固件库按键控制LED灯 外部中断EXTI结构体&#xff1a;typedef struct{uint32_t EXTI_Line; …

<前端小白> 前端网页知识点总结

HTML 标签 1. 标题标签 h1到h6 2. 段落标签 p 3. 换行 br 水平线 hr 4. 加粗 strong 倾斜 em 下划线 ins 删除 del 5. 图像标签 img src-图像的位置 alt- 图片加载失败显示的文字 替换文本 title--- 鼠标放到图片上显示的文字 提示…

历史数据分析——宁波海运

运输服务 运输服务板块简介: 运输服务板块主要是为货物与人员流动提供核心服务的企业的集合,涵盖铁路、公路、航空、海运、物流等细分领域。该板块具有强周期属性,与经济复苏、政策调控、供需关系密切关联,尤其是海运领域。有不少国内股市的铁路、公路等相关的上市公司同…

小结:jvm 类加载过程

类加载过程 是Java虚拟机&#xff08;JVM&#xff09;将字节码文件&#xff08;.class文件&#xff09;加载到内存中&#xff0c;并转换为运行时数据结构的过程。这个过程可以分为多个步骤&#xff0c;每个步骤都有其特定的任务和目的。根据你提供的信息&#xff0c;以下是类加…

OpenCv高阶(八)——摄像头调用、摄像头OCR

文章目录 前言一、摄像头调用通用方法1、导入必要的库2、创建摄像头接口 二、摄像头OCR1.引入库2、定义函数&#xff08;1&#xff09;定义显示opencv显示函数&#xff08;2&#xff09;保持宽高比的缩放函数&#xff08;3&#xff09;坐标点排序函数&#xff08;4&#xff09;…

Java开发经验——阿里巴巴编码规范实践解析3

摘要 本文深入解析了阿里巴巴编码规范中关于错误码的制定与管理原则&#xff0c;强调错误码应便于快速溯源和沟通标准化&#xff0c;避免过于复杂。介绍了错误码的命名与设计示例&#xff0c;推荐采用模块前缀、错误类型码和业务编号的结构。同时&#xff0c;探讨了项目错误信…