我在《整体架构》这一章中,已经比较清楚的介绍了Kafka的ISR机制。每个分区中的Leader 会负责维护和跟踪 ISR 集合中所有 follower的滞后状态(Leader会维护每个Follower的LEO,Follower来拉取消息时会带上自己的LEO),当follower副本落后太多或失效时,leader 会把它从 ISR 集合中剔除,转移到OSR。默认情况下, 当 Leader 发生故障时,只有在 ISR 集合中的副本才有资格被选举为新的 leader。
但是,在一些极端情况下,ISR机制可能会引发一些问题,本章我就对这些问题进行一一分析。
一、存在的问题
在老版本的Kafka中,ISR机制可能引发 数据丢失 和 数据不一致 问题。
1.1 数据丢失
我们先来看第一种情况——数据丢失。什么情况下会出现数据丢失呢?
举个例子,假设某分区一共有2个副本,一个Leader,一个Follower。生产者(Producer)的min.insync.replicas
参数设置为1,也就是说,生产者发送消息给Leader,Leader写Log成功后,生产者就会认为发送成功了:
- 生产者先发送了1条消息给Leader,此时Leader的信息:
LEO = 1, HW = 0
; - Follower发送Fetch请求同步数据,上送
LEO = 0
; - Leader会维护所有Follower的LEO信息,取最小值作为HW,此时
HW = 0
并响应; - Follower同步到消息,更新自身信息:
LEO = 1, HW = 0
; - Follower再次发送Fetch请求,上送
LEO = 1
; - Leader维护所有Follower的LEO信息,取最小值作为HW,此时
HW = 1
并响应; - 正常情况下,Follower会接收到响应,更新自身信息:
LEO = 1, HW = 1
;
上述第7步,如果Follower还没接收到响应就挂掉了,此时它的LEO = 1, HW = 0
,那么Follower重启后,会依据HW来调整LEO,LEO会自动被调整为0,也就是说已经同步的消息会被从日志文件里删除。
接着,如果此时Leader也挂了,然后Follower当选为新Leader,由于它的HW = 0
,那么原来的Leader同步到数据后,会截断自己的日志,发生数据丢失。
1.2 数据不一致
我们再来看第二种情况——数据不一致,假设某分区一共有2个副本,一个Leader,一个Follower。生产者(Producer)的min.insync.replicas
参数设置为1,也就是说,生产者发送消息给Leader,Leader写Log成功后,生产者就会认为发送成功了:
- 生产者发送了2条消息给Leader,此时Leader的信息:
LEO = 2, HW = 0
; - Follower发送Fetch请求同步数据,先同步第一条,上送
LEO = 0
; - Leader会维护所有Follower的LEO信息,取最小值作为HW,此时
HW = 0
并响应; - Follower同步到消息,更新自身信息:
LEO = 1, HW = 0
; - Follower再次发送Fetch请求,同步第二条消息,上送
LEO = 1
; - Leader维护所有Follower的LEO信息,取最小值作为HW,此时
HW = 1
并响应; - 正常情况下,Follower会接受到响应,更新自身信息:
LEO = 1, HW = 1
;
上述第7步完成后,假设Leader宕机了,Follower成为新的Leader,此时它的HW = 1
,就一条数据,然后生产者又发了一条数据给新leader,此时HW = 2
,但是第二条数据是新的数据。接着老leader重启变为follower,这个时候发现两者的HW都是2。这个时候他俩数据是不一致的,本来合理的应该是新的follower要删掉自己原来的第二条数据,跟新leader同步的,让他们俩的数据一致,但是因为HW一样,所以就不会截断数据了。
二、解决方案
上述数据丢失的场景是一种非常极端的场景,一般只会在0.11.x
版本之前出现。0.11.x
版本时,Kafka引入了 Leader Epoch 机制。所谓Leader Epoch,大致可以理解为每个Leader的版本号,以及自己是从哪个offset开始写数据的,类似[epoch = 0, offset = 0]
。
三、ISR工作原理
Kafka到底是如何维护ISR列表的?什么样的Follower才有资格放到ISR列表里呢?
3.1 replica.lag.max.messages
在0.9.x
之前的版本里,Kafka Broker有一个核心的参数:replica.lag.max.messages
,默认值4000,表示如果Follower落后Leader的消息数量超过了这个参数值,就认为Follower是 out-of-sync,就会从ISR列表里移除。
我们通过一个例子来理解下,假设一个分区有3个副本:一个Leader,两个Follower,配置是:replica.lag.max.messages = 3
,min.insync.replicas = 2
,ack = -1
。
也就是说,生产者发送一条消息后,当ISR中至少存在2个副本(包含Leader)且这些副本都写成功后,生产者才会收到写入成功的响应。那么每个Follower会不断地发送Fetch请求拉取消息(上送自己的LEO),此时Kafka会判断Leader和Follower的LEO相差多少,如果差的数量超过了replica.lag.max.messages
参数值,就会把Follower踢出ISR列表。
存在的问题
replica.lag.max.messages
这一机制,在瞬间高并发访问的情况下会出现问题:比如Leader瞬间接收到几万条消息,然后所有Follower还没来得及同步过去,此时所有follower都会被踢出ISR列表,然后同步完成之后,又会再被加入ISR列表。
也就是说,这种依靠同步消息数量来判断Follower是否落后的机制,可能会导致在系统高峰时期,Follower被频繁踢出ISR列表,然后再回到ISR列表,这种操作是完全无意义的。
3.2 replica.lag.time.max.ms
Kafka从0.9.x
版本开始,引入了replica.lag.max.ms
参数,默认值10秒,表示如果某个Follower的LEO一直落后Leader超过了10秒,那么才判定这个Follower是 out-of-sync,就会从ISR列表里移除。
这样的话,即使出现瞬间的流量洪峰,一下子导致几个Follower都落后了不少数据,但是只要在限定的时间内尽快追上来,别一直落后,就不会认为是out-of-sync。
上面就是ISR的核心工作机制了,一般导致Follower同步数据较慢的原因主要有以下三种:
- Follower所在机器的性能变差,比如网络负载过高,I/O负载过高,CPU负载过高等等;
- Follower所在机器的Kafka Broker进程出现卡顿,最常见的就是发生了Full GC;
- 动态增加了Partition的副本,此时新加入的Follower会拼命从Leader上同步数据,但是这个是需要时间的,所以如果参数配置不当,会导致生产者hang等待同步完成。