【Netty】RecvByteBufAllocator源码分析

 2023-01-28
原文作者:程序员小潘 原文地址:https://juejin.cn/post/6968383023888891934

前言

前面写过《Netty服务端启动全流程源码分析》,BossGroup获取到客户端连接SocketChannel后会将其注册到WorkerGroup,由WorkerGroup来驱动数据IO读写。WorkerGroup的EventLoop监听到Channel有OP_READ事件时,会调用Channel.Unsafe.read()方法,Netty会将读取到的数据包装成ByteBuf,然后触发回调pipeline.fireChannelRead(byteBuf)将事件传播出去。

整体流程是清楚了,但是对于详细的数据接收细节没有介绍,本篇文章会做介绍。

前置知识

熟悉Java Nio编程的同学应该知道,要想从SocketChannel读取数据,需要先创建一个ByteBuffer,然后调用SocketChannel.read(ByteBuffer)方法。但是由于读取操作并未实际发生,程序并不知道有多少数据需要接收,导致我们并不知道需要创建一个多大的ByteBuffer,大了会造成内存的浪费,小了又需要频繁扩容。而且ByteBuffer本身不支持扩容操作,你需要重新申请一个更大的ByteBuffer,然后进行内存的复制,开销就更大了。Netty是如何解决这个问题的呢?后面会介绍。

还有一个知识点,读者需要提前了解。对于注册到Selector多路复用器上,且监听OP_READ事件的Channel,Selector判断的其实就是Channel的有效可读字节数。意思就是说,对于有数据可读的Channel,如果你数据没有读完,下次select()多路复用器依然会再返回它。所以,Netty会进行循环读,前面说过了,你不知道对端会发送给你多少数据,默认单次最多读16次,超过16次数据还没读完,本次就不再继续处理了,因为Netty怕阻塞其他IO事件,后面会详细分析。

AbstractNioByteChannel.read()分析

之前的文章已经分析过,当Netty检测到Channel有可读事件时,会调用AbstractNioByteChannel.read()方法,下面是该方法整体的一个源码分析:

    /*
    客户端发送数据时触发。
    见 io.netty.channel.nio.NioEventLoop.processSelectedKey
     */
    @Override
    public final void read() {
        // 客户端Channel的配置
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        // 获取ByteBufAllocator,默认是 PooledByteBufAllocator
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        // 重置统计信息
        allocHandle.reset(config);
    
        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            /*
            当对端发送一个超大的数据包时,TCP会拆包。
            OP_READ事件只会触发一次,Netty需要循环读,默认最多读16次,因此ChannelRead()可能会触发多次,拿到的是半包数据。
            如果16次没把数据读完,没有关系,下次select()还会继续处理。
            对于Selector的可读事件,如果你没有读完数据,它会一直返回。
             */
            do {
                // 分配一个ByteBuf,大小能容纳可读数据,又不过于浪费空间。
                byteBuf = allocHandle.allocate(allocator);
                /*
                doReadBytes(byteBuf):ByteBuf内部有ByteBuffer,底层还是调用了SocketChannel.read(ByteBuffer)
                allocHandle.lastBytesRead()根据读取到的实际字节数,自适应调整下次分配的缓冲区大小。
                 */
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    // 没有读取到字节
                    byteBuf.release();//释放ByteBuf,空的,没有意义。
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }
    
                // 递增已经读取的消息数量
                allocHandle.incMessagesRead(1);
                readPending = false;
                // 通过pipeline传播ChannelRead事件
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());//判断是否需要继续读
    
            // 读取完毕,pipeline传播ChannelReadComplete事件
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
    
            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }

这里我们重点关注一下recvBufAllocHandle()方法,很简单,就是对Channel绑定的recvHandle进行了判空校验,如果没绑定就创建一个。

    @Override
    public RecvByteBufAllocator.Handle recvBufAllocHandle() {
        if (recvHandle == null) {
            // 如果Channel对应的recvHandle是空的,则创建一个新实例
            recvHandle = config().getRecvByteBufAllocator().newHandle();
        }
        return recvHandle;
    }

