2023-07-31
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

上一章,我讲解了Kafka发送请求时,对拆包问题的处理流程。Kafka客户端会与每个Broker都建立一个TCP长连接,每一个请求都会发送完后才会发送下一个请求,所以一般不存在粘包问题。

但是,当Kafka客户端读取Broker的响应时就不一样了,存在两种情况:

  1. 读取的响应中,包含了多个请求的完整响应,也就是出现了 粘包
  2. 读取的响应中,只包含了一个请求的部分响应,此时就需要多次读取,将多次读取的结果进行合并,也就是出现了 拆包

本章,我先来讲解Kafka客户端对 OP_READ 事件的处理流程,然后对读响应的拆包和粘包问题进行分析。

一、读响应

我们先来回顾下Kafka客户端的底层是如何读取响应数据的。本质还是通过Selector.poll()方法,这块大家应该已经很熟悉了:

    // Selector.java
    
    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;    // 按照Broker维度缓存响应请求
    private final List<NetworkReceive> completedReceives;                    // 保存每个Broker的最近一个响应请求
    public void poll(long timeout) throws IOException {
        //...
    
        // 1.遍历SelectionKey进行处理
        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        }
        // 2.将最近一个读取完成的响应,添加到响应列表completedReceives
        addToCompletedReceives();
    }
    
    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                   boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        // ...
        // Channel中有OP_READ事件发生,且不存在已经读取完毕的响应
        if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
            NetworkReceive networkReceive;    //用于暂存读取的数据
            // 循环从Channel读取数据,networkReceive!=null,表示一个完整响应读取完成
            while ((networkReceive = channel.read()) != null)
                // 将读取到的数据添加到“接受队列”中
                addToStagedReceives(channel, networkReceive);
        }
    }
    
    private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
        // 不存在则首次创建
        if (!stagedReceives.containsKey(channel))
            stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
        // 将响应添加到队列
        Deque<NetworkReceive> deque = stagedReceives.get(channel);
        deque.add(receive);
    }
    
    private void addToCompletedReceives() {
        if (!this.stagedReceives.isEmpty()) {
            Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
                KafkaChannel channel = entry.getKey();
                if (!channel.isMute()) {
                    Deque<NetworkReceive> deque = entry.getValue();
                    addToCompletedReceives(channel, deque);
                    if (deque.isEmpty())
                        iter.remove();
                }
            }
        }
    }
    
    private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
        // 获取最近一个读取完成的响应
        NetworkReceive networkReceive = stagedDeque.poll();
        this.completedReceives.add(networkReceive);
        this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
    }

1.1 读取流程

Selector.poll主流程中的KafkaChannel.read()方法,它从底层的SocketChannel中读取字节并封装成一个NetworkReceive对象,缓存在KafkaChannel的receive字段中,代表当前正在读取的响应。如果一个响应全部读取完毕,就返回结果并清除receive缓存,这个逻辑是上一章讲解的发送请求的逻辑是类似的,主要是因为存在拆包的情况:

    // KafkaChannel.java
    
    private NetworkReceive receive;        //缓存当前在读的响应对象
    public NetworkReceive read() throws IOException {
        NetworkReceive result = null;
    
        // 初始化NetworkReceive,maxReceiveSize就是一次最大能接受的字节,id标识一个Broker
        if (receive == null) {
            receive = new NetworkReceive(maxReceiveSize, id);
        }
    
        // 读取字节
        receive(receive);
    
        // 如果一个响应全部读取完毕,就返回结果并清除receive缓存
        if (receive.complete()) {
            receive.payload().rewind();    //消息体的position置为0
            result = receive;
            receive = null;    //清除请求缓存
        }
        return result;
    }
    
    // 调用NetworkReceive的方法,从底层的SocketChannel读取字节
    private long receive(NetworkReceive receive) throws IOException {
        return receive.readFrom(transportLayer);
    }

从上面的代码可以看到,真正读取字节的行为是调用了NetworkReceive.readFrom()方法。NetworkReceive是什么呢?我们接着看。

1.2 NetworkReceive

NetworkReceive代表了一个完整的响应请求,它包含了一个4字节的消息头,以及一个消息体,而消息头的内容就是消息体的大小 。它对应了我们上一章讲解的发送请求的NetworkSend对象:

    // NetworkReceive.java
    
    public class NetworkReceive implements Receive {
    
        public final static String UNKNOWN_SOURCE = "";
        public final static int UNLIMITED = -1;
        // Broker ID
        private final String source;
        // 消息头Buffer
        private final ByteBuffer size;
        // 最大消息体大小
        private final int maxSize;
        // 消息体Buffer
        private ByteBuffer buffer;
    
        public NetworkReceive(int maxSize, String source) {
            this.source = source;
            this.size = ByteBuffer.allocate(4);
            this.buffer = null;
            this.maxSize = maxSize;
        }
    
        @Override
        public boolean complete() {
            // 消息头写满4字节且消息体写满,才算读取完一个完整请求
            return !size.hasRemaining() && !buffer.hasRemaining();
        }
    
        public long readFrom(ScatteringByteChannel channel) throws IOException {
            return readFromReadableChannel(channel);
        }
    
        public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
            int read = 0;
            if (size.hasRemaining()) {    //如果消息头Buffer有空余
                // 读取字节到消息头Buffer,如果出现拆包,可能不足4字节
                int bytesRead = channel.read(size);
                if (bytesRead < 0)
                    throw new EOFException();
                read += bytesRead;
                if (!size.hasRemaining()) {    //如果消息头Buffer没有空余(已读满)
                    // 计算消息体的大小
                    size.rewind();
                    int receiveSize = size.getInt();
                    if (receiveSize < 0)
                        throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                    if (maxSize != UNLIMITED && receiveSize > maxSize)
                        throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
                    // 为消息体分配Buffer
                    this.buffer = ByteBuffer.allocate(receiveSize);
                }
            }
    
            if (buffer != null) {
                // 读取消息体
                int bytesRead = channel.read(buffer);
                if (bytesRead < 0)
                    throw new EOFException();
                read += bytesRead;
            }
    
            return read;
        }
    
        // 返回消息体
        public ByteBuffer payload() {
            return this.buffer;
        }
    }

