用Redis的List实现消息队列

news2025/5/15 6:57:31

介绍如何在 Spring Boot 中使用 Redis List 的 BRPOPLPUSH命令来实现一个线程安全且可靠的消息队列。

整合Redis

整合Redis

用Redis的List实现消息队列

Redis的List相关指令

**「LPUSH key element [element ...]」**把元素插入到 List 的首部,如果 List 不存在,会自动创建。

BRPOPLPUSH source destination timeout

移除并且返回 List (source)尾部的最后一个元素,并且同时会把这个元素插入到另一个 List (destination)的首部。

当 source List 中没有元素时,Redis 会阻塞连接,直到有其他客户端向其推送元素或超时。超时时间(秒)为 0 表示永远不超时。

注意,这个命令是 「原子性」 的,也就是说只要客户端获取到了返回的元素,那么这个元素一定就会在 destination List 有备份。这是实现可靠消息队列的关键!

**「RPOPLPUSH source destination」**同上,它是 BRPOPLPUSH 命令的 「非阻塞」 版,如果 List 中没有元素就会立即返回 null

**「LREM key count element」**从 List 中删除元素,count 的值不同,删除的方式也不同:

  • count > 0:从头到尾开始搜索,删除与 element 相等的元素,最多删除 count 个。
  • count < 0:从尾到头开始搜索,删除与 element 相等的元素,最多删除 count (绝对值)个。
  • count = 0:删除所有与元素相等的元素。

实现思路

一个简单易用且可靠的消息队列:

  1. 生产者使用 LPUSH 命令往消息队列生产消息
  2. 消费者使用 BRPOPLPUSH 命令从队列消费消息,并且还会在获取并返回消息的时候把该消息推送到另一个消息队列,也就是 Pending 队列,这个队列中存储的就是未被消费者 ACK 的消息
  3. 消费者成功消费完毕后,使用 LREM 命令从 Pending 队列中删除这条消息,整个消费过程结束
  4. 如果消费者在消费过程中出现异常、宕机,那么需要在恢复后从 Pending 队列中获取到这条消息,再进行重新消费,从而保证了消息队列的可靠性,不会丢失消息(可能存在重复消费,需要做好幂等处理)

在 Spring Boot 中实现

Redis 队列 Key 常量

/**
 * Redis 常量
 */
public class RedisConstants {

    // 消息队列
    public static final String QUEUE_NAME = "queue_orders";

    // pending 队列,即待确认消息的队列
    public static final String PENDINGQUEUE_NAME = "pending_queue_orders";
}

消费者

创建一个 OrderConsumer Bean 模拟从队列中消费订单 ID。

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import com.keepc.springredis.config.JsonRedisTemplate;
import com.keepc.springredis.constant.RedisConstants;

/**
 * 模拟从队列中消费订单 ID
 * OrderConsumer 实现了 ApplicationRunner 接口,在应用就绪后创建新的消费线程进行消费。
 */
@Component
public class OrderConsumer implements ApplicationRunner, Runnable {

    static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);

    @Autowired
    JsonRedisTemplate jsonRedisTemplate;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 应用启动后,创建新的线程来执行消费任务
        Thread thread = new Thread(this);
        thread.setName("order-consumer-thread");
        thread.start();
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 1:消费者,从队列未弹出消息,并推送到 pending 队列,整个过程是原子性的
                // 最多阻塞 5 秒,超过 5 秒后还没有消息,则返回 null
                // stringRedisTemplate.opsForList().rightPopAndLeftPush 该方法底层调用的正是 brpoplpush 命令
                Object item = jsonRedisTemplate.opsForList().rightPopAndLeftPush(RedisConstants.QUEUE_NAME,
                        RedisConstants.PENDINGQUEUE_NAME, 5,
                        TimeUnit.SECONDS);

                if (item == null) {
                    log.info("等待消息 ...");
                    continue;
                }

                try {

                    // 2:解析为 Long
                    Long orderId = Long.parseLong(String.valueOf(item));

                    // 模拟消息消费
                    log.info("消费消息: {}", orderId);

                } catch (Exception e) {
                    log.error("消费异常:{}", e.getMessage());
                    continue;
                }

                // 3:消费成功,从 pending 队列删除记录,相当于确认消费
                // 底层正是 LREM 命令
                jsonRedisTemplate.opsForList().remove(RedisConstants.PENDINGQUEUE_NAME, 0, item);
            } catch (Exception e) {
                log.error("队列监听异常:{}", e.getMessage());
                break;
            }
        }
        log.info("退出消费");
    }
}

