2024-04-03
原文作者:吴声子夜歌 原文地址: https://blog.csdn.net/cold___play/article/details/132323130

consumer端设计

1、consumer group状态机

如前所述,新版本consumer依赖于broker端的组协调者(coordinator)来管理组内的所有consumer实例并负责把分配方案下发到每个consumer上。分配方案是由组内的leader consumer根据指定的分区分配策略指定的。该分配策略必须是组内所有consumer都支持的。事实上,如果所有consumer协调在一起无法选出共同的分区策略,那么coordinator就会抛出异常。这种设计就确保了每个consumer group总有一个一致性的分配策略,同时还能确保每个consumer只能为它拥有的分区提交位移。

Kafka社区刚刚推出分区分配的操作是在consumer端执行的而非broker端,这样做的好处主要有如下两点。

  • 便于维护与升级:如果在broker端实现,那么分配策略的变动势必要重启整个Kafka集群。生产环境中重启服务器的代价是很高的。
  • 便于实现自定义策略:不同的策略由不同的逻辑实现。coordinator端代码不容易实现灵活可定制的分配逻辑。

基于以上两点优势,新版consumer对group下所有成员的分区分配工作由consumer代码实现。这样做的另一个好处就是,解耦了组管理与分区分配,coordinator负责组管理工作,而consumer程序负责分区分配。

当前,Kafka为每个consumer group定义了5个状态,它们分别如下。

  • Empty:该状态表明group下没有任何active consumer,但可能包含位移信息。每个group创建时便处于Empty状态。若group工作了一段时间后所有consumer都离开组,
    那么group也会处于该状态。由于可能包含位移信息,处于此状态下的group依然可以响应OffsetFetch请求,即返回clients端对应的位移信息。
  • PreparingRebalance:该状态表明group正在准备进行group rebalance。此时,group己经“受理”了部分成员发送过来的JoinGroup请求,同时等待其他成员发送JoinGroup请求,直到所有成员都成功加入组或超时。由于该状态下的group依然可能保存有位移信息,因此clients依然可以发起OffsetFetch请求去获取位移,甚至还可以发起
    OffsetCommit请求去提交位移。
  • AwaitingSync:该状态表明所有成员都已经加入组并等待leader consumer发送分区分配方案。同样地,此时依然可以获取位移,但若提交位移,coordinator将会抛出
    REBALANCE_IN_PROGRESS异常来表明该group正在进行rebalance。
  • Stable:该状态表明group开始正常消费。此时group必须响应clients发送过来的任何请求,比如位移提交请求、位移获取请求、心跳请求等。
  • Dead:该状态表明group已经彻底废弃,group内没有任何active成员并且group的所有元数据信息都已被删除。处于此状态的group不会响应任何请求。严格来说,coordinator会返回UNKNOWN MEMBER ID异常。

任意时刻,所有consumer group必然处于上面5个状态中的一个。这5个状态之间的状态流转如图所示。当group首次创建时,coordinator会设置该group状态为Empty,当有新的成员加入组时,组状态变更为PreparingRebalance。此时group会等待一段时间让更多的组成员加入(在Kafka0.l0.l.0之后,这段时间由consumer端参数max.poll.interval…ms指定)。如果所有成员都及时地加入了组,那么组状态变更为AwaitingSync。此时leader consumer开始分配消费方案。

在分配方案确定后,leader consumer将分配方案以SyncGroup请求的方式发送给coordinator,然后coordinator把方案下发给下面的所有组成员。此时组状态进入到Stable,表明group开始正常消费数据。

当group处于Stable时,若所有成员都离组,那么此时group状态会首先调整为PreparingRebalance,然后变更为Empty,最后等待元数据过期被移除后group变更为Dead,以上就是一个consumer group典型的状态生命周期流转。下面根据每个状态详细讨论一下状态流转的条件。

