2023-07-31
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

在 Kafka 集群中,会有一个或多个 Broker,那么Kafka是如何管理这些Broker的呢?

从本章开始,我将讲解Kafka的 集群管理 ,这块内容本质属于 Replication Subsystem ,但是又相对独立,所以我单独拎出来讲解。

Kafka集群会选举一个Broker作为控制器( Kafka Controller ),由它来负责管理整个集群中的所有分区和副本的状态:

  • 当某个分区的 Leader 副本出现故障时,由Controller负责为该分区选举新的 Leader 副本;
  • 当检测到某个分区的 ISR 集合发生变化时,由Controller负责通知所有 Broker 更新其元数据信息;
  • 当为某个 Topic 增加分区数量时,同样还是由Controller负责分区的重新分配。

我们先从各个Broker启动后,Controller的选举开始。

一、Controller选举

每个Broker启动后,都会创建两个与集群管理相关的重要组件: KafkaControllerKafkaHealthcheck ,通过这两个组件,Broker集群会完成Controller的选举:

    // KafkaServer.scala
    class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup {
      var kafkaController: KafkaController = null
      var kafkaHealthcheck: KafkaHealthcheck = null
    
      def startup() {
        try {
          info("starting")
          //...
    
          // 启动KafkaController
          kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix)
          kafkaController.startup()
    
          // 启动KafkaHealthcheck
          kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
              config.interBrokerProtocolVersion)
          kafkaHealthcheck.startup()
          //...
        }
        catch {
          //...
        }
      }
    }

1.1 初始化

KafkaController

我们先来看KafkaController的创建和初始化:

  1. 首先注册了一个Zookeeper监听器,用来监听ZK的Session状态变化;
  2. 启动组件 ZookeeperLeaderElector ,进入控制器选举流程,也就是说会选举出一个Broker作为集群管理(Controller)节点。
    // KafkaController.scala
    
    private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,onControllerResignation, config.brokerId, time)
    
    def startup() = {
      inLock(controllerContext.controllerLock) {
        info("Controller starting up")
        // 1.注册一个监听Zookeeper Session状态的监听器
        registerSessionExpirationListener()
        isRunning = true
        // 2.启动组件ZookeeperLeaderElector,进入Broker Leader器选举流程
        controllerElector.startup
        info("Controller startup complete")
      }
    }
    
    private def registerSessionExpirationListener() = {
      zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
    }

KafkaHealthcheck

再来看KafkaHealthcheck的创建和初始化:

  1. 首先,注册了一个Zookeeper监听器,用来监听ZK的Session状态变化;
  2. 接着,把当前Broker的信息注册到Zookeeper的某个节点下。
    // KafkaHealthcheck.scala
    
    class KafkaHealthcheck(brokerId: Int,
                           advertisedEndpoints: Seq[EndPoint],
                           zkUtils: ZkUtils,
                           rack: Option[String],
                           interBrokerProtocolVersion: ApiVersion) extends Logging {
      private[server] val sessionExpireListener = new SessionExpireListener
    
      def startup() {
        // 1.注册一个监听Zookeeper Session状态的监听器
        zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)
        register()
      }
    
      def register() {
        //...
        // 注意当前Broker的信息到ZK节点中
        zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort, rack,
          interBrokerProtocolVersion)
      }
    }

KafkaHealthcheck会将当前Broker的信息注册到Zookeeper的临时节点中:/brokers/ids/[0...N]。比如,当前的BrokerID为1,那么就注册一个/brokers/ids/1节点,节点的内容包含该Broker的IP、端口等信息。

1.2 选举

我们再来看Broker是如何通过KafkaController完成选举的,实际是通过一个名为 ZookeeperLeaderElector 的组件来完成:

  1. ZookeeperLeaderElector在创建时,会指定要监听的Zookeeper节点——/Controller
  2. ZookeeperLeaderElector启动后,会在/Controller节点上注册一个监听器——LeaderChangeListener,一旦/Controller的内容发生变化,Broker就会收到通知。
    class ZookeeperLeaderElector(controllerContext: ControllerContext,
                                 electionPath: String,
                                 onBecomingLeader: () => Unit,
                                 onResigningAsLeader: () => Unit,
                                 brokerId: Int,
                                 time: Time) extends LeaderElector with Logging {
      // -1表示当前还没有Leader Broker
      var leaderId = -1        
      // electionPath在构造ZookeeperLeaderElector时指定:/Controller
      val index = electionPath.lastIndexOf("/")        
      if (index > 0)
        controllerContext.zkUtils.makeSurePersistentPathExists(electionPath.substring(0, index))
      val leaderChangeListener = new LeaderChangeListener
    
      def startup {
        inLock(controllerContext.controllerLock) {
          // 注册LeaderChangeListener监听器,当/Controller变化时,会收到通知
          controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
          elect
        }
      }
    }

