2023-07-31
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

上一章讲到,Sender线程将请求封装成ClientRequest,然后缓存到 InFlightRequestsKafkaChannel 中,最后关注KafkaChannel上的OP_WRITE事件。

本章,我们就看看Sender线程Loop执行的最后一步:

    // Sender.java
    
     this.client.poll(pollTimeout, now);

在之前的章节,我其实已经讲解过NetworkClient.poll()方法的底层原理,当时主要关注的是连接建立(OP_CONNECT)。本章,我重点讲解Slector是如何对Channel上的读写事件(OP_READ/OP_WRITE)进行处理的。

一、监听事件

Kafka客户端的底层对Java NIO进行了封装,但是万变不离其宗。Java NIO的Selector通过一个线程来管理多个Socket,我们可以将多个Channel注册到一个Selector上(一个Channel代表了一个Socket,在Java NIO中就是SocketChannel),并设置其感兴趣的事件。

这样一来,在Selector.select()操作时,若发现Channel中有我们感兴趣的事件发生,Selector就会将其记录下来(即SelectedKey),然后我们就可以对事件进行相应的处理了。

一个SelectionKey是与一个KafkaChannel关联的,Kafka Selector会监听以下事件的发生:

  • OP_CONNECT: 连接就绪事件;
  • OP_WRITE: 写操作就绪事件,即客户端可以将数据写到缓冲区;
  • OP_READ: 读操作就绪事件,即客户端可以从缓冲区读取数据。
    // NetworkClient.java
    
    public List<ClientResponse> poll(long timeout, long now) {
        //...
        try {
            // 调用org.apache.kafka.common.network.Selector的poll方法轮询Channel
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }
        //...
    }

二、Selector轮询

我们来看Selector的poll方法:

    // Selector.java
    
    public void poll(long timeout) throws IOException {
        //...
        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        }
        //...
    }

核心就是pollSelectionKeys,Selector会轮询一批SelectionKey,如果有关注的事件发生了就执行:

    // Selector.java
    
    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        // 遍历SelectionKey
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            KafkaChannel channel = channel(key);
    
            try {
                if (isImmediatelyConnected || key.isConnectable()) {
                    // 1.建立连接 OP_CONNECT -> OP_READ
                    if (channel.finishConnect()) {
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                    } else
                        continue;
                }
    
                // 2.读取数据 OP_READ -> OP_WRITE
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                        addToStagedReceives(channel, networkReceive);
                }
    
                // 3.写数据
                if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    if (send != null) {
                        this.completedSends.add(send);
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }
                //...
            } catch (Exception e) {
                String desc = channel.socketDescription();
                if (e instanceof IOException)
                    log.debug("Connection with {} disconnected", desc, e);
                else
                    log.warn("Unexpected error from {}; closing connection", desc, e);
                close(channel, true);
            }
        }
    }

2.1 OP_CONNECT

OP_CONNECT事件,是客户端首次尝试与Broker建立连接时设置关注的:

202307312122495861.png

最终由Selector.connect()方法完成对OP_CONNECT事件的监听:

    // Selector.java
    
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        //...
    
        // 在SocketChannel上注册对OP_CONNECT事件的监听
        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
    
        //...
    }

注册完监听之后,Sender线程在后续的Loop执行中,会调用到Seletor.poll方法,最终通过KafkaChannel.finishConnect()方法完成连接的建立:

    // Seletor.java
    
    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                   boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        //...
    
        if (channel.finishConnect()) {
            this.connected.add(channel.id());
            this.sensors.connectionCreated.record();
        } else
            continue;
    }
    // KafkaChannel.java
    
    public boolean finishConnect() throws IOException {
        return transportLayer.finishConnect();
    }
    // SslTransportLayer.java
    
    public boolean finishConnect() throws IOException {
        // 调用SocketChannel的finishConnect方法,完成连接建立
        boolean connected = socketChannel.finishConnect();
        if (connected)
            // 建立成功后,取消对OP_CONNECT事件的关注,增加对OP_READ事件的关注
            key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        return connected;
    }

上述代码,在连接建立完成后,会取消对OP_CONNECT事件的关注,增加对OP_READ的关注。

整个建立连接的调用链路如下图:

202307312122500412.png

2.2 OP_READ

OP_READ事件,是客户端与Broker建立完成连接后设置关注的:

    key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);

注册完监听之后,Sender线程在后续的Loop执行中,会调用到Seletor.pollSelectionKeys方法,当出现OP_READ事件时,就会通过KafkaChannel.read()方法读取数据:

    // Selector.java
    
    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                   boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        //...
        if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
            NetworkReceive networkReceive;
            // 读取数据
            while ((networkReceive = channel.read()) != null)
                addToStagedReceives(channel, networkReceive);
        }
        //...
    }

2.3 OP_WRITE

OP_WRITE事件,是客户端在发送消息的过程中设置关注的:

202307312122508733.png

最终会有KafkaChannel将要发送的请求进行缓存,同时增加对这个Channel上的OP_WRITE事件的监听:

    // KafkaChannel.java
    
    public void setSend(Send send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        this.send = send;    //缓存在KafkaChannel内部
        // 增加对OP_WRITE事件的监听
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }

注册完监听之后,Sender线程在后续的Loop执行中,会调用到Seletor.pollSelectionKeys方法,当出现OP_WRITE事件时,就会通过KafkaChannel.write()方法写数据:

    // Selector.java
    
    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                   boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        //...
        if (channel.ready() && key.isWritable()) {
            // 写数据
            Send send = channel.write();
            if (send != null) {
                // 将发送出去的请求添加到完成列表CompletedSends中
                this.completedSends.add(send);
                this.sensors.recordBytesSent(channel.id(), send.size());
            }
        }
        //...
    }

最后,还要注意一点,一旦写完消息之后,就会取消OP_WRITE事件的监听,也就是不再关注这个写请求的事件了,此时仅仅保留关注OP_READ事件,只有当下一次有写请求缓存时,才会重新关注:

    // KafkaChannel.java
    
    private boolean send(Send send) throws IOException {
        send.writeTo(transportLayer);
        if (send.completed())
            // 写成功后,取消到OP_WRITE的关注
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
    
        return send.completed();
    }

三、请求后续处理

当请求发送完成后,整个流程并没有结束,因为客户端还没有收到Broker的响应。客户端还会做两件事:

  1. 对响应进行处理;
  2. 对回调进行处理

3.1 回调处理

Selector.poll()写完数据后,会将最近发送的一个请求添加到完成列表CompletedSends中:

    public class Selector implements Selectable {
        private final List<Send> completedSends;
    
        private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                       boolean isImmediatelyConnected,
                                       long currentTimeNanos) {
             //...
            if (channel.ready() && key.isWritable()) {
                // 写消息(发送请求)
                Send send = channel.write();
                if (send != null) {
                    // 添加到“完成列表”
                    this.completedSends.add(send);
                    this.sensors.recordBytesSent(channel.id(), send.size());
                }
            }
        }
    }

接下来,NetworkClient会对已经发送的请求进行处理,如果请求不需要响应,直接删除inFlightRequests缓存的最近一个等待响应的请求,然后执行回调函数ClientResponse.onComplete(),callback就是RequestCompletionHandler,也就是我们发送消息时自己指定的回调函数:

    // NetworkClient.java
    
    public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }
        //...
    
        // 处理已经发送的请求
        handleCompletedSends(responses, updatedNow);
        //...
    
        // 执行回调函数
        for (ClientResponse response : responses) {
            try {
                response.onComplete();
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
        return responses;
    }
    
    private void handleCompletedSends(List<ClientResponse> responses, long now) {
        // 遍历已经发送的请求
        for (Send send : this.selector.completedSends()) {
            // 往对应的Broker发送的最近一个请求
            InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
            // 不期待响应,说明acks==0
            if (!request.expectResponse) {
                // 删除inFlightRequests中最近发送的请求
                this.inFlightRequests.completeLastSent(send.destination());
                // 添加到响应列表
                responses.add(request.completed(null, now));
            }
        }
    }
    // ClientResponse.java
    
    public void onComplete() {
        if (callback != null)
            callback.onComplete(this);
    }

默认定义的回调函数地方是在Sender线程里面:

    // Sender.java
    
    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
           //...
    
        // 创建回调函数
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };
    
        String nodeId = Integer.toString(destination);
        // 注意这里,acks != 0就是expectResponse的值
        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
        client.send(clientRequest, now);
        log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
    }

处理响应:

    // Sender.java
    
    private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
    
        //...
        if (response.hasResponse()) {
            ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
            for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                TopicPartition tp = entry.getKey();
                ProduceResponse.PartitionResponse partResp = entry.getValue();
                RecordBatch batch = batches.get(tp);
                // 处理每个批次消息的响应
                completeBatch(batch, partResp, correlationId, now);
            }
            this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
            this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
        } else {
            // this is the acks = 0 case, just complete all requests
            for (RecordBatch batch : batches.values()) {
                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
            }
        }
    }

后面对响应的具体处理我就不赘述了,读者可以自己顺着思路去看源码。

四、总结

本章,我对Sender线程Loop循环处理的最后一步——Selector轮询处理进行了讲解,这也是最核心的一步。Kafka客户端对于请求的处理,完全是NIO模式,Sender线程对请求的异步处理机制,也非常值得我们借鉴学习。

到本章为止,Kafka客户端的消息发送的整体流程我就已经全部分析完了,底层NIO组件的工作机制也已经初步做了分析。从下一章开始,我将深入讲解Kafka底层NIO通讯组件的工作原理,包含对“粘包”和”拆包“的处理、异常处理等等。

阅读全文