2023-07-31
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

从本章开始,我将讲解Kafka Broker的副本子系统(Replication Subsystem)。在 Kafka 中,副本是最重要的概念之一,副本机制是 Kafka 实现高可用的基础。Kafka会将同一个分区的多个副本分散在不同的 Broker 机器上,其中的某个副本会被指定为 Leader,负责响应客户端的读写请求,其它副本自动成为 Follower,向Leader副本发送读取请求,同步最新写入的数据。

那么接下来的几章,我就针对Kafka副本子系统的副本同步功能进行讲解,主要包括以下内容:

  1. Follower副本从Leader拉取数据的全流程;
  2. Leader副本的LEO和HW更新机制;
  3. ISR列表的更新机制。

本章,我先来讲解Kafka的Partition副本同步的整体流程,站在Follower的角度,整个流程可以用下面这张图表示:

202307312124514371.png

一、副本同步

在正式讲解副本的同步机制之前,我们先来回顾下Kafka中的HW、LEO、AR、ISR、OSR这几个概念:

  • AR: 分区中的所有副本统称为 AR ;
  • ISR: 与 Leader 副本保持同步状态的副本集合,Leader 副本本身也是ISR中的一员;
  • OSR: 当 ISR 集合中的Follower副本滞后Leader副本的时间超过replica.lag.time.max.ms参数值后,会被判定为同步失败,则将此 follower 副本剔除出 ISR 集合,加入到OSR;
  • LEO: 标识每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的 LEO;
  • HW: ISR 中最小的 LEO 即为 HW ,俗称高水位,消费者只能拉取到 HW 之前的消息。

1.1 ReplicaFetcherManager

每个Broker启动后,会创建 ReplicaManager ,而ReplicaManager在实例化过程中,内部会创建一个名为 ReplicaFetcherManager 的对象:

    // ReplicaManager.scala
    
    class ReplicaManager(val config: KafkaConfig,
                         metrics: Metrics,
                         time: Time,
                         val zkUtils: ZkUtils,
                         scheduler: Scheduler,
                         val logManager: LogManager,
                         val isShuttingDown: AtomicBoolean,
                         quotaManager: ReplicationQuotaManager,
                         threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
      val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix,
                                                            quotaManager)
      //...
    }

ReplicaFetcherManager本身非常简单,它是 AbstractFetcherManager 的子类,可以创建 副本同步线程 ,向Leader分区所在的Broker拉取数据进行同步:

    // ReplicaFetcherManager.scala
    
    class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManager, metrics: Metrics, time: Time, threadNamePrefix: Option[String] = None, quotaManager: ReplicationQuotaManager)
          extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
            "Replica", brokerConfig.numReplicaFetchers) {
      override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
        val threadName = threadNamePrefix match {
          case None =>
            "ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id)
          case Some(p) =>
            "%s:ReplicaFetcherThread-%d-%d".format(p, fetcherId, sourceBroker.id)
        }
        // 创建副本同步线程
        new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig,
          replicaMgr, metrics, time, quotaManager)
      }
      //...
    }

我们重点关注下ReplicaFetcherManager的父类AbstractFetcherManager,它的内部保存了一个fetcherThreadMap,以Borker为维度。什么意思呢?

就是说,ReplicaFetcherManager最终会遍历当前Broker上的所有Follower分区,把那些Leader分区在同一个Broker节点上的Follower分区归为一类,并为它们创建一个ReplicaFetcherThread线程。这样一来,一个ReplicaFetcherThread线程就负责为一批Follower分区向同一个Broker节点同步消息,节省了网络开销:

    abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1)
      extends Logging with KafkaMetricsGroup {
      // fetcher线程集合
      private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
    
      // 为Leader分区在同一个Broker上的所有Follower分区创建一个fetcher线程
      def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) {
          mapLock synchronized {
            val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialOffset) =>
              BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))}
            for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
              var fetcherThread: AbstractFetcherThread = null
              fetcherThreadMap.get(brokerAndFetcherId) match {
                case Some(f) => fetcherThread = f
                case None =>
                  fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
                  // 按照Broker维度,缓存线程
                  fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
                  // 启动线程
                  fetcherThread.start
              }
    
              fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) =>
                tp -> brokerAndInitOffset.initOffset
              })
            }
          }
        }
    }

1.2 ReplicaFetcherThread

再来看下ReplicaFetcherThread这个副本同步线程,它继承自AbstractFetcherThread,很多处理流程直接委托给父类完成:

    // ReplicaFetcherThread.scala
    
    class ReplicaFetcherThread(name: String,
                               fetcherId: Int,
                               sourceBroker: BrokerEndPoint,
                               brokerConfig: KafkaConfig,
                               replicaMgr: ReplicaManager,
                               metrics: Metrics,
                               time: Time,
                               quota: ReplicationQuotaManager)
      extends AbstractFetcherThread(name = name,
                                    clientId = name,
                                    sourceBroker = sourceBroker,
                                    fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
                                    isInterruptible = false) {
    
        override def run(): Unit = {
          info("Starting ")
          try{
            while(isRunning.get()){
              // 调用父类方法,完成副本数据同步
              doWork()
            }
          } catch{
            case e: Throwable =>
              if(isRunning.get())
                error("Error due to ", e)
          }
          shutdownLatch.countDown()
          info("Stopped ")
        }
    }

