RecvByteBufAllocator
首先在说这个之前,先来点预备知识,不然不好理解。我们要读数据,当然是从通道里读,那我们是不是应该有个放数据的地方啊,这个就是接受缓冲区,那这个放数据的地方要多大呢,太大了浪费,太小了又不够,可能要涉及扩容,性能不好,所以netty
设计了一个RecvByteBufAllocator
分配接口。我们来看下这个接口。
public interface RecvByteBufAllocator {
//处理器,也就是做一些统计的
Handle newHandle();
@Deprecated
interface Handle {
//分配缓冲区,实际是交给ByteBufAllocator的
ByteBuf allocate(ByteBufAllocator alloc);
//猜下次该用多大的接收缓冲区
int guess();
//重置一些统计参数
void reset(ChannelConfig config);
//统计读取的消息数
void incMessagesRead(int numMessages);
//设置上一次读取的字节数
void lastBytesRead(int bytes);
//获取上一次读取的字节数
int lastBytesRead();
//设置尝试读取的字节数
void attemptedBytesRead(int bytes);
//获取尝试读取字节数
int attemptedBytesRead();
//是否还能继续读
boolean continueReading();
//读取完成
void readComplete();
}
@SuppressWarnings("deprecation")
@UnstableApi
interface ExtendedHandle extends Handle {
boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier);
}
这个接口就是定义了一些统计的参数,是为了猜下一次可能读取的数据大小,这样就可以对接受缓冲区的大小进行调节。
MaxMessagesRecvByteBufAllocator
这个又加了两个方法,其实为了限制读的次数,默认配置读到16
个消息就不读了,不然可能事件长了就阻塞IO
线程去做别的事了。
public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator {
// 每次读循环,读多少个消息
int maxMessagesPerRead();
//设置最大消息数
MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead);
}
DefaultMaxMessagesRecvByteBufAllocator
这个是上面接口的抽象实现,还加了一个是否停止读的标记respectMaybeMoreData
,默认就是认为没有更多数据,不读了。
public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
private volatile int maxMessagesPerRead;//最多读多少个消息
private volatile boolean respectMaybeMoreData = true;//是否没有更多的数据,停止读了
public DefaultMaxMessagesRecvByteBufAllocator() {
this(1);//这里设置是只读1个消息
}
public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) {
maxMessagesPerRead(maxMessagesPerRead);
}
@Override
public int maxMessagesPerRead() {
return maxMessagesPerRead;
}
@Override
public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) {
checkPositive(maxMessagesPerRead, "maxMessagesPerRead");
this.maxMessagesPerRead = maxMessagesPerRead;
return this;
}
public DefaultMaxMessagesRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
this.respectMaybeMoreData = respectMaybeMoreData;
return this;
}
public final boolean respectMaybeMoreData() {
return respectMaybeMoreData;
}
}
MaxMessageHandle处理器
这个就是前面RecvByteBufAllocator
里处理器的抽象实现。里面有个一个布尔值判别器,主要是说如果把申请的接收缓冲区填满了,那就说明可能还要读,否则就是不读了,因为数据都填不满缓冲区。
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
private int maxMessagePerRead;//每次读的最大消息数
private int totalMessages;//总共读了多少次消息
private int totalBytesRead;//总共读的字节数
private int attemptedBytesRead;//尝试读的字节数
private int lastBytesRead;//上一次读的字节数
private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
//一个布尔值判别器
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return attemptedBytesRead == lastBytesRead;//是否把缓冲区内可写的空间全填满
}
};
/** 重置属性
* Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
*/
@Override
public void reset(ChannelConfig config) {
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
//分配缓冲区
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
//增加接受读消息的数量
@Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
//保存上一次读取的字节数
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;//记录上次读取的字节数
if (bytes > 0) {//先判断后加,0就不加了,将性能提高到极致啊
totalBytesRead += bytes;//统计总的字节数
}
}
//获取上一次读取的字节数
@Override
public final int lastBytesRead() {
return lastBytesRead;
}
//是否继续读
@Override
public boolean continueReading() {
return continueReading(defaultMaybeMoreSupplier);
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&//配置了自动读
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&//如果还有可读的,或者把缓冲区可写的全填满了
totalMessages < maxMessagePerRead &&//没超过最大读取消息数
totalBytesRead > 0;//已经有数据读取
}
@Override
public void readComplete() {
}
//尝试读取的尺寸,默认是缓冲区可写的尺寸
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
//总读的大小
protected final int totalBytesRead() {
return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
}
}
AdaptiveRecvByteBufAllocator
可调节的缓冲区分配器,这个是有限制的,默认最小尺寸64
,最大是65536
,初始是1024
,初始化时候会创建一个数组,里面有所有可分配的尺寸,从16
到2^30
。16-496
是间隔16
分的,512
以上是2
倍分的。然后会将最小最大初始尺寸都转化为索引,方便操作。
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
static final int DEFAULT_MINIMUM = 64;//最小
static final int DEFAULT_INITIAL = 1024;//初始
static final int DEFAULT_MAXIMUM = 65536;//最大
//增加索引+4 减少索引-1
private static final int INDEX_INCREMENT = 4;
private static final int INDEX_DECREMENT = 1;
private static final int[] SIZE_TABLE;//尺寸数组
//16-496间隔16 512到2的31次-1 间隔2倍
static {
List<Integer> sizeTable = new ArrayList<Integer>();
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
for (int i = 512; i > 0; i <<= 1) {
sizeTable.add(i);
}
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) {
SIZE_TABLE[i] = sizeTable.get(i);
}
}
@Deprecated
public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
//二分查找
private static int getSizeTableIndex(final int size) {
for (int low = 0, high = SIZE_TABLE.length - 1;;) {
if (high < low) {
return low;
}
if (high == low) {
return high;
}
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid];
int b = SIZE_TABLE[mid + 1];
if (size > b) {
low = mid + 1;
} else if (size < a) {
high = mid - 1;
} else if (size == a) {
return mid;
} else {
return mid + 1;//返回最近大于size的尺寸
}
}
}
private final int minIndex;//记录最小尺寸索引
private final int maxIndex;//记录最大尺寸索引
private final int initial;//记录初始尺寸索引
public AdaptiveRecvByteBufAllocator() {
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
checkPositive(minimum, "minimum");
if (initial < minimum) {
throw new IllegalArgumentException("initial: " + initial);
}
if (maximum < initial) {
throw new IllegalArgumentException("maximum: " + maximum);
}
//设置最小索引
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum) {
this.minIndex = minIndex + 1;
} else {
this.minIndex = minIndex;
}
//设置最大索引
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum) {
this.maxIndex = maxIndex - 1;
} else {
this.maxIndex = maxIndex;
}
this.initial = initial;
}
@SuppressWarnings("deprecation")
@Override
public Handle newHandle() {
return new HandleImpl(minIndex, maxIndex, initial);
}
@Override
public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
super.respectMaybeMoreData(respectMaybeMoreData);
return this;
}
}
HandleImpl
这个是真正进行尺寸伸缩的处理器。每次设置上一次读取的字节数时,会判断真实读取的字节数是不是把分配的缓冲区给填满了,如果满了就要进行缓冲区尺寸的伸缩。伸缩算法就是如果真实读取的字节数小于等于当前尺寸的前一个尺寸大小,且要连续两次,那就会把空间缩小成前一个尺寸大小。如果真实读取字节数大于等于预测的接收缓冲区大小,那就扩容,每次扩容是当前尺寸的后4
个尺寸大小,但是不超过最大尺寸。
举个例子16,32,48,64,80,96
.6
个尺寸,开始是在32
,如果两次发现真实的读取数据都小于等于16
,那就设置成16
,如果发现数据大于等于32
,就跳4
个位置,就是96
。为什么要扩容的时候跳那么多呢,我想可能是 因为扩容太小的话会可能会有多次扩容,多次申请直接缓冲区,直接缓冲区的创建和释放是有性能消耗的。
private final class HandleImpl extends MaxMessageHandle {
private final int minIndex;
private final int maxIndex;
private int index;//当前在尺寸表的索引
private int nextReceiveBufferSize;//下一次接受缓冲区大小
private boolean decreaseNow;
HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
index = getSizeTableIndex(initial);
nextReceiveBufferSize = SIZE_TABLE[index];
}
//设置上次读取的字节数
@Override
public void lastBytesRead(int bytes) {
if (bytes == attemptedBytesRead()) {//如果真实读取的字节数等于读取尝试的字节数,也就是将接受缓冲区的可写位置全部填满了
record(bytes);//要进行尺寸伸缩了
}
super.lastBytesRead(bytes);
}
//猜测下一个接受缓冲区的大小
@Override
public int guess() {
return nextReceiveBufferSize;
}
//记录,根据这次真实接受的尺寸,进行下一次接受缓冲区的大小伸缩
private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {//连续两次小,才会进行缩减
if (decreaseNow) {//减少,索引-1,不小于最小索引
index = max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {//扩大,索引+4,不超过最大索引
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
//读取完成
@Override
public void readComplete() {
record(totalBytesRead());
}
}
看看分配大内存的复用情况吧,这样就可以减少内存不足老去对外申请内存的情况,减少性能消耗,读取一次:
再次读取,又分配了同块内存:
读取完成:
好像有点长了,下一篇来讲讲这个在哪里用。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。