2023-07-31
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

通过前面章节的学习,我们已经知道了Kafka客户端在发送消息时,最终是通过底层的NIO组件将消息写入到Channel的ByteBuffer中的。

那么,这里就要思考一个问题:如果Kafka客户端的一次Write操作没有把全部的数据都发送给Broker,此时该如何处理的呢?其实这就是典型的 拆包问题 ,也就是说本来应该一个数据包就把所有数据发送给Server的,结果拆成了很多个包。

操作系统在发送TCP数据的时候,底层会有一个缓冲区(例如1024字节),如果一次请求发送的数据量没达到缓冲区大小,TCP会将本来多个不同的请求合并为一个请求发送,这就形成了 粘包问题 ;如果一次请求发送的数据量较大,超过了缓冲区大小,TCP就会将本来是一个的请求拆分为多个发送,这就是 拆包问题

Kafka客户端解决拆包和粘包问题的思路是:将消息分为 头部消息体 ,在头部中保存整个消息体的大小,发送消息时先发送消息头,再发送消息体;在读取消息时,只有读取到足够长度的消息之后才算是读到了一个完整的消息。

一、ByteBuffer

为了更好的讲解Kafka客户端NIO组件的“拆包”问题,我得先来讲解下Java NIO中的ByteBuffer,纯粹是照顾下新手童鞋,你们也可以自己去找本Java方面的书去看下Java NIO部分,比如《Core Java》,或者参考我的另一个专栏——《透彻理解Java网络编程》。

1.1 Buffer

缓冲区是用来和NIO通道Channel进行交互的,包括从通道读取数据,或者将数据写入到Channel中。

缓冲区的本质上是一块既可以写入数据也可以从中读取数据的内存,这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问这块内存。 对于每种Java原始类型,都对应一种Buffer类型,我们最常用的就是其中的ByteBuffer:

202307312122541831.png

Buffer有四个重要的属性: capacitypositionlimitmark ,其中positionlimit在读模式和写模式下含义有所不同:

  • capacity :Buffer的固定大小值;
  • position :表示当前的读/写位置的索引(从0开始),当将Buffer从写模式切换到读模式时(flip方法),position会被重置为0;
  • limit :读写操作都不能越过这个下标。写模式下,默认等于capacity,表示能写入的数据上限;读模式下,表示最多能读多少数据;
  • mark :为某一读过的位置做标记,便于某些时候回退到该位置。调用mark()会将mark设为当前的position的值,以后调用reset()会将position属性设置为该mark的值。

上述属性值恒满足以下条件:

    0 <= mark <= position <= limit <= capacity

举个例子理解下,假设一个缓冲区容量capacity是10,开始指针position指向0。然后写入6个字节数据,写完后,下标0、1、2、3、4、5有数据,指针指向6,即当前position=6。此时,用limit(6)方法将当前位置设为EOF位置,那么,读数据的时候,读到EOF位置就结束了。

1.2 核心方法

我们再来看下Buffer的核心方法:

clear()
把position设为0,把limit设为capacity,一般在把数据写入Buffer前或清空Buffer时调用。

flip()
把limit设为当前position,把position设为0,一般在从Buffer读出数据前调用。

rewind()
把position设为0,limit不变,一般在需要重写数据时调用。

compact()
将position与 limit之间的数据复制到Buffer的开始位置,复制后position = limit - positionlimit = capacity。但如果position 与limit 之间没有数据的话,就不会进行复制。

mark()
标记Buffer中的一个特定position。

reset()
通过调用Buffer.reset()方法恢复到上一次mark标记的position。

1.3 读写过程

最后,用两张网上的图来说明下一般情况下,Buffer的读写过程。

(1)写模式下,往buffer里写一个字节,并把postion移动一位,写模式下默认limit与capacity相等:

202307312122547032.png

(2)写完数据,需要开始读的时候,将postion复位到0,并将limit设为当前postion。

