2023-07-31
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

KafkaProducer发送消息的过程中可能会出现 消息超时 的问题。本章,我会从Kafka客户端的底层对该问题进行讲解。

一、超时场景

我们先来看下,哪些情况下会出现超时问题:

  • RecordBatch长时间停留在 BufferPool 缓冲区中,压根没有被Sender线程获取;
  • Sender线程将消息发送出去了,但是一直没有收到响应,NetworkSend请求长时间积压在 InFlightRequests 中。

1.1 BufferPool超时

我们先来看第一种情况。Sender线程的运行主流程中有这么一行代码:

    // Sender.java
    
    void run(long now) {
           //...
    
        // 5.剔除超时的batch(默认60s)
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
    }

我们来看RecordAccumulator的abortExpiredBatches方法,它的处理逻辑如下:

    // RecordAccumulator.java
    
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
    public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) {
        List<RecordBatch> expiredBatches = new ArrayList<>();
        int count = 0;
        // 1.遍历查找超时RecordBatch
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            Deque<RecordBatch> dq = entry.getValue();
            TopicPartition tp = entry.getKey();
            if (!muted.contains(tp)) {
                synchronized (dq) {
                    RecordBatch lastBatch = dq.peekLast();
                    Iterator<RecordBatch> batchIterator = dq.iterator();
                    while (batchIterator.hasNext()) {
                        RecordBatch batch = batchIterator.next();
                        boolean isFull = batch != lastBatch || batch.isFull();
                        // 判断是否超时
                        if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) {
                            expiredBatches.add(batch);
                            count++;
                            batchIterator.remove();    //移除
                        } else {
                            break;
                        }
                    }
                }
            }
        }
        // 2.触发回调函数
        if (!expiredBatches.isEmpty()) {
            log.trace("Expired {} batches in accumulator", count);
            for (RecordBatch batch : expiredBatches) {
                // 回调
                batch.expirationDone();
                // 回收分配的Buffer
                deallocate(batch);
            }
        }
    
        return expiredBatches;
    }

RecordBatch的expirationDone方法最终会调用内部的done方法,也就是触发回调函数的执行:

    // RecordBatch.java
    
    void expirationDone() {
        if (expiryErrorMessage == null)
            throw new IllegalStateException("Batch has not expired");
        this.done(-1L, Record.NO_TIMESTAMP,
                  new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage));
    }
    
    public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
        //...
        // execute callbacks
        for (Thunk thunk : thunks) {
            try {
                if (exception == null) {
                    RecordMetadata metadata = thunk.future.value();
                    thunk.callback.onCompletion(metadata, null);
                } else {
                    thunk.callback.onCompletion(null, exception);
                }
            } catch (Exception e) {
                log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
            }
        }
        produceFuture.done();
    }

1.2 InFlightRequests超时

再来看另一种超时的场景,Sender线程的主流程中调用了NetworkClient的poll方法:

    // Sender.java
    
    void run(long now) {
        //...
        this.client.poll(pollTimeout, now);
    }

NetworkClient的poll方法内部有一段超时逻辑的判断,也就是说如果发现有对Broker的请求超时了,即超过request.timeout.ms(默认60s)还没响应,此时会关闭掉跟那个Broker的连接,认为那个Broker已经故障了 。同时,进行内存数据结构的清理,并再次标记为需要去重新拉取元数据:

    // NetworkClient.java
    
    public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }
    
        //处理超时请求
        handleTimedOutRequests(responses, updatedNow);
    
        return responses;
    }
    
    private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
        // 获取超时的目标Broker
        List<String> nodeIds = this.inFlightRequests
            .getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
        for (String nodeId : nodeIds) {
            // 关闭与该Broker的连接
            this.selector.close(nodeId);
            processDisconnection(responses, nodeId, now);
        }
    
        // 标记更新元数据
        if (!nodeIds.isEmpty())
            metadataUpdater.requestUpdate();
    }
    
    private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
        connectionStates.disconnected(nodeId, now);
        nodeApiVersions.remove(nodeId);
        nodesNeedingApiVersionsFetch.remove(nodeId);
        // 清理InFlightRequests中缓存的针对该Broker的请求
        for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {    
            if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id)
                metadataUpdater.handleDisconnection(request.destination);
            else
                responses.add(request.disconnected(now));
        }
    }

超时逻辑判断:

    // InFlightRequests.java
    
    public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) {
        List<String> nodeIds = new LinkedList<>();
        for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>> requestEntry : requests.entrySet()) {
            String nodeId = requestEntry.getKey();
            Deque<NetworkClient.InFlightRequest> deque = requestEntry.getValue();
            if (!deque.isEmpty()) {
                NetworkClient.InFlightRequest request = deque.peekLast();
                // 当前事件-请求发送事件超过了`request.timeout.ms`
                long timeSinceSend = now - request.sendTimeMs;
                if (timeSinceSend > requestTimeout)
                    nodeIds.add(nodeId);
            }
        }
        return nodeIds;
    }

二、总结

本章,我对KafkaProducer发送消息的过程中可能会出现的 消息超时 问题进行讲解,整体分为两种情况:

  1. 请求积压在BufferPool;
  2. 请求积压在InFlightRequests。

无论哪种情况,请求超时的判断逻辑中都涉及参数request.timeout.ms,默认超时时间为60s。同时,超时后最终会触发回调函数的执行。

阅读全文