Channel会依赖RecvByteBufAllocator.Handle来创建ByteBuf,为什么不直接创建呢?因为前面说过的,不知道真正有多少数据要接收,不知道该创建多大的ByteBuf,大了浪费空间,小了又要频繁扩容。基于这个原因,Channel会把创建ByteBuf的任务交给RecvByteBufAllocator.Handle处理,希望它可以基于历史数据做统计分析,分配出一个容量大到足够容纳所有的数据,又小到不会浪费太多的空间。

RecvByteBufAllocator.Handle的细节后面分析,这里先把read()流程分析完。

创建好一个大小合适的ByteBuf之后,Channel会调用doReadBytes(byteBuf)将数据写入到Bytebuf:

    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        // 设置 尝试读取的字节数,尽量把ByteBuf填满
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        // 将SocketChannel读取的数据写入到byteBuf
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

byteBuf.writeBytes(ScatteringByteChannel in, int length)其实就是将ByteBuf转换成JDK的ByteBuffer,然后通过JDK原生的SocketChannel进行读取,它会返回实际的读取字节数。

如果实际读取的字节数小于等于0,说明没有数据可读了,本次OP_READ事件处理完毕,触发pipeline.fireChannelReadComplete()回调,否则触发pipeline.fireChannelRead(byteBuf)将读取到的ByteBuf传递给其他ChannelHandler处理。

如果对端发送的数据包很大,很可能创建的ByteBuf不能一次性读完所有数据,所以Channel这里会进行循环读,整个读取的逻辑会放在一个while循环里,通过allocHandle.continueReading()判断是否需要继续读取数据。因此,即使TCP没有发生拆包,如果创建的ByteBuf过小,ChannelHandler的channelRead()也会被触发多次,所以, 切记不可错误的理解为「channelRead()是因为TCP拆包导致的」

RecvByteBufAllocator分析

RecvByteBufAllocator是Netty的「数据接收缓冲区分配器」,Channel依赖它来创建大小合适的ByteBuf,提升性能和节省内存。

RecvByteBufAllocator是一个很简单的接口,它的工作由内部接口Handle完成,所以直接看Handle接口就行了。

    interface Handle {
        /*
        通过ByteBufAllocator分配一个大小合适的ByteBuf。
        太大:浪费空间。
        太小:频繁扩容,内存复制开销。
         */
        ByteBuf allocate(ByteBufAllocator alloc);
    
        // 猜测需要分配的字节数
        int guess();
    
        // 重置已累积的任何计数器,并建议为下一个读循环应读取多少消息字节。
        void reset(ChannelConfig config);
    
        // 增加已读的消息数量
        void incMessagesRead(int numMessages);
    
        // 设置上一次读取到的字节数,AdaptiveRecvByteBufAllocator会根据该值 自适应调整下次分配的缓冲区大小。
        void lastBytesRead(int bytes);
    
        // 获取上一次读取的字节数
        int lastBytesRead();
    
        // 设置尝试读取的字节数
        void attemptedBytesRead(int bytes);
    
        // 获取尝试读取的字节数
        int attemptedBytesRead();
    
        // 是否还能继续读取
        boolean continueReading();
    
        // 读取完成
        void readComplete();
    }

默认使用的RecvByteBufAllocator实现是AdaptiveRecvByteBufAllocator,它可以自适应调整分配的ByteBuf大小,我们重点分析。

AdaptiveRecvByteBufAllocator类图如下:

202212302216302031.png

从上往下看吧,MaxMessagesRecvByteBufAllocator很简单,在顶级接口的基础之上,限制了循环读的次数:

    /*
    当有可读事件时,Netty是循环读的,通过continueReading()判断是否需要继续读取。
    该类主要是用来限制循环读取的次数的,默认是16.
     */
    public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator {
    
        // 返回循环读的最大次数
        int maxMessagesPerRead();
    
        // 设置循环读的最大次数
        MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead);
    }

DefaultMaxMessagesRecvByteBufAllocator是MaxMessagesRecvByteBufAllocator的默认实现,先看属性:

    // 最大读取消息数量
    private volatile int maxMessagesPerRead;
    /*
    是否尊重/关心 还有更多的数据可读?
    如果为true,则无条件认为还有数据可读,直到下次循环读取到0字节为止。
    这可能会导致多执行一次无效读,无意义的创建一个ByteBuf。
     */
    private volatile boolean respectMaybeMoreData = true;

