模拟消息队列的消费逻辑-Java

news2026/4/30 16:27:40
分享一个大牛的人工智能教程。零基础通俗易懂风趣幽默希望你也加入到人工智能的队伍中来请轻击人工智能教程​​​​​https://www.captainai.net/troubleshooter这是一个生产级消息队列消费逻辑模拟重点突出线程模型、可靠性保证、背压控制、顺序消费、以及如何测试验证。一、需求澄清第一步你需要先问清楚场景消费语义最多一次At-most-once、至少一次At-least-once、精确一次Exactly-once顺序要求全局有序还是分区有序消息类型普通消息、延迟消息、事务消息异常处理消费失败后重试、死信、还是跳过性能指标预期QPS、消息大小例如我假设需要实现至少一次消费语义 分区有序支持重试死信用多线程模型提升吞吐。这是最经典的中间态设计。二、核心架构设计先画图再编码[生产者] → [Queue/Broker] → [Consumer Group] ↓ [Worker Pool] (多线程拉取) ↓ [Message Processor] [Ack机制] ↓ (失败时) [Retry Queue] → [Dead Letter Queue]三、完整Java实现1. 消息模型// Message.java import java.time.Instant; import java.util.UUID; public class Message { private final String id; private final String topic; private final String key; // 用于分区 private final byte[] body; private final Instant timestamp; private int retryCount; public Message(String topic, String key, byte[] body) { this.id UUID.randomUUID().toString(); this.topic topic; this.key key; this.body body; this.timestamp Instant.now(); this.retryCount 0; } // Getters and setters public String getId() { return id; } public String getTopic() { return topic; } public String getKey() { return key; } public byte[] getBody() { return body; } public Instant getTimestamp() { return timestamp; } public int getRetryCount() { return retryCount; } public void incrementRetry() { retryCount; } }2. 消费接口定义// MessageConsumer.java FunctionalInterface public interface MessageConsumer { /** * 消费消息 * return true表示消费成功false表示失败需重试 */ boolean consume(Message message); }3. 核心消费引擎// MessageQueueConsumer.java import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 模拟消息队列消费逻辑 * 特性 * - 线程池并发消费 * - 分区有序同一key的消息顺序处理 * - 失败重试 死信队列 * - 优雅关闭 */ public class MessageQueueConsumer { private static final Logger log LoggerFactory.getLogger(MessageQueueConsumer.class); private final String consumerGroup; private final MessageConsumer consumer; private final int concurrency; // 并发消费线程数 private final int maxRetries; // 最大重试次数 private final long retryDelayMs; // 重试延迟 private final int queueCapacity; // 内部队列容量 // 核心组件 private final BlockingQueueMessage messageQueue; private final ExecutorService workerPool; private final ScheduledExecutorService retryScheduler; private final AtomicBoolean running; private final ConcurrentHashMapString, ReentrantLock partitionLocks; // 死信队列 private final BlockingQueueMessage deadLetterQueue; // 监控指标 private final AtomicLong processedCount; private final AtomicLong successCount; private final AtomicLong failureCount; private final AtomicLong retryCount; public MessageQueueConsumer(String consumerGroup, MessageConsumer consumer, int concurrency, int maxRetries, long retryDelayMs) { this(consumerGroup, consumer, concurrency, maxRetries, retryDelayMs, 10000); } public MessageQueueConsumer(String consumerGroup, MessageConsumer consumer, int concurrency, int maxRetries, long retryDelayMs, int queueCapacity) { this.consumerGroup consumerGroup; this.consumer consumer; this.concurrency concurrency; this.maxRetries maxRetries; this.retryDelayMs retryDelayMs; this.queueCapacity queueCapacity; this.messageQueue new LinkedBlockingQueue(queueCapacity); this.workerPool Executors.newFixedThreadPool(concurrency); this.retryScheduler Executors.newScheduledThreadPool(concurrency / 2); this.running new AtomicBoolean(true); this.partitionLocks new ConcurrentHashMap(); this.deadLetterQueue new LinkedBlockingQueue(); this.processedCount new AtomicLong(0); this.successCount new AtomicLong(0); this.failureCount new AtomicLong(0); this.retryCount new AtomicLong(0); startWorkers(); } /** * 提交消息到消费队列 */ public void submit(Message message) { try { messageQueue.put(message); log.debug(Message submitted to queue: {}, message.getId()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error(Failed to submit message: {}, message.getId(), e); } } /** * 批量提交消息 */ public void submitAll(ListMessage messages) { for (Message msg : messages) { submit(msg); } } /** * 启动工作线程 */ private void startWorkers() { for (int i 0; i concurrency; i) { workerPool.submit(this::consumeLoop); } log.info(Consumer started with {} workers, concurrency); } /** * 主消费循环 */ private void consumeLoop() { while (running.get()) { try { Message message messageQueue.poll(100, TimeUnit.MILLISECONDS); if (message null) { continue; } // 保证分区有序 processWithPartitionOrder(message); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } } /** * 保证同一key的消息顺序处理 */ private void processWithPartitionOrder(Message message) { String lockKey message.getKey() ! null ? message.getKey() : message.getTopic(); ReentrantLock lock partitionLocks.computeIfAbsent(lockKey, k - new ReentrantLock()); lock.lock(); try { processMessage(message); } finally { lock.unlock(); } } /** * 实际消费逻辑带重试 */ private void processMessage(Message message) { processedCount.incrementAndGet(); try { boolean success consumer.consume(message); if (success) { successCount.incrementAndGet(); log.debug(Message consumed successfully: {}, message.getId()); } else { handleFailure(message); } } catch (Exception e) { log.error(Consumer threw exception for message: {}, message.getId(), e); handleFailure(message); } } /** * 处理消费失败 */ private void handleFailure(Message message) { failureCount.incrementAndGet(); if (message.getRetryCount() maxRetries) { // 需要重试 message.incrementRetry(); retryCount.incrementAndGet(); long delay calculateBackoffDelay(message.getRetryCount()); log.warn(Message {} failed (retry {}/{}), retrying in {}ms, message.getId(), message.getRetryCount(), maxRetries, delay); retryScheduler.schedule(() - { if (running.get()) { submit(message); } }, delay, TimeUnit.MILLISECONDS); } else { // 超过最大重试次数进死信队列 log.error(Message {} exceeded max retries, moving to DLQ, message.getId()); deadLetterQueue.offer(message); } } /** * 指数退避 抖动 */ private long calculateBackoffDelay(int retryCount) { long baseDelay retryDelayMs * (long) Math.pow(2, retryCount - 1); long jitter (long) (baseDelay * 0.1 * Math.random()); return Math.min(baseDelay jitter, 30000); // 最大30秒 } /** * 优雅关闭 */ public void shutdown() { log.info(Shutting down consumer...); running.set(false); workerPool.shutdown(); retryScheduler.shutdown(); try { if (!workerPool.awaitTermination(30, TimeUnit.SECONDS)) { workerPool.shutdownNow(); } if (!retryScheduler.awaitTermination(30, TimeUnit.SECONDS)) { retryScheduler.shutdownNow(); } } catch (InterruptedException e) { workerPool.shutdownNow(); retryScheduler.shutdownNow(); Thread.currentThread().interrupt(); } log.info(Consumer shutdown complete. Stats: processed{}, success{}, failure{}, retry{}, dlq{}, processedCount.get(), successCount.get(), failureCount.get(), retryCount.get(), deadLetterQueue.size()); } /** * 获取死信队列中的消息用于人工处理 */ public ListMessage getDeadLetters() { ListMessage deadLetters new ArrayList(); deadLetterQueue.drainTo(deadLetters); return deadLetters; } /** * 获取消费统计信息 */ public ConsumerStats getStats() { return new ConsumerStats( processedCount.get(), successCount.get(), failureCount.get(), retryCount.get(), deadLetterQueue.size(), messageQueue.size() ); } public static class ConsumerStats { private final long processed; private final long success; private final long failure; private final long retry; private final int deadLetterSize; private final int pendingSize; public ConsumerStats(long processed, long success, long failure, long retry, int deadLetterSize, int pendingSize) { this.processed processed; this.success success; this.failure failure; this.retry retry; this.deadLetterSize deadLetterSize; this.pendingSize pendingSize; } Override public String toString() { return String.format(Stats{processed%d, success%d, failure%d, retry%d, dlq%d, pending%d}, processed, success, failure, retry, deadLetterSize, pendingSize); } } }4. 背压控制版本高级特性// BackpressuredConsumer.java - 简化版 /** * 支持背压的消费者当下游处理不过来时自动限流 */ public class BackpressuredConsumer extends MessageQueueConsumer { private final Semaphore backpressureSemaphore; private final int maxInflight; public BackpressuredConsumer(String consumerGroup, MessageConsumer consumer, int concurrency, int maxRetries, long retryDelayMs, int maxInflight) { super(consumerGroup, consumer, concurrency, maxRetries, retryDelayMs); this.maxInflight maxInflight; this.backpressureSemaphore new Semaphore(maxInflight); } Override private void processMessage(Message message) { try { backpressureSemaphore.acquire(); try { super.processMessage(message); } finally { backpressureSemaphore.release(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }四、测试策略重点1. 基础功能测试Test void testBasicConsumption() throws InterruptedException { CountDownLatch latch new CountDownLatch(10); MessageConsumer mockConsumer msg - { latch.countDown(); return true; }; MessageQueueConsumer consumer new MessageQueueConsumer( test-group, mockConsumer, 2, 3, 100 ); for (int i 0; i 10; i) { consumer.submit(new Message(topic, key, (msg i).getBytes())); } assertTrue(latch.await(5, TimeUnit.SECONDS)); ConsumerStats stats consumer.getStats(); assertEquals(10, stats.getProcessed()); assertEquals(10, stats.getSuccess()); consumer.shutdown(); }2. 重试与死信测试Test void testRetryAndDeadLetter() throws InterruptedException { AtomicInteger attemptCount new AtomicInteger(0); MessageConsumer failingConsumer msg - { int attempt attemptCount.incrementAndGet(); return attempt 2; // 第3次成功 }; MessageQueueConsumer consumer new MessageQueueConsumer( test-group, failingConsumer, 1, 2, 10 // 最多重试2次 ); consumer.submit(new Message(topic, key, test.getBytes())); Thread.sleep(500); // 等待重试完成 ConsumerStats stats consumer.getStats(); assertEquals(1, stats.getProcessed()); // 只有1条消息 assertEquals(0, stats.getSuccess()); // 最终失败 assertEquals(1, stats.getDeadLetterSize()); // 进死信队列 assertEquals(2, stats.getRetry()); // 重试了2次 consumer.shutdown(); }3. 分区有序性测试关键Test void testPartitionOrdering() throws InterruptedException { ListInteger consumedOrder Collections.synchronizedList(new ArrayList()); MessageConsumer consumer msg - { int seq Integer.parseInt(new String(msg.getBody())); consumedOrder.add(seq); return true; }; MessageQueueConsumer queueConsumer new MessageQueueConsumer( test-group, consumer, 4, 0, 0 ); // 向两个不同key发送乱序消息 // key1: 1,2,3,4 // key2: 100,200,300 for (int i 1; i 4; i) { queueConsumer.submit(new Message(topic, key1, String.valueOf(i).getBytes())); } for (int i 100; i 300; i 100) { queueConsumer.submit(new Message(topic, key2, String.valueOf(i).getBytes())); } Thread.sleep(1000); // 验证每个分区内部的顺序 ListInteger key1Order consumedOrder.stream() .filter(n - n 4) .collect(Collectors.toList()); ListInteger key2Order consumedOrder.stream() .filter(n - n 100) .collect(Collectors.toList()); assertEquals(Arrays.asList(1,2,3,4), key1Order); assertEquals(Arrays.asList(100,200,300), key2Order); queueConsumer.shutdown(); }4. 背压与限流测试Test void testBackpressure() throws InterruptedException { AtomicInteger processingFlag new AtomicInteger(0); CountDownLatch processingStarted new CountDownLatch(1); MessageConsumer slowConsumer msg - { processingStarted.countDown(); processingFlag.incrementAndGet(); Thread.sleep(2000); // 模拟慢消费 return true; }; BackpressuredConsumer consumer new BackpressuredConsumer( test-group, slowConsumer, 1, 0, 0, 2 // 最多2个inflight ); // 快速提交100条消息 for (int i 0; i 100; i) { consumer.submit(new Message(topic, key, (msg i).getBytes())); } processingStarted.await(); // 验证内部队列已满背压生效 ConsumerStats stats consumer.getStats(); assertTrue(stats.getPendingSize() 100); // 队列应该有积压 consumer.shutdown(); }5. 并发安全性测试Test void testConcurrentSubmit() throws InterruptedException { MessageQueueConsumer consumer new MessageQueueConsumer( test-group, msg - true, 10, 0, 0 ); int threadCount 50; int messagesPerThread 1000; ExecutorService executor Executors.newFixedThreadPool(threadCount); CountDownLatch latch new CountDownLatch(threadCount); for (int t 0; t threadCount; t) { executor.submit(() - { for (int i 0; i messagesPerThread; i) { consumer.submit(new Message(topic, key i % 100, (msg i).getBytes())); } latch.countDown(); }); } latch.await(30, TimeUnit.SECONDS); Thread.sleep(2000); // 等待消费完成 ConsumerStats stats consumer.getStats(); assertEquals(threadCount * messagesPerThread, stats.getProcessed()); executor.shutdown(); consumer.shutdown(); }6. 性能基准测试Test void benchmarkThroughput() { MessageQueueConsumer consumer new MessageQueueConsumer( benchmark, msg - true, 8, 0, 0, 100000 ); int messageCount 500_000; long start System.nanoTime(); for (int i 0; i messageCount; i) { consumer.submit(new Message(topic, key (i % 100), (msg i).getBytes())); } // 等待消费完成 while (consumer.getStats().getProcessed() messageCount) { Thread.sleep(100); } long duration System.nanoTime() - start; double throughput messageCount * 1000.0 / (duration / 1_000_000); System.out.printf(Throughput: %.2f msg/sec%n, throughput); consumer.shutdown(); }五、进阶问题Q1如何保证Exactly-Once语义回答At-least-once 幂等消费。可以在Message中加入全局唯一ID下游系统用Redis或DB去重表记录已处理的ID。实现支持扩展示例如public class ExactlyOnceConsumer extends MessageQueueConsumer { private final SetString processedIds ConcurrentHashMap.newKeySet(); Override protected boolean tryConsume(Message message) { if (processedIds.contains(message.getId())) { return true; // 已处理过直接返回成功 } boolean success super.tryConsume(message); if (success) { processedIds.add(message.getId()); } return success; } }Q2消息积压时如何处理回答动态扩缩容监控队列长度超过阈值时自动增加worker线程数优先级队列高优消息优先处理流量控制发送端限流避免消费端被压垮分片扩展增加更多的分区通过水平扩展消费实例Q3如何测试消息不会丢失回答我会设计故障注入测试在消费过程中随机kill consumer线程模拟网络中断、DB连接超时验证重试机制是否正确触发消息最终是否被处理使用Chaos Monkey框架Q4如何评估这个消费系统的容量回答采用容量规划三步法单线程压测找出一条消息的平均处理时间线性推算需要多少并发达到目标QPS实际峰值模拟用生产流量回放找到拐点同时设置水位告警当队列深度 阈值或消费延迟 5秒时报警六、总结这个消费模型具备生产级可靠性分区有序保证业务正确性、重试死信处理异常、背压机制防止下游被打垮。同时提供了一套完整的测试策略覆盖功能、并发、性能和混沌场景。更重要的是这个设计是可观测的——暴露了关键的metrics方便监控和容量规划。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2565453.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…