OrderConsumer 实现了 ApplicationRunner 接口,在应用就绪后创建新的消费线程进行消费。

stringRedisTemplate.opsForList().rightPopAndLeftPush 方法从 queue 队列消费一条消息,同时把消息添加到 pendingQueue 队列。该方法底层调用的正是 brpoplpush 命令,最多阻塞 5 秒,超时后返回 null

得到消息后解析为 Long 类型,模拟消费,即输出到日志。如果消费成功,则调用 stringRedisTemplate.opsForList().remove 方法(底层正是 LREM 命令)从 pendingQueue 队列中删除消息。如果消费失败,失败的消息会在 pendingQueue 队列中继续存在,不会丢失,可以重新投递消费或者是人工处理。

生产者

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.keepc.springredis.config.JsonRedisTemplate;
import com.keepc.springredis.constant.RedisConstants;

/**
 * 订单生产者
 */
@Component
public class OrderProducer {

    static final Logger log = LoggerFactory.getLogger(OrderProducer.class);

    @Autowired
    JsonRedisTemplate jsonRedisTemplate;

    /**
     * 发送订单到队列
     *
     * @param orderId 订单号
     */
    public void send(Long orderId) {
        if (orderId == null) {
            return;
        }
        log.info("发送订单到队列:{}", orderId);
        jsonRedisTemplate.opsForList().rightPush(RedisConstants.QUEUE_NAME, orderId);
    }
}

测试

@SpringBootTest
class SpringRedisApplicationTests {

	static final Logger logger = LoggerFactory.getLogger(SpringRedisApplicationTests.class);

	// 注入 JsonRedisTemplate
	@Autowired
	JsonRedisTemplate jsonRedisTemplate;

	@Autowired
	OrderProducer orderProducer;

	/**
	 * 进行消息队列测试的函数。
	 * 测试中,向消息队列发送四个随机长整型消息,然后等待3秒。
	 * 使用了JUnit的@Test注解标识这是一个测试方法,并通过@Timeout注解设置了方法执行的最长时间为10秒。
	 */
	@Test
	@Timeout(value = 10, unit = TimeUnit.SECONDS)
	public void mqTest() throws InterruptedException {
		// 发送一个空消息到消息队列
		orderProducer.send(null);
		// 创建一个随机数生成器
		Random random = new Random();
		// 分别发送三个随机生成的长整型消息到消息队列
		orderProducer.send(random.nextLong(0L, 10000000L));
		orderProducer.send(random.nextLong(0L, 10000000L));
		orderProducer.send(random.nextLong(0L, 10000000L));
		// 暂停3秒,以便观察消息队列处理情况
		TimeUnit.SECONDS.sleep(3L);
	}

}

结果:

2024-04-26T16:47:52.626+08:00  INFO 5456 --- [           main] c.k.springredis.mqByList.OrderProducer   : 发送订单到队列:6514813
2024-04-26T16:47:52.706+08:00  INFO 5456 --- [           main] c.k.springredis.mqByList.OrderProducer   : 发送订单到队列:1380282
2024-04-26T16:47:52.742+08:00  INFO 5456 --- [           main] c.k.springredis.mqByList.OrderProducer   : 发送订单到队列:5429620
2024-04-26T16:47:52.905+08:00  INFO 5456 --- [consumer-thread] c.k.springredis.mqByList.OrderConsumer   : 消费消息: 6514813
2024-04-26T16:47:52.925+08:00  INFO 5456 --- [consumer-thread] c.k.springredis.mqByList.OrderConsumer   : 消费消息: 5429620
2024-04-26T16:47:52.935+08:00  INFO 5456 --- [consumer-thread] c.k.springredis.mqByList.OrderConsumer   : 消费消息: 1380282
2024-04-26T16:47:55.934+08:00  WARN 5456 --- [ionShutdownHook] d.r.c.l.LettucePoolingConnectionProvider : LettucePoolingConnectionProvider contains unreleased connections
2024-04-26T16:47:55.973+08:00 ERROR 5456 --- [consumer-thread] c.k.springredis.mqByList.OrderConsumer   : 队列监听异常:Redis exception
2024-04-26T16:47:55.973+08:00  INFO 5456 --- [consumer-thread] c.k.springredis.mqByList.OrderConsumer   : 退出消费