上述这块代码完美解决了拆包和粘包的问题。

我们先来看第一种“粘包”的情况,也就是说响应中包含了多个不同请求的响应:

  1. Channel一次性读取满数据到消息头;
  2. 计算消息头的数值,得到消息体的大小size;
  3. 一次性读取size大小的消息体;
  4. 这样一个完整的响应就读完了,Selector.pollSelectionKeys()中的While循环就会退出,响应也会被添加到completedReceives列表中。

我们再来看另一种“拆包”的情况,也就是说一次读取并没有完整读取到一个请求,可能只读取了部分消息头,或者部分消息体:

  1. Channel尝试一次性读满消息头Buffer,但是很遗憾没读满,因为出现了粘包;
  2. Selector.pollSelectionKeys()中的While循环不会退出,继续下一轮;
  3. KafkaChannel中缓存了当前正在读取的响应对象receive,继续调用该对象的readFromReadableChannel方法,完成响应的读取;
  4. 全部读取完成后,Selector.pollSelectionKeys()中的While循环就会退出,响应也会被添加到completedReceives列表中。

二、处理响应

通过NetworkReceive完成响应的读取之后,Selector会将每一个Broker的多个响应都缓存到一个Map中,然后遍历这个Map,将 每个Broker的最近一个响应请求添加到completedReceives队列

2.1 缓存最近响应

缓存的最近响应列表存放在Selector的completedReceives字段中:

    // Selector.java
    
    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;    // 按照Broker维度缓存响应请求
    private final List<NetworkReceive> completedReceives;                    // 保存每个Broker的最近一个响应请求
    public void poll(long timeout) throws IOException {
        //...
    
        // 1.遍历SelectionKey进行处理
        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        }
        // 2.将最近一个读取完成的响应,添加到响应列表completedReceives
        addToCompletedReceives();
    }
    
    private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
        // 获取最近一个读取完成的响应
        NetworkReceive networkReceive = stagedDeque.poll();
        this.completedReceives.add(networkReceive);
        this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
    }

2.2 解析响应

现在所有读取完毕的最近响应都已经存放到completedReceives列表中了,我们来看下Kafka客户端是如何处理这些响应的:

    // NetworkClient.java
    
    public List<ClientResponse> poll(long timeout, long now) {
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }
    
        // 处理响应
        handleCompletedReceives(responses, updatedNow);
    
        // 触发回调函数
        for (ClientResponse response : responses) {
            try {
                response.onComplete();
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
        return responses;
    }
    
    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        // 遍历响应对象
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();    //Broker ID
            // 1.获取针对该Broker缓存的最早(oldest)发送的请求
            InFlightRequest req = inFlightRequests.completeNext(source);
    
            // 2.解析响应对象
            AbstractResponse body = parseResponse(receive.payload(), req.header);
    
            if (req.isInternalRequest && body instanceof MetadataResponse)    // 元数据更新响应
                metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
            else if (req.isInternalRequest && body instanceof ApiVersionsResponse)    // API版本响应
                handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
            else
                // 3.添加到
                responses.add(req.completed(body, now));
        }
    }
    
    public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
        // 解析响应内容头
        ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
        short apiKey = requestHeader.apiKey();
        short apiVer = requestHeader.apiVersion();
        // 解析响应内容体
        Struct responseBody = ProtoUtils.responseSchema(apiKey, apiVer).read(responseBuffer);
        // 匹配发送请求和响应
        correlate(requestHeader, responseHeader);
        // 创建一个ClientResponse对象并返回
        return AbstractResponse.getResponse(apiKey, responseBody);
    }
    
    // 判断请求与响应是否配对
    private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
        // correlation_id,是全局唯一的,用来标识一次请求的
        if (requestHeader.correlationId() != responseHeader.correlationId())
            throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
                                            + ") does not match request (" + requestHeader.correlationId() + "), request header: " + requestHeader);
    }