(3)从buffer里读一个字节,并把postion移动一位,上限是limit:

202307312122551833.png

二、写消息

分析完了Buffer,接着我再带大家来回顾下Kafka消息发送的流程,这次主要关注底层通讯组件。

2.1 ByteBuffer封装

首先,我们来看发送给Broker的Send请求对象的结构。

(1)Sender线程将要发送到某个Broker的所有消息封装成一个Send请求对象;

    // Sender.java
    
    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
        // ...
        String nodeId = Integer.toString(destination);
        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
        client.send(clientRequest, now);
    }
    // NetworkClient.java
    
    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
        //...
        String nodeId = clientRequest.destination();
    
        AbstractRequest request = null;
        AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
        request = builder.build();
    
        // 封装成Send请求对象
        Send send = request.toSend(nodeId, header);
    }

(2)Kafka客户端每一次向Broker发送请求,本质就是发送这个Send对象。而Send对象内部封装了ByteBuffer:

    // AbstractRequest.java
    
    public Send toSend(String destination, RequestHeader header) {
        return new NetworkSend(destination, serialize(header, this));    //Send的实现类NetworkSend
    }
    
    // 根据请求头和请求体,创建一个ByteBuffer
    public static ByteBuffer serialize(AbstractRequestResponse header, AbstractRequestResponse body) {
        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());    //分配空间
        header.writeTo(buffer);
        body.writeTo(buffer);
        buffer.rewind();    //postition置0,limit不变
        return buffer;
    }

可以看到,ByteBuffer的大小是根据消息的数据大小确定的,写完数据后,会调用rewind()方法,也就是说将position置0。

(3)接着,计算ByteBuffer的大小,将size结果封装成一个新的ByteBuffer,然后封装成一个NetworkSend对象。

    // NetworkSend.java
    public class NetworkSend extends ByteBufferSend {
    
        public NetworkSend(String destination, ByteBuffer buffer) {
            super(destination, sizeDelimit(buffer));
        }
    
        private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) {
            return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer};
        }
    
        // 新建一个ByteBuffer,总共4个字节,值就是之前写入的数据总大小
        private static ByteBuffer sizeBuffer(int size) {
            ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
            sizeBuffer.putInt(size);
            sizeBuffer.rewind();
            return sizeBuffer;
        }
    
    }
    
    // ByteBufferSend.java
    public class ByteBufferSend implements Send {
    
        private final String destination;
        private final int size;
        protected final ByteBuffer[] buffers;
        private int remaining;
        private boolean pending = false;
    
        public ByteBufferSend(String destination, ByteBuffer... buffers) {
            this.destination = destination;
            this.buffers = buffers;
            for (ByteBuffer buffer : buffers)
                remaining += buffer.remaining();
            this.size = remaining;
        }
    }

从上述的代码可以看出, 最终发送出去的消息的前4个字节表示消息内容的大小,后面跟着的才是消息内容 。这个非常重要,和拆包有关。

2.2 消息发送

消息的发送是通过Selector.poll()方法,内部调用了KafkaChannel.write()方法:

    // Selector.java
    
    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        if (channel.ready() && key.isWritable()) {
            // 调KafkaChannel.write()方法
            Send send = channel.write();
        }
    }
    // KafkaChannel.java
    
    public Send write() throws IOException {
        Send result = null;
        // 注意,如果发生拆包,send方法返回false,也就是result返回false
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }
    
    private boolean send(Send send) throws IOException {
        // 写消息到SocketChannel
        send.writeTo(transportLayer);
    
        // 如果消息写入完成
        if (send.completed())
            // 取消对OP_WRITE事件的关注
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);    
    
        return send.completed();
    }