Redis 的持久化方式

Redis 是一个内存数据库,为了保证数据的安全不丢失,它提供了两种数据备份(持久化)方式,即 「RDB」「AOF」

  • 「RDB」:生成某一时刻的数据快照,通过子进程进行备份,数据可能不完整(取决于备份周期)。RDB 是 Redis 默认的持久化方式。它会在特定的时间间隔内将内存中的数据集快照写入磁盘,生成一个dump.rdb文件。
  • 「AOF」:通过记录执行的指令到文件来实现数据备份,相对完整性较高,但是会记录每一条执行命令,性能会有一定影响。

这就需要根据你的业务场景来选择合适的持久化方式,也可以同时配合使用 「RDB」「AOF」 两种方式,兼顾性能和数据安全。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2375890.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++】类与对象【下】

文章目录 再谈构造函数构造函数的赋值构造函数体赋值&#xff1a;初始化列表explicit关键字 static成员概念特性 C11中成员初始化的新玩法友元友元类 内部类概念 再谈构造函数 构造函数的赋值 构造函数体赋值&#xff1a; 在创建对象时&#xff0c;编译器会通过调用构造函数…

无人机避障——如何利用MinumSnap进行对速度、加速度进行优化的轨迹生成(附C++python代码)

&#x1f525;轨迹规划领域的 “YYDS”——minimum snap&#xff01;作为基于优化的二次规划经典&#xff0c;它是无人机、自动驾驶轨迹规划论文必引的 “开山之作”。从优化目标函数到变量曲线表达&#xff0c;各路大神疯狂 “魔改”&#xff0c;衍生出无数创新方案。 &#…

Llama:开源的急先锋

Llama:开源的急先锋 Llama1&#xff1a;开放、高效的基础语言模型 Llama1使用了完全开源的数据&#xff0c;性能媲美GPT-3&#xff0c;可以在社区研究开源使用&#xff0c;只是不能商用。 Llama1提出的Scaling Law 业内普遍认为如果要达到同一个性能指标&#xff0c;训练更…

“redis 目标计算机积极拒绝,无法连接” 解决方法,每次开机启动redis

如果遇到以上问题 先打开“服务” 找到App Readiness 右击-启动 以管理员身份运行cmd&#xff0c;跳转到 安装redis的目录 运行&#xff1a;redis-server.exe redis.windows.conf 以管理员身份打开另一cmd窗口&#xff0c;跳转到安装redis的目录 运行&#xff1a;redis-…

LeetCode 热题 100 35.搜索插入位置

目录 题目&#xff1a; 题目描述&#xff1a; 题目链接&#xff1a; 思路&#xff1a; 核心思路&#xff1a; 思路详解&#xff1a; 代码&#xff1a; Java代码&#xff1a; 题目&#xff1a; 题目描述&#xff1a; 题目链接&#xff1a; 35. 搜索插入位置 - 力扣&…

从 “学会学习” 到高效适应:元学习技术深度解析与应用实践

一、引言&#xff1a;当机器开始 “学会学习”—— 元学习的革命性价值 在传统机器学习依赖海量数据训练单一任务模型的时代&#xff0c;元学习&#xff08;Meta Learning&#xff09;正掀起一场范式革命。 这项旨在让模型 “学会学习” 的技术&#xff0c;通过模仿人类基于经验…

AI开发者的算力革命:GpuGeek平台全景实战指南(大模型训练/推理/微调全解析)

目录 背景一、AI工业化时代的算力困局与破局之道1.1 中小企业AI落地的三大障碍1.2 GpuGeek的破局创新1.3 核心价值 二、GpuGeek技术全景剖析2.1 核心架构设计 三、核心优势详解‌3.1 优势1&#xff1a;工业级显卡舰队‌‌‌3.2 优势2&#xff1a;开箱即用生态‌3.2.1 预置镜像库…

AWS SNS:解锁高并发消息通知与系统集成的云端利器

导语 在分布式系统架构中&#xff0c;如何实现高效、可靠的消息通知与跨服务通信&#xff1f;AWS Simple Notification Service&#xff08;SNS&#xff09;作为全托管的发布/订阅&#xff08;Pub/Sub&#xff09;服务&#xff0c;正在成为企业构建弹性系统的核心组件。本文深度…

