从0到1搭建可靠消息链路:RocketMQ重试 + Redis幂等实战
三方消息对接为什么总翻车一套 RocketMQ Redis 幂等 的工业级解法含架构图伪代码 适合收藏三方平台对接、商品/订单同步、消息中台治理✅ 你将收获一套可直接落地的“拉取-发送-消费-重试-幂等-补偿”完整方案目录1. 真实痛点为什么“跑通了”还会出事故2. 一张图看全链路架构3. 从发送到消费代码级实现方案4. RocketMQ 在这套方案里的关键作用5. Redis 幂等到底怎么做才靠谱6. 三方 Token 过期的工程化兜底7. 最容易踩坑的 7 个点8. 可直接复用的伪代码模板9. FAQ高频问题10. 结语1. 真实痛点为什么“跑通了”还会出事故三方对接系统最常见的线上事故不是“接口不通”而是下游偶发超时整条链路阻塞同一条消息重复消费业务重复执行部分成功部分失败导致状态混乱Token 过期任务随机失败三方消息删除太早回溯困难。本质上是缺少一套“可靠消息 幂等治理”的系统设计。2. 一张图看全链路架构是否Scheduler/Task 定时任务ChannelAdapter 拉取三方消息分页结果 SearchResponseProducer 分片发送 MQRocketMQ BrokerConsumer 并发监听Transfer 统一入口Redis 幂等过滤模型转换 业务处理处理成功?写入幂等版本时间RECONSUME_LATER3. 从发送到消费代码级实现方案3.1 发送端分页拉取 双重分片 同步发送设计策略每页拉取结果立即发 MQ避免大事务堆积先按条数切分再按字节大小切分防超大消息同步发送失败立即可感知。伪代码voidexecuteTask(){RequestreqnewRequest(page1,pageSize100);while(true){SearchResponserespchannel.search(req);assertresp.success:channel call failed;// 双重分片数量 大小ListListItemchunkssplitByCountAndSize(resp.items,50,3*MB);for(ListItemchunk:chunks){mqProducer.send(topicTOPIC_ITEM,tagCHANNEL_X,bodywrap(chunk,accountInfo));}if(!resp.hasNext)break;req.page;req.extendParamresp.extendParam;// 透传上下文例如待删消息ID}}3.2 消费端失败即重试消费设计策略并发消费提高吞吐单条失败返回RECONSUME_LATERBroker 重投递同一条消息不依赖生产端重发。伪代码ConsumeStatusconsume(ListMessagemsgs){for(Messagemsg:msgs){if(!process(msg)){returnRECONSUME_LATER;}}returnCONSUME_SUCCESS;}4. RocketMQ 在这套方案里的关键作用4.1 解耦把“拉取三方”与“下游处理”拆开避免互相阻塞。4.2 弹性缓冲下游短时抖动时消息先堆在 Broker系统仍能继续拉取并入队。4.3 自动重试消费失败后由 Broker 重投递业务侧只需正确返回状态。4.4 至少一次投递语义这也是为什么必须做消费端幂等否则重复消费必出问题。5. Redis 幂等到底怎么做才靠谱5.1 推荐键模型脱敏idempotent:{tenant}:{account}:{topic}:{bizKey} - {bizKey}:{versionTs} TTL 24hbizKey业务唯一键如 SKU、订单号versionTs业务变更时间推荐用三方业务时间5.2 判定规则Redis 无记录处理有记录且currentTs redisTs跳过有记录且currentTs redisTs处理新版本5.3 伪代码MapString,LongcurrentbuildBizKeyVersionMap(items);ListStringskipKeysidempotentStore.calcSkip(current);ListItemtoProcessitems.stream().filter(i-!skipKeys.contains(i.bizKey())).toList();if(toProcess.isEmpty())returntrue;booleanokdoBusiness(toProcess);if(ok){idempotentStore.saveProcessed(versionMapOf(toProcess),Duration.ofHours(24));}returnok;6. 三方 Token 过期的工程化兜底线上常见错误码-100示例表示 token 过期。建议统一做“刷新 重试 1 次”。伪代码ResponsecallWithRetry(SupplierResponsecall){Responser1call.get();if(!isTokenExpired(r1))returnr1;if(!tokenService.refresh())returnr1;returncall.get();// only one retry}建议只重试一次避免无限循环放大故障。7. 最容易踩坑的 7 个点把字符串msgId当 Long 处理导致删除 ID 为空平台消息一直删不掉。只在任务开始检查 token运行中仍可能过期必须支持接口级自动刷新重试。幂等只按 bizKey不按版本时间会吞掉真实增量更新。versionTs用系统 now() 而非业务时间在部分场景会弱化幂等语义建议优先使用三方业务变更时间。误以为消费失败会触发“生产端重发”实际是 Broker 重投递同一条消息。删除三方消息时机过早建议延迟删除并与成功发送链路关联。没有 DLQ 回放机制超过最大重试后没有补偿能力线上恢复成本高。8. 可直接复用的伪代码模板8.1 幂等存储接口publicinterfaceIdempotentStore{ListStringcalcSkip(MapString,LongkeyToVersion);voidsaveProcessed(MapString,LongkeyToVersion,Durationttl);}8.2 消费处理器接口publicinterfaceMessageProcessorT{StringbizKey(Titem);LongversionTs(Titem);// 优先使用三方业务时间booleanhandle(ListTitems);}8.3 通用处理骨架booleanprocess(ListTitems){MapString,Longcurrentitems.stream().collect(toMap(this::bizKey,this::versionTs,Math::max));ListStringskipidempotentStore.calcSkip(current);ListTtodoitems.stream().filter(i-!skip.contains(bizKey(i))).toList();if(todo.isEmpty())returntrue;booleanokhandle(todo);if(ok){MapString,Longdonetodo.stream().collect(toMap(this::bizKey,this::versionTs,Math::max));idempotentStore.saveProcessed(done,Duration.ofHours(24));}returnok;}9. FAQ高频问题Q1三方消息删了MQ 消费失败还能重试吗能。只要消息已入 MQ重试依赖 Broker不依赖再去三方拉取。Q2为什么有时会看到重复消费RocketMQ 是至少一次投递重复是正常现象。消费端必须幂等。Q3幂等 TTL 设多久合适看业务重放窗口。常见 24h/48h若跨天补偿多建议更长并配合回放策略。Q4部分成功部分失败怎么办建议“成功项写幂等失败项走重试”并确保状态可追踪。10. 结语很多系统的问题不在“业务逻辑”而在“链路治理能力”。一套可长期稳定的三方同步系统核心是这 5 个词解耦、重试、幂等、补偿、可观测。如果你正在做三方对接这套 RocketMQ Redis 幂等方案基本可以作为你的默认基线。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2505069.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!