副本与ISR设计
一个Kafka分区本质上就是一个备份日志,即利用多份相同的备份共同提供冗余机制来保持系统高可用性。这些备份在Kafka中被称为副本(replica)。
Kafka把分区的所有副本均匀地分配到所有broker上,并从这些副本中挑选一个作为leader副本对外提供服务,而其他副本被称为follower副本,只能被动地向leader副本请求数据,从而保持与leader副本的同步。
假如leader副本永远工作正常,那么其实不需要follower副本。但现实总是残酷的,Kafka leader副本所在的broker可能因为各种各样的原因而随时宕机。一旦发生这种情况,
follower副本会竞相争夺成为新leader的权力。显然不是所有的follower都有资格去竞选leader。follower被动地向leader请求数据。对于那些落后leader进度太多的follower而言,它们是没有资格竞选leader的,毕竟它们手中握有的数据太旧了,如果允许它们成为leader,会造成数据丢失,而这对clients而言是灾难性的。鉴于这个原因,Kafka引入了ISR的概念。
所谓ISR,就是Kafka集群动态维护的一组同步副本集合(in-sync replicas)。每个topic分区都有自己的ISR列表,ISR中的所有副本都与leader保持同步状态。值得注意的是,leader副本总是包含在ISR中的,只有ISR中的副本才有资格被选举为leader。而producer写入的一条Kafka消息只有被ISR中的所有副本都接收到,才被视为“已提交”状态。由此可见,若ISR中有N个副本,那么该分区最多可以忍受N-1个副本崩溃而不丢失已提交消息。
1、follower副本同步
follower副本只做一件事情:向leader副本请求数据。相关概念如下:
起始位移(base offset)
:表示该副本当前所含第一条消息的offset。高水印值(high watermark,HW)
:副本高水印值。它保存了该副本最新一条己提交消息的位移。leader分区的HW值决定了副本中已提交消息的范围,也确定了consumer能够获取的消息上限,超过HW值的所有消息都被视为“未提交成功的”,因而consumer是看不到的。另外值得注意的是,不是只有leader副本才有HW值。实际上每个follower副本都有HW值,只不过只有leader副本的HW值才能决定clients能看到的消息数量罢了。日志末端位移(log end offset,.LEO)
:副本日志中下一条待写入消息的offset。所有副本都需要维护自己的LEO信息。每当leader副本接收到producer端推送的消息,它会更新自己的LEO(通常是加1)。同样,follower副本向leader副本请求到数据后也会增加自己的LEO。事实上只有ISR中的所有副本都更新了对应的LEO之后,leader副本才会向右移动HW值表明消息写入成功。整个流程如图所示。
为了形象化地说明图的流程,下面结合一个具体的示例来说明。假设图中的Kafka集群当前只有一个topic,该topic只有一个分区,分区共有3个副本,因此ISR中也是这3个副本。该topc当前没有任何数据。由于没有任何数据,因此3个副本的LEO都是0,HW值是0。
现有一个producer向broker1所在的leader副本发送了一条消息:
- broker1上的leader副本接收到消息,把自己的LEO值更新为1
- broker2和broker3上的follower副本各自发送请求给broker1
- broker1分别把该消息推送给follower副本
- follower副本接收到消息后各自更新自己的LEO为1
- leader副本接收到其他follower副本的数据请求响应(response)之后,更新HW值为1.此时位移为0的这条消息可以被consumer消费。
对于设置了acks=-1的producer而言,只有完整地做完上面所有的5步操作,producer才能正常返回。
2、ISR设计
对于如何界定ISR,不同Kafka版本的界定方法不同,大体上可分为0.9.0.0版本之前和0.9.0.0版本(含)之后两种方法。鉴于现在依然有很多用户在使用Kafka0.8.2.x版本,下面首
先介绍在这个版本中是如何判定ISR的。
2.1、0.9.0.0版本之前
0.9.0.0版本之前,Kafka提供了一个参数replica.lag.max.messages
,用于控制follower副本落后leader副本的消息数。一旦超过这个消息数,则视为该follower为“不同步”状态,从而需要被Kafka“踢出”ISR。
我们举一个实际的例子。假设有一个单分区的topc,副本数是3,这3个副本分别保存在broker1、broker2和broker3上。leader副本在brokerl上,其他两个broker上的副本都是
follower副本。现设置replica.lag.max.messages为4,此时有一个producer每次都给这个topic发送3条消息,那么初始状态如图所示。
初始状态下所有follower副本都是和leader副本同步的,所有follower都能追上leader的LEO。现在假设producer生产了1条消息给leader,而broker.3上的follower副本经历了一次
Full GC,这时日志的状态则如图所示。
此时,更新之后leader的LEO不再与HW值相等,但最新生产的这条消息不会被认为“已提交”,除非broker3.上的follower副本被“踢出”ISR或者追上leader的LEO。由于
replica.lag.max.messages被设置为4,而broker.3上的follower只落后1条消息,并不满足“不同步”条件,因此不会从ISR中移除。对于broker3.上的副本而言,只需要追上leader的LEO。如果我们假设broker3在执行Full GC停顿了l00毫秒之后重新追上了leader的进度,那么此时的日志状态如图所示。
好了,现在一切都恢复正常了。leader的HW值与LEO再次重叠,而两个follower也与leader同步。那么,除了上面举例说明的GC,还可能有哪些原因导致follower与leader不同步呢?归纳起来主要有如下3个原因。
- 请求速度追不上:follower副本在一段时间内都无法追上leader副本端的消息接收速度。比如follower副本所在broker的网络I/O开销过大导致备份消息的速度持续慢于从
leader处获取消息的速度。 - 进程卡住:follower在一段时间内无法向leader请求数据,比如之前提到的频繁GC或程序bug等。
- 新创建的副本:如果用户增加了副本数,那么新创建的follower副本在启动后全力追赶leader进度。在追赶进度这段时间内通常都是与leader不同步的。
上面的replica.lag.max.messages参数便是用于检测第一种情况的。另外,0.9.0.0版本之前还提供了另一个参数replica.lag.time.max.ms用于检测另外两种情况。比如设置replica.lag.time.max.ms为500毫秒,若follower副本无法在500毫秒内向leader请求数据,那么该follower就会被视为“不同步”,即会被踢出ISR。
0.9.0.0版本之前的这种IS方案在设计上有一些固有的缺陷。为了说明我们依然使用之前提到的例子。如果producer一次性发送消息的速度是2条/秒,即每个producer batch都包含2条消息。显然,此时设置replica…lag.max.messages=-4是相当安全且合适的数值。为什么?因为在leader副本接收到producer发送过来的消息之后且follower副本开始备份这些消息之前,follower副本落后leader的消息数不会超过3条。但如果follower副本落后leader的消息数超过3条,那么我们肯定希望leader把这个特别慢的follower副本踢出ISR以防止增加producer消息生产的延时。从这个简单的例子来看,这个参数似乎工作得很好,为什么说它是有缺陷的呢?
根本原因在于如果要正确设置这个参数的值,需要用户结合具体使用场景评估,而没有统一的设置方法。下面详细解释一下根本原因。首先,对于一个参数的设置,有一点是很重要的:用户应该对他们知道的参数进行设置,而不是对他们需要进行猜测的参数进行设置。对于该参数来
说,我们只能猜测应该设置成哪些值,而不是根据需要对其进行设置。为什么?举一个例子,假设在刚才那个topic的环境中,producer程序突然发起了一波消息生产的瞬时高峰流量,比如producer一次性发送4条消息,也就是说,消息数与replica.lag.max.messages值相等。此时,这两
个follower副本都会被认为与leader副本不同步,从而被踢出ISR,具体日志状态如图所示。
这两个follower副本与leader不再同步,但其实它们都处于存活状态(alive)且没有任何性能问题,下次FetchRequest时它们就能追上leader的LEO,并重新被加入ISR。于是就出现了这样的情况:它们不断地被踢出ISR,然后重新加回ISR,造成了与leader不同步、再同步、又不同步、再次同步的情况发生。想想就知道这是多大的开销!问题的关键就在
replica.lag.max.messages这个参数上。用户通过猜测设置该值,猜测producer的速度,猜测leader副本的入站流量。
可能有用户会说该参数默认值是4000就应该足够使用了。但有一点需要注意的是,这个参数是全局的!即所有topic都受到这个参数的影响。假设集群中有两个topic:tl和t2。若它们的流量差异非常巨大,t1的消息生产者一次性生产5000条消息,直接就突破了4000这个默认值;而另一个topict2,它的消息生产者一次性生产10条消息,那么Kafka就需要相当长的时间才能辨别出t2各个分区中那些滞后的副本。一旦出现有broker崩溃的情况,极易造成状态的不一致。
鉴于这些原因,在0.9.0.0版本及之后,Kafka社区优化了ISR方案的设计。
2.2、0.9.0.0版本之后
自0.9.0.0版本之后,Kafka去掉了之前的replica.lag.max.messages参数,改用统一的参数
同时检测由于慢以及进程卡壳而导致的滞后(lagging)一即follower副本落后leader副本的时间间隔。这个唯一的参数就是replica.lag.time.max.ms
,默认值是l0秒。对于“请求速度追不上”的情况,检测机制也发生了变化一如果一个follower副本落后leader的时间持续性地超过了这个参数值,那么该follower副本就是“不同步”的。这样即使出现刚刚提到的producer瞬时峰值流量,只要follower不是持续性落后,它就不会反复地在ISR中移进、移出。