最终,通过ByteBufferSend.writeTo完成数据往SocketChannel的写入。注意,由于拆包问题的存在,可能只发送了部分数据,此时remaining>0,ByteBufferSend用一个remaining字段标识剩余字节数:

    // ByteBufferSend.java
    
    public class ByteBufferSend implements Send {
    
        private final String destination;        // 目标Broker
        private final int size;                    // 数据大小,即buffers中每一项的大小
        protected final ByteBuffer[] buffers;    // 包含两部分:数据头(4字节)和数据体
        private int remaining;                    // 剩余待写入字节
        private boolean pending = false;
    
        public ByteBufferSend(String destination, ByteBuffer... buffers) {
            this.destination = destination;
            this.buffers = buffers;
            for (ByteBuffer buffer : buffers)
                remaining += buffer.remaining();
            this.size = remaining;
        }
    
        @Override
        public long writeTo(GatheringByteChannel channel) throws IOException {
            // 写入channel,返回写入成功的字节数
            long written = channel.write(buffers);
            if (written < 0)
                throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
            // 剩余待写字节数
            remaining -= written;
            pending = TransportLayers.hasPendingWrites(channel);
    
            // 返回本次写入成功的字节
            return written;
        }
    }

而在KafkaChannel完成消息写入后,有一个判断,判断Send请求里的数据是否全部发送完了,如果发送完了就取消对OP_WRITE事件的关注:

    // 如果消息写入完成
    if (send.completed())
        // 取消对OP_WRITE事件的关注
        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
    // ByteBufferSend.java
    
    public boolean completed() {
        return remaining <= 0 && !pending;
    }

所以,如果发生了拆包,KafkaChannel的write方法会返回null,并且KafkaChannel并不会把自己缓存的Send请求置为null:

    // KafkaChannel.java
    
    private Send send;
    public Send write() throws IOException {
        Send result = null;
        // 注意,如果发生拆包,send方法返回false,也就是result返回false
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }

那么,在下一轮的Sender.run()中,会判断上一次针对某个Broker的请求是否已经发送完毕:

    // Sender.java
    
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        if (!this.client.ready(node, now)) {    // 判断是否Broker就绪
            iter.remove();    //没有就绪就移除掉
            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
        }
    }
    // NetworkClient.java
    
    public boolean isReady(Node node, long now) {
        return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
    }
    
    private boolean canSendRequest(String node) {
        return connectionStates.isReady(node)     // Broker连接状态就绪
            && selector.isChannelReady(node)     // Channel连接就绪
            && inFlightRequests.canSendMore(node);    // 未响应请求判断
    }

核心是三个条件的判断,我们看关键的最后一个判断inFlightRequests.canSendMore(node),意思是如果“未响应请求队列”中的最近发送的那个请求没有completed(),就认为不能往该Broker继续发送请求:

    // InFlightRequests.java
    
    public boolean canSendMore(String node) {
        // 获取未响应请求队列
        Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
        return queue == null 
            || queue.isEmpty() 
            || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }

为什么不能继续往该Broker发送?因为我们最近发送的那个Send请求发生了拆包,Send请求还缓存在KafkaChannel里面,如果再发送会导致重复请求 ,所以直接要把这个Broker节点移除点,那么在本轮的Selector.poll()中就会发送缓存的那个Send请求,直接它的所有数据包发送完成。

三、总结

本章,我对Kafka客户端对消息发送的“拆包”处理的底层原理进行了讲解。总结一下,Kafka的核心处理思路有以下几点:

  1. 对每一个Broker要发送的消息,封装成一个NetworkSend请求,请求包含两部分:请求头和请求体,请求头共4字节,标识着请求体的大小;
  2. Kafka通过计算NetworkSend请求的remaining字段——待发送字节数,判断是否发送完了所有数据;
  3. 如果针对该Broker的请求没有一次性发送完成,那么就不会移除KafkaChannel对该请求的缓存,同时在下一次Sender线程的轮询过程中,不会针对该Broker生成新的NetworkSend请求,而是利用KafkaChannel缓存的NetworkSend请求,再次发送剩余字节,直接全部发送完成。
阅读全文