Buffer 是一种字节容器,在Java NIO、 Netty 等 NIO 框架中都有类似的设计。例如,Java NIO 中的ByteBuffer、Netty4 中的 ByteBuf。Dubbo 抽象出了 ChannelBuffer 接口,用于对底层各种不同的 NIO 框架中的 Buffer 进行统一,其子类如下图所示:
上述这些Dubbo中的缓冲区相关接口、抽象类及实现类,都定义在dubbo-remoting-api
模块中。org.apache.dubbo.remoting.buffer
包在更高层面上,抽象了各个 NIO 框架中的缓冲区概念,同时也提供了一些基础实现:
本章,我就按照 ChannelBuffer 的继承结构,从顶层的 ChannelBuffer 接口开始,逐个分析 ChannelBuffer 的各个实现类。
一、ChannelBuffer接口
ChannelBuffer 接口的设计与 Netty4 中 ByteBuf
抽象类的设计基本一致,也有 readerIndex
和 writerIndex
指针的概念。
1.1 核心方法
我们先来看ChannelBuffer接口的核心方法,大部分方法和Netty中的ByteBuf是类似,都是围绕readerIndex
和 writerIndex
指针展开:
public interface ChannelBuffer extends Comparable<ChannelBuffer> {
// 从从参数指定的位置读取字节(不会修改readerIndex)
void getBytes(int index, byte[] dst);
// 向参数指定的位置写入字节(不会修改writerIndex)
void setBytes(int index, byte[] src);
// 从readerIndex指针开始,读取数据到dst,并移动readerIndex指针
void readBytes(byte[] dst);
// 从src读取数据,并从writerIndex指针位置开始写入,移动writerIndex指针
void writeBytes(byte[] src);
// 记录当前readerIndex指针,一般会和resetReaderIndex()配合使用
void markReaderIndex();
// 记录当前writerIndex指针,一般会和resetWriterIndex()配套使用
void markWriterIndex();
// 返回当前buffer的容量
int capacity();
// 将readerIndex和writerIndex清零
void clear();
// 返回创建 ChannelBuffer 的工厂对象
ChannelBufferFactory factory();
//...
}
1.2 ChannelBufferFactory
ChannelBufferFactory是用于创建 ChannelBuffer 的工厂对象,它定义了很多 getBuffer()
方法重载来创建 ChannelBuffer:
public interface ChannelBufferFactory {
ChannelBuffer getBuffer(int capacity);
ChannelBuffer getBuffer(byte[] array, int offset, int length);
ChannelBuffer getBuffer(ByteBuffer nioBuffer);
}
ChannelBufferFactory的类继承图如下:
1.3 AbstractChannelBuffer
AbstractChannelBuffer 抽象类 实现了 ChannelBuffer 接口的大部分方法,其核心是维护了以下四个索引:
public abstract class AbstractChannelBuffer implements ChannelBuffer {
// 通过 readBytes() 方法及其重载读取数据时,会后移 readerIndex 索引
private int readerIndex;
// 通过 writeBytes() 方法及其重载写入数据的时候,会后移 writerIndex 索引
private int writerIndex;
// 实现记录以及回滚readerIndex的功能
private int markedReaderIndex;
// 实现记录以及回滚writerIndex的功能
private int markedWriterIndex;
//...
}
AbstractChannelBuffer 的 readBytes()
和 writeBytes()
方法(及其重载)内部会通过 getBytes() 方法和 setBytes() 方法实现数据的读写,这些方法在 AbstractChannelBuffer 子类中实现。以readBytes/writeBytes方法为例:
// AbstractChannelBuffer.java
public void readBytes(byte[] dst, int dstIndex, int length) {
// 检测Channel中的可读字节数是否足够
checkReadableBytes(length);
// 将readerIndex之后的length个字节读取到dst数组中 dstIndex~dstIndex+length 的位置
getBytes(readerIndex, dst, dstIndex, length); // 抽象方法,子类实现
// 将readerIndex后移length个字节
readerIndex += length;
}
public void writeBytes(byte[] src, int srcIndex, int length) {
// 将src数组中srcIndex~srcIndex+length的数据写入当前buffer的writerIndex~writerIndex+length的位置
setBytes(writerIndex, src, srcIndex, length); // 抽象方法,子类实现
// 将writeIndex后移length个字节
writerIndex += length;
}
1.4 输入输出流
Dubbo 在 ChannelBuffer 的基础上,提供了一套输入输出流,如下图所示:
ChannelBufferInputStream 内部封装了一个 ChannelBuffer,其实现 InputStream 接口的 read() 相关方法时,全部委托 ChannelBuffer 读取数据。ChannelBufferInputStream 还维护了一个 startIndex
索引和endIndex
索引,作为读取数据的起止位置。
public class ChannelBufferInputStream extends InputStream {
private final ChannelBuffer buffer;
private final int startIndex;
private final int endIndex;
public int read(byte[] b, int off, int len) throws IOException {
int available = available();
if (available == 0) {
return -1;
}
len = Math.min(available, len);
buffer.readBytes(b, off, len);
return len;
}
//...
}
ChannelBufferOutputStream 与 ChannelBufferInputStream 类似,会向底层的 ChannelBuffer 写入数据,我这里就不赘述了。
1.5 ChannelBuffers
ChannelBuffers是一个门面类,包含了创建各类 ChannelBuffer 对象的方法:
public final class ChannelBuffers {
public static final ChannelBuffer EMPTY_BUFFER = new HeapChannelBuffer(0);
private ChannelBuffers() {
}
/...
}
- dynamicBuffer() 方法:创建 DynamicChannelBuffer 对象,初始化大小由第一个参数指定,默认为 256。
- buffer() 方法:创建指定大小的 HeapChannelBuffer 对象。
- wrappedBuffer() 方法:将传入的 byte[] 数字封装成 HeapChannelBuffer 对象。
- directBuffer() 方法:创建 ByteBufferBackedChannelBuffer 对象,需要注意的是,底层的 ByteBuffer 使用的是堆外内存,需要特别关注堆外内存的管理。
ChannelBuffers的equals方法比较特殊,用于比较两个 ChannelBuffer 是否相同。该方法会逐个比较两个 ChannelBuffer 中的前 7 个可读字节,只有两者完全一致,才算两个 ChannelBuffer 相同:
// ChannelBuffers.java
public static boolean equals(ChannelBuffer bufferA, ChannelBuffer bufferB) {
// 比较两个ChannelBuffer的可读字节数
final int aLen = bufferA.readableBytes();
if (aLen != bufferB.readableBytes()) {
return false;
}
// 只比较前7个字节
final int byteCount = aLen & 7;
int aIndex = bufferA.readerIndex();
int bIndex = bufferB.readerIndex();
for (int i = byteCount; i > 0; i--) {
// 前7个字节存在不同,则返回false
if (bufferA.getByte(aIndex) != bufferB.getByte(bIndex)) {
return false;
}
aIndex++;
bIndex++;
}
return true;
}
ChannelBuffers的compare() 方法,用于比较两个 ChannelBuffer 的大小,会逐个比较两个 ChannelBuffer 中的全部可读字节,具体实现与 equals() 方法类似,我就不赘述了。
二、ChannelBuffer实现类
了解了 ChannelBuffer 接口的核心方法以及 AbstractChannelBuffer 的公共实现之后,我们再来看 ChannelBuffer 的具体子类。
2.1 HeapChannelBuffer
HeapChannelBuffer 是基于字节数组的 ChannelBuffer 实现 ,它内部有一个array
字节数组,用于存储数据:
public class HeapChannelBuffer extends AbstractChannelBuffer {
protected final byte[] array;
//...
}
HeapChannelBuffer 的 setBytes()
和 getBytes()
方法都是通过调用 System.arraycopy()
方法完成数组操作的:
// HeapChannelBuffer.java
public void setBytes(int index, byte[] src, int srcIndex, int length) {
System.arraycopy(src, srcIndex, array, index, length);
}
public void getBytes(int index, byte[] dst, int dstIndex, int length) {
System.arraycopy(array, index, dst, dstIndex, length);
}
HeapChannelBuffer 对象的创建是通过工厂类 HeapChannelBufferFactory 实现的。HeapChannelBufferFactory.getBuffer()
方法的内部通过 ChannelBuffers
这个工具类,创建一个指定大小的 HeapChannelBuffer 对象:
// HeapChannelBufferFactory.java
public class HeapChannelBufferFactory implements ChannelBufferFactory {
@Override
public ChannelBuffer getBuffer(int capacity) {
// 新建一个HeapChannelBuffer,底层会新建一个长度为capacity的byte数组
return ChannelBuffers.buffer(capacity);
}
@Override
public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
// 新建一个HeapChannelBuffer,并且会拷贝array数组中offset~offset+lenght的数据到新HeapChannelBuffer中
return ChannelBuffers.wrappedBuffer(array, offset, length);
}
@Override
public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
if (nioBuffer.hasArray()) {
return ChannelBuffers.wrappedBuffer(nioBuffer);
}
ChannelBuffer buf = getBuffer(nioBuffer.remaining());
int pos = nioBuffer.position();
buf.writeBytes(nioBuffer);
nioBuffer.position(pos);
return buf;
}
}
2.2 DynamicChannelBuffer
DynamicChannelBuffer 是其它 ChannelBuffer 的装饰器,它可以为其它 ChannelBuffer 添加 动态扩展容量 的功能。DynamicChannelBuffer 中有两个核心字段:
public class DynamicChannelBuffer extends AbstractChannelBuffer {
// 默认为 HeapChannelBufferFactory
private final ChannelBufferFactory factory;
// 被修饰的 ChannelBuffer,默认为 HeapChannelBuffer
private ChannelBuffer buffer;
}
DynamicChannelBuffer 需要关注的是 ensureWritableBytes()
方法,该方法实现了动态扩容的功能,在每次写入数据之前,都需要调用该方法确定当前可用空间是否足够:
// DynamicChannelBuffer.java
@Override
public void ensureWritableBytes(int minWritableBytes) {
if (minWritableBytes <= writableBytes()) {
return;
}
int newCapacity;
if (capacity() == 0) {
newCapacity = 1;
} else {
newCapacity = capacity();
}
// 空间扩大为原来的两倍
int minNewCapacity = writerIndex() + minWritableBytes;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
ChannelBuffer newBuffer = factory().getBuffer(newCapacity);
// 将旧 ChannelBuffer 中的数据拷贝到新 ChannelBuffer 中
newBuffer.writeBytes(buffer, 0, writerIndex());
// 指向新 ChannelBuffer 对象
buffer = newBuffer;
}
ensureWritableBytes() 方法如果检测到底层 ChannelBuffer 对象的空间不足,则会创建一个新的 ChannelBuffer(空间扩大为原来的两倍),然后将旧 ChannelBuffer 中的数据拷贝到新 ChannelBuffer 中,最后将 buffer 字段指向新 ChannelBuffer 对象,完成整个扩容操作。
2.3 ByteBufferBackedChannelBuffer
ByteBufferBackedChannelBuffer, 是基于 Java NIO 中 ByteBuffer 的 ChannelBuffer 实现 ,它的内部通过委托调用 ByteBuffer 的 API 实现了ChannelBuffer 的接口功能。以 getBytes() 方法和 setBytes() 方法为例:
// ByteBufferBackedChannelBuffer.java
public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
// Java NIO中的ByteBuffer
private final ByteBuffer buffer;
private final int capacity;
public void getBytes(int index, byte[] dst, int dstIndex, int length) {
ByteBuffer data = buffer.duplicate();
try {
// 移动ByteBuffer中的指针
data.limit(index + length).position(index);
} catch (IllegalArgumentException e) {
throw new IndexOutOfBoundsException();
}
// 通过ByteBuffer.get()方法实现读取
data.get(dst, dstIndex, length);
}
public void setBytes(int index, byte[] src, int srcIndex, int length) {
ByteBuffer data = buffer.duplicate();
// 移动ByteBuffer中的指针
data.limit(index + length).position(index);
// 将数据写入底层的ByteBuffer中
data.put(src, srcIndex, length);
}
//...
}
2.4 NettyBackedChannelBuffer
NettyBackedChannelBuffer , 是基于 Netty 中 ByteBuf 的 ChannelBuffer 实现 ,Netty 中的 ByteBuf 维护了 readerIndex 和 writerIndex 以及 markedReaderIndex、markedWriterIndex 这四个索引,所以 NettyBackedChannelBuffer 没有再继承 AbstractChannelBuffer 抽象类,而是直接实现了 ChannelBuffer 接口。
public class NettyBackedChannelBuffer implements ChannelBuffer {
// Netty 中的ByteBuf
private ByteBuf buffer;
public void getBytes(int index, byte[] dst, int dstIndex, int length) {
buffer.getBytes(index, dst, dstIndex, length);
}
public void setBytes(int index, byte[] src) {
buffer.setBytes(index, src);
}
//...
}
三、总结
本章,我对dubbo-remoting
模块中的 org.apache.dubbo.remoting.buffer
包中的核心实现进行了讲解。Dubbo底层基于各类网络通信框架完成通信,但是各个框架对于缓冲区的接口及实现都是不同的,所以Dubbo需要在这些框架之上再抽象出一层。
ChannelBuffer 就是这一抽象层的核心接口,Dubbo提供了 ChannelBuffer 的多种实现,包括 HeapChannelBuffer、DynamicChannelBuffer、ByteBufferBackedChannelBuffer 等。