2023-08-16
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/372

Buffer 是一种字节容器,在Java NIO、 Netty 等 NIO 框架中都有类似的设计。例如,Java NIO 中的ByteBuffer、Netty4 中的 ByteBuf。Dubbo 抽象出了 ChannelBuffer 接口,用于对底层各种不同的 NIO 框架中的 Buffer 进行统一,其子类如下图所示:

202308162141298511.png

上述这些Dubbo中的缓冲区相关接口、抽象类及实现类,都定义在dubbo-remoting-api模块中。org.apache.dubbo.remoting.buffer包在更高层面上,抽象了各个 NIO 框架中的缓冲区概念,同时也提供了一些基础实现:

202308162141305262.png

本章,我就按照 ChannelBuffer 的继承结构,从顶层的 ChannelBuffer 接口开始,逐个分析 ChannelBuffer 的各个实现类。

一、ChannelBuffer接口

ChannelBuffer 接口的设计与 Netty4 中 ByteBuf抽象类的设计基本一致,也有 readerIndexwriterIndex 指针的概念。

1.1 核心方法

我们先来看ChannelBuffer接口的核心方法,大部分方法和Netty中的ByteBuf是类似,都是围绕readerIndexwriterIndex 指针展开:

    public interface ChannelBuffer extends Comparable<ChannelBuffer> {
    
        // 从从参数指定的位置读取字节(不会修改readerIndex)
        void getBytes(int index, byte[] dst);
    
        // 向参数指定的位置写入字节(不会修改writerIndex)
        void setBytes(int index, byte[] src);
    
        // 从readerIndex指针开始,读取数据到dst,并移动readerIndex指针
        void readBytes(byte[] dst);
    
        // 从src读取数据,并从writerIndex指针位置开始写入,移动writerIndex指针
        void writeBytes(byte[] src);
    
        // 记录当前readerIndex指针,一般会和resetReaderIndex()配合使用
        void markReaderIndex();
    
        // 记录当前writerIndex指针,一般会和resetWriterIndex()配套使用
        void markWriterIndex();
    
        // 返回当前buffer的容量
        int capacity();
    
        // 将readerIndex和writerIndex清零
        void clear();
    
        // 返回创建 ChannelBuffer 的工厂对象
        ChannelBufferFactory factory();
    
        //...
    }

1.2 ChannelBufferFactory

ChannelBufferFactory是用于创建 ChannelBuffer 的工厂对象,它定义了很多 getBuffer() 方法重载来创建 ChannelBuffer:

    public interface ChannelBufferFactory {
    
        ChannelBuffer getBuffer(int capacity);
    
        ChannelBuffer getBuffer(byte[] array, int offset, int length);
    
        ChannelBuffer getBuffer(ByteBuffer nioBuffer);
    }

ChannelBufferFactory的类继承图如下:

202308162141312253.png

1.3 AbstractChannelBuffer

AbstractChannelBuffer 抽象类 实现了 ChannelBuffer 接口的大部分方法,其核心是维护了以下四个索引:

    public abstract class AbstractChannelBuffer implements ChannelBuffer {
    
        // 通过 readBytes() 方法及其重载读取数据时,会后移 readerIndex 索引
        private int readerIndex;
    
        // 通过 writeBytes() 方法及其重载写入数据的时候,会后移 writerIndex 索引
        private int writerIndex;
    
        // 实现记录以及回滚readerIndex的功能
        private int markedReaderIndex;
    
        // 实现记录以及回滚writerIndex的功能
        private int markedWriterIndex;
    
        //...
    }

AbstractChannelBuffer 的 readBytes()writeBytes() 方法(及其重载)内部会通过 getBytes() 方法和 setBytes() 方法实现数据的读写,这些方法在 AbstractChannelBuffer 子类中实现。以readBytes/writeBytes方法为例:

    // AbstractChannelBuffer.java
    
    public void readBytes(byte[] dst, int dstIndex, int length) {
        // 检测Channel中的可读字节数是否足够
        checkReadableBytes(length);
    
        // 将readerIndex之后的length个字节读取到dst数组中 dstIndex~dstIndex+length 的位置
        getBytes(readerIndex, dst, dstIndex, length);    // 抽象方法,子类实现
    
        // 将readerIndex后移length个字节
        readerIndex += length;
    }
    
    public void writeBytes(byte[] src, int srcIndex, int length) {
        // 将src数组中srcIndex~srcIndex+length的数据写入当前buffer的writerIndex~writerIndex+length的位置
        setBytes(writerIndex, src, srcIndex, length);    // 抽象方法,子类实现
    
        // 将writeIndex后移length个字节
        writerIndex += length;
    }

1.4 输入输出流

Dubbo 在 ChannelBuffer 的基础上,提供了一套输入输出流,如下图所示:

202308162141327594.png

