consumer端设计
1、consumer group状态机
如前所述,新版本consumer依赖于broker端的组协调者(coordinator)来管理组内的所有consumer实例并负责把分配方案下发到每个consumer上。分配方案是由组内的leader consumer根据指定的分区分配策略指定的。该分配策略必须是组内所有consumer都支持的。事实上,如果所有consumer协调在一起无法选出共同的分区策略,那么coordinator就会抛出异常。这种设计就确保了每个consumer group总有一个一致性的分配策略,同时还能确保每个consumer只能为它拥有的分区提交位移。
Kafka社区刚刚推出分区分配的操作是在consumer端执行的而非broker端,这样做的好处主要有如下两点。
- 便于维护与升级:如果在broker端实现,那么分配策略的变动势必要重启整个Kafka集群。生产环境中重启服务器的代价是很高的。
- 便于实现自定义策略:不同的策略由不同的逻辑实现。coordinator端代码不容易实现灵活可定制的分配逻辑。
基于以上两点优势,新版consumer对group下所有成员的分区分配工作由consumer代码实现。这样做的另一个好处就是,解耦了组管理与分区分配,coordinator负责组管理工作,而consumer程序负责分区分配。
当前,Kafka为每个consumer group定义了5个状态,它们分别如下。
Empty
:该状态表明group下没有任何active consumer,但可能包含位移信息。每个group创建时便处于Empty状态。若group工作了一段时间后所有consumer都离开组,
那么group也会处于该状态。由于可能包含位移信息,处于此状态下的group依然可以响应OffsetFetch请求,即返回clients端对应的位移信息。PreparingRebalance
:该状态表明group正在准备进行group rebalance。此时,group己经“受理”了部分成员发送过来的JoinGroup请求,同时等待其他成员发送JoinGroup请求,直到所有成员都成功加入组或超时。由于该状态下的group依然可能保存有位移信息,因此clients依然可以发起OffsetFetch请求去获取位移,甚至还可以发起
OffsetCommit请求去提交位移。AwaitingSync
:该状态表明所有成员都已经加入组并等待leader consumer发送分区分配方案。同样地,此时依然可以获取位移,但若提交位移,coordinator将会抛出
REBALANCE_IN_PROGRESS异常来表明该group正在进行rebalance。Stable
:该状态表明group开始正常消费。此时group必须响应clients发送过来的任何请求,比如位移提交请求、位移获取请求、心跳请求等。Dead
:该状态表明group已经彻底废弃,group内没有任何active成员并且group的所有元数据信息都已被删除。处于此状态的group不会响应任何请求。严格来说,coordinator会返回UNKNOWN MEMBER ID异常。
任意时刻,所有consumer group必然处于上面5个状态中的一个。这5个状态之间的状态流转如图所示。当group首次创建时,coordinator会设置该group状态为Empty,当有新的成员加入组时,组状态变更为PreparingRebalance。此时group会等待一段时间让更多的组成员加入(在Kafka0.l0.l.0之后,这段时间由consumer端参数max.poll.interval…ms指定)。如果所有成员都及时地加入了组,那么组状态变更为AwaitingSync。此时leader consumer开始分配消费方案。
在分配方案确定后,leader consumer将分配方案以SyncGroup请求的方式发送给coordinator,然后coordinator把方案下发给下面的所有组成员。此时组状态进入到Stable,表明group开始正常消费数据。
当group处于Stable时,若所有成员都离组,那么此时group状态会首先调整为PreparingRebalance,然后变更为Empty,最后等待元数据过期被移除后group变更为Dead,以上就是一个consumer group典型的状态生命周期流转。下面根据每个状态详细讨论一下状态流转的条件。
Empty与PreparingRebalance
:Empty状态的group下没有任何active consumer。此时当有一个consumer加入时,Empty变为PreparingRebalance。反之,处于PreparingRebalance状态的group中,当所有成员都离开组时,PreparingRebalance变为Empty。Empty与Dead
:Empty状态下的group依然可能保存group元数据信息甚至位移信息。Kafka默认会在1天(这个时间可通过broker端参数offsets.retention.minutes配置)后
删除这些数据。一旦这些数据被删除,则group进入Dead状态。PreparingRebalance与AwaitingSync
:在PreparingRebalance状态时,若成员在规定时间内(max.poll.interval.ms)完成加入组操作,那么group进入AwaitingSync状态。若有的组
成员很慢,没能在这段时间内加入组,那么规定时间一过goup依然会进入AwaitingSync,当那个慢consumer加入组时,group又会重新变更为PreparingRebelance。因此在实际应用中一定要谨慎设置max.poll.interval.ms参数。对于处于AwaitingSync状态的group而言,当己加入成员崩溃、主动离组或元数据信息发生变更时,group也会重新进入PreparingRebalance.AwaitingSync与Stable
:在coordinator成功下发了leader consumer所做的分配方案后,group进入到Stable状态开始正常工作。Stable与PreparingRebalance
:group正常工作时,当有成员发生崩溃或主动离组,抑或是leader consumer重新加入组,再或是有成员元数据发生变更,则group会直接进入
PreparingRebalance开启新一轮rebalance。在实际场景中,最常见的rebalance都是因为处于Stable状态的group下的consumer消费处理逻辑过重且session超时,从而被
“踢出”group而导致的。其他状态与Dead
:前面说过新版本consumer group的位移保存在内部topic-consumer offsets下的某个分区。当这个分区leader所在的broker发生了崩溃时,就必须对该分区进行迁移,从而导致coordinator的变更。当出现这种情况时,不论group处于哪种状态,都必须直接将group设置为Dead状态。
2、group管理协议
介绍完consumer的状态机,下面看看coordinator的组管理是如何工作的。coordinator的组管理协议由两个阶段构成,即组成员加入阶段和状态同步阶段。第一个阶段用于为goup指定active成员并从它们之中选出leader consumer第二个阶段则让leader consumer制定分配方案并同步到其他组成员中。从consumer的角度来看,第一个阶段是收集所有consumer的topic订阅信息,而第二个阶段则利用这些信息给每个consumer分配要消费的分区。每个group下
的leader consumer通常都是第一个加入group consumer。
2.1、加入组
这一阶段的主要目的就是让成员加入group。所用到的协议请求类型是JoinGroup。当确定了该group对应的coordinator之后,每个成员都要显式地发送JoinGroup请求给coordinator。该请求封装了consumer各自的订阅信息、成员id等元数据。coordinator会持有这些JoinGroup请求一段时间,直到所有组成员都发送了JoinGroup请求。此时,coordinator选择其中的一个consumer作为leader,然后给所有组成员发送对应的response.
coordinator拿到所有成员的JoinGroup请求后会去获取所有成员都支持的协议类型。如果有成员指定了一个与其他成员都不兼容的协议类型,则该成员将被拒绝加入组。值得注意的是,这里的协议类型不是通信协议,而是consumer group端支持的分配策略。举一个例子说明一下,l.0.0版本的Kafka默认支持3种分配策略(range、round-robin和sticky),如果有一个consumer指定了自定义的策略而其他consumer都不支持该策略,那么这个consumer就不被允许加入组。
coordinator处理JoinGroup请求后会把所有consum㎡er成员的元数据信息封装进一个数组,然后以JoinGroup response的方式发给group的leader consumer。切记,它只会给leader consumer发送这样的信息,给其他成员只会发送一个空数组。这样做的好处在于:①非leader
consumer本来也不需要知晓这部分信息;②极大地减少了网络I/O传输的开销。
leader consumer通过JoinGroup response知晓了group下所有成员的订阅情况,这样它就有足够的信息开始制定分配方案了。
2.2、同步组状态信息
group所有成员都加入组之后,leader consumer根据指定的分配策略进行分区的分配。当前最新版本的Kafka 1.0.0默认提供了3种分配策略,即,range、round-robin和sticky。sticky策略是Kafka社区于0.11.0.0版本最新推出的,可以最大限度地实现分区负载的均匀分配以及rebalance之后最少的分配变动。
在这一阶段中,group所有成员都需要显式地给coordinator发送SyncGroup请求。不过只有leader consumer的SyncGroup请求中会包含它的分配方案。coordinator接收到leader的SyncGroup请求后取出分配方案并单独抽取出每个consumer对应的分区,然后把分区封装进SyncGroup的response,发送给各个consumer。.这样每个consumer都只会得到属于自己的那一部分分区,而不会知晓其他consumer的分配方案。
在所有consumer成员都收到SyncGroup response之后,coordinator将group状态设置为Stable,此时组开始正常工作,每个成员按照coordinator发过来的方案开始消费指定的分区。
3、rebalance场景
3.1、新成员加入组
3.2、成员发生崩溃
组成员崩溃和组成员主动离开是两种不同的场景。因为在崩溃时成员并不会主动告知coordinator此事,coordinator有可能需要一个完整的session.timeout周期才检测到这一崩溃,这必然会造成consumer的滞后。可以说离开组是主动地发起rebalance,而崩溃则是被动地发起rebalance,如图所示。