再均衡原理
新版的消费者客户端对此进行了重新设计,将全部消费组分成多个子集,每个消费组的子集在服务端对应一个GroupCoordinator对其进行管理,GroupCoordinator是Kafka服务端中用于管理消费组的组件。而消费者客户端中的ConsumerCoordinator组件负责与GroupCoordinator进行交互。
ConsumerCoordinator与GroupCoordinator之间最重要的职责就是负责执行消费者再均衡的操作,包括前面提及的分区分配的工作也是在再均衡期间完成的。就目前而言,一共有如下几种情形会触发再均衡的操作:
- 有新的消费者加入消费组。
- 有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向GroupCoordinator发送心跳等情况时,GroupCoordinator会认为消费者已经下线。
- 有消费者主动退出消费组(发送LeaveGroupRequest请求)。比如客户端调用了unsubscrible()方法取消对某些主题的订阅。
- 消费组所对应的GroupCoorinator节点发生了变更。
- 消费组内所订阅的任一主题或者主题的分区数量发生变化。
下面就以一个简单的例子来讲解一下再均衡操作的具体内容。当有消费者加入消费组时,消费者、消费组及组协调器之间会经历一下几个阶段。
1、第一阶段(FIND_COORDINATOR)
消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker,并创建与该broker相互通信的网络连接。如果消费者已经保存了与消费组对应的GroupCoordinator节点的信息,并且与它之间的网络连接是正常的,那么就可以进入第二阶段。否则,就需要向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的GroupCoordinator,这里的“某个节点”并非是集群中的任意节点,而是负载最小的节点。
如图所示,FindCoordinatorRequest请求体中只有两个域(Field):coordinator key和coordinator_type。coordinator key在这里就是消费组的名称,即groupld,coordinator type置为0。这个FindCoordinatorRequest请求还会在Kafka事务:
Kafka在收到FindCoordinatorRequest请求之后,会根据coordinator key(也就是groupId)查找对应的GroupCoordinator节点,如果找到对应的GroupCoordinator则会返回其相对应的node id、host和port信息。
具体查找GroupCoordinator的方式是先根据消费组groupId的哈希值计算_consumer_offsets中的分区编号,具体算法如下所示:
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
其中groupId.hashCode就是使用Java中String类的hashCode()方法获得的,groupMetadataTopicPartitionCount为主题consumer offsets的分区个数,这个可以通过broker端参数offsets.topic.num.partitions来配置,默认值为50。
找到对应的_consumer_offsets中的分区之后,再寻找此分区leader副本所在的broker节点,该broker节点即为这个groupld所对应的GroupCoordinator节点。消费者groupId最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给此分区leader副本所在的broker节点,让此broker节点既扮演GroupCoordinator的角色,又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。
2、第二阶段(JOIN_GROUP)
在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。
JoinGroupRequest的结构包含多个域:
- group_id就是消费组的id,通常也表示为groupId。
- session timout对应消费端参数session.timeout.ms,默认值为l0000,即l0秒。GroupCoordinator超过session timeout指定的时间内没有收到心跳报文则认为此消费者已经下线。
- rebalance timeout对应消费端参数max.po1l.interval.ms,
默认值为300000,即5分钟。表示当消费组再平衡的时候,GroupCoordinator等待各个消费者重新加入的最长等待时间。 - member id表示GroupCoordinator分配给消费者的id标识。消费者第一次发送JoinGroupRequest请求的时候此字段设置为null。
- protocol_type表示消费组实现的协议,对于消费者而言此字段值为“consumer”
JoinGroupRequest中的group protocols域为数组类型,其中可以囊括多个分区分配策略,这个主要取决于消费者客户端参数partition.assignment.strategy的配置。如果配置了多种策略,那么JoinGroupRequest中就会包含多个protocol name和protocol metadata。其中protocol name对应于PartitionAssignor接口中的name()方法。而protocol metadata和PartitionAssignor接口中的subscription()方法有直接关系,protocol metadata是一个bytes类型,其实质上还可以更细粒度地划分为version、topics和user_data,如图所示。
version占2个字节,目前其值固定为0;topics对应PartitionAssignor接口的subscription()方法返回值类型Subscription中的topics,代表一个主题列表;user data对应Subscription中的userData,可以为空。
如果是原有的消费者重新加入消费组,那么在真正发送JoinGroupRequest请求之前还要执行一些准备工作:
- 如果消费端参数enable.auto.commit设置为true(默认值也为true),即开启自动提交位移功能,那么在请求加入消费组之前需要向GroupCoordinator提交消费位移。这个过
程是阻塞执行的,要么成功提交消费位移,要么超时。 - 如果消费者添加了自定义的再均衡监听器(ConsumerRebalanceListener),那么此时会调用onPartitionsRevoked()方法在重新加入消费组之前实施自定义的规则逻辑,比如清除一些状态,或者提交消费位移等。
- 因为是重新加入消费组,之前与GroupCoordinator节点之间的心跳检测也就不需要了,所以在成功地重新加入消费组之前需要禁止心跳检测的运作。
消费者在发送JoinGroupRequest请求之后会阻塞等待Kafka服务端的响应。服务端在收到JoinGroupRequest请求后会交由GroupCoordinator来进行处理。GroupCoordinator首先会对JoinGroupRequest请求做合法性校验,比如group id是否为空、当前broker节点是否是请求的消费者组所对应的组协调器、rebalance timeout的值是否在合理的范围之内。如果消费者是第一次请求加入消费组,那么JoinGroupRequest请求中的member id值为null,即没有
它自身的唯一标志,此时组协调器负责为此消费者生成一个member id。这个生成的算法很简单,具体如以下伪代码所示。
String memberId = clientId + "-" + UUID.randomUUID().toString();
其中clientId为消费者客户端的clientld,对应请求头中的client id。由此可见消费者的member id由clientId和UUID用“.”字符拼接而成。
2.1、选举分区分配策略
每个消费者都可以设置自己的分区分配策略,对消费组而言需要从各个消费者呈报上来的各个分配策略中选举一个彼此都“信服”的策略来进行整体上的分区分配。这个分区分配的选举并非由leader消费者决定,而是根据消费组内的各个消费者投票来决定的。这里所说的“根据组内的各个消费者投票来决定”不是指GroupCoordinator还要再与各个消费者进行进一步交互,而是根据各个消费者呈报的分配策略来实施。最终选举的分配策略基本上可以看作被各个
消费者支持的最多的策略,具体的选举过程如下:
- 收集各个消费者支持的所有分配策略,组成候选集candidates。
- 每个消费者从候选集candidates中找出第一个自身支持的策略,为这个策略投上一票。
- 计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
如果有消费者并不支持选出的分配策略,那么就会报出异常IllegalArgumentException:Member does not support protocol。需要注意的是,这里所说的“消费者所支持的分配策略”是指partition.assignment.strategy参数配置的策略,如果这个参数值只配置了RangeAssignor,那么这个消费者客户端只支持RangeAssignor分配策略,而不是消费者客户端代码中实现的3种分配策略及可能的自定义分配策略。
在此之后,Kafka服务端就要发送JoinGroupResponse响应给各个消费者,leader消费者和其他普通消费者收到的响应内容并不相同,首先我们看一下JoinGroupResponse的具体结构,如图所示。
JoinGroupResponse包含了多个域,其中generation id用来标识当前消费组的年代信息,避免受到过期请求的影响。leader id表示消费组leader消费者的member id。
Kafka发送给普通消费者的JoinGroupResponse中的members内容为空,而只有leader消费者的JoinGroupResponse中的members包含有效数据。members为数组类型,其中包含各个成员信息。member metadata为消费者的订阅信息,与JoinGroupRequest中的
protocol_metadata内容相同,不同的是JoinGroupRequest可以包含多个<protocol_name,protocol metadata>的键值对,在收到JoinGroupRequest之后,GroupCoordinator已经选举出唯一的分配策略。也就是说,protocol name已经确定(group protocol),那么对应的protocol metadata也就确定了,最终各个消费者收到的JoinGroupResponse响应中的member metadata就是这个确定了的protocol metadata。由此可见,Kafka把分区分配的具体分配交还给客户端,自身并不参与具体的分配细节,这样即使以后分区分配的策略发生了变更,也只需要重启消费端的应用即可,而不需要重启服务端。
3、第三阶段(SYNC_GROUP)
leader消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,此时leader消费者并不是直接和其余的普通消费者同步分配方案,而是通过GroupCoordinator这个“中间人”来负责转发同步分配方案的。在第三阶段,也就是同步阶段,各个消费者会向GroupCoordinator发送SyncGroupRequest请求来同步分配方案,如图所示。
我们再来看一下SyncGroupRequest请求的具体结构,如图所示。SyncGroupRequest中的group_id、generation_id和member_.id前面都有涉及,这里不再赘述。只有leader
消费者发送的SyncGroupRequest请求中才包含具体的分区分配方案,这个分配方案保存在group_assignment中,而其余消费者发送的SyncGroupRequest请求中的group_assignment为空。
group_assignment是一个数组类型,其中包含了各个消费者对应的具体分配方案:member id表示消费者的唯一标识,而member assignment是与消费者对应的分配方案,
它还可以做更具体的划分,member assignment的结构如图所示。
与JoinGroupRequest请求中的protocol metadata类似,都可以细分为3个更具体的字段,只不过protocol metadata存储的是主题的列表信息,而member assignment存储的是分区信息,member assignment中可以包含多个主题的多个分区信息。
服务端在收到消费者发送的SyncGroupRequest清求之后会交由GroupCoordinator来负责具体的逻辑处理。GroupCoordinator同样会先对SyncGroupRequest请求做合法性校验,在此之后会将从leader消费者发送过来的分配方案提取出来,连同整个消费组的元数据信息一起存入Kafka的consumer offsets主题中,最后发送响应给各个消费者以提供给各个消费者各自所属的分配方案。
这里所说的响应就是指SyncGroupRequest请求对应的SyncGroupResponse,SyncGroupResponse的内容很简单,里面包含的就是消费者对应的所属分配方案,SyncGroupResponse的结构如图所示,具体字段的释义可以从前面的内容中推测出来,这里就不赘述了。
当消费者收到所属的分配方案之后会调用PartitionAssignor中的onAssignment()方法。随后再调用ConsumerRebalanceListener中的OnPartitionAssignedO方法。之后开启心跳任务,消费者定期向服务端的GroupCoordinator发送HeartbeatRequest来确定彼此在线。
3.1、消费组元数据信息
我们知道消费者客户端提交的消费位移会保存在Kafka的consumer offsets主题中,这里也一样,只不过保存的是消费组的元数据信息(GroupMetadata)。具体来说,每个消费组的元数据信息都是一条消息,不过这类消息并不依赖于具体版本的消息格式,因为它只定义了消息中的key和value字段的具体内容,所以消费组元数据信息的保存可以做到与具体的消息格式无关。
图中对应的就是消费组元数据信息的具体内容格式,上面是消息的ky,下面是消息的value。可以看到key和value中都包含version字段,用来标识具体的key和value的版本信息,不同的版本对应的内容格式可能并不相同,就目前版本而言,key的version为2,而value的version为l,读者在理解时其实可以忽略这个字段而探究其他具备特定含义的内容。key中除了version就是group字段,它表示消费组的名称,和JoinGroupRequest或SyncGroupRequest请求中的group id是同一个东西。虽然key中包含了version字段,
但确定这条信息所要存储的分区还是根据单独的group字段来计算的,这样就可以保证消费组的元数据信息与消费组对应的GroupCoordinator处于同一个broker节点上,省去了中间轮转的开销。
value中包含的内容有很多,可以参照和JoinGroupRequest或SyncGroupRequest请求中的内容来理解,具体各个字段的释义如下:
protocol_type
:消费组实现的协议,这里的值为“consumer”。generation
:标识当前消费组的年代信息,避免收到过期请求的影响。protocol
:消费组选取的分区分配策略。leader
:消费组的leader消费者的名称。members
:数组类型,其中包含了消费组的各个消费者成员信息,图中右边部分就是消费者成员的具体信息,每个具体字段都比较容易辨别,需要着重说明的是subscription和assignment这两个字段,分别代码消费者的订阅信息和分配信息。
4、第四阶段(HEARTBEAT)
进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了
GroupCoordinator,并且GroupCoordinator将其保存到了Kafka内部的consumer offsets主题中,
此时消费者可以通过OffsetFetchRequest请求获取上次提交的消费位移并从此处继续消费。消费者通过向GroupCoordinator发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读
取分区中的消息。心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停止发送心跳的时间足够长,则整个会话就被判定为过期,GroupCoordinator也会认为这个消费者已经死亡,就会触发一次再均衡行为。消费者的心跳间隔时间由参数heartbeat.interval.ms
指定,默认值为3000,即3秒,这个参数必须比session.timeout.ms参数设定的值要小,一般情况下heartbeat.interval.ms的配置值不能超过session.timeout.ms配置值的1/3。这个参数可以调整得更低,以控制正常重新平衡的预期时间。
如果一个消费者发生崩溃,并停止读取消息,那么GroupCoordinator会等待一小段时间,确认这个消费者死亡之后才会触发再均衡。在这一小段时间内,死掉的消费者并不会读取分区里的消息。这个一小段时间由session.timeout.ms参数控制,该参数的配置值必须在broker端参数group.min.session.timeout.ms(默认值为6000,即6秒)和group.max.session.timeout.ms(默认值为300000,即5分钟)允许的范围内。
还有一个参数max.poll.interva1.ms,它用来指定使用消费者组管理时poll()方法调用之间的最大延迟,也就是消费者在获取更多消息之前可以空闲的时间量的上限。如果此超时
时间期满之前po10没有调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。
除了被动退出消费组,还可以使用LeaveGroupRequest请求主动退出消费组,比如客户端调用了unsubscrible(0方法取消对某些主题的订阅,这个比较简单,这里就不再赘述了。