ZookeeperLeaderElector的elect方法用来完成真正的Broker选举:

  1. 初始化情况下,/Controller节点是不存在,getControllerID的解析结果就是-1,表示当前没有Leader Broker。那么当前Broker就会主动创建一个/Controller节点,并写入自己的节点信息,包含BrokerId、版本号、时间戳等等;
  2. 如果竞争创建/Controller节点失败了,就以最先创建的那个Broker作为Leader。
    // ZookeeperLeaderElector.scala
    def elect: Boolean = {
      val timestamp = time.milliseconds.toString
      val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
      // 1.解析/Controller节点内容,获取LeaderId
      leaderId = getControllerID
    
      // 2.已经有Broker Leader了,直接返回
      if(leaderId != -1) {
         debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
         return amILeader
      }
    
      try {
        // 3.创建/Controller临时节点,写入当前Broker的信息
        val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
                                                        electString,
                                                        controllerContext.zkUtils.zkConnection.getZookeeper,
                                                        JaasUtils.isZkSecurityEnabled())
        zkCheckedEphemeral.create()
        info(brokerId + " successfully elected as leader")
        // LeaderId变成当前Broker的ID
        leaderId = brokerId
        onBecomingLeader()
      } catch {
        case _: ZkNodeExistsException =>
          // 执行到这里,说明有其它Broker先一步创建成功节点,成为了Leader
          leaderId = getControllerID 
    
          if (leaderId != -1)
            debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
          else
            warn("A leader has been elected but just resigned, this will result in another round of election")
        // 执行到这里,可能是发生了未知异常,则删除/Controller临时节点,这将触发新一轮选举
        case e2: Throwable =>
          error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
          resign()
      }
      amILeader
    }

整个选举的流程,我用下面这张图表示:

202307312125064901.png

1.3 重新选举

我们再来看下,如果某个Broker成为Leader(比如上图中的Broker1),然后突然宕机挂了会如何呢?

上一节说过了,ZookeeperLeaderElector包含一个监听器——LeaderChangeListener,会监听/Controller节点的变化,由于这是一个临时节点,那么当Broker1宕机后,节点就会消失:

    // ZookeeperLeaderElector.scala
    class LeaderChangeListener extends IZkDataListener with Logging {
      // 节点信息发生变化时触发
      @throws[Exception]
      def handleDataChange(dataPath: String, data: Object) {
        val shouldResign = inLock(controllerContext.controllerLock) {
          val amILeaderBeforeDataChange = amILeader
          leaderId = KafkaController.parseControllerId(data.toString)
          // 在节点变化前是Leader,但是节点变化后不是Leader
          amILeaderBeforeDataChange && !amILeader
        }
    
        if (shouldResign)
          onResigningAsLeader()
      }
    
      // 节点删除时触发
      @throws[Exception]
      def handleDataDeleted(dataPath: String) { 
        val shouldResign = inLock(controllerContext.controllerLock) {
          amILeader    // leaderId == brokerId ?
        }
        // 如果LeaderId发生了变化,说明要重新选举
        if (shouldResign)
          onResigningAsLeader()
    
        inLock(controllerContext.controllerLock) {
          elect
        }
      }
    }

节点一消失,就会触发重新选举,上述的onResigningAsLeader方法用于取消Controller,主要是unregister一些由Controller管理的组件,重点就是最一行elect触发重新选举,上一节已经讲过了就不再赘述。

二、Broker集群伸缩

既然Broker Leader已经选举完成了,那么接下来Controller就要对整个集群进行管理了。这里有一个问题,Controller怎么知道当前集群中有哪些其它Broker呢?这些Broker的上下线(集群伸缩)又如何感知呢?

我在上一节讲过,每个Broker启动后,都会初始化一个 KafkaHealthcheck 组件,它就是来负责Broker自身的上下线的。

2.1 Broker上下线

当某个Broker启动并加入到集群后,会通过 KafkaHealthcheck 往Zookeeper注册一个临时节点:/brokers/ids/[当前BorkerID]

    // KafkaHealthcheck.scala
    
    class KafkaHealthcheck(brokerId: Int,
                           advertisedEndpoints: Seq[EndPoint],
                           zkUtils: ZkUtils,
                           rack: Option[String],
                           interBrokerProtocolVersion: ApiVersion) extends Logging {
      private[server] val sessionExpireListener = new SessionExpireListener
    
      def startup() {
        // 1.注册一个监听Zookeeper Session状态的监听器
        zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)
        register()
      }
    
      def register() {
        //...
        // 注意当前Broker的信息到ZK节点中
        zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort, rack,
          interBrokerProtocolVersion)
      }
    }

202307312125086842.png

那么集群中的Controller(也就是Leader Broker)是如何感知到的呢?

2.2 Broker变动感知

Broler选举成功成为Leader后,会去监听/brokers/ids/节点的变动:

202307312125105833.png

