2023-07-31
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

KafkaProducer通过组件RecordAccumulator将消息按照批次缓存后,就立马返回了,也就是说消息的发送是 异步 的。那么,到底是谁负责从缓存中获取各个batch消息,然后通过网络组件发送给Broker呢?

没错,就是Sender线程。本章,我就来讲解Sender线程是如何完成消息发送的。

注:前面章节我讲解过KafkaProdcer的初始化流程,我们应该已经知道Sender本质是一个Runnable任务,实际创建的线程是“KafkaThread”,我这里习惯叫做”Sender线程“。

一、整体流程

我们先来回顾下Sender线程的整体处理流程,然后再逐节分析内部的细节。Sender线程启动后,会在一个Loop循环中不断执行run方法:

    // Sender.java
    
    void run(long now) {
        // Cluster包含了元数据
        Cluster cluster = metadata.fetch();
    
        // 1.获取已经有消息就绪的Partition对应的Broker
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    
        // 2.如果有Partition对应的元数据都没拉取到,就标识一下,后续需要尝试拉取元数据
        if (!result.unknownLeaderTopics.isEmpty()) {
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }
    
        // 3.检查与Broker的连接状态
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            // 没有建立好连接就移除
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }
    
        // 4.按照Broker维度,对Partition进行分组
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                                                                         this.maxRequestSize, now);
        if (guaranteeMessageOrder) {
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }
    
        // 5.剔除超时的batch(默认60s)
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
    
        sensors.updateProduceRequestMetrics(batches);
    
        // 6.计算超时时间
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0;
        }
    
        // 7.创建ClientReqeust对象,包括了多个Batch,形成一个请求
        sendProduceRequests(batches, now);
    
        // 8.负责真正的网络请求发送
        this.client.poll(pollTimeout, now);
    }

我来解释下上述的各个步骤:

  1. 首先,获取Cluster对象,里面包含了缓存的Broker集群的元数据信息(也可能没有元数据);

  2. 接着,需要判断哪些Broker上的Partition已经准备好发送数据了,比如:

    • 已经有写满的batch(16kb)的Partition;
    • batch创建时间已经超过了linger.ms的Partition。
  3. 第二步筛选完后,可能有一些Partition不知道Leader信息,对于这种情况,后面需要重新拉取元数据;

  4. 接着,检查这些数据已经就绪的Broker,看看客户端是否已经与它们建立了长连接,如果没有则建立连接;

  5. 接着,还需要对数据重新按照Broker维度分组:<Integer, List<RecordBatch>>,一个Broker可以对应多个Partition的Batch;

  6. 剔除超时的batch(默认60s),也就是说如果有batch在内存缓冲区里停留超过60s,就丢弃掉;

  7. 然后,针对每个要发送的Broker,创建一个ClientReqeust对象,里面包括了多个Batch,形成一个请求,后面会将它发送给Broker;

  8. 最后,通过 NetWorkClient 网络通信组件,发送实际的网络I/O通信请求,同时也读取响应结果;

以上就是Sender线程运行的整体流程,接下来我们分步骤来看内部实现细节。

二、就绪Batch筛选

首先来看RecordAccumulator是如何筛选哪些Broker上的Partition已经准备好发送数据了:

    // Sender.java
    
    // 1.获取已经可以发送消息的Partition
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

核心就是RecordAccumulator的ready方法,该方法会筛选出就绪的Leader Partition所在的Broker节点,ReadyCheckResult本质是一个包装Bean对象,封装了以下内容:

    public final static class ReadyCheckResult {
        // 就绪Partition Leader所在的Broker
        public final Set<Node> readyNodes;
        // 下一次进行就绪筛选要等待的时间
        public final long nextReadyCheckDelayMs;
        // 未知Leader的Topic
        public final Set<String> unknownLeaderTopics;
    }

2.1 流程

我们再来看RecordAccumulator.ready()的处理流程,它的核心思路就是:

  1. 遍历缓存的map<TopicPartition, Deque<RecordBatch>>,针对每一个Partition,获取队列头的batch,检查该batch 是否符合就绪条件
  2. 如果符合就绪条件,就把Partition对应的Leader Partition所在的Broker加入结果集;
  3. 如果不符合就绪条件,就计算出下一次处理的时间nextReadyCheckDelayMs,那么Sender线程后续会等待该时间之后再来检查。
    // RecordAccumulator.java
    
    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        // 定义各类返回对象
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        Set<String> unknownLeaderTopics = new HashSet<>();
    
        // 判断BufferPool可用内存是否已经耗尽
        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<RecordBatch> deque = entry.getValue();
            // Leader Partition
            Node leader = cluster.leaderFor(part);
            synchronized (deque) {
                // 不存在Leader,说明要重新拉取元数据
                if (leader == null && !deque.isEmpty()) {
                    unknownLeaderTopics.add(part.topic());
                } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                    // 从队列头获取batch,因为入队是从队尾入的
                    RecordBatch batch = deque.peekFirst();
                    if (batch != null) {
                        // 判断是否有就绪的Batch
                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                        long waitedTimeMs = nowMs - batch.lastAttemptMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        boolean full = deque.size() > 1 || batch.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            // 计算下一次ready check的时间
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
    }

上述流程中,还需要关注的一点是:判断缓冲池BufferPool是否已经耗尽。当BufferPool没用可用空间时,条件等待队列 waiters 不为空:

    // BufferPool.java
    
    private final Deque<Condition> waiters;
    
    public int queued() {
        lock.lock();
        try {
            return this.waiters.size();
        } finally {
            lock.unlock();
        }
    }

2.2 筛选条件

那么,哪些batch才算是已经就绪的batch呢?判断逻辑在如下代码中,每个变量我都加了详尽注释:

    // RecordAccumulator.java
    
    // 是否属于重试batch
    boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
    
    // 表示这个batch目前已经等待了多久:waitedTimeMs = 当前时间 - 上一次重试时间,默认情况下batch.lastAttemptMs为创建时间
    long waitedTimeMs = nowMs - batch.lastAttemptMs;
    
    // 表示这个batch最多要等待多久才被发送:timeToWaitMs = 重试时间间隔 或 最大可逗留时间,
    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
    
    // 表示这个batch的剩余等待时间
    long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
    
    // 表示batch空间是否已经满了
    boolean full = deque.size() > 1 || batch.isFull();
    
    // 表示batch是否到期,到期就是就绪了
    boolean expired = waitedTimeMs >= timeToWaitMs;
    boolean sendable = full || expired || exhausted || closed || flushInProgress();
    
    if (sendable && !backingOff) {
        // 把就绪batch所在的Leader Partition加入readyNodes
        readyNodes.add(leader);
    }

从上述代码可以看出,以下情况就会认为已经就绪:

  1. BufferPool缓冲区满了,默认32MB;
  2. Batch空间满了,默认16KB;
  3. RecordAccumulator已经关闭,因为当客户端关闭时,必须立马把内存缓冲区中的Batch发送出去;
  4. Batch的缓存时间超过了linger.ms

三、总结

本章,我对Sender线程Loop处理流程中, 就绪Batch消息筛选 的底层原理进行了讲解。我们需要主要关注的是,在哪些情况下,缓冲区中的消息会被立马发送。

阅读全文