2023-07-31  阅读(32)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

当Kafka集群中的一个Broker节点突然宕机,那么这个节点上的分区副本就已经处于功能失效的状态了,Kafka并不会自动将这些失效分区副本迁移到其它Broker节点上;或者说,当集群中新增一个Broker时,这个Broker上没有任何分区副本,Kafka也不会自动将其它Broker上已有的一些分区副本重分配到该新加入的Broker中。

所以,这就是涉及到了 分区重分配(reblance)问题 ,本章,我就对分区重分配的整体流程和底层原理进行讲解。

一、重分配策略

执行分区重分配,第一件事情就是要生成重分配方案。Kafka提供了 kafka-reassign-partitions.sh 脚本,用来生成重分配策略,并执行分区重分配工作,它可以在集群扩容、 Broker节点挂掉的场景下对分区副本进行迁移。

1.1 脚本使用

我们先来看下kafka-reassign-partitions.sh 脚本的基本使用,主要包含三个步骤:

  1. 创建一个包含要进行分区重分配的Topic清单的JSON 文件;
  2. 根据Topic清单和Broker节点清单生成一份 重分配方案
  3. 根据方案执行重分配动作。

我通过一个示例来讲解下,假设现有3个Broker组成的Kafka集群,一个名为topic-reassign的主题,包含4个分区,每个分区有2个副本,当前的分区副本分配状态如下:

    Topic : topic-reassign    Partition : 0    Leader: 0    Replicas: 0,2    Isr : 0,2
    Topic : topic-reassign    Partition : 1    Leader: 1    Replicas: 1,0    Isr : 1,0
    Topic : topic-reassign    Partition : 2    Leader: 2    Replicas: 2,1    Isr : 2,1
    Topic : topic-reassign    Partition : 3    Leader: 0    Replicas: 0,1    Isr : 0,1

Partition表示分区ID,Leader表示该分区的Leader副本所在的Broker ID,Replicas表示该分区的所有副本分配在哪些Broker上(即值为BrokerID),Isr表示该分区的ISR列表。

假设我现在需要将Broker ID为1的节点下线,那么首先要做的就是将它上面的分区副本迁移出去,使用kafka-reassign-partitions.sh脚本创建一个 JSON 文件,文件内容为要进行分区重分配的Topic清单 :

    # reassign.json 
    {
        "topics": [
            {
                "topic": "topic-reassign"
            }
        ],
        "version": 1
    }

接着,根据这个JSON 文件,然后指定所要分配的Broker节点列表,通过命令生成一份重分配方案 :

    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --generate --topics-to-move-json-file reassign.json --broker-list 0,2

执行上述命令后,会生成两串JSON格式的内容。Current partition replica assignment表示当前的分区副本分配情况,Proposed partition reassignment configuration就是生成的一份供参考的重分配方案:

    Current partition replica assignment
    {
        "version": 1,
        "partitions": [{
            "topic": "topic-reassign",
            "partition": 0,
            "replicas": [0, 2],
            "log_dirs": ["any", "any"]
        }, {
            "topic": "topic-reassign",
            "partition": 1,
            "replicas": [1, 0],
            "log_dirs": ["any", "any"]
        }, {
            "topic": "topic-reassign",
            "partition": 2,
            "replicas": [2, 1],
            "log_dirs": ["any", "any"]
        }, {
            "topic": "topic-reassign",
            "partition": 3,
            "replicas": [0, 1],
            "log_dirs": ["any", "any"]
        }]
    }
    
    Proposed partition reassignment configuration
    {
        "version": 1,
        "partitions": [{
            "topic": "topic-reassign",
            "partition": 2,
            "replicas": [2, 0],
            "log_dirs": ["any", "any"]
        }, {
            "topic": "topic-reassign",
            "partition": 1,
            "replicas": [0, 2],
            "log_dirs": ["any", "any"]
        }, {
            "topic": "topic-reassign",
            "partition": 3,
            "replicas": [0, 2],
            "log_dirs": ["any", "any"]
        }, {
            "topic": "topic-reassign",
            "partition": 0,
            "replicas": [2, 0],
            "log_dirs": ["any", "any"]
        }]
    }

注意:这里只是生成一份可行性的方案,并没有真正执行重分配的动作。另外,一般要将Current partition replica assignment的内容保存起来,以备后续的回滚操作。

最后,将第二个 JSON 内容保存在一个 JSON 文件中,假定这个文件的名称为project.json,然后执行以下命令完成分区副本重分配:

    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file project.json --execute

