Redis队列与Pub/Sub方案全解析:原理、对比与实战性能测试

news2025/5/22 23:37:38

一、为什么选择Redis实现消息队列?
Redis凭借其内存级操作(微秒级响应)、丰富的数据结构以及持久化能力,成为构建高性能消息队列的热门选择。相比传统消息队列(如Kafka/RabbitMQ),Redis在以下场景表现突出:
• 轻量级任务调度:毫秒级任务分发

• 实时数据处理:日志采集、事件驱动架构

• 高并发队列:电商秒杀、API限流

• 实时广播:即时通知、实时数据推送


二、主流实现方案对比

方案对比维度

特性List结构队列Stream类型队列Sorted Set队列Pub/Sub
消息持久化❌(依赖Redis配置,配置RDB或AOF后可以持久化)✔️(内置持久化)❌(依赖Redis配置)❌(纯内存)
消息广播✔️(一对多)
离线消息✔️(存储未ACK消息)❌(立即丢弃)
订阅模式✔️(频道/模式匹配)
典型延迟0.8ms1.2ms2.7ms0.2ms
适用场景任务队列可靠消息处理定时任务实时通知

三、核心方案实现详解

方案1:List结构队列(简单队列)

核心原理

// 生产者
jedis.lpush("task_queue", taskJson);

// 消费者(阻塞模式)
List<String> result = jedis.brpop(0, "task_queue");
String task = result.get(1);

持久化机制
• RDB持久化:定时生成内存快照(需配置save参数)

