2023-07-31
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

了解了Broker集群的选举以及整体的集群管理机制,我们来看下Kafka创建Topic,以及对分区副本进行管理的流程。通常来说,我们会通过Kafka自带的kafka-topics.sh脚本来创建Topic。那么,当我们指定了一个Topic的分区数、每个分区的副本数之后,Controller(Leader Broker)是如何选择Leader副本?又是如何分配在Broker集群中分配这些副本的呢?

本章,我就对Topic的分区副本分配原理进行讲解。

如果Producer发送消息时指定了一个不存在的Topic,也会默认创建(分区1,副本1),可以通过Broker端的参数auto.create.topics.enable禁止默认创建的行为,生产环境建议禁止掉。

一、创建Topic

通过上一章的讲解,我们应该已经明白, 集群中的每个Broker都知道整个集群的元数据信息 。所谓元数据信息就是:集群中的每个Broker上有哪些Topic分区,每个Topic的分区信息,这些分区的Leader副本在哪,Follower副本在哪……

1.1 脚本使用

所以,Controller需要对这些Topic的分区进行管理,我以一个Topic的创建作为示例进行讲解,便于大家理解。首先,我们来看Topic的创建流程:

202307312125182701.png

创建Topic通过脚本kafka-topics.sh

    # kafka-topics.sh
    
    exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

本质是执行了 TopicCommand 命令:

    // TopicCommand.scala
    
    def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
      // Topic名称
      val topic = opts.options.valueOf(opts.topicOpt)
      // 配置
      val configs = parseTopicConfigsToBeAdded(opts)
      val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
      try {
        // 1.手动指定分区
        if (opts.options.has(opts.replicaAssignmentOpt)) {
          val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
          AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
        } else {
          // 2.自动分配分区
          CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
          // 配置的分区数
          val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
          // 配置的副本数
          val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
          val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
                              else RackAwareMode.Enforced
          // 创建主题
          AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
        }
        println("Created topic \"%s\".".format(topic))
      } catch  {
        case e: TopicExistsException => if (!ifNotExists) throw e
      }
    }

我们重点看它的自动分配分区分支,调用了AdminUtils.createTopic()来创建Topic并对分区副本进行分配:

    // AdminUtils.scala
    
    def createTopic(zkUtils: ZkUtils,
                    topic: String,
                    partitions: Int,
                    replicationFactor: Int,
                    topicConfig: Properties = new Properties,
                    rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
      // 1.从Zookeeper中获取Broker集群的元数据信息
      val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
      // 2.基于一定的算法,将分区副本分配给各个Broker
      val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 
                                                                 partitions, replicationFactor)’
      // 3.将分配好最终策略,直接写入Zookeeper中的/brokers/topics/[Topic名称]节点中
      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
    }

可以看到, 创建Topic的本质就是根据Topic的分区数、每个分区的副本数,基于一定的算法把它们分配给各个Broker,然后把分配策略写入到Zookeeper中

所谓分区副本分配策略,我这里简单解释下,假设有个Topic设置3个分区,每个分区2个副本,那么分配结果可能就是下面这个样子:

    partition1 -> [0,1]        #分区1的Leader副本分配在Broker0,Follower副本分配在Broker1
    partition2 -> [2,0]        #分区2的Leader副本分配在Broker2,Follower副本分配在Broker0
    partition3 -> [1,2]        #分区3的Leader副本分配在Broker1,Follower副本分配在Broker2

至于具体的分区副本分配算法,我就不赘述了,读者可以自己去AdminUtils.assignReplicasToBrokers方法里瞅一瞅,无非就是类似负载均衡之类的策略,我重点关注分区副本分配的整体流程。

二、副本管理

创建Topic只是将 分区副本分配策略 写入到了Zookeeper的/brokers/topics/[Topic名称]节点中,那么接下来Controller如何根据策略来进行执行副本分配?如何对副本进行管理呢?

2.1 监听Topic创建

显然,Controller是可以感知到新Topic的创建的,也就是说它会去监听/brokers/topics节点的变化,整个监听的过程我用下面这张图来表示:

202307312125202322.png

我们来看下底层的源码:

    // KafkaController.scala
    
    // Broker选举成为Leader后,会调用该方法
    def onControllerFailover() {
      if(isRunning) {
        info("Broker %d starting become controller state transition".format(config.brokerId))
        readControllerEpochFromZookeeper()
        incrementControllerEpoch(zkUtils.zkClient)
    
        registerReassignedPartitionsListener()
        registerIsrChangeNotificationListener()
        registerPreferredReplicaElectionListener()
        // 关键看这里
        partitionStateMachine.registerListeners()
        //...
      }
      else
        info("Controller has been shut down, aborting startup/failover")
    }

onControllerFailover方法中调用了PartitionStateMachine.registerListeners(),它会去监听/brokers/topics/节点的变化:

    // PartitionStateMachine.scala
    
    def registerListeners() {
      registerTopicChangeListener()
      registerDeleteTopicListener()
    }
    
    private def registerTopicChangeListener() = {
      // 监听“/brokers/topics”节点的变化
      zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
    }
    
    private def registerDeleteTopicListener() = {
      // 监听“/admin/delete_topics”节点的变化
      zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
    }

我们来看下TopicChangeListener这个监听器,它的内部就 根据“/brokers/topics”节点下的子节点变化,筛选出新增的Topic, 然后按照分区维度维护成一个Map[TopicAndPartition, Seq[Int]]

    // PartitionStateMachine.scalaSS
    
    class TopicChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {
    
      protected def logName = "TopicChangeListener"
    
      // 当“/brokers/topics”节点下的子节点发生变化时,会触发Controller调用该方法
      def doHandleChildChange(parentPath: String, children: Seq[String]) {
        inLock(controllerContext.controllerLock) {
          if (hasStarted.get) {
            try {
              val currentChildren = {
                debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
                children.toSet
              }
              // 新创建的分区
              val newTopics = currentChildren -- controllerContext.allTopics
              // 删除的分区
              val deletedTopics = controllerContext.allTopics -- currentChildren
              controllerContext.allTopics = currentChildren
              // 获取分区副本分配策略
              val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
              controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
                !deletedTopics.contains(p._1.topic))
              controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
              info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
                deletedTopics, addedPartitionReplicaAssignment))
              if (newTopics.nonEmpty)
                // 关键看这里,
                controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
            } catch {
              case e: Throwable => error("Error while handling new topic", e)
            }
          }
        }
      }
    }

最后,上述代码会调用KafkaController的onNewTopicCreation方法,发送Topic的元数据信息给各个Broker,这个Broker就可以进行一些初始化操作,比如新建分区日志段,准备接受Producer发送过来的消息等等:

    // KafkaController.scala
    
    def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
      info("New topic creation callback for %s".format(newPartitions.mkString(",")))
      // subscribe to partition changes
      topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
      // 按照分配策略,发送Topic的元数据信息给各个Broker
      onNewPartitionCreation(newPartitions)
    }

三、总结

本章,我对Topic创建的整体流程和底层原理进行了讲解,Controller会监听新Topic的创建,同时对分区副本进行管理,向新的元数据信息发送给集群中的其它Broker。

阅读全文