核心逻辑都在Handle里,所以直接看Handle即可:

    public abstract class MaxMessageHandle implements ExtendedHandle {
        private ChannelConfig config;
        // 最大读取多少次消息,默认16次,没读完,下次select接着读。
        private int maxMessagePerRead;
        // 读取的总消息数
        private int totalMessages;
        // 读取的字节总数
        private int totalBytesRead;
        // 尝试读取的字节数,默认是ByteBuf的可写字节数,即尽量把ByteBuf填满。
        private int attemptedBytesRead;
        // 上次读取的字节数,根据它调整下次分配的缓冲区大小。
        private int lastBytesRead;
        private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
        /*
        是否还有更多数据可读的默认判断:attemptedBytesRead == lastBytesRead。
        即:本次读取的数据有没有填满ByteBuf,如果填满了,说明可能还有数据要读。否则就不读了,直接触发ChannelReadComplete()。
         */
        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            @Override
            public boolean get() {
                return attemptedBytesRead == lastBytesRead;
            }
        };
    
        // 根据config重置数据,每次处理新的Read事件时触发。
        @Override
        public void reset(ChannelConfig config) {
            this.config = config;
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }
    
        // 根据猜测的字节数,分配一个ByteBuf。
        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {
            return alloc.ioBuffer(guess());
        }
    
        // 递增统计消息的读取数量,默认超过16就不读了,防止阻塞IO线程,其他事件得不到处理。
        @Override
        public final void incMessagesRead(int amt) {
            totalMessages += amt;
        }
    
        // 根据上次读取的字节数,累加总读取到的字节数
        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {
                totalBytesRead += bytes;
            }
        }
    
        @Override
        public final int lastBytesRead() {
            return lastBytesRead;
        }
    
        // 是否还要继续循环读取消息
        @Override
        public boolean continueReading() {
            /*
            判断依据:
                1.认为还有可读数据
                2.读取的消息数没有达到上限
             */
            return continueReading(defaultMaybeMoreSupplier);
        }
    
        @Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            return config.isAutoRead() &&
                   (!respectMaybeMoreData || maybeMoreDataSupplier.get()) // 认为还有可读数据
                    &&
                   totalMessages < maxMessagePerRead && totalBytesRead > 0;// 读取的消息数没有达到上限
        }
    
        @Override
        public void readComplete() {
        }
    
        @Override
        public int attemptedBytesRead() {
            return attemptedBytesRead;
        }
    
        // 设置尝试读取的字节数,默认为ByteBuf的可写字节数
        @Override
        public void attemptedBytesRead(int bytes) {
            attemptedBytesRead = bytes;
        }
    
        protected final int totalBytesRead() {
            return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
        }
    }

MaxMessageHandle会根据每次读取的字节数是否填满ByteBuf为依据,判断是否还要继续循环读。如果填满了说明Channel可能还有数据等待读取,反之已无数据可读,直接跳出循环即可。

