2023-08-01  阅读(16)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/126

我曾经在分布式系列中讲解过消息丢失,当时主要从RabbitMQ和kafka是如何应对消息丢失的。本章,我们先来回顾下消息丢失的场景,然后看看RocketMQ是如何解决消息丢失的问题的。

一、消息丢失场景

《系统改造:异步、解耦、削峰》一章,我们通过引入RocketMQ使得系统之间解耦,如下图:

202308012101203831.png

物流、积分、促销、通知系统通过订阅RocketMQ中的消息与订单系统进行交互。那么,这个过程中如果消息丢失了怎么办呢?消息丢失可能发生在以下几个场景。

1.1 生产者消息丢失

订单系统,发送支付成功的消息到MQ中,这个时候恰巧可能网络发生了抖动,也就是网络突然有点问题,导致这次网络通信失败了,消息就丢失了:

202308012101212962.png

1.2 中间件消息丢失

当消息写入MQ之后,MQ可能仅仅是把这个消息给写入到PageCache中,然后就返回成功响应了,但此时消息还在OS缓存中,如果此时机器宕机了,消息就丢失了:

202308012101221213.png

此外,就算数据刷到磁盘上去了,但如果你的磁盘出现故障,比如磁盘坏了,上面存储的数据还是会丢失。

1.3 消费者消息丢失

接着往下看,假设积分系统从RocketMQ拉取到了一个消息,偏移量是offset=10,正常情况下,积分系统处理完自身逻辑后会响应给RocketMQ,告诉它offset=10这个消息已经消息成功了。

但是,有的消息中间件,比如Kafka,会消费者一消费成功就会自动提交offset,此时如果消息的自身逻辑处理异常或宕机,就会导致MQ认为已经消费成功,但事实上消费失败了,从而出现了消息丢失的现象。

202308012101228274.png

二、消息零丢失方案

了解了消息丢失的三类基本场景,我们就来看看,RocketMQ怎么才能保证全链路的消息零丢失。

2.1 生产者侧

我们先来看下生产者侧怎么保证消息不丢失,也就是 生产者100%将消息投递成功

在RocketMQ中,提供了 事务消息机制 ,凭借这个事务级的消息机制,就可以确保生产者能将消息投递成功。这里其实涉及到了分布式事务的概念,但是我们本章不会展开,有兴趣的童鞋可以参考我写的分布式系列

half消息

以我们的订单系统为例,假设它收到了一个订单支付成功的通知,它首先会发送一条half消息到RocketMQ中去。这个half消息,你可以理解为消息的状态是half状态,这个时候积分系统是看不见这个half消息的。

202308012101237245.png

为什么要先发送一个half消息呢?

因为订单系统收到支付成功的通知后,如果直接操作本地数据库操作成功了,但是投递消息失败了(比如MQ挂了或网络有问题),会造成积分增加不成功,出现数据不一致的情况。所以,先投递一个half消息,初步跟MQ做个沟通,确保网络和MQ暂时都没问题。

接着,发送half消息成功并得到响应后,订单系统就开始执行自己的本地操作,比如更新订单状态等等:

202308012101244716.png

如果本地事务执行失败了,订单系统就发送一个rollback请求给MQ,告诉它将刚才那个half消息给删除掉,不需要走后续流程了:

202308012101251107.png

如果本地事务执行成功了,订单系统就发送一个commit请求给MQ,告诉它将刚才那个half消息进行commit操作,这时积分系统就可以看到并消费这个消息了:

202308012101260768.png

回调接口

RocketMQ有一个针对Half消息的补偿机制,它会去扫描所有未commit或删除的half消息,也就是说这些half消息没有被生产者确认到底是删除掉还是提交掉。因为可能因为网络原因,RocketMQ并没有收到订单系统发送过来的commit/rollback请求。

所以,针对这些状态不确定的half消息,超过一定时间后,RocketMQ就会回调订单系统的一个接口,确认下到底是打算commit这个half消息还是要delete掉?此时,订单系统可以根据订单状态做相应的处理,然后告知RabbitMQ结果。

202308012101276399.png

通过上面这样一套消息事务机制,就可以保证我们的订单系统一旦成功执行了数据库操作,就一定能将消息投递到MQ中,这样就保证了生产者发送消息一定不会丢失。

2.2 MQ侧

我们再来看下RocketMQ侧怎么保证消息不丢失。

其实通过第一节的分析,我们已经知道,RocketMQ侧存在两种消息丢失的情况:

  1. 消息写入到了OS Cache中,并已返回给Producer写入成功的响应,数据还未刷到磁盘,但MQ挂了;
  2. 数据已经刷到了磁盘上,Producer也收到了成功响应,但是磁盘坏掉了。

刷盘方式

针对第一种情况,默认情况下RocketMQ是 异步刷盘 ,也就是说只要消息写入到了OS Cache,就会响应生产者成功。 所以如果一定要确保MQ侧零丢失的话,可以调整RocketMQ的刷盘策略,修改Broker的配置文件,将其中的flushDiskType配置设置为:SYNC_FLUSH,也就是 同步刷盘