ChannelBufferInputStream 内部封装了一个 ChannelBuffer,其实现 InputStream 接口的 read() 相关方法时,全部委托 ChannelBuffer 读取数据。ChannelBufferInputStream 还维护了一个 startIndex 索引和endIndex 索引,作为读取数据的起止位置。

    public class ChannelBufferInputStream extends InputStream {
    
        private final ChannelBuffer buffer;
        private final int startIndex;
        private final int endIndex;
    
        public int read(byte[] b, int off, int len) throws IOException {
            int available = available();
            if (available == 0) {
                return -1;
            }
    
            len = Math.min(available, len);
            buffer.readBytes(b, off, len);
            return len;
        }
        //...
    }

ChannelBufferOutputStream 与 ChannelBufferInputStream 类似,会向底层的 ChannelBuffer 写入数据,我这里就不赘述了。

1.5 ChannelBuffers

ChannelBuffers是一个门面类,包含了创建各类 ChannelBuffer 对象的方法:

    public final class ChannelBuffers {
    
        public static final ChannelBuffer EMPTY_BUFFER = new HeapChannelBuffer(0);
    
        private ChannelBuffers() {
        }
        /...
    }

202308162141345755.png

  • dynamicBuffer() 方法:创建 DynamicChannelBuffer 对象,初始化大小由第一个参数指定,默认为 256。
  • buffer() 方法:创建指定大小的 HeapChannelBuffer 对象。
  • wrappedBuffer() 方法:将传入的 byte[] 数字封装成 HeapChannelBuffer 对象。
  • directBuffer() 方法:创建 ByteBufferBackedChannelBuffer 对象,需要注意的是,底层的 ByteBuffer 使用的是堆外内存,需要特别关注堆外内存的管理。

ChannelBuffers的equals方法比较特殊,用于比较两个 ChannelBuffer 是否相同。该方法会逐个比较两个 ChannelBuffer 中的前 7 个可读字节,只有两者完全一致,才算两个 ChannelBuffer 相同:

    // ChannelBuffers.java
    
    public static boolean equals(ChannelBuffer bufferA, ChannelBuffer bufferB) {
        // 比较两个ChannelBuffer的可读字节数
        final int aLen = bufferA.readableBytes();
        if (aLen != bufferB.readableBytes()) {
            return false;
        }
    
        // 只比较前7个字节
        final int byteCount = aLen & 7;
    
        int aIndex = bufferA.readerIndex();
        int bIndex = bufferB.readerIndex();
    
        for (int i = byteCount; i > 0; i--) {
            // 前7个字节存在不同,则返回false
            if (bufferA.getByte(aIndex) != bufferB.getByte(bIndex)) {
                return false;
            }
            aIndex++;
            bIndex++;
        }
        return true;
    }

ChannelBuffers的compare() 方法,用于比较两个 ChannelBuffer 的大小,会逐个比较两个 ChannelBuffer 中的全部可读字节,具体实现与 equals() 方法类似,我就不赘述了。

二、ChannelBuffer实现类

了解了 ChannelBuffer 接口的核心方法以及 AbstractChannelBuffer 的公共实现之后,我们再来看 ChannelBuffer 的具体子类。

2.1 HeapChannelBuffer

HeapChannelBuffer 是基于字节数组的 ChannelBuffer 实现 ,它内部有一个array字节数组,用于存储数据:

    public class HeapChannelBuffer extends AbstractChannelBuffer {
    
        protected final byte[] array;
    
        //...
    }

HeapChannelBuffer 的 setBytes()getBytes() 方法都是通过调用 System.arraycopy() 方法完成数组操作的:

    // HeapChannelBuffer.java
    
    public void setBytes(int index, byte[] src, int srcIndex, int length) {
        System.arraycopy(src, srcIndex, array, index, length);
    }
    
    public void getBytes(int index, byte[] dst, int dstIndex, int length) {
        System.arraycopy(array, index, dst, dstIndex, length);
    }

HeapChannelBuffer 对象的创建是通过工厂类 HeapChannelBufferFactory 实现的。HeapChannelBufferFactory.getBuffer() 方法的内部通过 ChannelBuffers 这个工具类,创建一个指定大小的 HeapChannelBuffer 对象:

    // HeapChannelBufferFactory.java
    
    public class HeapChannelBufferFactory implements ChannelBufferFactory {
    
        @Override
        public ChannelBuffer getBuffer(int capacity) {
            // 新建一个HeapChannelBuffer,底层会新建一个长度为capacity的byte数组
            return ChannelBuffers.buffer(capacity);
        }
    
        @Override
        public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
            // 新建一个HeapChannelBuffer,并且会拷贝array数组中offset~offset+lenght的数据到新HeapChannelBuffer中
            return ChannelBuffers.wrappedBuffer(array, offset, length);
        }
    
        @Override
        public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
            if (nioBuffer.hasArray()) {
                return ChannelBuffers.wrappedBuffer(nioBuffer);
            }
    
            ChannelBuffer buf = getBuffer(nioBuffer.remaining());
            int pos = nioBuffer.position();
            buf.writeBytes(nioBuffer);
            nioBuffer.position(pos);
            return buf;
        }
    }