• AOF持久化:记录所有写操作命令(需配置appendonly yes

• 验证方法:

# 查看当前持久化配置
CONFIG GET save
CONFIG GET appendonly

特性分析
• 优点:实现简单,性能极高(TPS 10万+)

• 缺点:无ACK机制,持久化依赖Redis配置

• 适用场景:日志采集、非关键任务队列


方案2:Stream类型队列(企业级队列)

核心原理

// 生产者
String messageId = jedis.xadd("order_stream", "*", 
    "status", "created",
    "amount", "99.9");

// 消费者组消费
Map.Entry<String, String> entry = jedis.xreadGroup(
    "order_group", 
    "consumer1", 
    XReadGroupParams.xReadGroupParams().count(1).streamOffset("order_stream", ">"),
    "order_stream"
).get(0);

// 确认消息
jedis.xack("order_stream", "order_group", entry.getKey());

核心优势
• 消费者组:支持多消费者并行处理

• 消息确认:ACK机制保证消息不丢失

• 消息回溯:可查看历史消息(7天默认)


方案3:Sorted Set延迟队列

核心原理

// 投递延迟任务(延迟30分钟)
long delaySeconds = 1800;
jedis.zadd("delay_queue", 
    System.currentTimeMillis() + delaySeconds*1000, 
    taskJson);

// 轮询处理
Set<String> tasks = jedis.zrangeByScore(
    "delay_queue", 
    0, 
    System.currentTimeMillis()
);

应用场景
• 订单超时处理

• 支付回调重试

• 定时任务调度


方案4:Pub/Sub实时消息系统

核心原理

// 发布者
jedis.publish("stock_updates", 
    JSON.toJSONString(stockData));

// 订阅者
JedisPubSub subscriber = new JedisPubSub() {
    @Override
    public void onMessage(String channel, String message) {
        handleRealTimeUpdate(message);
    }
};
jedis.subscribe(subscriber, "stock_updates");

核心特性
• 广播模式:一对多实时消息推送

• 模式匹配:支持通配符订阅(如news.*

• 低延迟:微秒级消息传递


四、Java实战代码示例

4.1 List队列完整实现

public class ListQueue {
    private static final String KEY = "list_queue";
    private Jedis jedis;

    public ListQueue() {
        this.jedis = new Jedis("localhost", 6379);
    }

    // 生产者
    public void produce(String task) {
        jedis.lpush(KEY, task);
    }

    // 消费者(阻塞模式)
    public String consume() {
        while (true) {
            List<String> result = jedis.brpop(0, KEY);
            if (result != null && !result.isEmpty()) {
                return result.get(1);
            }
        }
    }
}

4.2 Stream队列消费者组

public class StreamQueue {
    private static final String STREAM_KEY = "stream_queue";
    private static final String GROUP_NAME = "order_group";
    private Jedis jedis;

    public StreamQueue() {
        this.jedis = new Jedis("localhost", 6379);
        createConsumerGroup();
    }

    private void createConsumerGroup() {
        try {
            jedis.xgroupCreate(STREAM_KEY, GROUP_NAME, "0");
        } catch (Exception e) {
            // 组已存在
        }
    }

    // 消费者处理
    public void processMessages() {
        while (true) {
            Map.Entry<String, String> entry = jedis.xreadGroup(
                GROUP_NAME, 
                "consumer1", 
                XReadGroupParams.xReadGroupParams().count(1).streamOffset(STREAM_KEY, ">"),
                STREAM_KEY
            ).get(0);

            String msgId = entry.getKey();
            Map<String, String> fields = EntryToMap(entry.getValue());

            processTask(fields);
            jedis.xack(STREAM_KEY, GROUP_NAME, msgId);
        }
    }

    private Map<String, String> EntryToMap(String value) {
        // 解析Stream消息格式
        return Arrays.stream(value.split(","))
            .map(entry -> entry.split("="))
            .collect(Collectors.toMap(a -> a[0], a -> a[1]));
    }
}

4.3 Pub/Sub实时通知

public class PubSubDemo {
    public static void main(String[] args) {
        // 发布者线程
        new Thread(() -> {
            try (Jedis jedis = new Jedis("localhost")) {
                for (int i = 0; i < 1000; i++) {
                    jedis.publish("realtime_alerts", 
                        String.format("{\"event\":\"alert\",\"id\":%d}", i));
                    Thread.sleep(100);
                }
            }
        }).start();

        // 订阅者线程
        new Thread(() -> {
            Jedis jedis = new Jedis("localhost");
            jedis.subscribe(new JedisPubSub() {
                @Override
                public void onMessage(String channel, String message) {
                    System.out.printf("[实时通知] %s: %s%n", channel, message);
                }
            }, "realtime_alerts");
        }).start();
    }
}

五、性能测试对比

测试环境
• 硬件:4核8G CentOS 7.9

• Redis版本:6.2.6(混合持久化)

• 客户端:Jedis 4.2.3

• 并发量:500线程

测试结果(单位:TPS)

方案吞吐量平均延迟CPU占用消息可靠性
List队列(无持久化)122,3000.8ms38%❌(重启丢失)
List队列(AOF)98,5001.5ms45%✔️(AOF每秒同步)
Stream队列85,6001.2ms45%✔️(ACK机制)
Sorted Set队列38,4002.7ms29%✔️(定时轮询)
Pub/Sub182,4500.4ms32%❌(离线丢失)

六、生产环境配置建议

  1. List队列持久化配置
# Redis.conf 配置示例
save 900 1     # 900秒内至少1次修改触发保存
save 300 10    # 300秒内至少10次修改
save 60 10000  # 60秒内至少10000次修改
appendonly yes
appendfsync everysec  # 每秒同步(性能与安全平衡)
  1. 混合持久化方案
// 关键业务数据双写保障
jedis.lpush("critical_task", taskJson);  // 写List
jedis.xadd("critical_stream", "*", "data", taskJson);  // 写Stream

七、选型决策树

需要持久化?
需要可靠消费?
需要广播消息?
Stream队列
Sorted Set队列
List队列
Pub/Sub

八、关键注意事项

  1. List队列持久化陷阱
    • 大Key风险:单List超过1GB会显著降低性能

• 持久化阻塞:AOF重写期间可能延迟飙升

• 解决方案:

// 拆分大List为多个子List
String listKey = "task_list_" + (taskId % 10);
jedis.lpush(listKey, taskJson);
  1. Stream消息过期策略
# 自动清理旧消息(保留最近1000条)
XTRIM order_stream MAXLEN ~ 1000

通过本文的完整分析,开发者可以明确:
• List队列的持久化能力完全依赖Redis服务端配置,需显式启用AOF/RDB

• Stream队列是唯一内置可靠持久化的方案,适合核心业务场景

• Pub/Sub仅适用于实时广播场景,需配合其他方案实现消息持久化

生产环境建议采用混合架构:
• 用Pub/Sub处理实时通知

• 用Stream处理关键业务数据

• 用List处理高吞吐量日志(需配置持久化)

• 用Sorted Set处理定时任务

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2383466.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OBOO鸥柏丨AI数字人触摸屏查询触控人脸识别语音交互一体机上市

OBOO鸥柏丨AI数字人触摸屏查询触控人脸识别语音交互一体机上市分析 OBOO鸥柏品牌推出的AI数字人触摸屏查询触控人脸识别语音交互一体机&#xff0c;是其在智能交互设备领域的又一创新产品。该一体机整合了触摸屏查询、AI人脸识别、AI声源定位语音麦克风&#xff0c;触控交互以…

第5天-python饼图绘制

一、基础饼图绘制(Matplotlib) 1. 环境准备 python 复制 下载 pip install matplotlib numpy 2. 基础饼图代码 python 复制 下载 import matplotlib.pyplot as plt# 数据准备 labels = [1, 2, 3, 4] sizes = [30, 25, 15, 30] # 各部分占比(总和建议100) colors…

2023 睿抗机器人开发者大赛CAIP-编程技能赛-本科组(国赛) 解题报告 | 珂学家

前言 题解 2023 睿抗机器人开发者大赛CAIP-编程技能赛-本科组(国赛)。 vp了下&#xff0c;题目挺好的&#xff0c;难度也适中&#xff0c;但是彻底红温了。 第二题&#xff0c;题意不是那么清晰&#xff0c; M i n ( K 1 , K 2 ) Min(K_1, K_2) Min(K1​,K2​)容易求&#x…

LabVIEW风机状态实时监测

在当今电子设备高度集成化的时代&#xff0c;设备散热成为关键问题。许多大型设备机箱常采用多个风机协同散热&#xff0c;确保系统稳定运行。一旦风机出现故障&#xff0c;若不能及时察觉&#xff0c;可能导致设备损坏&#xff0c;造成巨大损失。为满足对机箱内风机状态实时监…

十一、面向对象底层逻辑-Dubbo过滤器Filter接口

一、引言&#xff1a;分布式系统中的可观测性与治理基石 在分布式服务调用链路中&#xff0c;如何在服务调用前后植入通用逻辑&#xff08;如日志记录、权限校验、性能监控等&#xff09;&#xff0c;是构建可观测、可治理系统的关键需求。Dubbo通过Filter接口实现了面向切面编…

linux安装nginx和前端部署vue项目

1、打包前端项目 npm run build 执行完后会在根目录下生成一个dist文件夹&#xff0c;这个dist文件夹就是我们后面要部署到nginx的东西。 2、将dist文件夹上传到服务器中 自己建一个目录&#xff0c;上传即可&#xff08;尽量不要在root目录下&#xff0c;可能涉及权限问题…

软件设计师“数据流图”真题考点分析——求三连

数据流图考点分析 1. 考点分值占比与趋势分析 综合知识题分值统计表 年份考题数量分值分值占比考察重点2018111.33%数据流图基本元素2019222.67%数据流图绘制原则2020111.33%数据流图与控制流图的区别2021334.00%数据字典与数据流图的关系2022222.67%分层数据流图的分解原则…

基于R语言的贝叶斯网络模型实践技术应用:开启科研新视角

在现代科研领域&#xff0c;变量间的因果关系推断是生态学、环境科学、医学等多学科研究的核心问题。然而&#xff0c;传统的统计学方法往往只能揭示变量间的相关关系&#xff0c;而非因果关系。贝叶斯网络作为一种结合图论与统计学理论的新型模型&#xff0c;不仅能够统合多种…

【Git】远程操作

Git 是一个分布式版本控制系统 可以简单理解为&#xff0c;每个人的电脑上都是一个完整的版本库&#xff0c;这样在工作时&#xff0c;就不需要联网 了&#xff0c;因为版本库就在自己的电脑上。 因此&#xff0c; 多个人协作的方式&#xff0c;譬如说甲在自己的电脑上改了文件…

DeepSpeed简介及加速模型训练

DeepSpeed是由微软开发的开源深度学习优化框架&#xff0c;专注于大规模模型的高效训练与推理。其核心目标是通过系统级优化技术降低显存占用、提升计算效率&#xff0c;并支持千亿级参数的模型训练。 官网链接&#xff1a;deepspeed 训练代码下载&#xff1a;git代码 一、De…

openlayer:10点击地图上某些省份利用Overlay实现提示省份名称

实现点击地图上的省份&#xff0c;在点击经纬度坐标位置附近利用Overlay实现提示框提示相关省份名称。本文介绍了如何通过OpenLayers库实现点击地图上的省份&#xff0c;并在点击的经纬度坐标位置附近显示提示框&#xff0c;提示相关省份名称。首先&#xff0c;定义了两个全局变…

upload-labs通关笔记-第13关 文件上传之白名单POST法

目录 一、白名单过滤 二、%00截断 1.截断原理 2、截断条件 &#xff08;1&#xff09;PHP版本 < 5.3.4 &#xff08;2&#xff09;magic_quotes_gpc配置为Off &#xff08;3&#xff09;代码逻辑存在缺陷 三、源码分析 1、代码审计 &#xff08;1&#xff09;文件…

数据库健康监测器(BHM)实战:如何通过 HTML 报告识别潜在问题

在数据库运维中,健康监测是保障系统稳定性与性能的关键环节。通过 HTML 报告,开发者可以直观查看数据库的运行状态、资源使用情况与潜在风险。 本文将围绕 数据库健康监测器(Database Health Monitor, BHM) 的核心功能展开分析,结合 Prometheus + Grafana + MySQL Export…

Oracle 11g 单实例使用+asm修改主机名导致ORA-29701 故障分析

解决 把服务器名修改为原来的&#xff0c;重启服务器。 故障 建表空间失败。 分析 查看告警日志 ORA-1119 signalled during: create tablespace splex datafile ‘DATA’ size 2000M… Tue May 20 18:04:28 2025 create tablespace splex datafile ‘DATA/option/dataf…

OpenCV CUDA模块图像过滤------用于创建一个最大值盒式滤波器(Max Box Filter)函数createBoxMaxFilter()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 createBoxMaxFilter()函数创建的是一个 最大值滤波器&#xff08;Maximum Filter&#xff09;&#xff0c;它对图像中每个像素邻域内的像素值取最…

Redis数据库-消息队列

一、消息队列介绍 二、基于List结构模拟消息队列 总结&#xff1a; 三、基于PubSub实现消息队列 (1)PubSub介绍 PubSub是publish与subscribe两个单词的缩写&#xff0c;见明知意&#xff0c;PubSub就是发布与订阅的意思。 可以到Redis官网查看通配符的书写规则&#xff1a; …

破解充电安全难题:智能终端的多重防护体系构建

随着智能终端的普及&#xff0c;充电安全问题日益凸显。从电池过热到短路起火&#xff0c;充电过程中的安全隐患不仅威胁用户的生命财产安全&#xff0c;也制约了行业的发展。如何构建一套高效、可靠的多重防护体系&#xff0c;成为破解充电安全难题的关键。通过技术创新和系统…

apptrace 三大策略,助力电商 App 在 618 突围

随着 5 月 13 日 “618” 电商大促预售战的打响&#xff0c;各大平台纷纷祭出百亿补贴、消费券等大招&#xff0c;投入超百亿流量与数十亿现金&#xff0c;意图在这场年度商战中抢占先机。但这场流量争夺战远比想象中艰难&#xff0c;中国互联网络信息中心数据显示&#xff0c;…

SuperVINS:应对挑战性成像条件的实时视觉-惯性SLAM框架【全流程配置与测试!!!】【2025最新版!!!!】

一、项目背景及意义 SuperVINS是一个改进的视觉-惯性SLAM&#xff08;同时定位与地图构建&#xff09;框架&#xff0c;旨在解决在挑战性成像条件下的定位和地图构建问题。该项目基于经典的VINS-Fusion框架&#xff0c;但通过引入深度学习方法进行了显著改进。 视觉-惯性导航系…

Node-Red通过开疆智能Profinet转ModbusTCP采集西门子PLC数据配置案例

一、内容简介 本篇内容主要介绍Node-Red通过node-red-contrib-modbus插件与开疆智能ModbusTCP转Profinet设备进行通讯&#xff0c;这里Profinet转ModbusTCP网关作为从站设备&#xff0c;Node-Red作为主站分别从0地址开始读取10个线圈状态和10个保持寄存器&#xff0c;分别用Mo…