rabbitMQ 消息顺序性、消息幂等性、消息不丢失、最终一致性、补偿机制、消息队列设计
1.消息顺序性
- 溯源:
消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。 - 举例:
比如通过mysql binlog进行两个数据库的数据同步,由于对数据库的数据操作是具有顺序性的,如果操作顺序搞反,就会造成不可估量的错误。
比如数据库对一条数据依次进行了 插入->更新->删除操作,这个顺序必须是这样,如果在同步过程中,消息的顺序变成了删除->插入->更新,那么原本应该被删除的数据,就没有被删除,造成数据的不一致问题。
RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序。
1.1 发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓。
如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。
1.2 队列中消息的顺序
RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,通常也不需要开发关心。
提示:不同队列中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站
1.3 消费消息的顺序
多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的。
虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致.
生产者 通过 channel 把消息 通过 exchange 路由到对一个的quque 的 过程中,MQ 本身保证消息的有序性,quque 是有序的,在业务上只要保证生产者发送到mq上的消息是有序的,那么MQ ,quque就能保证生产者发送到消息的有序性;但是生产者保证了消息的有序性并不能保证消费者消费到的消息就是有序的.
这主要体现在以下两点:
- 一个quque 上有多个consumer,由于每个消费者处理消息的快慢不一样,因此并不能保证每个consumer都顺序消费消息.
- 一个quque上只有一个consumer,但是这个consumer 是多线程异步处理,因此并不能保证这个consumer消费消息的处理是顺序处理;
1.出现顺序错乱的场景
错乱场景一
一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
错乱场景二
一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。
2.保证消息的消费顺序
解决方案一
解决消费顺序的问题,通常就是一个队列只有一个消费者
拆分成多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。保证一个quque 只有一个consumer 这样便保证了消费者消费MQ 消息的有序性;这样就可以一个个消息按顺序处理,缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。
提示:如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。
解决方案二
解决一个quque一个consumer异步处理的顺序问题
一个queue对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理。
2.消息幂等性
一条数据(消息)重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。
幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。从业务上来说重复调用多次产生的业务结果与调用一次产生的业务结果相同;
为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ服务端是没有这种控制的(同一批的消息有个递增的DeliveryTag),它不知道你是不是就要把一条消息发送两次,只能在消费端控制。
2.1 消息幂等的场景
业务场景1
消费者重复消息
假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。
业务场景2
消费者,内部处理消息的时候的重复
如果消费者每一次接收生产者的消息都成功了,只是在响应或者调用API的时候出了问题,会不会出现消息的重复处理?例如:存款100元,ATM重发了5次,核心系统一共处理了6次,余额增加了600元。
业务场景3
生产者、消费者手动确认消息。
1.生产者在消息发送消息后。再收到mq的确认后,还未更改数据发送状态结果挂掉了,导致消息的重复发送;
2.消费者,在消费消息后,还未给mq发送ack确认标志,消费者挂掉了,导致mq会将消息重复推送给消费者;
2.2 产生消费重复的原因
如何避免消息的重复消费?消息出现重复可能会有两个原因:
1.生产者问题
比如在开启了Confirm模式但未收到确认,生产者重新发送消息;或者生产者在消息发送消息后,在收到mq的确认后,还未更改数据发送状态结果挂掉了,导致消息的重复发送。
2.消费者问题
由于消费者未发送ACK或者其他原因,消息重复投递
2.3 消息幂等的解决方案
对于重复发送的消息,可以对每一条消息生成一个唯一的业务ID,通过日志或者消息落库来做重复控制。或者利用redis、mysql等中间工具的特性解决幂等性问题
- 1.保证生产者者发送到mq的消息幂等性
- 2.保证消费者消费mq消息的幂等性
常用解决方案:
- 1.比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
- 2.比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
- 3.比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
- 4.比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据;
3.消息不丢失
消息丢失原因场景以及解决方案
3.1 生产者发送消息至MQ的数据丢失
生产者发送消息,自动ack,或者发生发生网络异常导致,未做重发处理,导致推送mq的消息丢失;
解决方法: 在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,如果发生异常的情况下,做好消息的重发机制.
3.2 MQ挂掉导致消息丢失
MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失;
如exchange、quque 未设置消息的持久化,在消费者消息未消费或者未确认的情况下导致消息丢失
解决方式:MQ设置为持久化。将内存数据持久化到磁盘中
3.3 消费者自动确认ack下的消息丢失
消费者刚拿到消息,先给mq 发送 ack确认标志 ,再处理业务,解过还未处理业务挂掉了或发生异常
解决方式: 用 RabbitMQ 提供的 ack 机制
4.最终一致性
消息的最终一致性需要结合消息的生产者、消费者的、mq的消息不丢失一块考虑。
5.消息积压处理
消息不积压需要总体上保持消费者的消息消费速率rate 大于生产者的生产速率rate,这样在设计上不会出现消息积压;
消息积压处理需要考虑消息的幂等性,保证消息不被重复消费
5.1 消息堆积的判定
- 在消费者未消费到消息、或者消费收到的消息延迟比较大的情况下需要消息是否积压;
可以通过mq的管理界面查看消费者的消息消费情况。
5.2 消息积压的原因场景
1.消息队列的延时以及消息过期失效导致消息队列满了.
2.消息太多导致消息队列磁盘集群满了.
3.积压时间太长了,导致比如RabbitMQ设置了过期时间后,消息就没了。
1.快速处理大量数据积压设计方案
1.存在问题
几千万条数据在MQ里,积压了七八个小时。这个时候就是恢复consumer的问题。让它恢复消费速度,然后傻傻地等待几个小时消费完毕。这个肯定不能再面试的时候说。1个消费者1秒时1000条,1秒3个消费者是3000条。1分钟是18万条。1个小时是1000多万条。如果积压了上万条数据,即使消费者恢复了,也大概需要1个多小时才能恢复过来。
原来3个消费者1个小时。现在30个消费者,需要10分钟搞定。
2.解决方案
临时扩容:相当于将queue资源和consume资源扩大10倍,以10倍的速度来消费数据
具体操作步骤和思路如下:
1.先修改consumer的问题,确保其恢复消费速度。然后先将现有consumer都停掉。
2.临时创建新的consumer进行快速消费:
- 2.1:先新建1个topic订阅模型,根据RoutingKey(分割成原来的10倍key),临时建立好原来10倍或者20倍的Queue。
- 2.2:然后写一个临时consumer程序,部署上去,去消费积压的数据。
- 2.3:消费之后,不做耗时的处理。直接均匀轮训写入临时建立好的10倍数量的Queue。
- 2.4:接着征用10倍的机器来部署consume。每一批consumer消费1个临时的queue。
3.等快速消费完积压数据之后,恢复原来的部署架构,重新用原先的consumer来消费消息。
2.过期失效了怎么办?
过期失效就是TTL。如果消息在Queue中积压超过一定的时间就会被RabbitMQ给清理掉。这个数据就没了。这就不是数据积压MQ中了,而是大量的数据会直接搞丢。
1.在这种情况下,增加consume消费积压就不起作用了。此时,只能将丢失的那批数据,写个临时的程序,一点一点查出来,然后再灌入MQ中,把白天丢失的数据补回来。
2.或者将过去的消息放入死信对列(同样也会出现大量数据问题)
6.消息队列设计
如何让你来设计消息队列中间件,如何设计?
主要考察两块。
① 有没有对某个消息队列做过较为深入的原理的了解。或者从整体把握一个mq的架构原理。
② 设计能力,给你一个常见的系统,就是消息队列系统,看能够从全局把握一下整体架构设计的关键点。
比如说,这个消息队列,我们从以下几个方面来了解下:
1、首先MQ得支持可伸缩性吧。就是需要的时候增加吞吐量和容量?
2、其次,需要考虑一下MQ的数据是不是要持久化到磁盘
3、再次,考虑一下MQ的可用性。
4、最后,考虑一下能不能支持数据零丢失