上述handleCompletedReceives方法的处理逻辑如下:

  1. 首先,遍历响应对象列表completedReceives,按照下面的步骤处理每一个响应;
  2. 找到该响应来自哪个Broker,从InFlightRequest移除针对该Broker缓存的 最早(oldest)发送的请求
  3. 解析响应内容,对响应头中的correlationId和发送请求头中的correlationId进行匹配,如果不能配对则抛出异常;
  4. 根据解析的响应内容,创建一个ClientResponse对象并返回。

这里关键要理解一点: 正常情况下InFlightRequest中缓存的最早(oldest)发送的请求与该Broker的最新一个响应是配对的

2.3 处理回调

解析完响应之后,会遍历ClientResponse列表,调用回调处理器RequestCompletionHandler进行处理:

    // NetworkClient.java
    
    public List<ClientResponse> poll(long timeout, long now) {
        //...
    
        // 调用回调处理器
        for (ClientResponse response : responses) {
            try {
                response.onComplete();
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
        return responses;
    }

ClientResponse里面包含了请求头信息和响应内容:

    public class ClientResponse {
        private final RequestHeader requestHeader;            // 请求头
        private final RequestCompletionHandler callback;    // 回调处理
        private final String destination;                    // Broker ID
        private final long receivedTimeMs;
        private final long latencyMs;
        private final boolean disconnected;
        private final RuntimeException versionMismatch;
        private final AbstractResponse responseBody;        // 响应内容
    
        public void onComplete() {
            if (callback != null)
                callback.onComplete(this);
        }
    }

RequestCompletionHandler是在Sender线程内部创建的,我们来看下RequestCompletionHandler.onComplete()方法,其实就是遍历响应对象,然后关联到发送请求时的RecordBatch,最后触发我们自定义的回调函数:

    // Sender.java
    
    private void sendProduceRequest(long now, int destination, short acks, 
                                    int timeout, List<RecordBatch> batches) {
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };
    }
    
    private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches,
                                       long now) {
        // 根据响应的不同状态进行处理
        if (response.wasDisconnected()) {
            //...
        } else if (response.versionMismatch() != null) {
            //...        
        } else {
            // 正常响应
            if (response.hasResponse()) {    
                ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
                // 遍历响应对象
                for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry :
                     produceResponse.responses().entrySet()) {
                    TopicPartition tp = entry.getKey();        // 分区
                    ProduceResponse.PartitionResponse partResp = entry.getValue();
                    // batches是在发送请求的时候组装的,可根据分区匹配到RecordBatch
                    RecordBatch batch = batches.get(tp);
                    // 处理
                    completeBatch(batch, partResp, correlationId, now);
                }
            }
            // 不需要响应的情况(acks==0)
            else {
                for (RecordBatch batch : batches.values()) {
                    completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
                }
            }
        }
    }
    
    private void completeBatch(RecordBatch batch, ProduceResponse.PartitionResponse response,
                               long correlationId, long now) {
        Errors error = response.error;
        // 1.异常情况,且可以重试
        if (error != Errors.NONE && canRetry(batch, error)) {
            // 重试Batch需要重新入缓冲区(放入队列头),后面重新发送
            this.accumulator.reenqueue(batch, now);
        } 
        else {
            RuntimeException exception;
            if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
                exception = new TopicAuthorizationException(batch.topicPartition.topic());
            else
                exception = error.exception();
            // 2.正常情况,完成回调处理
            batch.done(response.baseOffset, response.logAppendTime, exception);
            // 释放缓冲区对应Buffer
            this.accumulator.deallocate(batch);
            if (error != Errors.NONE)
                this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
        }
        // 3.元数据异常
        if (error.exception() instanceof InvalidMetadataException) {
            if (error.exception() instanceof UnknownTopicOrPartitionException)
            // 需要重新更新元数据
            metadata.requestUpdate();
        }
    
        if (guaranteeMessageOrder)
            this.accumulator.unmutePartition(batch.topicPartition);
    }

最后,来看RecordBatch.done(),其实就是设置一些字段值,然后触发我们自定义的回调函数:

    // RecordBatch.java
    
    public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
        log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
                  topicPartition, baseOffset, exception);
    
        if (completed.getAndSet(true))
            throw new IllegalStateException("Batch has already been completed");
    
        // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
        produceFuture.set(baseOffset, logAppendTime, exception);
    
        // 执行回调函数
        for (Thunk thunk : thunks) {
            try {
                // 1.正常请求
                if (exception == null) {
                    // 响应元数据
                    RecordMetadata metadata = thunk.future.value();
                    // 执行回调方法
                    thunk.callback.onCompletion(metadata, null);
                } 
                // 2.存在异常(可能是连续重试都失败)
                else {
                    thunk.callback.onCompletion(null, exception);
                }
            } catch (Exception e) {
                log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
            }
        }
        produceFuture.done();
    }

三、总结

本章,我对Kafka客户端处理响应的流程和底层原理进行了详细分析,读者需要关注的重点是Kafka客户端是如何对响应的拆包和粘包问题进行处理的,以及解析完响应内容后的后续处理流程,包含回调、异常、缓冲区处理等等。

Kafka客户端的KafkaChannel缓存了最近读取响应的NetworkReceive对象,只要一个完整的请求没有全部读取完,就不会移除对该响应的缓存。

阅读全文