KafkaProducer通过组件RecordAccumulator将消息按照批次缓存后,就立马返回了,也就是说消息的发送是 异步 的。那么,到底是谁负责从缓存中获取各个batch消息,然后通过网络组件发送给Broker呢?
没错,就是Sender线程。本章,我就来讲解Sender线程是如何完成消息发送的。
注:前面章节我讲解过KafkaProdcer的初始化流程,我们应该已经知道Sender本质是一个Runnable任务,实际创建的线程是“KafkaThread”,我这里习惯叫做”Sender线程“。
一、整体流程
我们先来回顾下Sender线程的整体处理流程,然后再逐节分析内部的细节。Sender线程启动后,会在一个Loop循环中不断执行run
方法:
// Sender.java
void run(long now) {
// Cluster包含了元数据
Cluster cluster = metadata.fetch();
// 1.获取已经有消息就绪的Partition对应的Broker
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// 2.如果有Partition对应的元数据都没拉取到,就标识一下,后续需要尝试拉取元数据
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// 3.检查与Broker的连接状态
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
// 没有建立好连接就移除
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// 4.按照Broker维度,对Partition进行分组
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
if (guaranteeMessageOrder) {
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 5.剔除超时的batch(默认60s)
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
// 6.计算超时时间
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
}
// 7.创建ClientReqeust对象,包括了多个Batch,形成一个请求
sendProduceRequests(batches, now);
// 8.负责真正的网络请求发送
this.client.poll(pollTimeout, now);
}
我来解释下上述的各个步骤:
-
首先,获取Cluster对象,里面包含了缓存的Broker集群的元数据信息(也可能没有元数据);
-
接着,需要判断哪些Broker上的Partition已经准备好发送数据了,比如:
- 已经有写满的batch(16kb)的Partition;
- batch创建时间已经超过了
linger.ms
的Partition。
-
第二步筛选完后,可能有一些Partition不知道Leader信息,对于这种情况,后面需要重新拉取元数据;
-
接着,检查这些数据已经就绪的Broker,看看客户端是否已经与它们建立了长连接,如果没有则建立连接;
-
接着,还需要对数据重新按照Broker维度分组:
<Integer, List<RecordBatch>>
,一个Broker可以对应多个Partition的Batch; -
剔除超时的batch(默认60s),也就是说如果有batch在内存缓冲区里停留超过60s,就丢弃掉;
-
然后,针对每个要发送的Broker,创建一个ClientReqeust对象,里面包括了多个Batch,形成一个请求,后面会将它发送给Broker;
-
最后,通过 NetWorkClient 网络通信组件,发送实际的网络I/O通信请求,同时也读取响应结果;
以上就是Sender线程运行的整体流程,接下来我们分步骤来看内部实现细节。
二、就绪Batch筛选
首先来看RecordAccumulator是如何筛选哪些Broker上的Partition已经准备好发送数据了:
// Sender.java
// 1.获取已经可以发送消息的Partition
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
核心就是RecordAccumulator的ready方法,该方法会筛选出就绪的Leader Partition所在的Broker节点,ReadyCheckResult本质是一个包装Bean对象,封装了以下内容:
public final static class ReadyCheckResult {
// 就绪Partition Leader所在的Broker
public final Set<Node> readyNodes;
// 下一次进行就绪筛选要等待的时间
public final long nextReadyCheckDelayMs;
// 未知Leader的Topic
public final Set<String> unknownLeaderTopics;
}
2.1 流程
我们再来看RecordAccumulator.ready()
的处理流程,它的核心思路就是:
- 遍历缓存的map
<TopicPartition, Deque<RecordBatch>>
,针对每一个Partition,获取队列头的batch,检查该batch 是否符合就绪条件 ; - 如果符合就绪条件,就把Partition对应的Leader Partition所在的Broker加入结果集;
- 如果不符合就绪条件,就计算出下一次处理的时间
nextReadyCheckDelayMs
,那么Sender线程后续会等待该时间之后再来检查。
// RecordAccumulator.java
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
// 定义各类返回对象
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
// 判断BufferPool可用内存是否已经耗尽
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();
// Leader Partition
Node leader = cluster.leaderFor(part);
synchronized (deque) {
// 不存在Leader,说明要重新拉取元数据
if (leader == null && !deque.isEmpty()) {
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
// 从队列头获取batch,因为入队是从队尾入的
RecordBatch batch = deque.peekFirst();
if (batch != null) {
// 判断是否有就绪的Batch
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
// 计算下一次ready check的时间
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
上述流程中,还需要关注的一点是:判断缓冲池BufferPool是否已经耗尽。当BufferPool没用可用空间时,条件等待队列 waiters 不为空:
// BufferPool.java
private final Deque<Condition> waiters;
public int queued() {
lock.lock();
try {
return this.waiters.size();
} finally {
lock.unlock();
}
}
2.2 筛选条件
那么,哪些batch才算是已经就绪的batch呢?判断逻辑在如下代码中,每个变量我都加了详尽注释:
// RecordAccumulator.java
// 是否属于重试batch
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
// 表示这个batch目前已经等待了多久:waitedTimeMs = 当前时间 - 上一次重试时间,默认情况下batch.lastAttemptMs为创建时间
long waitedTimeMs = nowMs - batch.lastAttemptMs;
// 表示这个batch最多要等待多久才被发送:timeToWaitMs = 重试时间间隔 或 最大可逗留时间,
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
// 表示这个batch的剩余等待时间
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// 表示batch空间是否已经满了
boolean full = deque.size() > 1 || batch.isFull();
// 表示batch是否到期,到期就是就绪了
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
// 把就绪batch所在的Leader Partition加入readyNodes
readyNodes.add(leader);
}
从上述代码可以看出,以下情况就会认为已经就绪:
- BufferPool缓冲区满了,默认32MB;
- Batch空间满了,默认16KB;
- RecordAccumulator已经关闭,因为当客户端关闭时,必须立马把内存缓冲区中的Batch发送出去;
- Batch的缓存时间超过了
linger.ms
。
三、总结
本章,我对Sender线程Loop处理流程中, 就绪Batch消息筛选 的底层原理进行了讲解。我们需要主要关注的是,在哪些情况下,缓冲区中的消息会被立马发送。