RocketMQ如何保证消息不丢失
RocketMQ的一个发送消息流程
生产者 => Broker => 消费者 Broker master => slave => 磁盘
- 生产者发送消息给到Broker
- Broker进行主从同步
- Broker不管Master还是Slave进行刷盘操作
- Broker将消息投递给消费者
在上面4个步骤任何一个步骤都将会出现消息丢失的情况,当然别的MQ也会有同样的情况出现。其实还有第5种情况NameServer挂了。
下面针对这5种情况进行分析
生产者发送消息
所有网络请求都是不可靠的,走网络请求都有一定的消息丢失的可能性。
同步发送是一个好办法,发送等Broker有应答了表示Broker接收成功进行下一步操作。
但这样有问题,我消息发送出去了,但本地代码执行异常了怎么办。或许有人说可以本地代码执行完成再去进行消息发送操作,但这样消息发送失败了怎么办。这些都有各种各样的问题。RocketMQ对于这些场景提供了事务消息的思路。
事务消息其实相当于给我们很多次反悔的操作。
事务消息实现机制

步骤:
- 发送方将half事务消息发送到broker
- broker把消息持久化成功,向发送方返回ACK确认收到消息。
- 发送方开始执行本地事务
- 发送方根据本地事务执行结果向broker发送二次确认
- broker收到发送方commit状态则将half事务消息标记为可投递,订阅方可以收到消息。。发送方rollback状态,则删除half事务消息。。如果步骤4的二次确认没有发送到broker则会过一段时间broker会对发送方进行消息回查,发送方收到消息回查,检查对应消息的本地事务执行结果。发送方根据回查结果继续步骤5。
Q&A:
半事务消息发送失败怎么办?
要想回答这个问题,首先得了解半事务消息有什么作用。半事务消息主要作用是一个探测功能,证明RocketMQ是可用的。如果没有启动事务消息我们通常的操作是执行本地然后发消息,这样就可能出现一个尴尬的情况,本地都执行成功了,写消息失败了。而引入了半事务消息机制,半事务消息写入失败,没事,我们这时一般认为MQ服务不可用,给要执行的本地代码一个标记,等MQ服务正常重新执行流程。
本地事务执行失败怎么办?
比如本地事务需要执行Mysql,刚好这时候Mysql宕了。我们一般可以把数据缓存起来,给Broker一个UNKNOW状态,等待Broker的回查,回查的时候吧缓存起来的数据继续执行本地事务。
Broker主从同步
之前2m-2s-async集群的搭建方式其实是不推荐的,生产上一般是使用Dledger搭建集群。因为2m-2s有个生来就有的问题就是不会主从切换,主节点挂了从节点不会顶上去。直到RocketMQ4.5以后的版本才支持Dledger集群。
Dledger集群会托管CommitLog文件,所以不是像之前配置brokerRole解决主从同步数据丢失的情况。
Dledger文件同步
Dledger会通过两阶段提交解决文件同步问题,具体流程如下
数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段
- Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件
- Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态
- Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步
Broker同步刷盘
RocketMQ的刷盘方式flushDiskType配置成同步刷盘就可以保证消息在刷盘过程中不会丢失了
Broker投递消息
Broker投递消息本身就是看你的ack来解决的,只要不使用异步消费,一般消息是不会丢失的。但返回ack给broker的时候可能失败,下次就会再次消费消息,出现了重复消费的问题,下面会对重复消费进行解读。
NameServer挂了
大家都知道RocketMQ是通过NameServer进行路由转发的,如果集群中所有NameServer都挂了怎么办。
在这种情况下,RocketMQ相当于整个服务都不可用了,那他本身肯定无法给我们保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如在订单系统中,如果多次尝试发送RocketMQ不成功,那就只能另外找给地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。
总结
完整分析过后,整个RocketMQ消息零丢失的方案就是
- 生产者使用事务消息机制
- Broker配置同步刷盘+Dledger主从架构
- 消费者不要使用异步消费
- 整个MQ挂了之后准备降级方案
重复消费
消费者消费消息的时候业务代码执行完毕正准备给Broker一个ack的时候突然就崩了,这时候ack没有给Broker,offset没有变,之前消费的消息一定会被再次消费,这样就带来了重复消费的问题。解决这种问题的根本就是通过一个全局唯一的编号,记录在Mysql中,消费了增加一条记录,消费前查询一下数据路有没有消费过这条消息。
消息顺序
在有些业务场景下可能要保证消息的顺序性。比如消息之间有依赖关系,只有前一个消费过了,后一个才能消费。
如何保证消息顺序
消息顺序有2种,全局有序,局部有序
- 全局有序:整个MQ系统的所有消息严格按照队列先入先出顺序进行消费。
- 局部有序:部分消息有序
在大部分的MQ业务场景,我们只需要能够保证局部有序
解决办法:
发送者发送消息时,会通过MessageQueue轮询的方式保证消息尽量均匀的分布到所有的MessageQueue上,而消费者也就同样需要从多个MessageQueue上消费消息。而MessageQueue是RocketMQ存储消息的最小单元,他们之间的消息都是互相隔离的,在这种情况下,是无法保证消息全局有序
局部有序:只需要将有序的一组消息都存入同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。RocketMQ中,可以在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发入哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。
全局有序:Topic配置成只有一个MessageQueue队列。这样天生就能保证消息全局有序了
消息积压问题
在正常情况下,使用MQ都会要尽量保证他的消息生产速度和消费速度整体上是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。这类问题通常在实际工作中会出现得比较隐蔽。由于网络波动或数据库故障等情况,会导致消息大量的积累。这在一些大型的互联网项目中,消息积压的速度是相当恐怖的。所以消息积压是个需要时时关注的问题。
如何处理消息积压问题
如果Topic下的MessageQueue配置得是足够多的,那每个Consumer实际上会分配多个MessageQueue来进行消费。这个时候,就可以简单的通过增加Consumer的服务节点数量来加快消息的消费,等积压消息消费完了,再恢复成正常情况。最极限的情况是把Consumer的节点个数设置成跟MessageQueue的个数相同。但是如果此时再继续增加Consumer的服务节点就没有用了。
而如果Topic下的MessageQueue配置得不够多的话,那就不能用上面这种增加Consumer节点个数的方法了。这时怎么办呢? 这时如果要快速处理积压的消息,可以创建一个新的Topic,配置足够多的MessageQueue。然后把所有消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转储到新的Topic中,这个速度是可以很快的。然后在新的Topic上,就可以通过增加消费者个数来提高消费速度了。之后再根据情况恢复成正常情况。