2.2 DynamicChannelBuffer

DynamicChannelBuffer 是其它 ChannelBuffer 的装饰器,它可以为其它 ChannelBuffer 添加 动态扩展容量 的功能。DynamicChannelBuffer 中有两个核心字段:

    public class DynamicChannelBuffer extends AbstractChannelBuffer {
    
        // 默认为 HeapChannelBufferFactory
        private final ChannelBufferFactory factory;
    
        // 被修饰的 ChannelBuffer,默认为 HeapChannelBuffer
        private ChannelBuffer buffer;
    }

DynamicChannelBuffer 需要关注的是 ensureWritableBytes() 方法,该方法实现了动态扩容的功能,在每次写入数据之前,都需要调用该方法确定当前可用空间是否足够:

    // DynamicChannelBuffer.java
    
    @Override
    public void ensureWritableBytes(int minWritableBytes) {
        if (minWritableBytes <= writableBytes()) {
            return;
        }
    
        int newCapacity;
        if (capacity() == 0) {
            newCapacity = 1;
        } else {
            newCapacity = capacity();
        }
        // 空间扩大为原来的两倍
        int minNewCapacity = writerIndex() + minWritableBytes;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }
    
        ChannelBuffer newBuffer = factory().getBuffer(newCapacity);
        // 将旧 ChannelBuffer 中的数据拷贝到新 ChannelBuffer 中
        newBuffer.writeBytes(buffer, 0, writerIndex());
        // 指向新 ChannelBuffer 对象
        buffer = newBuffer;
    }

ensureWritableBytes() 方法如果检测到底层 ChannelBuffer 对象的空间不足,则会创建一个新的 ChannelBuffer(空间扩大为原来的两倍),然后将旧 ChannelBuffer 中的数据拷贝到新 ChannelBuffer 中,最后将 buffer 字段指向新 ChannelBuffer 对象,完成整个扩容操作。

2.3 ByteBufferBackedChannelBuffer

ByteBufferBackedChannelBuffer, 是基于 Java NIO 中 ByteBuffer 的 ChannelBuffer 实现 ,它的内部通过委托调用 ByteBuffer 的 API 实现了ChannelBuffer 的接口功能。以 getBytes() 方法和 setBytes() 方法为例:

    // ByteBufferBackedChannelBuffer.java
    
    public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
        // Java NIO中的ByteBuffer
        private final ByteBuffer buffer;
    
        private final int capacity;
    
        public void getBytes(int index, byte[] dst, int dstIndex, int length) {
            ByteBuffer data = buffer.duplicate();
            try {
                // 移动ByteBuffer中的指针
                data.limit(index + length).position(index);
            } catch (IllegalArgumentException e) {
                throw new IndexOutOfBoundsException();
            }
            // 通过ByteBuffer.get()方法实现读取
            data.get(dst, dstIndex, length);
        }
    
        public void setBytes(int index, byte[] src, int srcIndex, int length) {
            ByteBuffer data = buffer.duplicate();
            // 移动ByteBuffer中的指针
            data.limit(index + length).position(index);
            // 将数据写入底层的ByteBuffer中
            data.put(src, srcIndex, length);
        }
        //...
    }

2.4 NettyBackedChannelBuffer

NettyBackedChannelBuffer , 是基于 Netty 中 ByteBuf 的 ChannelBuffer 实现 ,Netty 中的 ByteBuf 维护了 readerIndex 和 writerIndex 以及 markedReaderIndex、markedWriterIndex 这四个索引,所以 NettyBackedChannelBuffer 没有再继承 AbstractChannelBuffer 抽象类,而是直接实现了 ChannelBuffer 接口。

    public class NettyBackedChannelBuffer implements ChannelBuffer {
        // Netty 中的ByteBuf
        private ByteBuf buffer;
    
        public void getBytes(int index, byte[] dst, int dstIndex, int length) {
            buffer.getBytes(index, dst, dstIndex, length);
        }
    
        public void setBytes(int index, byte[] src) {
            buffer.setBytes(index, src);
        }
        //...
    }

三、总结

本章,我对dubbo-remoting 模块中的 org.apache.dubbo.remoting.buffer包中的核心实现进行了讲解。Dubbo底层基于各类网络通信框架完成通信,但是各个框架对于缓冲区的接口及实现都是不同的,所以Dubbo需要在这些框架之上再抽象出一层。

ChannelBuffer 就是这一抽象层的核心接口,Dubbo提供了 ChannelBuffer 的多种实现,包括 HeapChannelBuffer、DynamicChannelBuffer、ByteBufferBackedChannelBuffer 等。

阅读全文