2023-07-31
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

本章,我将对副本同步过程中的 延迟读写 情况进行讲解。什么是延迟读写?

这要分两种情况:

  1. 延迟读取:也就是说,生产者可能发送消息并不频繁,各个分区的Leader和Follower完全处于同步一致的状态,这时, Leader本身没什么消息写入,而此时Follower又发送了Fetch请求过来同步消息 ,此时Leader就会延迟读取消息;
  2. 延迟写入:主要就是Producer的acks参数设置为all 或 -1,必须等待所有ISR中的Follower都拉取到了这批数据后,才可以响应。

本章,我就对这两种情况进行讲解。

一、延迟读取

首先,我们来看如果Broker端接受到Fetch请求后,没有最新的消息可供读取会怎么做?

1.1 等待

事实上,Kafka Broker利用了 时间轮机制 来处理这种情况:把一个DelayedFetch任务插入到500毫秒之后的时间格里去,随着轮子的运转,最多就是500毫秒之后就会执行这个Fetch任务。如果在这个500毫秒之内,有新的数据进入到了Leader中,就会触发Fetch任务的执行:

    // ReplicaManager.scala
    
    def fetchMessages(timeout: Long,
                      replicaId: Int,
                      fetchMinBytes: Int,
                      fetchMaxBytes: Int,
                      hardMaxBytesLimit: Boolean,
                      fetchInfos: Seq[(TopicPartition, PartitionData)],
                      quota: ReplicaQuota = UnboundedQuota,
                      responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) {
      val isFromFollower = replicaId >= 0
      val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
      val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
    
      // 从本地磁盘读取消息
      val logReadResults = readFromLocalLog(
        replicaId = replicaId,
        fetchOnlyFromLeader = fetchOnlyFromLeader,
        readOnlyCommitted = fetchOnlyCommitted,
        fetchMaxBytes = fetchMaxBytes,
        hardMaxBytesLimit = hardMaxBytesLimit,
        readPartitionInfo = fetchInfos,
        quota = quota)
      //...
      if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
        val fetchPartitionData = logReadResults.map { case (tp, result) =>
          tp -> FetchPartitionData(result.error, result.hw, result.info.records)
        }
        responseCallback(fetchPartitionData)
      } else {
        // 进入到这个分支,说明没有最新的消息可供读取
        val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) =>
          val fetchInfo = fetchInfos.collectFirst {
            case (tp, v) if tp == topicPartition => v
          }.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos"))
          (topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
        }
        val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
          fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)
    
        // 创建一个DelayedFetch任务,进行延时调度
        val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
        val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
            new TopicPartitionOperationKey(tp) }
        // 由时间轮进行调度
        delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
      }
    }

关键是上述的delayedFetchPurgatory,这个就是时间轮的核心组件,Kafka会将请求封装成一个DelayedFetch任务,交给DelayedOperationPurgatory进行调度处理。

DelayedOperationPurgatory的代码我就不深入了,读者可以顺着我的思路去研读它的代码。

1.2 唤醒

在创建上述的DelayedFetch延时任务时,实际上传入了一个responseCallback回调函数。所以,一旦Broker接受到新的消息,并完成向分区写入消息后,就会唤醒延时任务,并触发回调函数:

    // Partition.scala
    
    def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0) = {
    val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
      leaderReplicaIfLocal match {
        case Some(leaderReplica) =>
          //...
    
          // 1.向分区写入消息
          val info = log.append(records, assignOffsets = true)
          // 2.唤醒延时任务
          replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
          (info, maybeIncrementLeaderHW(leaderReplica))
        case None =>
          throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
            .format(topicPartition, localBrokerId))
      }
    }
    // ReplicaManager.scala
    def tryCompleteDelayedFetch(key: DelayedOperationKey) {
      // 通过时间轮组件完成调度
      val completed = delayedFetchPurgatory.checkAndComplete(key)
      debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed))
    }

二、延迟写入

我们再来看延迟写入的情况。

2.1 等待