另外,我们还可以验证查看分区重分配的进度:

    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file project.json --verify
    
    # 输出信息:
    Status of partition reassignment:
    Reassignment of partition topic-reassign completed successfully
    Reassignment of partition topic-reassign-2 completed successfully
    Reassignment of partition topic-reassign-1 completed successfully
    Reassignment of partition topic-reassign-0 completed successfully
    Reassignment of partition topic-reassign-3 completed successfully

二、底层原理

了解了分区副本重分配的流程,我就带大家来看看底层的源码。事实上, 分区副本重分配的本质,就是将新分配方案写入到Zookeeper的admin/reassign_partitions节点中,由Controller Broker监听变动并通知各个Broker进行数据复制迁移

202307312125322241.png

2.1 监听重分配

我们已经知道了最终执行分区重分配是通过脚本kafka-reassign-partitions.sh

    # kafka-reassign-partitions.sh
    exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand "$@"

我们看ReassignPartitionsCommand执行重分配的方法executeAssignment

    // ReassignPartitionsCommand.scala
    
    object ReassignPartitionsCommand extends Logging {
    
      def main(args: Array[String]): Unit = {
        val opts = validateAndParseArgs(args)
        val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
        val zkUtils = ZkUtils(zkConnect,
                              30000,
                              30000,
                              JaasUtils.isZkSecurityEnabled())
        try {
          // 验证
          if(opts.options.has(opts.verifyOpt))
            verifyAssignment(zkUtils, opts)
          // 生成方案
          else if(opts.options.has(opts.generateOpt))
            generateAssignment(zkUtils, opts)
          // 执行
          else if (opts.options.has(opts.executeOpt))
            executeAssignment(zkUtils, opts)
        } catch {
          case e: Throwable =>
            println("Partitions reassignment failed due to " + e.getMessage)
            println(Utils.stackTrace(e))
        } finally zkUtils.close()
      }
    }

最终调用了reassignPartitions方法,将分区分配方案写入admin/reassign_partitions节点:

    // ReassignPartitionsCommand.scala
    
    def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Long = -1) {
      val partitionsToBeReassigned = parseAndValidate(zkUtils, reassignmentJsonString)
      val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
    
      // If there is an existing rebalance running, attempt to change its throttle
      if (zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)) {
        println("There is an existing assignment running.")
        reassignPartitionsCommand.maybeLimit(throttle)
      }
      else {
        if (throttle >= 0)
        // 执行命令
        if (reassignPartitionsCommand.reassignPartitions(throttle)) {
          println("Successfully started reassignment of partitions.")
        } else
          println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
      }
    }
    
    def reassignPartitions(throttle: Long = -1): Boolean = {
      maybeThrottle(throttle)
      try {
        val validPartitions = proposedAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
        if (validPartitions.isEmpty) false
        else {
          val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)
          // 将分区分配方案写入/admin/reassign_partitions节点
          zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
          true
        }
      } catch {
          //...
      }
    }

当Controller Broker监听到/admin/reassign_partitions节点的变动后,就会开始执行分区重分配:

    // KafkaController.scala
    
    // 监听到`/admin/reassign_partitions`节点的变动后,执行该方法
    def doHandleDataChange(dataPath: String, data: AnyRef) {
      debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
        .format(dataPath, data))
      val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
      val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
        partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
      }
      // 遍历每一个分区,按照方案对该分区的副本进行重分配
      partitionsToBeReassigned.foreach { partitionToBeReassigned =>
        inLock(controllerContext.controllerLock) {
          if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
            error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
              .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
            controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
          } else {
            // 关键是这里,对该分区进行重分配
            val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
            controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
          }
        }
      }
    }
    
    // 针对指定的分区
    def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
                                          reassignedPartitionContext: ReassignedPartitionsContext) {
      val newReplicas = reassignedPartitionContext.newReplicas
      val topic = topicAndPartition.topic
      val partition = topicAndPartition.partition
      try {
        val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
        assignedReplicasOpt match {
          case Some(assignedReplicas) =>
            if (assignedReplicas == newReplicas) {
              throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
                " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
            } else {
              // 将分区的副本分配到各个新的Broker
              info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
              watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
              controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
              deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
              // 关键是这里
              onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
            }
          case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
            .format(topicAndPartition))
        }
      } catch {
        //...
      }
    }

2.2 执行重分配

最核心的其实就是KafkaController.onPartitionReassignment()方法,该方法包含了对某个分区的副本执行重分配的整个流程。