在选举流程中有这么一行代码onBecomingLeader

    // ZookeeperLeaderElector.scala
    
    def elect: Boolean = {
      //...
      try {
        val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
                                                        electString,
                                                        controllerContext.zkUtils.zkConnection.getZookeeper,
                                                        JaasUtils.isZkSecurityEnabled())
        zkCheckedEphemeral.create()
        info(brokerId + " successfully elected as leader")
        // 当前Broker成为Leader Broker
        leaderId = brokerId
        // 
        onBecomingLeader()
      } catch {
        //...
      }
      amILeader
    }

onBecomingLeader方法里面做了很多事情,主要就是Controller的管理工作,我们重点关注replicaStateMachine.registerListeners()

    // KafkaController.scala
    
    def onControllerFailover() {
      if(isRunning) {
        info("Broker %d starting become controller state transition".format(config.brokerId))
        readControllerEpochFromZookeeper()
        incrementControllerEpoch(zkUtils.zkClient)
    
        registerReassignedPartitionsListener()
        registerIsrChangeNotificationListener()
        registerPreferredReplicaElectionListener()
        partitionStateMachine.registerListeners()
        // 关键看这里
        replicaStateMachine.registerListeners()
        //...
      }
      else
        info("Controller has been shut down, aborting startup/failover")
    }

Controller通过组件ReplicaStateMachine对集群中Broker的变动进行监听,使用了一个名为BrokerChangeListener的监听器:

    // ReplicaStateMachine.scala
    
    def registerListeners() {
      // 注册Broker变动监听器
      registerBrokerChangeListener()
    }
    
    private def registerBrokerChangeListener() = {
      // BrokerIdsPath就是“/brokers/ids”
      zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
    }

我们来看下BrokerChangeListener,当/brokers/ids这个节点中有子目录增减时,都会调用下面的doHandleChildChange方法:

    // ReplicaStateMachine.scala
    
    class BrokerChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {
      protected def logName = "BrokerChangeListener"
    
      def doHandleChildChange(parentPath: String, currentBrokerList: Seq[String]) {
        info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
        inLock(controllerContext.controllerLock) {
          if (hasStarted.get) {
            ControllerStats.leaderElectionTimer.time {
              try {
                // 1.拿到“/brokers/ids”下的所有子节点
                val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
                val curBrokerIds = curBrokers.map(_.id)
                val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
                // 2.拿到所有新增的Broker ID,也就是新加入集群的Broker
                val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
                // 3.拿到所有下线的Broker ID,也就是退出集群的Broker
                val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
                val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
                controllerContext.liveBrokers = curBrokers
                val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
                val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
                val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
                info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
                  .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
                // 4.添加上线的Broker
                newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
                // 5.移除下线的Broker
                deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
                // 6.对新增的Broker进行处理
                if(newBrokerIds.nonEmpty)
                  controller.onBrokerStartup(newBrokerIdsSorted)
                // 7.对下线的Broker进行处理
                if(deadBrokerIds.nonEmpty)
                  controller.onBrokerFailure(deadBrokerIdsSorted)
              } catch {
                case e: Throwable => error("Error while handling broker changes", e)
              }
            }
          }
        }
      }
    }

2.3 集群元数据推送

感知到Broker变动之后,Leader Broker(Controller)会针对上线还是下线做不同的处理,我这里针对集群上线新Broker讲解下元数据推送的过程:

    // KafkaController.scala
    
    def onBrokerStartup(newBrokers: Seq[Int]) {
        info("New broker startup callback for %s".format(newBrokers.mkString(",")))
        // 新增的Broker集合
        val newBrokersSet = newBrokers.toSet
        // 发送集群元数据信息给各个Broker
        sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
        // ...      
    
    }

元数据推送的过程,依赖的还是本系列前面讲到的网络通信层,就是依赖Kafka自定义的那套NIO通信框架:

    // KafkaController.scala
    
    def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
      try {
        // 1.将更新请求打成一个batch包
        brokerRequestBatch.newBatch()
        // 2.添加要发送的Broker
        brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
        // 3.发送请求
        brokerRequestBatch.sendRequestsToBrokers(epoch)
      } catch {
        case e : IllegalStateException => {
          // Resign if the controller is in an illegal state
          error("Forcing the controller to resign")
          brokerRequestBatch.clear()
          controllerElector.resign()
    
          throw e
        }
      }
    }

整个流程大致如下图所示,读者可以自己研究下推送集群元数据信息的源码,我不再赘述:

202307312125116604.png

三、总结

本章,我对Kafka的Broker集群的选举流程进行了讲解,Kafka Server服务启动后,每个Broker内部都包含一个KafkaController组件,选举流程的本质就是通过该组件往Zookeeper写入节点,首先写入成功的就是Leader,即Controller。

Controller负责管理整个Broker集群,包含Broker的上下线感知,分区副本的分配、选举,集群元数据的通知更新等等。下一章,我就来讲解

阅读全文