说完父类,接下来看核心的AdaptiveRecvByteBufAllocator,源码也不是很长:

    /*
        自适应的,接收对端数据的ByteBuf分配器,分配的ByteBuf有合适的初试容量。
        避免太小导致频繁扩容,太大导致内存浪费,GC压力。
    
        基于历史的数据采集做预测:
            1.前一次接收的数据完全读满了ByteBuf,则下次会增大缓冲区。
            2.连续两次接收的数据小于指定值,则会缩小下次分配的缓冲区。
     */
    public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
    
        // 默认最小值
        static final int DEFAULT_MINIMUM = 64;
        // 默认初试值
        static final int DEFAULT_INITIAL = 2048;
        // 默认最大值
        static final int DEFAULT_MAXIMUM = 65536;
    
        /*
        如果需要扩容下次分配的缓冲区大小,这个是扩容的索引步长。
         */
        private static final int INDEX_INCREMENT = 4;
        /*
        如果需要缩容下次分配的缓冲区大小,这个是缩容的索引步长。
         */
        private static final int INDEX_DECREMENT = 1;
    
        // 扩容表
        private static final int[] SIZE_TABLE;
    
        static {
            List<Integer> sizeTable = new ArrayList<Integer>();
            // 512字节内,以16字节为步长,递增
            for (int i = 16; i < 512; i += 16) {
                sizeTable.add(i);
            }
    
            // 512字节后,成倍扩容
            for (int i = 512; i > 0; i <<= 1) { // lgtm[java/constant-comparison]
                sizeTable.add(i);
            }
    
            // List转数组
            SIZE_TABLE = new int[sizeTable.size()];
            for (int i = 0; i < SIZE_TABLE.length; i ++) {
                SIZE_TABLE[i] = sizeTable.get(i);
            }
        }
    
        /**
         * @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
         */
        @Deprecated
        public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
    
        /*
        通过给定size查找下标,二分查找法。
        如果size不在SIZE_TABLE内,返回最接近它的一个稍小值的索引
         */
        private static int getSizeTableIndex(final int size) {
            for (int low = 0, high = SIZE_TABLE.length - 1;;) {
                if (high < low) {
                    return low;
                }
                if (high == low) {
                    return high;
                }
    
                int mid = low + high >>> 1;
                int a = SIZE_TABLE[mid];
                int b = SIZE_TABLE[mid + 1];
                if (size > b) {
                    low = mid + 1;
                } else if (size < a) {
                    high = mid - 1;
                } else if (size == a) {
                    return mid;
                } else {
                    return mid + 1;
                }
            }
        }
    
        private final class HandleImpl extends MaxMessageHandle {
            // 最小容量的索引
            private final int minIndex;
            // 最大容量的索引
            private final int maxIndex;
            // 默认容量的索引
            private int index;
    
            // 下次接受的缓冲区大小
            private int nextReceiveBufferSize;
            // 是否需要立即缩容?因为需要连续两次读取的字节数小于阈值,第一次设为true,第二次才缩容。
            private boolean decreaseNow;
    
            HandleImpl(int minIndex, int maxIndex, int initial) {
                this.minIndex = minIndex;
                this.maxIndex = maxIndex;
    
                // 根据初始值得到SIZE_TABLE的下标
                index = getSizeTableIndex(initial);
                // 第一次分配的缓冲区大小就是initial默认值
                nextReceiveBufferSize = SIZE_TABLE[index];
            }
    
            /*
            Unsafe.read()循环读数据时会调用该方法,bytes是上一次实际读取到的字节数。
             */
            @Override
            public void lastBytesRead(int bytes) {
                /*
                attemptedBytesRead():尝试读取的字节数。
                NioSocketChannel.doReadBytes()会将attemptedBytesRead设置为ByteBuf.writableBytes(),
                即只要有数据可读,Netty会尽量将ByteBuf写满。
                 */
                if (bytes == attemptedBytesRead()) {
                    // 实际读取的字节数填满了缓冲区,则扩容。
                    record(bytes);
                }
                // 调用父类方法,将数值累加到 totalBytesRead
                super.lastBytesRead(bytes);
            }
    
            /*
            预测下次需要分配的缓冲区大小
             */
            @Override
            public int guess() {
                return nextReceiveBufferSize;
            }
    
            /*
             根据实际读取到的字节数,自适应调整 下次应该分配的缓冲区大小
             */
            private void record(int actualReadBytes) {
                // 如果连续两次,读取的字节数 小于等于 前一个容量大小
                if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
                    // 根据-1的索引步长进行缩容,需要连续两次触发才缩容,所以有一个decreaseNow
                    if (decreaseNow) {
                        index = max(index - INDEX_DECREMENT, minIndex);//确保不低于最小值
                        nextReceiveBufferSize = SIZE_TABLE[index];
                        decreaseNow = false;
                    } else {
                        decreaseNow = true;
                    }
                } else if (actualReadBytes >= nextReceiveBufferSize) {
                    /*
                    根据+4的索引步长进行扩容,但不能超过最大值。
                    因此默认情况下的扩容逻辑:
                    2048 > 32768 > 65536 > 65536(不变...)
                    2k   > 32K   > 64K ...
                     */
                    index = min(index + INDEX_INCREMENT, maxIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                }
            }
    
            @Override
            public void readComplete() {
                // 数据读取完毕,根据本次读取的总字节数,自适应调整下次应该分配的缓冲区大小
                record(totalBytesRead());
            }
        }
    
        // 最小、默认、最大容量在SIZE_TABLE中的下标
        private final int minIndex;
        private final int maxIndex;
        private final int initial;
    
        // 根据默认值创建一个:自适应接受缓冲区分配器:64、2048、65535
        public AdaptiveRecvByteBufAllocator() {
            this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
        }
    
        // 根据指定最小、默认、最大容量创建一个:自适应接受缓冲区分配器
        public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
            checkPositive(minimum, "minimum");
            if (initial < minimum) {
                throw new IllegalArgumentException("initial: " + initial);
            }
            if (maximum < initial) {
                throw new IllegalArgumentException("maximum: " + maximum);
            }
    
            int minIndex = getSizeTableIndex(minimum);
            if (SIZE_TABLE[minIndex] < minimum) {
                this.minIndex = minIndex + 1;
            } else {
                this.minIndex = minIndex;
            }
    
            int maxIndex = getSizeTableIndex(maximum);
            if (SIZE_TABLE[maxIndex] > maximum) {
                this.maxIndex = maxIndex - 1;
            } else {
                this.maxIndex = maxIndex;
            }
    
            this.initial = initial;
        }
    
        @SuppressWarnings("deprecation")
        @Override
        public Handle newHandle() {
            return new HandleImpl(minIndex, maxIndex, initial);
        }
    
        @Override
        public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
            super.respectMaybeMoreData(respectMaybeMoreData);
            return this;
        }
    }