所以,调整为同步刷盘之后,生产者写入MQ的每条消息,只要MQ响应成功了,那么消息就是已经进入磁盘文件了。

数据同步

针对第二种情况,必须有Slave-Broker从Master-Slave同步数据,而且只有当Slave-Broker同步数据成功,才能认为消息写入成功。

这样一来,但凡一条消息写入成功,此时主从Broker上都有这条数据了,即使Master-Broker的磁盘坏了,Slave-Broker上至少还是有数据的,那数据也不会因为磁盘故障而丢失了。

对于基于DLedger技术和Raft协议构建的主从同步架构,只要MQ侧响应消息写入成功了,那就代表着数据已经通过Raft协议同步给了超过半数的Follower,所以在这种场景上,RocketMQ天然保证消息不会丢失。

2.3 消费者侧

最后,我们来看下消费者侧是怎么保证消息不丢失的。

之前说过了,如果消息者消费完消息,立即提交了消息的offset,然后还没处理完自身的逻辑就挂掉了,那么就会出现消息丢失的现象。

但是,对于RocketMQ而言,是消费者处理完自身业务逻辑后,再提交给Broker响应。所以即使此时系统崩溃了,没有响应成功,也没有关系,因为MQ侧会认为没有消费成功,会再次发送消息。

此时,消息者侧要做的就是控制好消费接口的幂等性,这个我们后面会讲到。

Consumer默认的消费模式,是必须处理完一批消息,才会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS这个状态标识,告诉Broker消息都处理结束了,提交offset到Broker去。 所以需要特别注意, 不能在代码中对消息进行异步的处理 ,也就是不能没处理完自身业务逻辑,就返回响应。

三、消息事务原理

RocketMQ的消息零丢失方案,最核心的就是生产者侧的事务消息机制,其本质是通过CommitLog、CosumeQueue这套存储机制来做的,half消息其实是写入到了Broker内部的Topic中。

3.1 Half消息原理

以订单系统为例,正常情况下,当订单系统发送一个通知消息给RocketMQ时,是指定这个通知消息的Topic为order_topic,然后最终定位到的是某台Broker服务器上的MessageQueue对应的ConsumeQueue。

但是如果这个通知消息是half消息,那么这个消息的offset并不会保存到order_topic对应的ConsumeQueue中,而是保存到一个名为RMQ_SYS_TRANS_HALF_TOPIC的内部Topic的ConsumeCQueue中,如下图:

2023080121012905810.png

可以看到,消费者(积分系统)订阅的是order_topic,而由于“支付成功通知消息”是个half消息,所以它在CommitLog中的偏移量offset是写入到内部RMQ_SYS_TRANS_HALF_TOPIC对应的ConsumeQueue中,所以half消息对于消费者实例来说是不可见的。

3.2 回调原理

如果订单系统没有对half消息进行commit或rollback,RocketMQ会去回调生产者的接口。那么RocketMQ怎么知道要回调哪些half消息呢?

事实上,RocketMQ后台有个定时任务,会去扫描RMQ_SYS_TRANS_HALF_TOPIC这个Topic中的half消息,如果发现某个half消息超过了一定时间还没被commit或rollback,它就会去回调生产者的接口,确认到底是commit还是rollback:

2023080121013062411.png

RocketMQ最多回调15次,如果15次之后那个half消息还没办法确认最终状态,就会默认对其rollback。

3.3 commit原理

如果生产者最终对half消息执行commit请求,那RocketMQ的底层是如何实现的呢?

事实上,RocketMQ接受到commit请求后,会往一个名为OP_TOPIC的内部Topic中写入一条OP记录,标记half消息已经是commit状态了。接着,会把RMQ_SYS_TRANS_HALF_TOPIC中的half消息offset写入到order_topic的ConsumeQueue里去,然后积分系统就可以看到这条消息,并进行消费了,如下图:

2023080121013329212.png

3.4 rollback原理

rollback原理和commit原理是一样的,如果生产者最终对half消息执行了rollback请求,RocketMQ的底层并不会真的把half消息给删除掉,而是在内部OP_TOPIC对应的ConsumeQueue中写入一条OP记录,标记half消息已经是rollback状态了:

2023080121013498513.png

四、总结

本章,我们对消息丢失的三类场景进行分析,并给出了一套消息零丢失的全链路解决方案,对RocketMQ中的事务消息机制原理进行了详细讲解。

在业务系统中,使用事务消息机制会大大降低吞吐量,所以一般建议,对于涉账类以及核心数据相关的交易链路或系统,可以采用上述这套消息零丢失方案。而对于其它大部分没那么核心的场景和系统,其实即使丢失一些数据,也不会导致太大的问题,不建议采用这套方案。

一般来说,对于大多数系统,生产者通过同步发送消息+反复重试的方式,就可以做到99%场景下的消息可靠投递。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文