本章,我将对副本同步过程中的 延迟读写 情况进行讲解。什么是延迟读写?
这要分两种情况:
- 延迟读取:也就是说,生产者可能发送消息并不频繁,各个分区的Leader和Follower完全处于同步一致的状态,这时, Leader本身没什么消息写入,而此时Follower又发送了Fetch请求过来同步消息 ,此时Leader就会延迟读取消息;
- 延迟写入:主要就是Producer的acks参数设置为all 或 -1,必须等待所有ISR中的Follower都拉取到了这批数据后,才可以响应。
本章,我就对这两种情况进行讲解。
一、延迟读取
首先,我们来看如果Broker端接受到Fetch请求后,没有最新的消息可供读取会怎么做?
1.1 等待
事实上,Kafka Broker利用了 时间轮机制 来处理这种情况:把一个DelayedFetch任务插入到500毫秒之后的时间格里去,随着轮子的运转,最多就是500毫秒之后就会执行这个Fetch任务。如果在这个500毫秒之内,有新的数据进入到了Leader中,就会触发Fetch任务的执行:
// ReplicaManager.scala
def fetchMessages(timeout: Long,
replicaId: Int,
fetchMinBytes: Int,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota = UnboundedQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) {
val isFromFollower = replicaId >= 0
val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
// 从本地磁盘读取消息
val logReadResults = readFromLocalLog(
replicaId = replicaId,
fetchOnlyFromLeader = fetchOnlyFromLeader,
readOnlyCommitted = fetchOnlyCommitted,
fetchMaxBytes = fetchMaxBytes,
hardMaxBytesLimit = hardMaxBytesLimit,
readPartitionInfo = fetchInfos,
quota = quota)
//...
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.hw, result.info.records)
}
responseCallback(fetchPartitionData)
} else {
// 进入到这个分支,说明没有最新的消息可供读取
val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) =>
val fetchInfo = fetchInfos.collectFirst {
case (tp, v) if tp == topicPartition => v
}.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos"))
(topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
}
val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)
// 创建一个DelayedFetch任务,进行延时调度
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) =>
new TopicPartitionOperationKey(tp) }
// 由时间轮进行调度
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
}
}
关键是上述的delayedFetchPurgatory,这个就是时间轮的核心组件,Kafka会将请求封装成一个DelayedFetch任务,交给DelayedOperationPurgatory进行调度处理。
DelayedOperationPurgatory的代码我就不深入了,读者可以顺着我的思路去研读它的代码。
1.2 唤醒
在创建上述的DelayedFetch延时任务时,实际上传入了一个responseCallback
回调函数。所以,一旦Broker接受到新的消息,并完成向分区写入消息后,就会唤醒延时任务,并触发回调函数:
// Partition.scala
def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0) = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
case Some(leaderReplica) =>
//...
// 1.向分区写入消息
val info = log.append(records, assignOffsets = true)
// 2.唤醒延时任务
replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
(info, maybeIncrementLeaderHW(leaderReplica))
case None =>
throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
.format(topicPartition, localBrokerId))
}
}
// ReplicaManager.scala
def tryCompleteDelayedFetch(key: DelayedOperationKey) {
// 通过时间轮组件完成调度
val completed = delayedFetchPurgatory.checkAndComplete(key)
debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed))
}
二、延迟写入
我们再来看延迟写入的情况。
2.1 等待
ReplicaManager在写入消息时,有下面这样的一段逻辑,本质还是利用了时间轮,创建了一个DelayedProduce延迟任务:
// ReplicaManager.scala
def appendRecords(timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {
if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds
val localProduceResults = appendToLocalLog(internalTopicsAllowed, entriesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
}
// 如果需要延迟写入
if (delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// 1.创建延迟写入调度任务DelayedProduce
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
// 2.利用时间轮进行调度
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// we can respond immediately
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
responseCallback(produceResponseStatus)
}
} else {
// If required.acks is outside accepted range, something is wrong with the client
// Just return an error and don't handle the request at all
val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
LogAppendInfo.UnknownLogAppendInfo.firstOffset, Record.NO_TIMESTAMP)
}
responseCallback(responseStatus)
}
}
我们来看下delayedRequestRequired方法,就是说当acks==-1时,也就是需要ISR中的Follower都写入完成时,才会触发延迟写入:
// ReplicaManager.scalaF
private def delayedRequestRequired(requiredAcks: Short,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
requiredAcks == -1 &&
entriesPerPartition.nonEmpty &&
localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
}
2.2 唤醒
最后,我们来看下Broker是在哪里唤醒DelayedProduce任务的。既然等待的是各个Fellower进行数据同步,那肯定是在数据同步的代码里,每个Follower同步完一次数据就要检查一下,是否可以唤醒DelayedProduce延时任务。
所以,又回到了Fetch请求对应的ReplicaManager.fetchMessages()
方法,层层调用后,就是在调用ReplicaManager.updateFollowerLogReadResults()
方法时进行判断的:
// ReplicaManager.scala
def fetchMessages(timeout: Long,
replicaId: Int,
fetchMinBytes: Int,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota = UnboundedQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) {
val isFromFollower = replicaId >= 0
val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
// read from local logs
val logReadResults = readFromLocalLog(
replicaId = replicaId,
fetchOnlyFromLeader = fetchOnlyFromLeader,
readOnlyCommitted = fetchOnlyCommitted,
fetchMaxBytes = fetchMaxBytes,
hardMaxBytesLimit = hardMaxBytesLimit,
readPartitionInfo = fetchInfos,
quota = quota)
// Fetch请求
if(Request.isValidBrokerId(replicaId))
// 更新Follower的消息读取结果
updateFollowerLogReadResults(replicaId, logReadResults)
//...
}
// ReplicaManager.scala
private def updateFollowerLogReadResults(replicaId: Int, readResults: Seq[(TopicPartition, LogReadResult)]) {
debug("Recording follower broker %d log read results: %s ".format(replicaId, readResults))
readResults.foreach { case (topicPartition, readResult) =>
getPartition(topicPartition) match {
case Some(partition) =>
partition.updateReplicaLogReadResult(replicaId, readResult)
// 对应ACKS>1的情况,检查Follower是否都已经同步完成
tryCompleteDelayedProduce(new TopicPartitionOperationKey(topicPartition))
case None =>
warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicPartition))
}
}
}
def tryCompleteDelayedProduce(key: DelayedOperationKey) {
// 检查是否可以唤醒延时任务
val completed = delayedProducePurgatory.checkAndComplete(key)
debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed))
}
三、总结
本章,我对副本同步中的最后一块内容——延迟读写进行了讲解。Kafka就是利用了时间轮机制对这种延迟读写的情况进行处理。