ReplicaManager在写入消息时,有下面这样的一段逻辑,本质还是利用了时间轮,创建了一个DelayedProduce延迟任务:

    // ReplicaManager.scala
    
    def appendRecords(timeout: Long,
                      requiredAcks: Short,
                      internalTopicsAllowed: Boolean,
                      entriesPerPartition: Map[TopicPartition, MemoryRecords],
                      responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {
    
      if (isValidRequiredAcks(requiredAcks)) {
        val sTime = time.milliseconds
        val localProduceResults = appendToLocalLog(internalTopicsAllowed, entriesPerPartition, requiredAcks)
        debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
    
        val produceStatus = localProduceResults.map { case (topicPartition, result) =>
          topicPartition ->
                  ProducePartitionStatus(
                    result.info.lastOffset + 1, // required offset
                    new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
        }
    
        // 如果需要延迟写入
        if (delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
          // 1.创建延迟写入调度任务DelayedProduce
          val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
          val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
          val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
          // 2.利用时间轮进行调度
          delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
    
        } else {
          // we can respond immediately
          val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
          responseCallback(produceResponseStatus)
        }
      } else {
        // If required.acks is outside accepted range, something is wrong with the client
        // Just return an error and don't handle the request at all
        val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
          topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
            LogAppendInfo.UnknownLogAppendInfo.firstOffset, Record.NO_TIMESTAMP)
        }
        responseCallback(responseStatus)
      }
    }

我们来看下delayedRequestRequired方法,就是说当acks==-1时,也就是需要ISR中的Follower都写入完成时,才会触发延迟写入:

    // ReplicaManager.scalaF
    private def delayedRequestRequired(requiredAcks: Short,
                                       entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                       localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
      requiredAcks == -1 &&
      entriesPerPartition.nonEmpty &&
      localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
    }

2.2 唤醒

最后,我们来看下Broker是在哪里唤醒DelayedProduce任务的。既然等待的是各个Fellower进行数据同步,那肯定是在数据同步的代码里,每个Follower同步完一次数据就要检查一下,是否可以唤醒DelayedProduce延时任务。

所以,又回到了Fetch请求对应的ReplicaManager.fetchMessages()方法,层层调用后,就是在调用ReplicaManager.updateFollowerLogReadResults()方法时进行判断的:

    // ReplicaManager.scala
    def fetchMessages(timeout: Long,
                      replicaId: Int,
                      fetchMinBytes: Int,
                      fetchMaxBytes: Int,
                      hardMaxBytesLimit: Boolean,
                      fetchInfos: Seq[(TopicPartition, PartitionData)],
                      quota: ReplicaQuota = UnboundedQuota,
                      responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) {
      val isFromFollower = replicaId >= 0
      val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
      val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
    
      // read from local logs
      val logReadResults = readFromLocalLog(
        replicaId = replicaId,
        fetchOnlyFromLeader = fetchOnlyFromLeader,
        readOnlyCommitted = fetchOnlyCommitted,
        fetchMaxBytes = fetchMaxBytes,
        hardMaxBytesLimit = hardMaxBytesLimit,
        readPartitionInfo = fetchInfos,
        quota = quota)
    
      // Fetch请求
      if(Request.isValidBrokerId(replicaId))
        // 更新Follower的消息读取结果
        updateFollowerLogReadResults(replicaId, logReadResults)
        //...
    }
    // ReplicaManager.scala
    private def updateFollowerLogReadResults(replicaId: Int, readResults: Seq[(TopicPartition, LogReadResult)]) {
      debug("Recording follower broker %d log read results: %s ".format(replicaId, readResults))
      readResults.foreach { case (topicPartition, readResult) =>
        getPartition(topicPartition) match {
          case Some(partition) =>
            partition.updateReplicaLogReadResult(replicaId, readResult)
    
            // 对应ACKS>1的情况,检查Follower是否都已经同步完成
            tryCompleteDelayedProduce(new TopicPartitionOperationKey(topicPartition))
          case None =>
            warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicPartition))
        }
      }
    }
    
    def tryCompleteDelayedProduce(key: DelayedOperationKey) {
      // 检查是否可以唤醒延时任务
      val completed = delayedProducePurgatory.checkAndComplete(key)
      debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed))
    }

三、总结

本章,我对副本同步中的最后一块内容——延迟读写进行了讲解。Kafka就是利用了时间轮机制对这种延迟读写的情况进行处理。

阅读全文