[Redis小技巧10]深入 Redis Stream:从原理到生产级实践

news2026/3/14 18:35:25
一、Stream 是什么为什么需要它Redis Stream 是 Redis 5.0 引入的一种持久化、可追加、支持消费者组的消息队列数据结构。它解决了传统LIST缺乏消息确认和PUB/SUB非持久化、无重试机制在构建可靠消息系统时的短板。1. 与 List 和 Pub/Sub 的对比特性LISTPUB/SUBSTREAM消息持久化✅但无元数据❌✅带 ID、时间戳、字段值多消费者支持❌竞争消费✅广播✅通过消费组实现负载均衡消息确认ACK❌❌✅消息回溯❌需自行维护❌✅按 ID 或时间范围阻塞读取✅BLPOP✅✅XREAD BLOCK结论Stream 是 Redis 中唯一原生支持“可靠消息队列”语义的数据结构。二、Stream 底层原理Stream 基于Radix Tree Listpack实现Entry ID格式为毫秒时间戳-序列号如1710234567890-0保证全局有序。内部存储每个节点是一个Listpack紧凑型内存结构存储多个字段-值对。索引优化Radix Tree 快速定位 ID 范围支持高效范围查询XRANGE。这种设计在高吞吐写入与低内存占用之间取得平衡适合日志、事件等高频写场景。三、核心命令详解下表归纳了最常用命令及其复杂度命令作用时间复杂度典型用途XADD key id field value [field value ...]向 Stream 追加消息O(1)生产者写入事件XREAD [BLOCK ms] STREAMS key id读取消息支持阻塞O(NM)N流数M返回消息数消费者拉取消息XRANGE key start end [COUNT n]按 ID 范围查询O(N)N返回消息数调试、回溯XDEL key id [id ...]删除消息仅标记不释放内存O(1) per ID清理敏感数据XGROUP CREATE key groupname id [MKSTREAM]创建消费组O(1)初始化消费者组XREADGROUP GROUP group consumer STREAMS key 从消费组读取新消息O(NM)消费组消费XACK key group id [id ...]确认消息已处理O(1) per ID避免重复消费XPENDING key group [start end count] [consumer]查看挂起消息O(N)监控未 ACK 消息XCLAIM key group new_consumer min_idle_time id [id ...] [IDLE ms] [TIME unix-time-ms]将消费组中处于 Pending Entries ListPEL中的消息从原消费者转移给新消费者常用于故障恢复或消息重试O(N M)其中 N 是待认领的消息数量M 是 PEL 中需更新的元数据开销通常视为 O(1) 每条消息当某个消费者宕机或处理超时时由其他消费者主动接管其未 ACK 的消息实现高可用消费也可用于手动重试积压消息提示表示“只读取新消息”0表示“从头开始”。四、消费组Consumer Group机制详解消费组是 Redis Stream 实现多消费者协作消费的核心。1. 关键概念Group逻辑分组每个 Stream 可有多个 Group。Consumer组内具体消费者由名字标识自动注册。Pending Entries List (PEL)记录已分发但未 ACKAcknowledgment确认 的消息。Last Delivered ID组内最后分发的 ID用于恢复消费位点。2. 消息生命周期生产者XADD写入消息。消费者调用XREADGROUP获取消息消息进入 PEL。消费成功 →XACK消息从 PEL 移除。消费失败/超时 → 其他消费者可通过XPENDINGXCLAIM接管消息。3. 故障恢复若消费者宕机其 PEL 中的消息可被其他消费者通过XCLAIM接管。重启后可通过XREADGROUP从0或继续消费取决于业务需求。4. Stream 消费组消息流转5. 消息确认与重试机制五、典型应用场景1. 微服务异步通信场景订单服务 → 库存服务 → 通知服务优势解耦、削峰、失败重试架构每个服务作为独立 Consumer Group确保消息不丢失2. 实时日志收集场景前端埋点 → Stream → 日志分析服务优势高吞吐写入、按时间回溯、支持多分析任务并行消费3. 事件溯源Event Sourcing场景用户操作流注册→登录→支付作为不可变事件存入 Stream优势天然有序、可重放、支持状态重建六、核心命令实操记录1. 创建 Stream 并写入初始数据首先创建一个名为app_logs的 Stream并向其中写入几条日志消息# 写入 3 条日志消息XADD app_logs * levelINFOserviceusereventloginuser_id1001XADD app_logs * levelERRORserviceordereventtimeoutorder_id5001XADD app_logs * levelWARNservicecacheeventmisskeyprofile:1001假设返回的 Entry ID 分别为1710432000000-01710432000001-01710432000002-0可以使用XRANGE app_logs - 查看所有已写入的消息以确认数据正确无误。2. 创建消费组为了实现多消费者的负载均衡与消息确认机制我们需要为app_logs创建一个消费组# 删除旧组如果存在XGROUP DESTROY app_logs alert_group# 重新创建从头开始读取所有消息XGROUP CREATE app_logs alert_group0注意使用0表示从第一条消息开始消费若想仅处理新消息则应使用$。3. 使用XREADGROUP拉取消息接下来我们可以用XREADGROUP从消费组中拉取消息。这里我们将模拟consumer-A消费者的行为XREADGROUP ... 返回结果示例XREADGROUP GROUP alert_group consumer-A COUNT2STREAMS app_logs1)1)app_logs# Stream 名称2)1)1)1710432000000-0# 消息 ID 12)1)level2)INFO3)service4)user5)event6)login7)user_id8)10012)1)1710432000001-0# 消息 ID 22)1)level2)ERROR3)service4)order5)event6)timeout7)order_id8)5001数据结构解析XREADGROUP的返回是一个嵌套数组包含Stream 名称如app_logs消息列表每条消息由一个 ID 和字段-值对组成例如1)1710432000000-0# 消息 ID2)1)level2)INFO3)service4)user...这意味着每条消息都带有一个唯一的 ID 和若干键值对字段。XREADGROUP ... 0返回结果示例XREADGROUP GROUP alert_group consumer-A COUNT2STREAMS app_logs01)1)app_logs# Stream 名称2)1)1)1710432000000-0# 消息 ID 12)1)level2)INFO3)service4)user5)event6)login7)user_id8)10012)1)1710432000001-0# 消息 ID 22)1)level2)ERROR3)service4)order5)event6)timeout7)order_id8)5001特性XREADGROUP ... XREADGROUP ... 0或具体 ID消息来源Stream 中尚未被该消费组消费过的新消息消费组 PELPending Entries List中已分发但未 ACK 的消息是否进入 PEL✅ 是新消息首次分配自动加入 PEL❌ 否消息已在 PEL 中只是重新读取是否支持负载均衡✅ 是Redis 自动分配给不同消费者❌ 否只能读取属于指定消费者的 PEL 消息典型用途正常消费流程主路径故障恢复 / 重试异常路径能否读到历史消息取决于XGROUP CREATE时的起始 ID- 若为0→ 能- 若为$→ 不能不能除非之前已用拉取过并未 ACK重复调用结果每次返回新的未消费消息每次返回相同的未 ACK 消息4. 查看 Pending Entries List (PEL)执行完XREADGROUP后这两条消息已被加入 PELPending Entries List表示它们正在被处理但尚未确认。XPENDING app_logs alert_group返回1)(integer)2# 共 2 条未 ACK2)1710432000000-0# 最早 ID3)1710432000001-0# 最晚 ID4)1)1)consumer-A2)2# consumer-A 有 2 条挂起再查看具体挂起的消息详情XPENDING app_logs alert_group - 10返回1)1)1710432000000-02)consumer-A3)(integer)125000# 空闲毫秒数约 125 秒4)(integer)1# 已投递 1 次2)1)1710432000001-02)consumer-A3)(integer)1250004)(integer)15. 成功处理后调用XACK假设第一条消息处理成功我们可以调用XACK来确认这条消息XACK app_logs alert_group1710432000000-0返回(integer) 1表示 1 条确认成功再次检查 PELXPENDING app_logs alert_group现在只剩一条未确认消息1)(integer)12)1710432000001-03)1710432000001-04)1)1)consumer-A2)16. 模拟失败 —— 使用XCLAIM接管假设consumer-A宕机可以让consumer-B接管超时未处理的消息# 接管空闲超过 100 秒的消息XCLAIM app_logs alert_group consumer-B1000001710432000001-0返回1)1)1710432000001-02)1)level2)ERROR3)service4)order5)event6)timeout7)order_id8)5001此时consumer-B应该处理这条消息并在完成后调用XACKXACK app_logs alert_group1710432000001-0最终PEL 应为空XPENDING app_logs alert_group# 返回: (integer) 0总结通过上述步骤展示了如何使用XREADGROUP及其相关命令来实现高效的 Redis Stream 消息消费流程。关键点包括创建 Stream 和消费组确保 Stream 存在且配置正确的消费组。使用XREADGROUP拉取消息每次拉取时消息会进入 PEL等待确认。监控 Pending Entries List定期运行XPENDING及时发现并处理积压消息。故障恢复与重试利用XCLAIM实现消费者宕机后的消息接管保障系统高可用性。七、高频面试题Q1Stream 的消息 ID 是如何生成的可以自定义吗答默认格式为毫秒时间戳-序列号如1710234567890-0。可通过XADD key * ...自动生成也可手动指定但必须大于当前最大 ID否则报错。Q2消费组中的消息未 ACK 会怎样答消息会保留在 Pending Entries List (PEL) 中不会被再次分发给同一组的其他消费者除非使用XCLAIM主动接管。长期未 ACK 可能导致内存堆积。Q3如何监控 Stream 的积压情况答使用XPENDING key group查看挂起消息数量和分布结合XINFO STREAM key查看总长度和消费者组信息。Q4Stream 支持消息 TTL 吗答不直接支持。但可通过XADD ... MAXLEN ~ N限制长度近似滑动窗口或定期用XTRIM手动清理旧消息。Q5XREAD 和 XREADGROUP 有什么区别答XREAD是普通读取无消费组语义XREADGROUP必须指定 Group 和 Consumer会将消息加入 PEL 并支持 ACK适用于多消费者协作场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2411998.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…