我们重点来看上面的AbstractFetcherThread.doWork()方法,就是创建一个FetchRequest对象,然后向指定的Broker发起请求进行副本数据同步:

    // AbstractFetcherThread.scala
    
    abstract class AbstractFetcherThread(
      name: String,                                          // 线程名称
      clientId: String,                                      // Client Id,用于日志输出
      val sourceBroker: BrokerEndPoint,                      // 数据源Broker地址
      failedPartitions: FailedPartitions,                      // 处理过程中出现失败的分区
      fetchBackOffMs: Int = 0,                              // 获取操作重试间隔
      isInterruptible: Boolean = true,                      // 线程是否允许被中断
      val brokerTopicStats: BrokerTopicStats)                 // Broker端主题监控指标
      extends ShutdownableThread(name, isInterruptible) {
      // 定义FetchData类型表示获取的消息数据
      type FetchData = FetchResponse.PartitionData[Records]
      // 定义EpochData类型表示Leader Epoch数据
      type EpochData = OffsetsForLeaderEpochRequest.PartitionData
      private val partitionStates = new PartitionStates[PartitionFetchState]
      //...
    
        override def doWork() {
          val fetchRequest = inLock(partitionMapLock) {
            // 针对Leader都在同一个Broker上的一批Follower分区,创建一个FetchRequest
            val fetchRequest = buildFetchRequest(partitionStates.partitionStates.asScala.map { state =>
              state.topicPartition -> state.value
            })
            // 没有需要同步的分区,等待一段时间
            if (fetchRequest.isEmpty) {
              partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
            }
            fetchRequest
          }
          if (!fetchRequest.isEmpty)
            // 发送请求进行副本数据同步
            processFetchRequest(fetchRequest)
        }  
    }

这里重点看下一个FetchRequest这个请求,我们必须要理解: 一个Broker上有许多Follower分区,这些分区的Leader可能分布同一个Broker上,那么对于同一个Broker的所有Follower分区,只要组装一个FetchRequest请求就可以了

    // ReplicaFetcherThread.scala
    
    protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
      val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
    
      // 遍历分区集合,组装FetchRequest
      partitionMap.foreach { case (topicPartition, partitionFetchState) =>
        if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition))
          requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
      }
      // 请求的数据包含:拉取的数据大小、最长等待时间、拉取的offset等等
      val requestBuilder = new JFetchRequest.Builder(maxWait, minBytes, requestMap).
          setReplicaId(replicaId).setMaxBytes(maxBytes)
      requestBuilder.setVersion(fetchRequestVersion)
      new FetchRequest(requestBuilder)
    }

最后,来看下发送请求和对结果处理的操作,本质就是通过底层NetworkClient组件执行请求发送,发送时会带上Follower的LEO,获得响应后会更新自己的LEO和HW:

    // AbstractFetcherThread.scala
    
    private def processFetchRequest(fetchRequest: REQ) {
      val partitionsWithError = mutable.Set[TopicPartition]()
    
      def updatePartitionsWithError(partition: TopicPartition): Unit = {
        partitionsWithError += partition
        partitionStates.moveToEnd(partition)
      }
    
      var responseData: Seq[(TopicPartition, PD)] = Seq.empty
    
      try {
        // 1.执行拉取,底层通过NIO组件NetworkClient完成
        responseData = fetch(fetchRequest)
      } catch {
        //...
      }
      fetcherStats.requestRate.mark()
    
      // 对响应进行处理
      if (responseData.nonEmpty) {
        inLock(partitionMapLock) {
          responseData.foreach { case (topicPartition, partitionData) =>
            val topic = topicPartition.topic
            val partitionId = topicPartition.partition
            Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState =>
              if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.offset) {
                Errors.forCode(partitionData.errorCode) match {
                  case Errors.NONE =>
                    try {
                      val records = partitionData.toRecords
                      val newOffset = records.shallowEntries.asScala.lastOption.map(_.nextOffset).getOrElse(
                        currentPartitionFetchState.offset)
    
                      fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
                      // 处理分区数据
                      processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData)
    
                      val validBytes = records.validBytes
                      if (validBytes > 0) {
                        partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
                        fetcherStats.byteRate.mark(validBytes)
                      }
                    } catch {
                     //...
                    }
                     //...
                }
              })
          }
        }
      }
      //...
    }
    // ReplicaFetcherThread.scala
    
    // 处理响应结果
    def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
      try {
        val replica = replicaMgr.getReplica(topicPartition).get
        val records = partitionData.toRecords
    
        maybeWarnIfOversizedRecords(records, topicPartition)
    
        if (fetchOffset != replica.logEndOffset.messageOffset)
          throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
        if (logger.isTraceEnabled)
        // 1.将消息写入磁盘
        replica.log.get.append(records, assignOffsets = false)
        if (logger.isTraceEnabled)
        // 2.更新副本的高水位HW
        val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
        replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
    
        //...
      } catch {
        case e: KafkaStorageException =>
          fatal(s"Disk error while replicating data for $topicPartition", e)
          Runtime.getRuntime.halt(1)
      }
    }

二、总结

本章,我站在Follower副本的角度对副本同步的整个流程进行了讲解。重点如下:

  1. Broker启动后,会在创建多个后台线程—— ReplicaFetcherThread ,负责副本的同步。ReplicaFetcherThread是按照Broker维度创建的,一批Leader分区在同一个Broker上的Follower分区共用一个ReplicaFetcherThread;
  2. ReplicaFetcherThread拉取消息时,本质还是作为Producer发送请求,走的还是Kafka自定义的一套NIO通信框架;
  3. Follower拉取到消息后,按照 日志子系统 那套流程完成消息的磁盘写入,并会更新自己的LEO和HW。
阅读全文