RocketMQ生产问题实践


RocketMQ如何保证消息不丢失

RocketMQ的一个发送消息流程

生产者 => Broker => 消费者 Broker master => slave => 磁盘

  1. 生产者发送消息给到Broker
  2. Broker进行主从同步
  3. Broker不管Master还是Slave进行刷盘操作
  4. Broker将消息投递给消费者

在上面4个步骤任何一个步骤都将会出现消息丢失的情况,当然别的MQ也会有同样的情况出现。其实还有第5种情况NameServer挂了。

下面针对这5种情况进行分析

生产者发送消息

所有网络请求都是不可靠的,走网络请求都有一定的消息丢失的可能性。

同步发送是一个好办法,发送等Broker有应答了表示Broker接收成功进行下一步操作。

但这样有问题,我消息发送出去了,但本地代码执行异常了怎么办。或许有人说可以本地代码执行完成再去进行消息发送操作,但这样消息发送失败了怎么办。这些都有各种各样的问题。RocketMQ对于这些场景提供了事务消息的思路。

事务消息其实相当于给我们很多次反悔的操作。

事务消息实现机制

步骤:

  1. 发送方将half事务消息发送到broker
  2. broker把消息持久化成功,向发送方返回ACK确认收到消息。
  3. 发送方开始执行本地事务
  4. 发送方根据本地事务执行结果向broker发送二次确认
  5. broker收到发送方commit状态则将half事务消息标记为可投递,订阅方可以收到消息。。发送方rollback状态,则删除half事务消息。。如果步骤4的二次确认没有发送到broker则会过一段时间broker会对发送方进行消息回查,发送方收到消息回查,检查对应消息的本地事务执行结果。发送方根据回查结果继续步骤5。

Q&A:

半事务消息发送失败怎么办?

要想回答这个问题,首先得了解半事务消息有什么作用。半事务消息主要作用是一个探测功能,证明RocketMQ是可用的。如果没有启动事务消息我们通常的操作是执行本地然后发消息,这样就可能出现一个尴尬的情况,本地都执行成功了,写消息失败了。而引入了半事务消息机制,半事务消息写入失败,没事,我们这时一般认为MQ服务不可用,给要执行的本地代码一个标记,等MQ服务正常重新执行流程。

本地事务执行失败怎么办?

比如本地事务需要执行Mysql,刚好这时候Mysql宕了。我们一般可以把数据缓存起来,给Broker一个UNKNOW状态,等待Broker的回查,回查的时候吧缓存起来的数据继续执行本地事务。

Broker主从同步

之前2m-2s-async集群的搭建方式其实是不推荐的,生产上一般是使用Dledger搭建集群。因为2m-2s有个生来就有的问题就是不会主从切换,主节点挂了从节点不会顶上去。直到RocketMQ4.5以后的版本才支持Dledger集群。

Dledger集群会托管CommitLog文件,所以不是像之前配置brokerRole解决主从同步数据丢失的情况。

Dledger文件同步

Dledger会通过两阶段提交解决文件同步问题,具体流程如下

数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段

  1. Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件
  2. Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态
  3. Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步

Broker同步刷盘

RocketMQ的刷盘方式flushDiskType配置成同步刷盘就可以保证消息在刷盘过程中不会丢失了

Broker投递消息

Broker投递消息本身就是看你的ack来解决的,只要不使用异步消费,一般消息是不会丢失的。但返回ack给broker的时候可能失败,下次就会再次消费消息,出现了重复消费的问题,下面会对重复消费进行解读。

NameServer挂了

大家都知道RocketMQ是通过NameServer进行路由转发的,如果集群中所有NameServer都挂了怎么办。

在这种情况下,RocketMQ相当于整个服务都不可用了,那他本身肯定无法给我们保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如在订单系统中,如果多次尝试发送RocketMQ不成功,那就只能另外找给地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。

总结

完整分析过后,整个RocketMQ消息零丢失的方案就是

  • 生产者使用事务消息机制
  • Broker配置同步刷盘+Dledger主从架构
  • 消费者不要使用异步消费
  • 整个MQ挂了之后准备降级方案

重复消费

消费者消费消息的时候业务代码执行完毕正准备给Broker一个ack的时候突然就崩了,这时候ack没有给Broker,offset没有变,之前消费的消息一定会被再次消费,这样就带来了重复消费的问题。解决这种问题的根本就是通过一个全局唯一的编号,记录在Mysql中,消费了增加一条记录,消费前查询一下数据路有没有消费过这条消息。

消息顺序

在有些业务场景下可能要保证消息的顺序性。比如消息之间有依赖关系,只有前一个消费过了,后一个才能消费。

如何保证消息顺序

消息顺序有2种,全局有序,局部有序

  • 全局有序:整个MQ系统的所有消息严格按照队列先入先出顺序进行消费。
  • 局部有序:部分消息有序

在大部分的MQ业务场景,我们只需要能够保证局部有序

解决办法:

发送者发送消息时,会通过MessageQueue轮询的方式保证消息尽量均匀的分布到所有的MessageQueue上,而消费者也就同样需要从多个MessageQueue上消费消息。而MessageQueue是RocketMQ存储消息的最小单元,他们之间的消息都是互相隔离的,在这种情况下,是无法保证消息全局有序

局部有序:只需要将有序的一组消息都存入同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。RocketMQ中,可以在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发入哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。

全局有序:Topic配置成只有一个MessageQueue队列。这样天生就能保证消息全局有序了

消息积压问题

在正常情况下,使用MQ都会要尽量保证他的消息生产速度和消费速度整体上是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。这类问题通常在实际工作中会出现得比较隐蔽。由于网络波动或数据库故障等情况,会导致消息大量的积累。这在一些大型的互联网项目中,消息积压的速度是相当恐怖的。所以消息积压是个需要时时关注的问题。

如何处理消息积压问题

如果Topic下的MessageQueue配置得是足够多的,那每个Consumer实际上会分配多个MessageQueue来进行消费。这个时候,就可以简单的通过增加Consumer的服务节点数量来加快消息的消费,等积压消息消费完了,再恢复成正常情况。最极限的情况是把Consumer的节点个数设置成跟MessageQueue的个数相同。但是如果此时再继续增加Consumer的服务节点就没有用了。
而如果Topic下的MessageQueue配置得不够多的话,那就不能用上面这种增加Consumer节点个数的方法了。这时怎么办呢? 这时如果要快速处理积压的消息,可以创建一个新的Topic,配置足够多的MessageQueue。然后把所有消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转储到新的Topic中,这个速度是可以很快的。然后在新的Topic上,就可以通过增加消费者个数来提高消费速度了。之后再根据情况恢复成正常情况。


文章作者: dm
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 dm !
评论
 上一篇
Kafka基本使用及设计原理 Kafka基本使用及设计原理
背景Kafka是一个分布式基于Zookeeper的分布式消息系统,支持多分区,多副本。是由Scala语言开发,现在是Apache基金会顶级开源项目。它有着高吞吐、低延迟的特性。这种特性主要应用于大数据场景,日志收集场景。不适合一些复杂的业务
2024-02-01
下一篇 
RocketMQ整体理解和消息样例 RocketMQ整体理解和消息样例
RocketMQ架构设计 在RocketMQ上主要分为4部分,Producer、Consumer、NameServer、Broker Producer:消息生产者,Producer通过NameServer拉取所有Broker集群,通过负载
2023-12-28
  目录