PoolChunk的newChunk
继续上篇的内存,如果没有分配成功,就创建一个新的块Chunk
,这里就涉及到树的操作了,可能比较难理解,不会慢慢分析,以及画图来解释的。
@Override
protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<byte[]>(this, newByteArray(chunkSize), pageSize, maxOrder, pageShifts, chunkSize, 0);
}
PoolChunk的一些属性
因为涉及到的属性比较多,所以还是先列出来,回头可以忘记了可以看:
private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;//31
final PoolArena<T> arena;//所在的arena区域
final T memory;//真正分配的内存,如果是堆内的话就是字节数组,否则就是直接缓冲区DirectByteBuffer,这个是真正操作分配的内存,其他的一些都是逻辑上分配内存
final boolean unpooled;//是否要进行池化
final int offset;//缓存行偏移,默认0
private final byte[] memoryMap;//内存映射深度字节数组
private final byte[] depthMap;//深度映射字节数组,这个数组不变,作为对照计算的
private final PoolSubpage<T>[] subpages;//子页数组,也是满二叉树的叶子节点数组
/** Used to determine if the requested capacity is equal to or greater than pageSize. */
private final int subpageOverflowMask;//跟前面讲过的一样,这个用来判断是否小于页大小
private final int pageSize;//页大小 8k
private final int pageShifts;//页位移,也就是pageSize=1<<<pageShifts,8k就是13,即2的13次方是8k
private final int maxOrder;//最大深度索引,默认11 从0开始的
private final int chunkSize;//块大小,默认16m
private final int log2ChunkSize;//ChunkSize取log2的值 24
private final int maxSubpageAllocs;//最大子叶数,跟最大深度有关,最大深度上的叶子结点个数就是子页数
/** Used to mark memory as unusable */
private final byte unusable;//是否无法使用,最大深度索引+1,默认是12,表示不可用
private final Deque<ByteBuffer> cachedNioBuffers;//池化用
private int freeBytes;//可分配的字节数,默认是16m
PoolChunkList<T> parent;//所在的块列表
PoolChunk<T> prev;//前驱
PoolChunk<T> next;//后继
PoolChunk构造方法
主要还是这个构造方法,此时memory
因为是堆内存,所以传入的是个字节数组,如果是直接内存,则是直接缓冲区。
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
unpooled = false;//池化
this.arena = arena;
this.memory = memory;
this.pageSize = pageSize;//8k
this.pageShifts = pageShifts;//13
this.maxOrder = maxOrder;//11
this.chunkSize = chunkSize;//16m
this.offset = offset;//0
unusable = (byte) (maxOrder + 1);//最大深度索引+1,表示不可用 默认是11+1=12
log2ChunkSize = log2(chunkSize);//chunkSize是2的多少次数 24次
subpageOverflowMask = ~(pageSize - 1);//比大小的掩码
freeBytes = chunkSize;//初始可分配就是块大小16m
//最大深度应该小于30
assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
maxSubpageAllocs = 1 << maxOrder;//可分配子页的个数 2的11次方=2048,是最大深度索引为maxOrder的二叉树的个数,也就是满二叉树的叶子节点
// 满二叉树的数组 4095个 总共有12层 根据等比公式 结果为4095个
memoryMap = new byte[maxSubpageAllocs << 1];//叶子结点的两倍
depthMap = new byte[memoryMap.length];//参照深度,固定的
int memoryMapIndex = 1;//内存映射索引从1开始 到4095 总共4095个 第0个索引不用
for (int d = 0; d <= maxOrder; ++ d) { //深度索引从0开始到maxOrder
int depth = 1 << d;//深度为d的层上有depth个结点,depth是某一深度索引d的结点个数
for (int p = 0; p < depth; ++ p) {//从左到右,从上到下,进行编号,从1开始,并且设置深度索引d
memoryMap[memoryMapIndex] = (byte) d;//设置深度索引
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;//内存映射索引+1
}
}
//分配子页个数
subpages = newSubpageArray(maxSubpageAllocs);
cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);//创建性能比较好的队列
}
这里要注意memoryMap
和depthMap
两个数组,其实对应的是一棵满二叉树,把块大小16m
,分成了4095
个节点,最深层的叶子结点的尺寸就是一个页大小8k
,因此,最大深度索引maxOrder=11
的地方,根据满二叉树的特征,有2
的11
次方个节点,即2048
。而16m
除以8k
也刚好是2048
,刚好对应上,至于上面的父节点,都是作为管理的,可以标记叶子节点是否已经被分配了,而且二叉树查找的性能一般来说比直接数组查找要高,刚好也能体现伙伴算法的思想,而且查找的时候还用了深度优先,因为主要是要看叶子节点是否能分配,所以当然要上到下去判断啦,说了那么多,我们来看下这个数组怎么就是一棵树:
memoryMap
和depthMap
刚开始就是这样初始化的,可以看到对应数组索引号是从1
开始的,就是结点里面的编号,深度索引就是结点右边框里的数字,一棵深度为12
的满二叉树刚好是4095
个节点,从0
开始,最大深度索引为11
。
newSubpageArray创建化子页数组
叶子节点的数组2048
个,每个容量是8k
。
private PoolSubpage<T>[] newSubpageArray(int size) {
return new PoolSubpage[size];
}
allocate分配空间
这里会判申请的规范容量是否大于等于一页的容量,使得话就用allocateRun
分配,否则就allocateSubpage
,之后会返回一个handle
句柄,这个东西是一个64
位的标记,记载着很多内存地址信息,如果是allocateRun
返回的就是内存映射索引,如果是allocateSubpage
返回的就是一个包含内存映射索引和子页信息的值, 接下来马上我会讲他到底做什么用的。获取后先从缓存里获取,如果没有就进行initBuf
初始化。
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
final long handle;//一个唯一的值,根据叶子节点id,位图索引生成
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize 大于页大小
handle = allocateRun(normCapacity);//run来分配
} else {
handle = allocateSubpage(normCapacity);//子页来分 <pageSize
}
if (handle < 0) {
return false;
}
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;//从缓存的NIO缓冲区队列里取
initBuf(buf, nioBuffer, handle, reqCapacity);
return true;
}
allocateRun大于页大小的分配,即Normal类型
此时的normCapacity
都是2
的倍数了。
private long allocateRun(int normCapacity) {
int d = maxOrder - (log2(normCapacity) - pageShifts);//这里的容量都是pageSize及以上的,log2(normCapacity) - pageShifts 表示容量是页大小的2的多少倍,最大深度索引再减去这个,刚好是定位到页大小倍数的深度索引
int id = allocateNode(d);
if (id < 0) {
return id;
}
freeBytes -= runLength(id);//减去分配的大小
return id;
}
log2高效的取出2的次数
private static int log2(int val) {
// compute the (0-based, with lsb = 0) position of highest set bit i.e, log2
return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);//用这种位运算代替直接取log,提高性能
}
首先是用Integer.numberOfLeadingZeros(val)
这个是通过位运算取出补码表示的最高非0
位的前面还有多少个0
,比如16=00010000
最高非0
位是1,前面还有3
个0
,如果是8
位二进制的话,就是3,如果是32
位的话,还得加上前面24
个0
,也就是27
个0
,然后INTEGER_SIZE_MINUS_ONE =32-1=31
,最后值就为4
,也就是16
是2
的4
次方,没问题。这个直接用位运算效率会高点。
深度d到底怎么算的
int d = maxOrder - (log2(normCapacity) - pageShifts)
首先我们看下(log2(normCapacity) - pageShifts)
,因为在这里的normCapacity
至少是8k
,所以取了log2后至少是13
,如果刚好是13
,那么这个结果就是0
,d=maxOrder=11
。意思就是说定位要了深度索引11
,也就是最大深度索引,上面是2048
个子页,大小也是8k
。如果是16k
呢,那结果就是11-(14-13)=10
,深度索引是10
。也就是内存映射为1024
的地方开始。至于从这深度索引的地方开始做什么,马上就会讲,知道这个深度索引怎么来的就好了。这个方法确实效率很高,不用从头遍历来确定深度索引,直接用位运算来确定从哪个深度索引。
allocateNode
这个就是取出内存映射索引的关键了,也是核心的分配算法:
private int allocateNode(int d) {
int id = 1;//从内存映射索引为1的开始 也就是深度索引为0开始
int initial = - (1 << d); // 用于比较id所在深度索引是否小于d
byte val = value(id);//获取内存映射的深度值
if (val > d) { // unusable 大于此深度索引就不可用了
return -1;
}//从头开始深度优先,遍历完所有深度索引小于d的可用的子结点,只有到id的深度索引是d的时候才会结束,而且是遍历一次都是深度索引+1,即是深度优先的算法,先找出对应的深度,然后从左到右看是否有内存可分配。
while (val < d || (id & initial) == 0) {
id <<= 1;//得到下一深度索引的左节点
val = value(id);//获取对应深度索引值
if (val > d) {//如果大于深度索引 即左节点不能用了
id ^= 1;//异或位运算,获取右结点
val = value(id);//再取出对应深度索引值
}
}
byte value = value(id);
//获取深度索引值,这里的value>=d 下面还要断言,如果是=d才是可以用的,>d即被设置了unusable,表示不可用了
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);//断言id保存的深度索引值为d 且id所在深度索引为d,否则就会输出错误信息
setValue(id, unusable); // mark as unusable 设置id深度索引值,为最大深度索引+1,即不可用了
updateParentsAlloc(id);//更新父节点值
return id;//返回内存映射索引
}
setValue
如果找到了可以分配内存的,则把内存映射的深度值改了,改成了不可用unusable
,表示已经分配。
private void setValue(int id, byte val) {
memoryMap[id] = val;
}
updateParentsAlloc
同时要更新父节点,一直到根节点,改变是否可用的状态,如果仅仅一个子节点分配了,那就父节点的深度值就改成子节点里最小的那个,子节点深度比父节点大1
,所以其实也就是深度值+1
,如果两个都被用了,那就改成unusable
。
private void updateParentsAlloc(int id) {
while (id > 1) {//从id开始直到跟节点
int parentId = id >>> 1;//获取父节点
byte val1 = value(id);//获id节点的深度索引值
byte val2 = value(id ^ 1);//获取另一个节点的深度索引值,即是左节点就获取右节点,是右节点就获取左节点
byte val = val1 < val2 ? val1 : val2;//取最小的
setValue(parentId, val);//设置父节点的深度索引值为子节点最小的那一个
id = parentId;//继续遍历父节点
}
}
整个分配的过程我画个图演示下吧,比如我就简单点申请8192
,d
算出来刚好是11
,于是从内存映射索引1
开始找,最后找到d=11
的2048
( 深度2的那层序号写错了,应该从4开始的 ):
然后把他的深度设置成12
,并且更新父节点:
看下debug
,设置了值,还没更新父节点:
跟你更新父节点后,内存映射索引2048
不可用了,父节点递归上去都取了可用子节点的深度值:
看到了吧,父节点的改变,就是我前面说的,其实简单的规则就是 有两个子节点可用,深度值不变,如果只有一个可用,深度值变成另一个的,或者说是深度值+1,如果两个都不可用,深度就是最大深度+1
。
我再弄个两个都不可用的看看父类的情况:
这次2048
和2049
都被用了,然后父节点1024
因为子节点都不可用也被设置不可用了,不过上面的父节点还是可用的,只是深度值+1
。
runLength
计算内存映射索引所在深度能分配的内存大小 即内存映射索引为id
节点的可分配内存的尺寸 叶子节点默认是一个页的大小8192
,父类尺寸就x2
,直到根节点,即是一个块ChunkSize
的大小16m ,比如深度值是0
,就是根节点,直接就16m
,如果深度值是11
,就是1<<24-11=8k
刚好是页大小。说白了,就是你这个id
的节点可以分配多少内存。比如2048-4095
对应8k
,1024-2047
对应16k
,512-1023
对应32k
等,
//初始化的内存映射的深度值
private byte depth(int id) {
return depthMap[id];
}
private int runLength(int id) {
// log2ChunkSize =24
return 1 << log2ChunkSize - depth(id); //深度值越小,说明能分配的越多,越大能分配的越少
}
这里讲了大于页大小的分配allocateRun
,接下去讲小于的分配方式allocateSubpage
,里面也有很多细节的东西。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。