RocketMQ
- 基本架构
- 消息模型
- 消费者消费消息模式
- 顺序消息机制
- 延迟消息
- 批量消息
- 事务消息
- 消息重试
- 最佳实践
基本架构
nameServer: 维护broker列表信息,客户端连接时只需要连接nameServer。可配置成集群。
broker:broker分为master和slave,master负责消息的发送和消费,slave是master的备份。master-slaver的集群方式,master挂掉时候slave不能主动转换为master提供服务(5.X版本后可以通过配置实现mater挂掉后slave转为master提供服务)。
leader-follower的集群方式,即高可用集群,各个broker是对等的,通过选举产生leader(在dashboart中显示为master),如果leader挂掉,在剩下的follower(显示为slave)中选举再产生新的leader。注意,只有超过半数的几点存活,才能选举出leader。
消息模型
⽣产者和消费者都可以指定⼀个Topic发送消息或者拉取消息。⽽Topic是⼀个逻辑概念。
Topic中的消息会分布在后⾯多个MessageQueue当中。这些MessageQueue会分布到⼀个或者多个broker中。
消费者消费消息模式
广播模式:所有关注topic的消费者都收到消息。广播模式下消息队列的消费位点由客户端自己维护,消费失败服务端不会重发。
集群模式:同一个消费者组只有一个成员收到消息。集群模式下消费点位由服务端维护,消费者组的所有成员共用一个位点,消费失败服务端会重发。
顺序消息机制
- ⽣产者只有将⼀批有顺序要求的消息,放到同⼀个MesasgeQueue上,通过MessageQueue的FIFO特性保证这⼀批消息的顺序。如果不指定MessageSelector对象,
那么⽣产者会采⽤轮询的⽅式将多条消息依次发送到不同的MessageQueue上。 - 消费者需要实现MessageListenerOrderly接⼝,实际上在服务端,处理MessageListenerOrderly时,会给⼀个MessageQueue加锁,拿到MessageQueue上所有的消息,然后再去读取下⼀个MessageQueue的消息。
- 消费消息失败时,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。因为消费者端只进⾏有限次数的重试。如果⼀条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进⾏重试。但是,如果消费者⼀直处理失败,超过最⼤重试次数,那么RocketMQ就会跳过这⼀条消息,处理后⾯的消息,这会造成消息乱序。
延迟消息
- 定固定的延迟级别:对于指定固定延迟级别的延迟消息,RocketMQ的实现⽅式是预设⼀个系统Topic,名字叫做SCHEDULE_TOPIC_XXXXX。在这个Topic下,预设了18个MessageQueue。这⾥每个对列就对应了⼀种延迟级别。然后每次扫描这18个队列⾥的消息,进⾏延迟操作就可以了。
- 指定消息发送时间:RocketMQ是通过时间轮算法实现。
批量消息
⽣产者要发送的消息⽐较多时,可以将多条消息合并成⼀个批量消息,⼀次性发送出去。这样可以减少⽹络IO,提升消息发送的吞吐量。同⼀批消息的Topic必须相同,另外,不⽀持延迟消息。还有批量消息的⼤⼩不要超过1M,如果太⼤就需要⾃⾏分割。
事务消息
- ⽣产者将消息发送⾄ApacheRocketMQ服务端。
- ApacheRocketMQ服务端将消息持久化成功之后,向⽣产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
- ⽣产者开始执⾏本地事务逻辑。
- ⽣产者根据本地事务执⾏结果向服务端提交⼆次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:⼆次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。⼆次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
- 在断⽹或者是⽣产者应⽤重启的特殊情况下,若服务端未收到发送者提交的⼆次确认结果,或服务端收到的⼆次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息⽣产者即⽣产者集群中任⼀⽣产者实例发起消息回查。
- ⽣产者收到消息回查后,需要检查对应消息的本地事务执⾏的最终结果。
- ⽣产者根据检查到的本地事务的最终状态再次提交⼆次确认,服务端仍按照步骤4对半事务消息进⾏处理。
消息重试
RocketMQ的消费者端,如果处理消息失败了,Broker是会将消息重新进⾏投送的。⽽在重试时,RocketMQ实际上会为每个消费者组创建⼀个对应的重试队列。重试的消息会进⼊⼀个“%RETRY%”+ConsumeGroup的队列中。
RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:
如果消息重试16次后仍然失败,消息将不再投递,转为进⼊死信队列。重试次数可以通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2⼩时。
如果消息超过最⼤重试次数,RocketMQ会将消息发送到死信队列。⼀个死信队列对应⼀个消费组。死信队列的默认权限为2(禁读)。如果需要处理死信队列的消息,需要把权限修改为6(可读可写后)消费该Topic的消息进行处理。队列中超过有效期(默认3天)的消息会被删除,不管有没有消费。
最佳实践
- ⼀个应⽤尽可能⽤⼀个Topic,⽽消息⼦类型则可以⽤tags来标识。tags过滤消息的性能很高,相当于索引。
- 消费端幂等控制:RocketMQ的每条消息都有⼀个唯⼀的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以⽤这个MessageId来作为判断幂等的关键依据。但是,这个MessageId是⽆法保证全局唯⼀的,也会有冲突的情况。所以在⼀些对幂等性要求严格的场景,最好是使⽤业务上唯⼀的⼀个标识⽐较靠谱。例如订单ID。⽽这个业务标识可以使⽤Message的Key来进⾏传递。