重平衡
1、rebalance概览
consumer group的rebalance本质上是一组协议,它规定了一个consumer group是如何达成一致来分配订阅topic的所有分区的。假设某个组下有20个consumer实例,该组订阅了一个有着l00个分区的topic。正常情况下,Kafka会为每个consumer平均分配5个分区。这个分配过程就被称为rebalance。当consumer成功地执行rebalance后,组订阅topic的每个分区只会分配给组内的一个consumer实例。
和旧版本consumer依托于ZooKeeper进行rebalance不同,新版本consumer使用了Kafka内置的一个全新的组协调协议(group coordination protocol)。对于每个组而言,Kafka的某个broker会被选举为组协调者(group coordinator)。coordinator负责对组的状态进行管理,它的主要职责就是当新成员到达时促成组内所有成员达成新的分区分配方案,即coordinator负责对组执行rebalance操作。
2、rebalance触发条件
- 组成员发生变更,比如新consumer加入组,或已有consumer主动离开组,再或是已有consumer崩溃时则触发rebalance。
- 组订阅opc数发生变更,比如使用基于正则表达式的订阅,当匹配正则表达式的新topic被创建时则会触发rebalance。
- 组订阅topic的分区数发生变更,比如使用命令行脚本增加了订阅topic的分区数。
真实应用场景中引发rebalance最常见的原因就是违背了第一个条件,特别是consumer崩溃的情况。这里的崩溃不一定就是指consumer进程“挂掉”或consumer进程所在的机器宕机。当consumer无法在指定的时间内完成消息的处理,那么coordinator就认为该consumer已经崩溃,从而引发新一轮rebalance。生产环境中用户一定要结合自身业务特点仔细调优consumer参数request.timeout.ms
、max.poll.records
和max.poll.interval.ms
,以避免不必要的rebalance出现。
3、rebalance分区分配
在rebalance时group下所有的consumer都会协调在一起共同参与分区分配。Kafka新版本consumer默认提供了3种分配策略,分别是range策略、round-robin策略和sticky策略。
所谓的分配策略决定了订阅topic的每个分区会被分配给哪个consumer。
- range策略主要是基于范围的思想。它将单个topc的所有分区按照顺序排列,然后把这些分区划分成固定大小的分区段并依次分配给每个consumer;
- round-robin策略则会把所有topic的所有分区顺序摆开,然后轮询式地分配给各个consumer。
- sticky策略有效地避免了上述两种策略完全无视历史分配方案的缺陷,采用了“有黏性”的策略对所有consumer实例进行分配,可以规避极端情况下的数据倾斜并且在两次rebalance间最大限度地维持了之前的分配方案。
通常意义上认为,如果group下所有consumer实例的订阅是相同,那么使用round-robin会带来更公平的分配方案,否则使用range策略的效果更好。此外,sticky策略在0.11.0.0版本才被引入,故目前使用的用户并不多。新版本consumer默认的分配策略是range。用户根据consumer参数partition.assignment.strategy
来进行设置。另外Kafka支持自定义的分配策略,用户可以创建自己的consumer分配器(assignor)。
针对rebalance过程中的分区分配,下面举一个简单的例子,加以说明。假设目前某个consumer group下有两个consumer:A和B。当第3个成员C加入时,满足了前面谈到的第一个触发条件,因此coordinator会执行rebalance,并根据range分配策略重新为A、B和C分配分区,如图所示。
由此可见,原先A和B分别处理3个分区的数据,rebalance之后A、B和C各自承担2个分区的消费,可以说这个分配方案非常公平,每个consumer上的负载是相同的。
4、rebalance generation
某个consumer group可以执行任意次rebalance。为了更好地隔离每次rebalance上的数据,新版本consumer设计了rebalance generation用于标识某次rebalance。generation这个词类似于JVM分代垃圾收集器中“分代”(严格来说,JVM GC使用的是generational)的概念。表示rebalance之后的一届成员,在consumer中它是一个整数,通常从0开始。
Kafka引入consumer generation主要是为了保护consumer group的,特别是防止无效offset提交。比如上一届的consumer成员由于某些原因延迟提交了offset,但rebalance之后该group产生了新一届的group成员,而这次延迟的offset提交携带的是旧的generation信息,因此这次提交会
被consumer group拒绝。很多Kafka用户在使用consumer时经常碰到的ILLEGAL GENERATION异常就是这个原
因导致的。事实上,每个group进行rebalance之后,generation号都会加l,表示group进入了一个新的版本。如图所示,Generation1时group有3个成员,随后成员2退出组,coordinator触发rebalance,consumer group进入到Generation2时代,之后成员4加入,再次触发rebalance,group进入到Generation3时代。
5、rebalance协议
前面提到过rebalance本质上是一组协议。group与coordinator共同使用这组协议完成group的rebalance。最新版本Kafka中提供了下面5个协议来处理rebalance相关事宜。
JoinGroup请求
:consumer请求加入组SyncGroup请求
:group leader把分配方案同步更新到组内所有成员中Heartbeat请求
:consumer定期向coordinator汇报心跳表明自己依然存货LeaveGroup请求
:consumer主动通知coordinator该consumer即将离组DescribeGroup请求
:查看组的所有信息,包括成员信息、协议信息、分配方案以及订阅信息等。该请求类型主要供管理员使用。coordinator不使用该请求执行rebalance。
在rebalance过程中,coordinator主要处理consumer发过来的JoinGroup和SyncGroup请求。当consumer主动离组时会发送LeaveGroup请求给coordinator。
在成功rebalance之后,组内所有consumer都需要定期地向coordinator发送Heartbeat请求。而每个consumer也是根据Heartbeat请求的响应中是否包含REBALANCE IN PROGRESS来判断当前group是否开启了新一轮rebalance。
6、rebalance流程
consumer group在执行rebalance之前必须首先确定coordinator所在的broker,并创建与该broker相互通信的Socket连接。确定coordinator的算法与确定offset被提交到__consumer_offsets
目标分区的算法是相同的。算法如下:
- 计算
Math.abs(groupID.hashCode) % offsets.topic.num.partitions
参数值(默认是50),假设是10 - 寻找
__consumer_offsets
分区10的leader副本所在的broker,该即为这个group的coordinator。
成功连接coordinator之后便可以执行rebalance操作。目前rebalance主要分为两步:加入组和同步更新分配方案。
加入组
:这一步中组内所有consumer(即group.id相同的所有consumer实例)向coordinator发送JoinGroup请求。当收集全JoinGroup请求后,coordinator从中选择一个consumer担任group的leader,并把所有成员信息以及它们的订阅信息发送给leader。特别需要注意的是,group的leader和coordinator不是一个概念。leader是某个
consumer实例,coordinator通常是Kafka集群中的一个broker。另外leader而非coordinator负责为整个group的所有成员制定分配方案。同步更新分配方案
:这一步中leader开始制定分配方案,即根据前面提到的分配策略决定每个consumer都负责哪些topic的哪些分区。一旦分配完成,leader会把这个分配方案封装进SyncGroup请求并发送给coordinator。比较有意思的是,组内所有成员都会发送SyncGroup请求,不过只有leader发送的SyncGroup请求中包含了分配方案。
coordinator接收到分配方案后把属于每个consumer的方案单独抽取出来作为SyncGroup请求的response返还给各自的consumer。
consumer group分配方案是在consumer端执行的。Kafka将这个权力下放给客户端主要是因为这样做可以有更好的灵活性。比如在这种机制下用户可以自行实现类似于Hadoop那样的机架感知(rack-aware)分配方案。同一个机架上的分区数据被分配给相同机架上的consumer,减少网络传输的开销。而且,即使以后分区策略发生了变更,也只需要重启consumer应用即可,不必重启Kafka服务器。