AdaptiveRecvByteBufAllocator根据名字就能看出来,它是一个自适应的数据接收缓冲区分配器,这个「自适应」体现在ByteBuf空间的分配上。

它使用三个属性分别定义ByteBuf分配的默认值、最小值、最大值,使用一个int数组SIZE_TABLE来定义扩/缩容的容量表,512字节内,以16字节为步长扩容,512字节后,成倍扩容。lastBytesRead()会记录下每次循环读取的实际字节数,如果读取的字节填满了ByteBuf,则会调用record(bytes)进行扩容,扩容策略为:扩容索引+4,即16倍扩容,在不超过最大值的前提下,默认的扩容策略如下:

    2048 > 32768 > 65536 > 65536(不变...)
    2k   > 32K   > 64K ...

如果连续两次读取的字节数小于等于前一个缩容容量,则会进行缩容,缩容的策略是容量索引位-1,即512字节后,成倍缩容,512字节前,缩小16字节。

由于需要连续两次读取的字节数少才会缩容,所以加了一个属性decreaseNow来记录是否需要立即缩容,第一次触发它为true,第二次才缩容。

为什么需要连续两次呢?因为一次读取的字节数少可能是因为读到了上一个数据包的末尾,数据包本身还是很大的,所以不能缩容。连续两次才能说明一个完整的数据包很小,下次分配的ByteBuf可以小些以节省内存。

循环结束后,会调用allocHandle.readComplete(),它会根据此次读取的总字节数去做动态调整,为下次分配ByteBuf提供预测。 例如,本次处理OP_READ事件,循环读3次,每次读取了100KB,那么下次就会直接分配一个300KB的ByteBuf,争取一次性读完。相反,如果读取的字节数少,就缩容节省内存。

总结

Channel在接收对端数据时,因为不知道该分配多大的ByteBuf来接收,所以会将ByteBuf的分配任务交给RecvByteBufAllocator,期望它能分配一个容量大到可以足够容纳数据,又小到不会浪费太多内存的ByteBuf,默认的实现是AdaptiveRecvByteBufAllocator,它会根据前面实际读取的字节数,自适应的调整下次分配的ByteBuf大小。

虽然AdaptiveRecvByteBufAllocator会尽量去预测下次分配ByteBuf的大小,但是预测会有不准的时候,因此Channel还是会进行循环读,防止ByteBuf分配的过小无法容纳所有数据。但是为了避免IO线程阻塞,其他Channel的事件得不到处理,默认会限制单次最多循环读16次,如果发送的数据包真的非常大,16次都没有读完,Netty本次也会放弃处理,等待下次select()轮询时再处理。

最后,再提醒一句,不要错误的理解为:channelRead()的触发是因为TCP拆包导致的!!!