【PmHub后端篇】PmHub集成 Sentinel+OpenFeign实现网关流量控制与服务降级

在微服务架构中&#xff0c;保障服务的稳定性和高可用性至关重要。本文将详细介绍在 PmHub 中如何利用 Sentinel Gateway 进行网关限流&#xff0c;以及集成 Sentinel OpenFeign 实现自定义的 fallback 服务降级。 1 熔断降级的必要性 在微服务架构中&#xff0c;服务间的调…

2025最新出版 Microsoft Project由入门到精通(八)

目录 查找关键路径方法 方法1:格式->关键任务 方法2:插入关键属性列 方法3&#xff1a;插入“可宽延的总时间”进行查看&#xff0c;>0不是关键路径&#xff0c;剩余的全是关键路径 方法4:设置关键路径的工作表的文本样式​编辑 方法5&#xff1a;突出显示/筛选器…

3.0/Q2,Charls最新文章解读

文章题目&#xff1a;Development of a visualized risk prediction system for sarcopenia in older adults using machine learning: a cohort study based on CHARLS DOI&#xff1a;10.3389/fpubh.2025.1544894 中文标题&#xff1a;使用机器学习开发老年人肌肉减少症的可视…

使用matlab进行数据拟合

目录 一、工作区建立数据 二、曲线拟合器(在"APP"中) 三、曲线拟合函数及参数 四、 在matlab中编写代码 一、工作区建立数据 首先&#xff0c;将数据在matlab工作区中生成。如图1所示&#xff1a; 图 1 二、曲线拟合器(在"APP"中) 然后&#xff0c;…

分布式1(cap base理论 锁 事务 幂等性 rpc)

目录 分布式系统介绍 一、定义与概念 二、分布式系统的特点 三、分布式系统面临的挑战 四、分布式系统的常见应用场景 CAP 定理 BASE 理论 BASE理论是如何保证最终一致性的 分布式锁的常见使用场景有哪些&#xff1f; 1. 防止多节点重复操作 2. 资源互斥访问 3. 分…

Myshell与清华联合开源TTS模型OpenVoiceV2,多语言支持,风格控制进一步增强~

项目背景 开发团队与发布 OpenVoice2 由 MyShell AI&#xff08;加拿大 AI 初创公司&#xff09;与 MIT 和清华大学的研究人员合作开发&#xff0c;技术报告于 2023 年 12 月发布 &#xff0c;V2 版本于 2024 年 4 月发布 。 项目目标是提供一个高效、灵活的语音克隆工具&…

YOLO11解决方案之热力图探索

概述 Ultralytics提供了一系列的解决方案,利用YOLO11解决现实世界的问题,包括物体计数、模糊处理、热力图、安防系统、速度估计、物体追踪等多个方面的应用。 使用YOLO11生成的热力图把复杂的数据转换成生动的彩色编码矩阵。这种可视化工具采用色谱来表示不同的数据值,暖色…

如何在终端/命令行中把PDF的每一页转换成图片(PNG)

今天被对象安排了一个任务&#xff1a; 之前自己其实也有这个需要&#xff0c;但是吧&#xff0c;我懒&#xff1a;量少拖拽&#xff0c;量大就放弃。但这次躲不过去了&#xff0c;所以研究了一下有什么工具可以做到这个需求。 本文记录我这次发现的使用 XpdfReader 的方法。…

计算机系统结构——Cache性能分析

一、实验目的 加深对Cache的基本概念、基本组织结构以及基本工作原理的理解。掌握Cache容量、相联度、块大小对Cache性能的影响。掌握降低Cache不命中率的各种方法以及这些方法对提高Cache性能的好处。理解LRU与随机法的基本思想以及它们对Cache性能的影响。 二、实验平台 实…

GESP2023年12月认证C++八级( 第三部分编程题(2)大量的工作沟通)

参考程序&#xff1a; #include <cstdio> #include <cstdlib> #include <cstring> #include <algorithm> #include <string> #include <map> #include <iostream> #include <cmath> #include <vector> #include <qu…

015枚举之滑动窗口——算法备赛

滑动窗口 最大子数组和 题目描述 给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 原题链接 思路分析 见代码注解 代码 int maxSubArray(vector<int>& num…