前言
前面写过《Netty服务端启动全流程源码分析》,BossGroup获取到客户端连接SocketChannel后会将其注册到WorkerGroup,由WorkerGroup来驱动数据IO读写。WorkerGroup的EventLoop监听到Channel有OP_READ
事件时,会调用Channel.Unsafe.read()
方法,Netty会将读取到的数据包装成ByteBuf,然后触发回调pipeline.fireChannelRead(byteBuf)
将事件传播出去。
整体流程是清楚了,但是对于详细的数据接收细节没有介绍,本篇文章会做介绍。
前置知识
熟悉Java Nio编程的同学应该知道,要想从SocketChannel读取数据,需要先创建一个ByteBuffer,然后调用SocketChannel.read(ByteBuffer)
方法。但是由于读取操作并未实际发生,程序并不知道有多少数据需要接收,导致我们并不知道需要创建一个多大的ByteBuffer,大了会造成内存的浪费,小了又需要频繁扩容。而且ByteBuffer本身不支持扩容操作,你需要重新申请一个更大的ByteBuffer,然后进行内存的复制,开销就更大了。Netty是如何解决这个问题的呢?后面会介绍。
还有一个知识点,读者需要提前了解。对于注册到Selector
多路复用器上,且监听OP_READ
事件的Channel,Selector
判断的其实就是Channel的有效可读字节数。意思就是说,对于有数据可读的Channel,如果你数据没有读完,下次select()
多路复用器依然会再返回它。所以,Netty会进行循环读,前面说过了,你不知道对端会发送给你多少数据,默认单次最多读16次,超过16次数据还没读完,本次就不再继续处理了,因为Netty怕阻塞其他IO事件,后面会详细分析。
AbstractNioByteChannel.read()分析
之前的文章已经分析过,当Netty检测到Channel有可读事件时,会调用AbstractNioByteChannel.read()
方法,下面是该方法整体的一个源码分析:
/*
客户端发送数据时触发。
见 io.netty.channel.nio.NioEventLoop.processSelectedKey
*/
@Override
public final void read() {
// 客户端Channel的配置
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// 获取ByteBufAllocator,默认是 PooledByteBufAllocator
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
// 重置统计信息
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
/*
当对端发送一个超大的数据包时,TCP会拆包。
OP_READ事件只会触发一次,Netty需要循环读,默认最多读16次,因此ChannelRead()可能会触发多次,拿到的是半包数据。
如果16次没把数据读完,没有关系,下次select()还会继续处理。
对于Selector的可读事件,如果你没有读完数据,它会一直返回。
*/
do {
// 分配一个ByteBuf,大小能容纳可读数据,又不过于浪费空间。
byteBuf = allocHandle.allocate(allocator);
/*
doReadBytes(byteBuf):ByteBuf内部有ByteBuffer,底层还是调用了SocketChannel.read(ByteBuffer)
allocHandle.lastBytesRead()根据读取到的实际字节数,自适应调整下次分配的缓冲区大小。
*/
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// 没有读取到字节
byteBuf.release();//释放ByteBuf,空的,没有意义。
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// 递增已经读取的消息数量
allocHandle.incMessagesRead(1);
readPending = false;
// 通过pipeline传播ChannelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());//判断是否需要继续读
// 读取完毕,pipeline传播ChannelReadComplete事件
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
这里我们重点关注一下recvBufAllocHandle()
方法,很简单,就是对Channel绑定的recvHandle
进行了判空校验,如果没绑定就创建一个。
@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
if (recvHandle == null) {
// 如果Channel对应的recvHandle是空的,则创建一个新实例
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}
Channel会依赖RecvByteBufAllocator.Handle来创建ByteBuf,为什么不直接创建呢?因为前面说过的,不知道真正有多少数据要接收,不知道该创建多大的ByteBuf,大了浪费空间,小了又要频繁扩容。基于这个原因,Channel会把创建ByteBuf的任务交给RecvByteBufAllocator.Handle处理,希望它可以基于历史数据做统计分析,分配出一个容量大到足够容纳所有的数据,又小到不会浪费太多的空间。
RecvByteBufAllocator.Handle的细节后面分析,这里先把read()
流程分析完。
创建好一个大小合适的ByteBuf之后,Channel会调用doReadBytes(byteBuf)
将数据写入到Bytebuf:
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 设置 尝试读取的字节数,尽量把ByteBuf填满
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
// 将SocketChannel读取的数据写入到byteBuf
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
byteBuf.writeBytes(ScatteringByteChannel in, int length)
其实就是将ByteBuf转换成JDK的ByteBuffer,然后通过JDK原生的SocketChannel进行读取,它会返回实际的读取字节数。
如果实际读取的字节数小于等于0,说明没有数据可读了,本次OP_READ
事件处理完毕,触发pipeline.fireChannelReadComplete()
回调,否则触发pipeline.fireChannelRead(byteBuf)
将读取到的ByteBuf传递给其他ChannelHandler处理。
如果对端发送的数据包很大,很可能创建的ByteBuf不能一次性读完所有数据,所以Channel这里会进行循环读,整个读取的逻辑会放在一个while
循环里,通过allocHandle.continueReading()
判断是否需要继续读取数据。因此,即使TCP没有发生拆包,如果创建的ByteBuf过小,ChannelHandler的channelRead()
也会被触发多次,所以, 切记不可错误的理解为「channelRead()是因为TCP拆包导致的」 。
RecvByteBufAllocator分析
RecvByteBufAllocator是Netty的「数据接收缓冲区分配器」,Channel依赖它来创建大小合适的ByteBuf,提升性能和节省内存。
RecvByteBufAllocator是一个很简单的接口,它的工作由内部接口Handle完成,所以直接看Handle接口就行了。
interface Handle {
/*
通过ByteBufAllocator分配一个大小合适的ByteBuf。
太大:浪费空间。
太小:频繁扩容,内存复制开销。
*/
ByteBuf allocate(ByteBufAllocator alloc);
// 猜测需要分配的字节数
int guess();
// 重置已累积的任何计数器,并建议为下一个读循环应读取多少消息字节。
void reset(ChannelConfig config);
// 增加已读的消息数量
void incMessagesRead(int numMessages);
// 设置上一次读取到的字节数,AdaptiveRecvByteBufAllocator会根据该值 自适应调整下次分配的缓冲区大小。
void lastBytesRead(int bytes);
// 获取上一次读取的字节数
int lastBytesRead();
// 设置尝试读取的字节数
void attemptedBytesRead(int bytes);
// 获取尝试读取的字节数
int attemptedBytesRead();
// 是否还能继续读取
boolean continueReading();
// 读取完成
void readComplete();
}
默认使用的RecvByteBufAllocator实现是AdaptiveRecvByteBufAllocator,它可以自适应调整分配的ByteBuf大小,我们重点分析。
AdaptiveRecvByteBufAllocator类图如下:
从上往下看吧,MaxMessagesRecvByteBufAllocator很简单,在顶级接口的基础之上,限制了循环读的次数:
/*
当有可读事件时,Netty是循环读的,通过continueReading()判断是否需要继续读取。
该类主要是用来限制循环读取的次数的,默认是16.
*/
public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator {
// 返回循环读的最大次数
int maxMessagesPerRead();
// 设置循环读的最大次数
MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead);
}
DefaultMaxMessagesRecvByteBufAllocator是MaxMessagesRecvByteBufAllocator的默认实现,先看属性:
// 最大读取消息数量
private volatile int maxMessagesPerRead;
/*
是否尊重/关心 还有更多的数据可读?
如果为true,则无条件认为还有数据可读,直到下次循环读取到0字节为止。
这可能会导致多执行一次无效读,无意义的创建一个ByteBuf。
*/
private volatile boolean respectMaybeMoreData = true;
核心逻辑都在Handle里,所以直接看Handle即可:
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
// 最大读取多少次消息,默认16次,没读完,下次select接着读。
private int maxMessagePerRead;
// 读取的总消息数
private int totalMessages;
// 读取的字节总数
private int totalBytesRead;
// 尝试读取的字节数,默认是ByteBuf的可写字节数,即尽量把ByteBuf填满。
private int attemptedBytesRead;
// 上次读取的字节数,根据它调整下次分配的缓冲区大小。
private int lastBytesRead;
private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
/*
是否还有更多数据可读的默认判断:attemptedBytesRead == lastBytesRead。
即:本次读取的数据有没有填满ByteBuf,如果填满了,说明可能还有数据要读。否则就不读了,直接触发ChannelReadComplete()。
*/
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return attemptedBytesRead == lastBytesRead;
}
};
// 根据config重置数据,每次处理新的Read事件时触发。
@Override
public void reset(ChannelConfig config) {
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
// 根据猜测的字节数,分配一个ByteBuf。
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
// 递增统计消息的读取数量,默认超过16就不读了,防止阻塞IO线程,其他事件得不到处理。
@Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
// 根据上次读取的字节数,累加总读取到的字节数
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes;
}
}
@Override
public final int lastBytesRead() {
return lastBytesRead;
}
// 是否还要继续循环读取消息
@Override
public boolean continueReading() {
/*
判断依据:
1.认为还有可读数据
2.读取的消息数没有达到上限
*/
return continueReading(defaultMaybeMoreSupplier);
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) // 认为还有可读数据
&&
totalMessages < maxMessagePerRead && totalBytesRead > 0;// 读取的消息数没有达到上限
}
@Override
public void readComplete() {
}
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
// 设置尝试读取的字节数,默认为ByteBuf的可写字节数
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
protected final int totalBytesRead() {
return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
}
}
MaxMessageHandle会根据每次读取的字节数是否填满ByteBuf为依据,判断是否还要继续循环读。如果填满了说明Channel可能还有数据等待读取,反之已无数据可读,直接跳出循环即可。
说完父类,接下来看核心的AdaptiveRecvByteBufAllocator,源码也不是很长:
/*
自适应的,接收对端数据的ByteBuf分配器,分配的ByteBuf有合适的初试容量。
避免太小导致频繁扩容,太大导致内存浪费,GC压力。
基于历史的数据采集做预测:
1.前一次接收的数据完全读满了ByteBuf,则下次会增大缓冲区。
2.连续两次接收的数据小于指定值,则会缩小下次分配的缓冲区。
*/
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
// 默认最小值
static final int DEFAULT_MINIMUM = 64;
// 默认初试值
static final int DEFAULT_INITIAL = 2048;
// 默认最大值
static final int DEFAULT_MAXIMUM = 65536;
/*
如果需要扩容下次分配的缓冲区大小,这个是扩容的索引步长。
*/
private static final int INDEX_INCREMENT = 4;
/*
如果需要缩容下次分配的缓冲区大小,这个是缩容的索引步长。
*/
private static final int INDEX_DECREMENT = 1;
// 扩容表
private static final int[] SIZE_TABLE;
static {
List<Integer> sizeTable = new ArrayList<Integer>();
// 512字节内,以16字节为步长,递增
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
// 512字节后,成倍扩容
for (int i = 512; i > 0; i <<= 1) { // lgtm[java/constant-comparison]
sizeTable.add(i);
}
// List转数组
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) {
SIZE_TABLE[i] = sizeTable.get(i);
}
}
/**
* @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
*/
@Deprecated
public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
/*
通过给定size查找下标,二分查找法。
如果size不在SIZE_TABLE内,返回最接近它的一个稍小值的索引
*/
private static int getSizeTableIndex(final int size) {
for (int low = 0, high = SIZE_TABLE.length - 1;;) {
if (high < low) {
return low;
}
if (high == low) {
return high;
}
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid];
int b = SIZE_TABLE[mid + 1];
if (size > b) {
low = mid + 1;
} else if (size < a) {
high = mid - 1;
} else if (size == a) {
return mid;
} else {
return mid + 1;
}
}
}
private final class HandleImpl extends MaxMessageHandle {
// 最小容量的索引
private final int minIndex;
// 最大容量的索引
private final int maxIndex;
// 默认容量的索引
private int index;
// 下次接受的缓冲区大小
private int nextReceiveBufferSize;
// 是否需要立即缩容?因为需要连续两次读取的字节数小于阈值,第一次设为true,第二次才缩容。
private boolean decreaseNow;
HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
// 根据初始值得到SIZE_TABLE的下标
index = getSizeTableIndex(initial);
// 第一次分配的缓冲区大小就是initial默认值
nextReceiveBufferSize = SIZE_TABLE[index];
}
/*
Unsafe.read()循环读数据时会调用该方法,bytes是上一次实际读取到的字节数。
*/
@Override
public void lastBytesRead(int bytes) {
/*
attemptedBytesRead():尝试读取的字节数。
NioSocketChannel.doReadBytes()会将attemptedBytesRead设置为ByteBuf.writableBytes(),
即只要有数据可读,Netty会尽量将ByteBuf写满。
*/
if (bytes == attemptedBytesRead()) {
// 实际读取的字节数填满了缓冲区,则扩容。
record(bytes);
}
// 调用父类方法,将数值累加到 totalBytesRead
super.lastBytesRead(bytes);
}
/*
预测下次需要分配的缓冲区大小
*/
@Override
public int guess() {
return nextReceiveBufferSize;
}
/*
根据实际读取到的字节数,自适应调整 下次应该分配的缓冲区大小
*/
private void record(int actualReadBytes) {
// 如果连续两次,读取的字节数 小于等于 前一个容量大小
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
// 根据-1的索引步长进行缩容,需要连续两次触发才缩容,所以有一个decreaseNow
if (decreaseNow) {
index = max(index - INDEX_DECREMENT, minIndex);//确保不低于最小值
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
/*
根据+4的索引步长进行扩容,但不能超过最大值。
因此默认情况下的扩容逻辑:
2048 > 32768 > 65536 > 65536(不变...)
2k > 32K > 64K ...
*/
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
@Override
public void readComplete() {
// 数据读取完毕,根据本次读取的总字节数,自适应调整下次应该分配的缓冲区大小
record(totalBytesRead());
}
}
// 最小、默认、最大容量在SIZE_TABLE中的下标
private final int minIndex;
private final int maxIndex;
private final int initial;
// 根据默认值创建一个:自适应接受缓冲区分配器:64、2048、65535
public AdaptiveRecvByteBufAllocator() {
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}
// 根据指定最小、默认、最大容量创建一个:自适应接受缓冲区分配器
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
checkPositive(minimum, "minimum");
if (initial < minimum) {
throw new IllegalArgumentException("initial: " + initial);
}
if (maximum < initial) {
throw new IllegalArgumentException("maximum: " + maximum);
}
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum) {
this.minIndex = minIndex + 1;
} else {
this.minIndex = minIndex;
}
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum) {
this.maxIndex = maxIndex - 1;
} else {
this.maxIndex = maxIndex;
}
this.initial = initial;
}
@SuppressWarnings("deprecation")
@Override
public Handle newHandle() {
return new HandleImpl(minIndex, maxIndex, initial);
}
@Override
public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
super.respectMaybeMoreData(respectMaybeMoreData);
return this;
}
}
AdaptiveRecvByteBufAllocator根据名字就能看出来,它是一个自适应的数据接收缓冲区分配器,这个「自适应」体现在ByteBuf空间的分配上。
它使用三个属性分别定义ByteBuf分配的默认值、最小值、最大值,使用一个int数组SIZE_TABLE
来定义扩/缩容的容量表,512字节内,以16字节为步长扩容,512字节后,成倍扩容。lastBytesRead()
会记录下每次循环读取的实际字节数,如果读取的字节填满了ByteBuf,则会调用record(bytes)
进行扩容,扩容策略为:扩容索引+4,即16倍扩容,在不超过最大值的前提下,默认的扩容策略如下:
2048 > 32768 > 65536 > 65536(不变...)
2k > 32K > 64K ...
如果连续两次读取的字节数小于等于前一个缩容容量,则会进行缩容,缩容的策略是容量索引位-1,即512字节后,成倍缩容,512字节前,缩小16字节。
由于需要连续两次读取的字节数少才会缩容,所以加了一个属性decreaseNow
来记录是否需要立即缩容,第一次触发它为true,第二次才缩容。
为什么需要连续两次呢?因为一次读取的字节数少可能是因为读到了上一个数据包的末尾,数据包本身还是很大的,所以不能缩容。连续两次才能说明一个完整的数据包很小,下次分配的ByteBuf可以小些以节省内存。
循环结束后,会调用allocHandle.readComplete()
,它会根据此次读取的总字节数去做动态调整,为下次分配ByteBuf提供预测。 例如,本次处理OP_READ
事件,循环读3次,每次读取了100KB,那么下次就会直接分配一个300KB的ByteBuf,争取一次性读完。相反,如果读取的字节数少,就缩容节省内存。
总结
Channel在接收对端数据时,因为不知道该分配多大的ByteBuf来接收,所以会将ByteBuf的分配任务交给RecvByteBufAllocator,期望它能分配一个容量大到可以足够容纳数据,又小到不会浪费太多内存的ByteBuf,默认的实现是AdaptiveRecvByteBufAllocator,它会根据前面实际读取的字节数,自适应的调整下次分配的ByteBuf大小。
虽然AdaptiveRecvByteBufAllocator会尽量去预测下次分配ByteBuf的大小,但是预测会有不准的时候,因此Channel还是会进行循环读,防止ByteBuf分配的过小无法容纳所有数据。但是为了避免IO线程阻塞,其他Channel的事件得不到处理,默认会限制单次最多循环读16次,如果发送的数据包真的非常大,16次都没有读完,Netty本次也会放弃处理,等待下次select()
轮询时再处理。
最后,再提醒一句,不要错误的理解为:channelRead()
的触发是因为TCP拆包导致的!!!