PoolArena构造函数
上篇讲到在PooledByteBufAllocator
最终的构造函数中,有创建PoolArena
:
我们先看看他的一些变量,后面会用到,所以这里先提一下,用到的时候忘记是干嘛的来看下就好了,我基本都注释了是干嘛的,默认值都是根据默认参数计算出来的:
//大小类型 还有超过chunkSize的,huge
enum SizeClass {
Tiny,//[16-512)
Small,//[512 - pageSize)
Normal//[pageSize - chunkSize]
}
//TinySubpage个数32 tiny是512以下的,512除以16 为32, 第1个是个头结点 后面31个才是有用的
static final int numTinySubpagePools = 512 >>> 4;
private final int maxOrder;//二叉树的最大深度 默认11
final int pageSize;//页大小 默认8K
final int pageShifts;//1左移多少位得到pageSize,页大小是8k = 1 << 13 所以默认13位
final int chunkSize;//块大小 默认16m
final int subpageOverflowMask;//用来判断是否小于一个页大小 即小于8k
final int numSmallSubpagePools;//small类型的子页数组的个数
final int directMemoryCacheAlignment;//对齐的缓存尺寸比如32 64
final int directMemoryCacheAlignmentMask;//对齐遮罩,求余数用,2的幂次可以用
private final PoolSubpage<T>[] tinySubpagePools;//tiny类型子页数组 默认32个
private final PoolSubpage<T>[] smallSubpagePools;//small类型子页数组 默认4个
//根据块内存使用率状态分的数组,因为百分比是正数,所以就直接取整了
private final PoolChunkList<T> q050;//50-100
private final PoolChunkList<T> q025;//25-75
private final PoolChunkList<T> q000;//1-50
private final PoolChunkList<T> qInit;//0-25
private final PoolChunkList<T> q075;//75-100
private final PoolChunkList<T> q100;//100-100
//块列表的一些指标度量
private final List<PoolChunkListMetric> chunkListMetrics;
// We need to use the LongCounter here as this is not guarded via synchronized block.//通过原子操作记录大小
private final LongCounter allocationsTiny = PlatformDependent.newLongCounter();//Tiny的分配个数
private final LongCounter allocationsSmall = PlatformDependent.newLongCounter();//Small的分配个数
private final LongCounter allocationsHuge = PlatformDependent.newLongCounter();//huge的分配个数
private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter();//huge的字节大小
其实是这个构造方法:
protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;
subpageOverflowMask = ~(pageSize - 1);//-8192
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);//创建tiny子页数组
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);//创建每个tiny子页链表的头结点
}
//剩余Small子页的个数 也就是[512-pageSize)范围内的尺寸类型的个数 pageSize=8192= 1<<<13 。512= 1<<<9 中间的尺寸类型个数是13-9=4
numSmallSubpagePools = pageShifts - 9;//13-9=4 4种尺寸其实就是512 1k 2k 4k
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);//创建每个small子页链表的头结点
}
//双向链表
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);//没有前一个列表,可以直接删除块
qInit.prevList(qInit);
List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);//使用率指标
metrics.add(qInit);
metrics.add(q000);
metrics.add(q025);
metrics.add(q050);
metrics.add(q075);
metrics.add(q100);
chunkListMetrics = Collections.unmodifiableList(metrics);
}
位运算细节1
下面量行看起来好像不知道干嘛的,其实是用来做位运算的,这里涉及的都是2的幂次,所以可以用这种方式来求。
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;
首先directMemoryCacheAlignment
这个就是缓存对齐的大小,比如64
字节,也就是说要对齐到64
,超过就要去下一个64
,而directMemoryCacheAlignmentMask
是用来获取对directMemoryCacheAlignment
取余的余数,所以应该是63
,二进制就是111111
,跟任何数做&
操作,都可以获取对64
的取余的余数,其实对应的就是这个方法:
int alignCapacity(int reqCapacity) {
int delta = reqCapacity & directMemoryCacheAlignmentMask;//取出余数
return delta == 0 ? reqCapacity : reqCapacity + directMemoryCacheAlignment - delta;//加上对齐标准,减去余数,就可以对齐了
}
举个例子,directMemoryCacheAlignment=64,directMemoryCacheAlignmentMask=63,reqCapacity =100
,那么delta =100-64=36
。当然如果用位与的话应该这样,我把一些高位0
删除了,只为了对齐做位与:
如果余数是0
,那reqCapacity
就是已经对齐了,直接返回,否则的话就要减去余数,加上对齐大小,也就是这样,余数0
:
余数36
,对齐后为128
:
位运算细节2
这个又是干嘛用的,又是一个位运算,好奇怪,其实这个跟上面那个的directMemoryCacheAlignmentMask
差不多,是个遮罩掩码,可以取出数的高位。
subpageOverflowMask = ~(pageSize - 1);
比如pageSize=8k
,二进制就是10000000000000
。pageSize - 1
的二进制就是01111111111111
,再取反就会发现低13
位全是0
,高位全是1
,刚好可以用来提取高位。任何一个高位是1
,值就是大于等于8k
(申请内存不可能是负数,前面会有检查,最高位不会是1
),其实主要是判断是否小于8k
,即一页大小,然后来用Tiny
或者Small
尺寸来处理,比如下面这个方法:
boolean isTinyOrSmall(int normCapacity) {
return (normCapacity & subpageOverflowMask) == 0;
}
只要位与为0
,就表示高位全是0
,那就是小于8k
,否则大于等于8k
,用来判断分配容量是否是Tiny
或者Small
。
newSubpagePoolArray
创建PoolSubpage
数组,其实PoolSubpage
是一个双向链表:
private PoolSubpage<T>[] newSubpagePoolArray(int size) {
return new PoolSubpage[size];
}
newSubpagePoolHead(int pageSize)
创建了一个只记录页大小的子页的头结点,头结点的前驱和后继都指向自己:
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
head.prev = head;
head.next = head;
return head;
}
按chunk的内存使用率进行分组
内部会按照chunk
的使用率分成6
组,每个组都是PoolChunkList
类型的数组,里面还维护着chunk
链表,每个链表有最大能申请的容量,有内存使用率的范围,然后PoolChunkList
也以链表的形式连接,只要chunk
的内存使用率发生变化,就会判断是否超出范围,超出会进行移动,具体后续会讲:
q050;//50-100
q025;//25-75
q000;//1-50
qInit;//0-25
q075;//75-100
q100;//100-100
来个示意图就是这样:
heapArena.allocate
接着我们上次讲到的heapArena.allocate
,看看里面做了什么:
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}
newByteBuf
@Override
protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
: PooledHeapByteBuf.newInstance(maxCapacity);
}
首先需要一个新的PooledByteBuf
,不管是不是unsafe
,都是从一个RECYCLER
的对象池里取得,然后返回:
static PooledUnsafeHeapByteBuf newUnsafeInstance(int maxCapacity) {
PooledUnsafeHeapByteBuf buf = RECYCLER.get();//池里获取
buf.reuse(maxCapacity);
return buf;
}
核心方法allocate
这里就是分配的重点啦,看我慢慢分析:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);//进行容量的规范化
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize 小于pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;//从缓存中可以拿到就返回
}
tableIdx = tinyIdx(normCapacity);//获取tiny数组下标
table = tinySubpagePools;//获取数组
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {//从缓存中获取
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];//获取头结点
/** 因为一个区域可能有多个线程操作,所以链表操作需要同步
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {//不是头结点就直接拿出来分配,头结点初始化的时候next和prea指向自己
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {//多线程共享的区域需要同步
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);//增加分配次数
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);//超过chunkSize的huge
}
}
normalizeCapacity规范化申请容量
这个就是规范化容量,因为请求的容量不一定是我们规定的尺寸,我们要根据不同的尺寸范围,对请求的容量进行规范化,比如我们最小单位是16
,如果你请求小于16
,那就会被规范化为16
。然后是按照大到小的顺序进行容量类型的判定,返回规范化后的容量。
如果申请容量大于等于chunkSize
,再看是否要对齐,然后直接返回了。
如果大于等于512
,就规范化到大于等于申请容量的规范类型,比如申请513
,规范化到1k
,申请1.5k
,规范化到2k
。
否则就是小于512
的,如果小于16
就补齐到16
,否则就规范到16
的倍数。
int normalizeCapacity(int reqCapacity) {
checkPositiveOrZero(reqCapacity, "reqCapacity");//检查非负
//大于等于块大小就判断是否要对齐 ,返回处理后的大小
if (reqCapacity >= chunkSize) {
return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);//对齐前面已经说过啦
}
//不是Tiny类型的 向上取到大于等于申请容量的规范类型 512 1k 2k 4k 这4个类型
if (!isTiny(reqCapacity)) { // >= 512
// Doubled
int normalizedCapacity = reqCapacity;
normalizedCapacity --;
normalizedCapacity |= normalizedCapacity >>> 1;
normalizedCapacity |= normalizedCapacity >>> 2;
normalizedCapacity |= normalizedCapacity >>> 4;
normalizedCapacity |= normalizedCapacity >>> 8;
normalizedCapacity |= normalizedCapacity >>> 16;
normalizedCapacity ++;
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1;
}
assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
return normalizedCapacity;
}
//对齐处理
if (directMemoryCacheAlignment > 0) {
return alignCapacity(reqCapacity);
}
// Quantum-spaced Tiny类型又是16的倍数
if ((reqCapacity & 15) == 0) {//位运算,取出余数,为0就表示是16的倍数
return reqCapacity;
}
//非16倍数的 向上取成16的倍数 比如要100,(reqCapacity & ~15)先减去余数4,然后+16,即变成了112,16的7倍
return (reqCapacity & ~15) + 16;
}
isTinyOrSmall申请容量是否小于页大小8k
就是前面说的取余掩码,等于0
说明小于8k
,否则大于8k
.
boolean isTinyOrSmall(int normCapacity) {
return (normCapacity & subpageOverflowMask) == 0;
}
isTiny申请容量是否小于512
如果是小于8k
的还要判断是Tiny
还是Small
类型,这个也是位运算,取了大于等于512的所有高位,看是否
是0
,是的话就说明小于512
,否则就大于等于512
。
static boolean isTiny(int normCapacity) {
return (normCapacity & 0xFFFFFE00) == 0;//因为normCapacity是正数,normCapacity最高位肯定是0,不用担心会是负数
}
tinyIdx获取tiny的索引
tiny
数组长度是32
,是512>>>4
,范围是16-496
,间隔16
,所以容量>>>4
可以获取对应索引,因为前面容量规范化过了,tiny
最小是16
,所以实际能获取对应的索引1-31
,索引0
给头结点了。
static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;//这里跟tiny数组大小为512>>>4=32 是一个道理,获取数组下标
}
allocateNormal
如果此时子页PoolSubpage
只有头结点的情况,就会进行allocateNormal
,负责申请内存:
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;//分配成功就返回
}
// Add a new chunk.不成功就增加一个块
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
boolean success = c.allocate(buf, reqCapacity, normCapacity);//分配空间
assert success;
qInit.add(c);//加入到初始块列表里
}
首先会根据顺序q050,q025,q000,qInit,q075
的顺序进行内存申请,之所以是这个顺序,我想可能是因为可以申请内存的使用率大吧,前3
个使用率都有50%
,而且都是相邻的,移动的时候也方便点,后两个是25%
,我们看看里面做了什么。
PoolChunkList的allocate
先判断有没有超过块列表中块的的最大容量,比如q050
,他里面的块的最大容量是16m
的50%
,也就是8m
,超过就返回,进行下一个块列表的申请。没超过就从头结点块开始,尝试申请内存,如果申请成功了,就判断块的使用率是否大于等于块列表的使用率,是的话就移动给下一个块列表,并从当前块列表中删除后返回,否则直接返回。具体申请和移动后面详细会讲。
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (normCapacity > maxCapacity) {//超过了最大分配容量
return false;
}
//遍历里面的块,块存在就尝试分配,块用满了就往下一个放
for (PoolChunk<T> cur = head; cur != null; cur = cur.next) {
if (cur.allocate(buf, reqCapacity, normCapacity)) {
if (cur.usage() >= maxUsage) {//使用率已经大于等于最大的了
remove(cur);//从当前块列表移除
nextList.add(cur);//放到下一个块列表里
}
return true;
}
}
return false;
}
篇幅有点长了,接下去newChunk
到后面讲吧,里面也有很多细节的东西。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。