我们回顾一下Topic的创建流程:每次创建完一个Topic,都会将该Topic分区副本的默认分配方案写入Zookeeper的/brokers/topics/[Topic名称]节点中,然后由Controller负责将新的集群元数据广播给其它Broker,其它Broker则开始新建日志段,接受Producer请求等操作……

分区重分配的流程本质其实也是相似的,只不过Controller会去/admin/reassign_partitions目录下获取分区重分配方案,然后广播集群元数据,让各个Broker执行日志复制。

我举个例子来帮助大家理解下分区副本的Reblance流程。假设现在有一个名为reassign的Topic,它的某个分区原先的副本分配方案为{1, 2, 3},最终要执行的新方案{4, 5, 6},那么对于原来Broker1、2、3上的副本,都需要进行重新分配到Broker4、5、6上,具体的过程如下:

  1. 首先,向Zookeeper中写入副本分配方案:{1, 2, 3, 4, 5, 6};
  2. 然后,向每个Broker都发送一个LeaderAndIsr请求,这样各个Broker上的副本都会向Leader副本请求数据进行复制;
  3. 等待直到步骤2中的所有Follower副本都同步完成,此时ISR={1, 2, 3, 4, 5, 6};
  4. 接着,在新分配副本的Broker{4, 5, 6}中选举一个Leader,比如这里是Broker4;
  5. 最后,从ISR中剔除{1, 2, 3}这三个副本,然后在Zookeeper中也删除{1, 2, 3}这三个副本,然后由Controller广播集群元数据。

上述步骤中,有几个比较重要的概念,我这里简单讲解下,关键是理解,对于每一个分区而言,都有下面几个集合:

  • RAR: 重分配后的副本状态,比如{1, 2, 3}表示副本分别分布在Broker1、Broker2、Broker3,Leader副本在Broker1上;
  • OAR: 初始的副本状态;
  • AR: 当前的副本状态,随着重分配过程不断变化;
  • RAR-OAR: RAR与OAR的差集,即需要创建、数据迁移的新副本;
  • OAR-RAR: OAR与RAR的差集,即迁移后需要下线的副本。

可以用下面这张图来理解整个过程:

202307312125350602.png

最后,给出onPartitionReassignment的源码,理解了整个流程,代码略读即可,关键是理解这个思路:

    // KafkaController.scala
    
    // 对分区topicAndPartition执行重分配方案
    def onPartitionReassignment(topicAndPartition: TopicAndPartition, 
                                reassignedPartitionContext: ReassignedPartitionsContext) {
      val reassignedReplicas = reassignedPartitionContext.newReplicas
      // 如果新分配的副本没有全在ISR中
      if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {
        // 需要创建、数据迁移的新副本
        val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
        val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
        // 1.在Zookeeper的“/brokers/topics/[Topic名称]”中写入该主题分区的副本分配方案:OAR + RAR
        updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
        // 2.发送LeaderAndIsr请求给OAR + RAR中每一个副本
        updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
          newAndOldReplicas.toSeq)
        // 3.新分配的副本状态更新为NewReplica,此时新副本会开始创建并同步数据
        startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
      } else { //等待所有的RAR都在ISR中
    
          // 4.迁移后需要下线的副本: OAR - RAR  
        val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
        // 5.将副本状态设置为OnlineReplica
        reassignedReplicas.foreach { replica =>
          replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
            replica)), OnlineReplica)
        }
        // 6.将上下文中的AR设置为RAR
        // 7.新加入的副本已经同步完成, LeaderAndIsr都更新到最新的结果
        moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
        // 8/9.将旧的副本下线
        stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
        // 10.将ZK中的AR设置为RAR
        updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
        // 11.分区重分配完成, 在ZK中删除/admin/reassign_partitions节点中的迁移方案
        removePartitionFromReassignedPartitions(topicAndPartition)
        info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
        controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
        // 12.发送metadata更新请求给所有Broker
        sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
        //...
      }
    }

三、总结

本章,我对Kafka的分区重分配流程和底层实现原理进行了讲解,分区重分配本质在于数据复制,先增加新的副本,然后进行数据同步,最后删除旧的副本来达到最终目的。

数据复制会占用额外的资源,所以,分区重分配一定要在低峰值时期执行。另外,可以减小重分配的粒度,以小批次的方式来操作是一种可行的解决思路 。但是,如果集群中某个分区的流量即使在低峰时期还是特别大,那么就需要采取限流机制,Kafka默认提供了两种复制限流的方式:通过脚本kafka-config. shkafka-reassign-partitions.sh实现。我就不赘述了,读者可以找相关资料了解。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文