当Kafka集群中的一个Broker节点突然宕机,那么这个节点上的分区副本就已经处于功能失效的状态了,Kafka并不会自动将这些失效分区副本迁移到其它Broker节点上;或者说,当集群中新增一个Broker时,这个Broker上没有任何分区副本,Kafka也不会自动将其它Broker上已有的一些分区副本重分配到该新加入的Broker中。
所以,这就是涉及到了 分区重分配(reblance)问题 ,本章,我就对分区重分配的整体流程和底层原理进行讲解。
一、重分配策略
执行分区重分配,第一件事情就是要生成重分配方案。Kafka提供了 kafka-reassign-partitions.sh
脚本,用来生成重分配策略,并执行分区重分配工作,它可以在集群扩容、 Broker节点挂掉的场景下对分区副本进行迁移。
1.1 脚本使用
我们先来看下kafka-reassign-partitions.sh
脚本的基本使用,主要包含三个步骤:
- 创建一个包含要进行分区重分配的Topic清单的JSON 文件;
- 根据Topic清单和Broker节点清单生成一份 重分配方案 ;
- 根据方案执行重分配动作。
我通过一个示例来讲解下,假设现有3个Broker组成的Kafka集群,一个名为topic-reassign
的主题,包含4个分区,每个分区有2个副本,当前的分区副本分配状态如下:
Topic : topic-reassign Partition : 0 Leader: 0 Replicas: 0,2 Isr : 0,2
Topic : topic-reassign Partition : 1 Leader: 1 Replicas: 1,0 Isr : 1,0
Topic : topic-reassign Partition : 2 Leader: 2 Replicas: 2,1 Isr : 2,1
Topic : topic-reassign Partition : 3 Leader: 0 Replicas: 0,1 Isr : 0,1
Partition表示分区ID,Leader表示该分区的Leader副本所在的Broker ID,Replicas表示该分区的所有副本分配在哪些Broker上(即值为BrokerID),Isr表示该分区的ISR列表。
假设我现在需要将Broker ID为1的节点下线,那么首先要做的就是将它上面的分区副本迁移出去,使用kafka-reassign-partitions.sh
脚本创建一个 JSON 文件,文件内容为要进行分区重分配的Topic清单 :
# reassign.json
{
"topics": [
{
"topic": "topic-reassign"
}
],
"version": 1
}
接着,根据这个JSON 文件,然后指定所要分配的Broker节点列表,通过命令生成一份重分配方案 :
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --generate --topics-to-move-json-file reassign.json --broker-list 0,2
执行上述命令后,会生成两串JSON格式的内容。Current partition replica assignment表示当前的分区副本分配情况,Proposed partition reassignment configuration就是生成的一份供参考的重分配方案:
Current partition replica assignment
{
"version": 1,
"partitions": [{
"topic": "topic-reassign",
"partition": 0,
"replicas": [0, 2],
"log_dirs": ["any", "any"]
}, {
"topic": "topic-reassign",
"partition": 1,
"replicas": [1, 0],
"log_dirs": ["any", "any"]
}, {
"topic": "topic-reassign",
"partition": 2,
"replicas": [2, 1],
"log_dirs": ["any", "any"]
}, {
"topic": "topic-reassign",
"partition": 3,
"replicas": [0, 1],
"log_dirs": ["any", "any"]
}]
}
Proposed partition reassignment configuration
{
"version": 1,
"partitions": [{
"topic": "topic-reassign",
"partition": 2,
"replicas": [2, 0],
"log_dirs": ["any", "any"]
}, {
"topic": "topic-reassign",
"partition": 1,
"replicas": [0, 2],
"log_dirs": ["any", "any"]
}, {
"topic": "topic-reassign",
"partition": 3,
"replicas": [0, 2],
"log_dirs": ["any", "any"]
}, {
"topic": "topic-reassign",
"partition": 0,
"replicas": [2, 0],
"log_dirs": ["any", "any"]
}]
}
注意:这里只是生成一份可行性的方案,并没有真正执行重分配的动作。另外,一般要将Current partition replica assignment的内容保存起来,以备后续的回滚操作。
最后,将第二个 JSON 内容保存在一个 JSON 文件中,假定这个文件的名称为project.json
,然后执行以下命令完成分区副本重分配:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file project.json --execute
另外,我们还可以验证查看分区重分配的进度:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file project.json --verify
# 输出信息:
Status of partition reassignment:
Reassignment of partition topic-reassign completed successfully
Reassignment of partition topic-reassign-2 completed successfully
Reassignment of partition topic-reassign-1 completed successfully
Reassignment of partition topic-reassign-0 completed successfully
Reassignment of partition topic-reassign-3 completed successfully
二、底层原理
了解了分区副本重分配的流程,我就带大家来看看底层的源码。事实上, 分区副本重分配的本质,就是将新分配方案写入到Zookeeper的admin/reassign_partitions
节点中,由Controller Broker监听变动并通知各个Broker进行数据复制迁移 :
2.1 监听重分配
我们已经知道了最终执行分区重分配是通过脚本kafka-reassign-partitions.sh
:
# kafka-reassign-partitions.sh
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand "$@"
我们看ReassignPartitionsCommand执行重分配的方法executeAssignment
:
// ReassignPartitionsCommand.scala
object ReassignPartitionsCommand extends Logging {
def main(args: Array[String]): Unit = {
val opts = validateAndParseArgs(args)
val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
val zkUtils = ZkUtils(zkConnect,
30000,
30000,
JaasUtils.isZkSecurityEnabled())
try {
// 验证
if(opts.options.has(opts.verifyOpt))
verifyAssignment(zkUtils, opts)
// 生成方案
else if(opts.options.has(opts.generateOpt))
generateAssignment(zkUtils, opts)
// 执行
else if (opts.options.has(opts.executeOpt))
executeAssignment(zkUtils, opts)
} catch {
case e: Throwable =>
println("Partitions reassignment failed due to " + e.getMessage)
println(Utils.stackTrace(e))
} finally zkUtils.close()
}
}
最终调用了reassignPartitions方法,将分区分配方案写入admin/reassign_partitions
节点:
// ReassignPartitionsCommand.scala
def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Long = -1) {
val partitionsToBeReassigned = parseAndValidate(zkUtils, reassignmentJsonString)
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
// If there is an existing rebalance running, attempt to change its throttle
if (zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)) {
println("There is an existing assignment running.")
reassignPartitionsCommand.maybeLimit(throttle)
}
else {
if (throttle >= 0)
// 执行命令
if (reassignPartitionsCommand.reassignPartitions(throttle)) {
println("Successfully started reassignment of partitions.")
} else
println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
}
}
def reassignPartitions(throttle: Long = -1): Boolean = {
maybeThrottle(throttle)
try {
val validPartitions = proposedAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
if (validPartitions.isEmpty) false
else {
val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)
// 将分区分配方案写入/admin/reassign_partitions节点
zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
true
}
} catch {
//...
}
}
当Controller Broker监听到/admin/reassign_partitions
节点的变动后,就会开始执行分区重分配:
// KafkaController.scala
// 监听到`/admin/reassign_partitions`节点的变动后,执行该方法
def doHandleDataChange(dataPath: String, data: AnyRef) {
debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
.format(dataPath, data))
val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
}
// 遍历每一个分区,按照方案对该分区的副本进行重分配
partitionsToBeReassigned.foreach { partitionToBeReassigned =>
inLock(controllerContext.controllerLock) {
if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
.format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
} else {
// 关键是这里,对该分区进行重分配
val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
}
}
}
}
// 针对指定的分区
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
val newReplicas = reassignedPartitionContext.newReplicas
val topic = topicAndPartition.topic
val partition = topicAndPartition.partition
try {
val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
assignedReplicasOpt match {
case Some(assignedReplicas) =>
if (assignedReplicas == newReplicas) {
throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
} else {
// 将分区的副本分配到各个新的Broker
info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
// 关键是这里
onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
}
case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
.format(topicAndPartition))
}
} catch {
//...
}
}
2.2 执行重分配
最核心的其实就是KafkaController.onPartitionReassignment()
方法,该方法包含了对某个分区的副本执行重分配的整个流程。
我们回顾一下Topic的创建流程:每次创建完一个Topic,都会将该Topic分区副本的默认分配方案写入Zookeeper的/brokers/topics/[Topic名称]
节点中,然后由Controller负责将新的集群元数据广播给其它Broker,其它Broker则开始新建日志段,接受Producer请求等操作……
分区重分配的流程本质其实也是相似的,只不过Controller会去/admin/reassign_partitions
目录下获取分区重分配方案,然后广播集群元数据,让各个Broker执行日志复制。
我举个例子来帮助大家理解下分区副本的Reblance流程。假设现在有一个名为reassign
的Topic,它的某个分区原先的副本分配方案为{1, 2, 3}
,最终要执行的新方案{4, 5, 6}
,那么对于原来Broker1、2、3上的副本,都需要进行重新分配到Broker4、5、6上,具体的过程如下:
- 首先,向Zookeeper中写入副本分配方案:{1, 2, 3, 4, 5, 6};
- 然后,向每个Broker都发送一个LeaderAndIsr请求,这样各个Broker上的副本都会向Leader副本请求数据进行复制;
- 等待直到步骤2中的所有Follower副本都同步完成,此时ISR={1, 2, 3, 4, 5, 6};
- 接着,在新分配副本的Broker{4, 5, 6}中选举一个Leader,比如这里是Broker4;
- 最后,从ISR中剔除{1, 2, 3}这三个副本,然后在Zookeeper中也删除{1, 2, 3}这三个副本,然后由Controller广播集群元数据。
上述步骤中,有几个比较重要的概念,我这里简单讲解下,关键是理解,对于每一个分区而言,都有下面几个集合:
- RAR: 重分配后的副本状态,比如{1, 2, 3}表示副本分别分布在Broker1、Broker2、Broker3,Leader副本在Broker1上;
- OAR: 初始的副本状态;
- AR: 当前的副本状态,随着重分配过程不断变化;
- RAR-OAR: RAR与OAR的差集,即需要创建、数据迁移的新副本;
- OAR-RAR: OAR与RAR的差集,即迁移后需要下线的副本。
可以用下面这张图来理解整个过程:
最后,给出onPartitionReassignment的源码,理解了整个流程,代码略读即可,关键是理解这个思路:
// KafkaController.scala
// 对分区topicAndPartition执行重分配方案
def onPartitionReassignment(topicAndPartition: TopicAndPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
// 如果新分配的副本没有全在ISR中
if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {
// 需要创建、数据迁移的新副本
val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
// 1.在Zookeeper的“/brokers/topics/[Topic名称]”中写入该主题分区的副本分配方案:OAR + RAR
updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
// 2.发送LeaderAndIsr请求给OAR + RAR中每一个副本
updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
newAndOldReplicas.toSeq)
// 3.新分配的副本状态更新为NewReplica,此时新副本会开始创建并同步数据
startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
} else { //等待所有的RAR都在ISR中
// 4.迁移后需要下线的副本: OAR - RAR
val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
// 5.将副本状态设置为OnlineReplica
reassignedReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
replica)), OnlineReplica)
}
// 6.将上下文中的AR设置为RAR
// 7.新加入的副本已经同步完成, LeaderAndIsr都更新到最新的结果
moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
// 8/9.将旧的副本下线
stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
// 10.将ZK中的AR设置为RAR
updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
// 11.分区重分配完成, 在ZK中删除/admin/reassign_partitions节点中的迁移方案
removePartitionFromReassignedPartitions(topicAndPartition)
info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
// 12.发送metadata更新请求给所有Broker
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
//...
}
}
三、总结
本章,我对Kafka的分区重分配流程和底层实现原理进行了讲解,分区重分配本质在于数据复制,先增加新的副本,然后进行数据同步,最后删除旧的副本来达到最终目的。
数据复制会占用额外的资源,所以,分区重分配一定要在低峰值时期执行。另外,可以减小重分配的粒度,以小批次的方式来操作是一种可行的解决思路 。但是,如果集群中某个分区的流量即使在低峰时期还是特别大,那么就需要采取限流机制,Kafka默认提供了两种复制限流的方式:通过脚本kafka-config. sh
和kafka-reassign-partitions.sh
实现。我就不赘述了,读者可以找相关资料了解。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。