202404032128016221.png

  • Empty与PreparingRebalance:Empty状态的group下没有任何active consumer。此时当有一个consumer加入时,Empty变为PreparingRebalance。反之,处于PreparingRebalance状态的group中,当所有成员都离开组时,PreparingRebalance变为Empty。
  • Empty与Dead:Empty状态下的group依然可能保存group元数据信息甚至位移信息。Kafka默认会在1天(这个时间可通过broker端参数offsets.retention.minutes配置)后
    删除这些数据。一旦这些数据被删除,则group进入Dead状态。
  • PreparingRebalance与AwaitingSync:在PreparingRebalance状态时,若成员在规定时间内(max.poll.interval.ms)完成加入组操作,那么group进入AwaitingSync状态。若有的组
    成员很慢,没能在这段时间内加入组,那么规定时间一过goup依然会进入AwaitingSync,当那个慢consumer加入组时,group又会重新变更为PreparingRebelance。因此在实际应用中一定要谨慎设置max.poll.interval.ms参数。对于处于AwaitingSync状态的group而言,当己加入成员崩溃、主动离组或元数据信息发生变更时,group也会重新进入PreparingRebalance.
  • AwaitingSync与Stable:在coordinator成功下发了leader consumer所做的分配方案后,group进入到Stable状态开始正常工作。
  • Stable与PreparingRebalance:group正常工作时,当有成员发生崩溃或主动离组,抑或是leader consumer重新加入组,再或是有成员元数据发生变更,则group会直接进入
    PreparingRebalance开启新一轮rebalance。在实际场景中,最常见的rebalance都是因为处于Stable状态的group下的consumer消费处理逻辑过重且session超时,从而被
    “踢出”group而导致的。
  • 其他状态与Dead:前面说过新版本consumer group的位移保存在内部topic-consumer offsets下的某个分区。当这个分区leader所在的broker发生了崩溃时,就必须对该分区进行迁移,从而导致coordinator的变更。当出现这种情况时,不论group处于哪种状态,都必须直接将group设置为Dead状态。

2、group管理协议

介绍完consumer的状态机,下面看看coordinator的组管理是如何工作的。coordinator的组管理协议由两个阶段构成,即组成员加入阶段和状态同步阶段。第一个阶段用于为goup指定active成员并从它们之中选出leader consumer第二个阶段则让leader consumer制定分配方案并同步到其他组成员中。从consumer的角度来看,第一个阶段是收集所有consumer的topic订阅信息,而第二个阶段则利用这些信息给每个consumer分配要消费的分区。每个group下
的leader consumer通常都是第一个加入group consumer。

2.1、加入组

这一阶段的主要目的就是让成员加入group。所用到的协议请求类型是JoinGroup。当确定了该group对应的coordinator之后,每个成员都要显式地发送JoinGroup请求给coordinator。该请求封装了consumer各自的订阅信息、成员id等元数据。coordinator会持有这些JoinGroup请求一段时间,直到所有组成员都发送了JoinGroup请求。此时,coordinator选择其中的一个consumer作为leader,然后给所有组成员发送对应的response.

coordinator拿到所有成员的JoinGroup请求后会去获取所有成员都支持的协议类型。如果有成员指定了一个与其他成员都不兼容的协议类型,则该成员将被拒绝加入组。值得注意的是,这里的协议类型不是通信协议,而是consumer group端支持的分配策略。举一个例子说明一下,l.0.0版本的Kafka默认支持3种分配策略(range、round-robin和sticky),如果有一个consumer指定了自定义的策略而其他consumer都不支持该策略,那么这个consumer就不被允许加入组。

coordinator处理JoinGroup请求后会把所有consum㎡er成员的元数据信息封装进一个数组,然后以JoinGroup response的方式发给group的leader consumer。切记,它只会给leader consumer发送这样的信息,给其他成员只会发送一个空数组。这样做的好处在于:①非leader
consumer本来也不需要知晓这部分信息;②极大地减少了网络I/O传输的开销。

leader consumer通过JoinGroup response知晓了group下所有成员的订阅情况,这样它就有足够的信息开始制定分配方案了。

2.2、同步组状态信息

group所有成员都加入组之后,leader consumer根据指定的分配策略进行分区的分配。当前最新版本的Kafka 1.0.0默认提供了3种分配策略,即,range、round-robin和sticky。sticky策略是Kafka社区于0.11.0.0版本最新推出的,可以最大限度地实现分区负载的均匀分配以及rebalance之后最少的分配变动。

在这一阶段中,group所有成员都需要显式地给coordinator发送SyncGroup请求。不过只有leader consumer的SyncGroup请求中会包含它的分配方案。coordinator接收到leader的SyncGroup请求后取出分配方案并单独抽取出每个consumer对应的分区,然后把分区封装进SyncGroup的response,发送给各个consumer。.这样每个consumer都只会得到属于自己的那一部分分区,而不会知晓其他consumer的分配方案。

在所有consumer成员都收到SyncGroup response之后,coordinator将group状态设置为Stable,此时组开始正常工作,每个成员按照coordinator发过来的方案开始消费指定的分区。

3、rebalance场景

3.1、新成员加入组

202404032128024662.png

3.2、成员发生崩溃

组成员崩溃和组成员主动离开是两种不同的场景。因为在崩溃时成员并不会主动告知coordinator此事,coordinator有可能需要一个完整的session.timeout周期才检测到这一崩溃,这必然会造成consumer的滞后。可以说离开组是主动地发起rebalance,而崩溃则是被动地发起rebalance,如图所示。

202404032128034113.png

3.3、成员主动离组

202404032128044684.png

3.4、成员提交位移

202404032128056285.png

阅读全文