SpringBoot与RocketMQ深度整合:多连接配置与动态Topic处理实战
1. 为什么需要多连接与动态Topic处理在实际的企业级项目中我们使用消息队列的场景往往不是单一的。比如你的订单服务可能需要向一个RocketMQ集群发送订单创建消息同时你的物流服务又需要从另一个独立的RocketMQ集群订阅物流状态变更的消息。如果只用一套连接配置代码就会变得僵化难以维护和扩展。我遇到过不少项目初期为了图省事把所有消息都往一个连接里塞。结果业务发展起来不同业务线的消息量、重要性和SLA要求都不一样混在一起导致监控困难、故障隔离性差甚至一个业务的消息积压拖垮了整个应用的连接。所以多连接配置的核心价值在于隔离与解耦让不同的业务消息走独立的通道互不影响。而动态Topic处理则是为了解决另一个痛点业务逻辑的无限膨胀。想象一下如果你的系统有几十个甚至上百个Topic难道要为每一个Topic都写一个独立的消费者类然后在里面用一堆if-else来判断该执行哪段业务逻辑吗这样的代码不仅臃肿每次新增一个Topic都要改代码、发版违反了开闭原则。动态Topic处理的目标就是实现一种“约定大于配置”的机制让Topic与业务处理类自动关联新增业务只需新增一个实现类即可核心框架代码无需改动。简单来说这套组合拳打下来你的SpringBoot应用就能像搭积木一样灵活地接入和管理多个RocketMQ集群并且能优雅地应对业务Topic的快速增长。下面我就带你一步步实现它。2. 项目环境与依赖准备首先我们得把基础环境搭好。这里我假设你已经有一个SpringBoot项目了版本2.3.x以上或3.x均可并且本地或远程有一个可用的RocketMQ服务NameServer地址通常是127.0.0.1:9876。第一步引入核心依赖。我们不直接使用官方的rocketmq-spring-boot-starter因为它对多连接和高度自定义的动态路由支持不够灵活。我们选择更底层的rocketmq-client这样掌控力更强。在你的pom.xml里加入dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-client/artifactId version4.9.4/version !-- 建议使用较新稳定版 -- /dependency第二步准备配置文件。我们将采用YAML格式来定义多个生产者和消费者连接。下面是一个application.yml的示例清晰地定义了两套生产者连接和两套消费者连接rocketmq: producer: # 生产者连接列表支持多个 producer-list: - producer-id: order-producer # 生产者唯一标识用于代码中获取 group-name: ${spring.application.name}-order-group # 生产者组名 namesrv-addr: 192.168.1.100:9876 # 订单业务专用的NameServer地址 max-message-size: 4096 # 消息最大长度单位字节 send-msg-timeout: 3000 # 发送超时时间毫秒 retry-times-when-send-failed: 2 # 发送失败重试次数 - producer-id: logistics-producer group-name: ${spring.application.name}-logistics-group namesrv-addr: 192.168.1.101:9876 # 物流业务专用的另一个集群 max-message-size: 8192 # 物流消息可能更大 send-msg-timeout: 5000 retry-times-when-send-failed: 3 consumer: # 消费者连接列表支持多个 consumer-list: - group-name: order-consumer-group namesrv-addr: 192.168.1.100:9876 # 订阅的主题列表每个topic对应一个业务处理类 topics: - topic-name: ORDER_CREATED tag-name: PAY_SUCCESS || SHIPPED # 支持Tag过滤* 代表所有Tag - topic-name: ORDER_CANCELLED tag-name: * consume-thread-min: 4 # 消费线程池最小线程数 consume-thread-max: 8 # 消费线程池最大线程数 consume-message-batch-max-size: 1 # 单次拉取最大消息数 orderly: false # 是否顺序消费 message-model: CLUSTERING # 消费模式集群 CLUSTERING / 广播 BROADCASTING - group-name: logistics-consumer-group namesrv-addr: 192.168.1.101:9876 topics: - topic-name: LOGISTICS_STATUS_UPDATE tag-name: * consume-thread-min: 2 consume-thread-max: 4 orderly: true # 物流状态更新需要顺序消费 message-model: CLUSTERING这个配置结构一目了然producer-list和consumer-list都是数组每个元素代表一个独立的连接配置。producer-id是我们自己定义的业务标识后面会用它来获取对应的生产者实例。topics下面定义了该消费者连接需要监听哪些Topic以及对应的Tag过滤规则。3. 多连接生产者配置实战配置写好了接下来就是怎么让SpringBoot把这些配置读进去并初始化对应的RocketMQ生产者实例。首先定义配置映射类。我们需要创建两个类来对应YAML中的结构。import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.List; Data Component ConfigurationProperties(prefix rocketmq.producer) public class MqProducerConfig { private ListProducerConfig producerList; Data public static class ProducerConfig { private String producerId; // 对应配置中的 producer-id private String groupName; private String namesrvAddr; private Integer maxMessageSize 4096; // 默认值 private Integer sendMsgTimeout 3000; private Integer retryTimesWhenSendFailed 2; } }然后在应用启动时初始化所有生产者。这里我们用ApplicationListener监听ApplicationReadyEvent事件确保Spring容器完全启动后再初始化MQ连接。import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.PreDestroy; import java.util.HashMap; import java.util.List; import java.util.Map; Slf4j Component public class MqProducerInitializer implements ApplicationListenerApplicationReadyEvent { Autowired private MqProducerConfig mqProducerConfig; // 全局生产者Mapkey为producerIdvalue为生产者实例 public static final MapString, DefaultMQProducer PRODUCER_MAP new HashMap(); Override public void onApplicationEvent(ApplicationReadyEvent event) { ListMqProducerConfig.ProducerConfig configList mqProducerConfig.getProducerList(); if (CollectionUtils.isEmpty(configList)) { log.warn(未配置任何RocketMQ生产者连接跳过初始化。); return; } for (MqProducerConfig.ProducerConfig config : configList) { try { DefaultMQProducer producer new DefaultMQProducer(config.getGroupName()); producer.setNamesrvAddr(config.getNamesrvAddr()); producer.setMaxMessageSize(config.getMaxMessageSize()); producer.setSendMsgTimeout(config.getSendMsgTimeout()); producer.setRetryTimesWhenSendFailed(config.getRetryTimesWhenSendFailed()); // 关闭VIP通道避免某些网络环境下的连接问题 producer.setVipChannelEnabled(false); producer.start(); PRODUCER_MAP.put(config.getProducerId(), producer); log.info(RocketMQ生产者启动成功: producerId{}, groupName{}, namesrvAddr{}, config.getProducerId(), config.getGroupName(), config.getNamesrvAddr()); } catch (MQClientException e) { log.error(RocketMQ生产者启动失败: producerId{}, groupName{}, config.getProducerId(), config.getGroupName(), e); // 这里可以根据业务决定是抛出异常终止启动还是记录日志继续初始化其他生产者 // throw new RuntimeException(MQ生产者初始化失败, e); } } } // 应用关闭时优雅地关闭所有生产者 PreDestroy public void destroy() { log.info(正在关闭所有RocketMQ生产者连接...); PRODUCER_MAP.forEach((producerId, producer) - { if (producer ! null) { producer.shutdown(); log.info(生产者已关闭: producerId{}, producerId); } }); } }最后提供一个工具类方便业务代码使用。我们不应该让业务代码直接去操作静态的PRODUCER_MAP而是封装一个工具方法。import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.util.CollectionUtils; public class RocketMqProducerUtil { public static DefaultMQProducer getProducer(String producerId) { if (StringUtils.isBlank(producerId)) { throw new IllegalArgumentException(producerId不能为空); } DefaultMQProducer producer MqProducerInitializer.PRODUCER_MAP.get(producerId); if (producer null) { throw new RuntimeException(未找到对应的RocketMQ生产者producerId producerId); } return producer; } // 提供一个便捷的发送方法示例实际可根据需要扩展 public static SendResult sendMessage(String producerId, String topic, String tags, String messageBody) throws Exception { DefaultMQProducer producer getProducer(producerId); Message msg new Message(topic, tags, messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET)); return producer.send(msg); } }这样在业务代码中当你需要给订单集群发消息时就调用RocketMqProducerUtil.getProducer(order-producer)拿到对应的生产者实例进行发送。不同的业务线使用不同的producerId实现了连接的物理隔离。4. 动态Topic消费者与业务路由设计消费者端的配置比生产者稍复杂因为它涉及到消息的监听和业务逻辑的分发。我们的目标是根据消息的Topic自动找到对应的业务处理类来消费。第一步同样是定义配置映射类。import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.List; Data Component ConfigurationProperties(prefix rocketmq.consumer) public class MqConsumerConfig { private ListConsumerConfig consumerList; Data public static class ConsumerConfig { private String groupName; private String namesrvAddr; private ListTopicConfig topics; private Integer consumeThreadMin 4; private Integer consumeThreadMax 8; private Integer consumeMessageBatchMaxSize 1; private Boolean orderly false; private String messageModel CLUSTERING; // 默认集群模式 } Data public static class TopicConfig { private String topicName; private String tagName *; // 默认监听所有Tag } }第二步设计业务处理接口。这是实现动态路由的关键。我们定义一个统一的接口所有处理具体Topic消息的类都必须实现它。import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public interface MqMessageHandler { /** * 处理消息 * param messageExtList 消息列表RocketMQ保证同一个队列的消息是顺序的 * param context 消费上下文包含重试次数等信息 * return 消费结果 */ ConsumeResult handle(ListMessageExt messageExtList, ConsumeContext context); }同时定义消费结果和上下文import lombok.Data; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; Data public class ConsumeResult { private boolean success; // 消费是否成功 private boolean needReconsumeLater; // 消费失败后是否需要稍后重试 private String remark; // 备注信息 public static ConsumeResult success() { ConsumeResult result new ConsumeResult(); result.setSuccess(true); result.setNeedReconsumeLater(false); return result; } public static ConsumeResult fail(boolean reconsume) { ConsumeResult result new ConsumeResult(); result.setSuccess(false); result.setNeedReconsumeLater(reconsume); return result; } } Data public class ConsumeContext { private boolean isOrderly; // 是否为顺序消费 private ConsumeConcurrentlyContext concurrentlyContext; private ConsumeOrderlyContext orderlyContext; // 可以扩展其他上下文信息如traceId等 }第三步实现消费者启动与动态路由。这是最核心的部分。我们同样在应用启动时初始化消费者并在监听器中根据Topic名称从Spring容器中动态获取对应的MqMessageHandler实现类。import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import javax.annotation.PreDestroy; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; Slf4j Component public class MqConsumerInitializer implements ApplicationListenerApplicationReadyEvent { Autowired private MqConsumerConfig mqConsumerConfig; Autowired private ApplicationContext applicationContext; private final MapString, DefaultMQPushConsumer consumerMap new ConcurrentHashMap(); Override public void onApplicationEvent(ApplicationReadyEvent event) { ListMqConsumerConfig.ConsumerConfig configList mqConsumerConfig.getConsumerList(); if (CollectionUtils.isEmpty(configList)) { log.warn(未配置任何RocketMQ消费者连接跳过初始化。); return; } for (MqConsumerConfig.ConsumerConfig config : configList) { try { DefaultMQPushConsumer consumer new DefaultMQPushConsumer(config.getGroupName()); consumer.setNamesrvAddr(config.getNamesrvAddr()); consumer.setConsumeThreadMin(config.getConsumeThreadMin()); consumer.setConsumeThreadMax(config.getConsumeThreadMax()); consumer.setConsumeMessageBatchMaxSize(config.getConsumeMessageBatchMaxSize()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 设置消费模式 if (BROADCASTING.equalsIgnoreCase(config.getMessageModel())) { consumer.setMessageModel(MessageModel.BROADCASTING); } else { consumer.setMessageModel(MessageModel.CLUSTERING); } // 注册消息监听器根据配置决定是顺序消费还是并发消费 if (Boolean.TRUE.equals(config.getOrderly())) { consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) - processMessage(msgs, context, true)); } else { consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) - processMessage(msgs, context, false)); } // 订阅Topic for (MqConsumerConfig.TopicConfig topicConfig : config.getTopics()) { String tag StringUtils.hasText(topicConfig.getTagName()) ? topicConfig.getTagName() : *; consumer.subscribe(topicConfig.getTopicName(), tag); log.info(消费者订阅: group{}, topic{}, tag{}, config.getGroupName(), topicConfig.getTopicName(), tag); } consumer.start(); consumerMap.put(config.getGroupName(), consumer); log.info(RocketMQ消费者启动成功: groupName{}, namesrvAddr{}, config.getGroupName(), config.getNamesrvAddr()); } catch (MQClientException e) { log.error(RocketMQ消费者启动失败: groupName{}, config.getGroupName(), e); } } } /** * 统一的消息处理入口实现动态路由 */ private Object processMessage(ListMessageExt msgs, Object context, boolean isOrderly) { if (CollectionUtils.isEmpty(msgs)) { return isOrderly ? ConsumeOrderlyStatus.SUCCESS : ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // RocketMQ保证一次拉取的消息属于同一个Topic String topic msgs.get(0).getTopic(); // 关键步骤根据Topic名称从Spring容器中获取对应的处理器Bean // 这里我们约定处理器的Bean名称格式为 topicHandler.{topicName} String beanName topicHandler. topic; MqMessageHandler handler applicationContext.getBean(beanName, MqMessageHandler.class); if (handler null) { log.error(未找到Topic [{}] 对应的消息处理器请检查Bean定义。消息将被挂起。, topic); // 返回稍后重试避免消息丢失同时给开发者修复问题的时间 return isOrderly ? ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT : ConsumeConcurrentlyStatus.RECONSUME_LATER; } // 构建消费上下文 ConsumeContext consumeContext new ConsumeContext(); consumeContext.setOrderly(isOrderly); if (isOrderly) { consumeContext.setOrderlyContext((ConsumeOrderlyContext) context); } else { consumeContext.setConcurrentlyContext((ConsumeConcurrentlyContext) context); } try { ConsumeResult result handler.handle(msgs, consumeContext); if (result.isSuccess()) { return isOrderly ? ConsumeOrderlyStatus.SUCCESS : ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } else { // 业务处理失败根据结果决定是否重试 if (result.isNeedReconsumeLater()) { log.warn(业务处理失败要求重试。Topic: {}, MsgId: {}, topic, msgs.get(0).getMsgId()); return isOrderly ? ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT : ConsumeConcurrentlyStatus.RECONSUME_LATER; } else { // 业务处理失败但明确要求不重试例如参数错误重试无意义返回成功避免进入死信队列 log.error(业务处理失败放弃重试。Topic: {}, MsgId: {}, Remark: {}, topic, msgs.get(0).getMsgId(), result.getRemark()); return isOrderly ? ConsumeOrderlyStatus.SUCCESS : ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } } catch (Exception e) { log.error(消息处理过程中发生未捕获异常。Topic: {}, MsgId: {}, topic, msgs.get(0).getMsgId(), e); // 系统异常返回重试 return isOrderly ? ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT : ConsumeConcurrentlyStatus.RECONSUME_LATER; } } PreDestroy public void destroy() { log.info(正在关闭所有RocketMQ消费者连接...); consumerMap.forEach((group, consumer) - { if (consumer ! null) { consumer.shutdown(); log.info(消费者已关闭: group{}, group); } }); } }第四步编写具体的业务处理器。现在对于任何一个新的Topic你只需要创建一个实现了MqMessageHandler接口的Spring Bean并按照约定的命名规则topicHandler.{topicName}进行命名即可。例如处理ORDER_CREATED主题的消息import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Component; import java.util.List; Slf4j Component(topicHandler.ORDER_CREATED) // 注意Bean名称的约定 public class OrderCreatedHandler implements MqMessageHandler { Override public ConsumeResult handle(ListMessageExt messageExtList, ConsumeContext context) { for (MessageExt messageExt : messageExtList) { try { String messageBody new String(messageExt.getBody(), UTF-8); log.info(收到订单创建消息: MsgId{}, Body{}, messageExt.getMsgId(), messageBody); // 在这里编写你的核心业务逻辑比如更新订单状态、通知库存系统等 // 模拟业务处理 boolean businessSuccess processOrder(messageBody); if (!businessSuccess) { // 业务逻辑失败比如库存不足返回失败并要求重试 return ConsumeResult.fail(true); } } catch (Exception e) { log.error(处理订单创建消息异常MsgId: {}, messageExt.getMsgId(), e); return ConsumeResult.fail(true); // 系统异常重试 } } return ConsumeResult.success(); } private boolean processOrder(String orderInfo) { // 模拟业务处理 // ... return true; } }再比如处理需要顺序消费的物流状态更新import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Component; import java.util.List; Slf4j Component(topicHandler.LOGISTICS_STATUS_UPDATE) public class LogisticsStatusUpdateHandler implements MqMessageHandler { Override public ConsumeResult handle(ListMessageExt messageExtList, ConsumeContext context) { // 顺序消费场景下通常需要根据消息的Key如订单号来保证同一个Key的消息被顺序处理 // 这里RocketMQ已经保证了同一个Queue的消息顺序投递我们只需要按顺序处理即可 for (MessageExt messageExt : messageExtList) { try { String messageBody new String(messageExt.getBody(), UTF-8); String orderId messageExt.getKeys(); // 假设消息Key是订单ID log.info(顺序处理物流状态更新: OrderId{}, Body{}, orderId, messageBody); // 更新物流状态这里必须是幂等操作因为可能会重试 updateLogisticsStatus(orderId, messageBody); } catch (Exception e) { log.error(顺序处理物流状态更新失败MsgId: {}, messageExt.getMsgId(), e); // 顺序消费中如果某条消息处理失败会阻塞该队列后续消息直到成功或达到重试上限。 // 这里返回失败并要求重试。 return ConsumeResult.fail(true); } } return ConsumeResult.success(); } private void updateLogisticsStatus(String orderId, String status) { // 实现幂等的状态更新逻辑 // ... } }看到这里整个动态路由的机制就清晰了。MqConsumerInitializer在启动时会根据配置订阅Topic。当消息到达时processMessage方法会根据消息的Topic动态地从Spring容器中查找名为topicHandler.{topicName}的Bean并调用其handle方法。这样一来每新增一个Topic的业务处理你只需要新增一个实现了MqMessageHandler的类并正确命名即可核心的消费者启动代码完全不用动。5. 高级特性与生产环境调优基础框架搭好了但要上生产环境还得考虑更多细节。这里我分享几个实战中总结的关键点。连接池与线程池调优生产者和消费者客户端内部都有网络连接和线程池。对于生产者主要关注发送线程池。RocketMQ客户端的DefaultMQProducer内部有异步发送的线程池默认大小是CPU核数。如果发送量巨大且是异步发送可以适当调大通过setAsyncSenderExecutor方法设置。对于消费者我们在配置里已经设置了consumeThreadMin和consumeThreadMax这直接影响消费能力。我的经验是对于CPU密集型业务如复杂计算线程数可以设置为CPU核数对于IO密集型业务如调用外部API、写数据库可以设置为CPU核数的1.5到2倍。但要注意线程不是越多越好太多会导致频繁上下文切换。消息发送的重试与超时在生产者配置中我们设置了retryTimesWhenSendFailed和sendMsgTimeout。对于核心业务消息重试次数可以设高一点比如3-5次超时时间也可以适当延长。但要注意同步发送的重试会阻塞调用线程。对于非核心的日志类消息可以设置为快速失败重试1次。另外RocketMQ还提供了异步发送和单向发送的模式在高吞吐量场景下可以考虑。顺序消费的注意事项在配置中我们将物流消费者的orderly设为了true。顺序消费的实现依赖于生产者将同一组消息发送到同一个Message Queue以及消费者从同一个Queue顺序拉取。生产者发送时需要指定一个MessageQueueSelector例如根据订单ID哈希选择队列。在消费者端MessageListenerOrderly会锁定当前正在消费的Queue确保同一时间只有一个线程消费该Queue从而保证顺序。但这也带来了吞吐量的下降所以只有真正需要严格顺序的场景如订单状态流转、库存扣减才使用它。消费幂等性与死信队列这是消息中间件避不开的话题。因为网络抖动、消费者重启等原因同一条消息可能会被多次投递Exactly-Once在分布式环境下很难保证RocketMQ保证At-Least-Once。所以你的MqMessageHandler中的业务逻辑必须是幂等的。常见的做法是利用数据库唯一约束比如消息表里把msgId或业务唯一ID状态设为主键。使用Redis等缓存记录处理前检查msgId是否已处理过。乐观锁更新数据时带版本号或状态条件。如果一条消息重试了最大次数默认16次后仍然失败RocketMQ会将其投递到死信队列Topic名为%DLQ%{ConsumerGroup}。你需要有另一个消费者来监控和处理死信队列的消息进行人工干预或记录告警。监控与运维建议日志记录在MqConsumerInitializer的processMessage方法中我们已经记录了关键日志。建议将消息的MsgId、Topic、消费状态、耗时等信息结构化输出方便接入ELK等日志系统。指标埋点可以在消息处理前后打点统计消息量、消费成功率、平均耗时等指标接入Prometheus等监控系统。优雅停机我们已经在初始化类中使用了PreDestroy来关闭连接确保应用关闭时不会丢失消息。在K8s等容器环境中要确保在收到终止信号后留出足够时间让消费者完成当前消息的处理再关闭。配置分离生产环境的NameServer地址、线程池大小等配置应该与代码分离使用配置中心如Nacos、Apollo管理实现不同环境dev/test/prod的差异化配置。6. 常见“坑点”与排查技巧这条路我踩过不少坑这里给你提个醒。坑点一消费者组名冲突。RocketMQ通过Consumer Group来标识一组消费者集群消费模式下同一个Group内的消费者共同消费一个Topic。如果你在测试环境把groupName写死了然后部署了多个相同的服务它们会负载均衡消费消息这没问题。但如果你把同一个服务部署到另一个环境比如预发布却忘了改groupName那么两个环境的消费者就会属于同一个Group消息会在两个环境间被随机消费导致业务混乱。切记不同环境、不同应用实例的消费者组名应该用变量区分比如${spring.application.name}-${spring.profiles.active}。坑点二Tag过滤的误用。在配置tagName时*表示订阅所有Tagtag1 || tag2表示订阅tag1或tag2。但要注意Tag是消息生产者发送时指定的如果消费者订阅了一个不存在的Tag是收不到任何消息的而且不会报错。排查“为什么没收到消息”时这是第一个要检查的点。坑点三顺序消费的阻塞。如果你的顺序消费者(orderly: true)在处理某条消息时抛出了异常并不断重试会导致该消息所在的整个Queue被阻塞后面的消息都无法处理。所以在顺序消费的业务逻辑里一定要做好异常捕获和容错处理。对于确实处理不了的“毒药消息”要在达到一定重试次数后记录日志并返回成功或转移到死信队列让队列能继续前进。可以在ConsumeContext中获取当前重试次数context.getReconsumeTimes()来做判断。坑点四消息体大小限制。RocketMQ默认的消息最大大小是4MB由maxMessageSize控制。如果你发送的消息包括Topic、Tag、Key、Body和属性超过了这个限制发送会失败。对于大消息可以考虑压缩、分片存储如存到OSS消息体中只放一个引用地址。排查技巧看日志RocketMQ客户端日志默认在~/logs/rocketmqlogs/下查看rocketmq_client.log可以找到连接、发送、订阅失败的详细原因。用控制台RocketMQ ConsoleDashboard是个神器可以查看Topic、Consumer Group的堆积情况、消费进度、连接客户端等是定位问题的第一现场。查网络NamesrvAddr连接不上是最常见的问题。用telnet命令测试端口是否通畅检查防火墙规则。模拟发送当你怀疑消费者没收到消息时可以用RocketMQ Console或者写个简单的测试程序往Topic里发一条消息看消费者端是否有日志打印快速定位是发送问题还是消费问题。这套基于SpringBoot的RocketMQ多连接与动态Topic处理方案在我经历的几个中大型项目中都稳定运行过。它最大的好处是结构清晰、扩展性强新业务接入成本极低。当然没有银弹你还需要根据自己项目的实际情况在消息可靠性、吞吐量和系统复杂度之间做出权衡。希望这些实战经验能帮你少走弯路。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2415005.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!