2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104336276

PoolArena构造函数

上篇讲到在PooledByteBufAllocator最终的构造函数中,有创建PoolArena

202309132201331311.png
我们先看看他的一些变量,后面会用到,所以这里先提一下,用到的时候忘记是干嘛的来看下就好了,我基本都注释了是干嘛的,默认值都是根据默认参数计算出来的:

    //大小类型 还有超过chunkSize的,huge
        enum SizeClass {
            Tiny,//[16-512)
            Small,//[512 - pageSize)
            Normal//[pageSize - chunkSize]
        }
    //TinySubpage个数32 tiny是512以下的,512除以16 为32, 第1个是个头结点 后面31个才是有用的
        static final int numTinySubpagePools = 512 >>> 4;
    
      	private final int maxOrder;//二叉树的最大深度 默认11
        final int pageSize;//页大小 默认8K
        final int pageShifts;//1左移多少位得到pageSize,页大小是8k = 1 << 13 所以默认13位
        final int chunkSize;//块大小 默认16m
        final int subpageOverflowMask;//用来判断是否小于一个页大小 即小于8k
        final int numSmallSubpagePools;//small类型的子页数组的个数
        final int directMemoryCacheAlignment;//对齐的缓存尺寸比如32 64
        final int directMemoryCacheAlignmentMask;//对齐遮罩,求余数用,2的幂次可以用
        private final PoolSubpage<T>[] tinySubpagePools;//tiny类型子页数组 默认32个
        private final PoolSubpage<T>[] smallSubpagePools;//small类型子页数组 默认4个
    //根据块内存使用率状态分的数组,因为百分比是正数,所以就直接取整了
        private final PoolChunkList<T> q050;//50-100
        private final PoolChunkList<T> q025;//25-75
        private final PoolChunkList<T> q000;//1-50
        private final PoolChunkList<T> qInit;//0-25
        private final PoolChunkList<T> q075;//75-100
        private final PoolChunkList<T> q100;//100-100
    //块列表的一些指标度量
        private final List<PoolChunkListMetric> chunkListMetrics;
    
    
        // We need to use the LongCounter here as this is not guarded via synchronized block.//通过原子操作记录大小
        private final LongCounter allocationsTiny = PlatformDependent.newLongCounter();//Tiny的分配个数
        private final LongCounter allocationsSmall = PlatformDependent.newLongCounter();//Small的分配个数
        private final LongCounter allocationsHuge = PlatformDependent.newLongCounter();//huge的分配个数
        private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter();//huge的字节大小

其实是这个构造方法:

     protected PoolArena(PooledByteBufAllocator parent, int pageSize,
              int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
            this.parent = parent;
            this.pageSize = pageSize;
            this.maxOrder = maxOrder;
            this.pageShifts = pageShifts;
            this.chunkSize = chunkSize;
            directMemoryCacheAlignment = cacheAlignment;
            directMemoryCacheAlignmentMask = cacheAlignment - 1;
            subpageOverflowMask = ~(pageSize - 1);//-8192
            tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);//创建tiny子页数组
            for (int i = 0; i < tinySubpagePools.length; i ++) {
                tinySubpagePools[i] = newSubpagePoolHead(pageSize);//创建每个tiny子页链表的头结点
            }
            //剩余Small子页的个数 也就是[512-pageSize)范围内的尺寸类型的个数 pageSize=8192= 1<<<13 。512= 1<<<9 中间的尺寸类型个数是13-9=4
            numSmallSubpagePools = pageShifts - 9;//13-9=4  4种尺寸其实就是512 1k 2k 4k
            smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
            for (int i = 0; i < smallSubpagePools.length; i ++) {
                smallSubpagePools[i] = newSubpagePoolHead(pageSize);//创建每个small子页链表的头结点
            }
            //双向链表
            q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
            q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
            q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
            q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
            q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
            qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
    
            q100.prevList(q075);
            q075.prevList(q050);
            q050.prevList(q025);
            q025.prevList(q000);
            q000.prevList(null);//没有前一个列表,可以直接删除块
            qInit.prevList(qInit);
    
            List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);//使用率指标
            metrics.add(qInit);
            metrics.add(q000);
            metrics.add(q025);
            metrics.add(q050);
            metrics.add(q075);
            metrics.add(q100);
            chunkListMetrics = Collections.unmodifiableList(metrics);
        }

位运算细节1

下面量行看起来好像不知道干嘛的,其实是用来做位运算的,这里涉及的都是2的幂次,所以可以用这种方式来求。

    directMemoryCacheAlignment = cacheAlignment;
    directMemoryCacheAlignmentMask = cacheAlignment - 1;

首先directMemoryCacheAlignment 这个就是缓存对齐的大小,比如64字节,也就是说要对齐到64,超过就要去下一个64,而directMemoryCacheAlignmentMask是用来获取对directMemoryCacheAlignment取余的余数,所以应该是63,二进制就是111111,跟任何数做&操作,都可以获取对64的取余的余数,其实对应的就是这个方法:

        int alignCapacity(int reqCapacity) {
            int delta = reqCapacity & directMemoryCacheAlignmentMask;//取出余数
            return delta == 0 ? reqCapacity : reqCapacity + directMemoryCacheAlignment - delta;//加上对齐标准,减去余数,就可以对齐了
        }

举个例子,directMemoryCacheAlignment=64,directMemoryCacheAlignmentMask=63,reqCapacity =100,那么delta =100-64=36。当然如果用位与的话应该这样,我把一些高位0删除了,只为了对齐做位与:

202309132201341952.png
如果余数是0,那reqCapacity就是已经对齐了,直接返回,否则的话就要减去余数,加上对齐大小,也就是这样,余数0

202309132201356633.png
余数36,对齐后为128

202309132201363314.png

位运算细节2

这个又是干嘛用的,又是一个位运算,好奇怪,其实这个跟上面那个的directMemoryCacheAlignmentMask差不多,是个遮罩掩码,可以取出数的高位。

    subpageOverflowMask = ~(pageSize - 1);

比如pageSize=8k,二进制就是10000000000000pageSize - 1的二进制就是01111111111111,再取反就会发现低13位全是0,高位全是1,刚好可以用来提取高位。任何一个高位是1,值就是大于等于8k(申请内存不可能是负数,前面会有检查,最高位不会是1),其实主要是判断是否小于8k,即一页大小,然后来用Tiny或者Small尺寸来处理,比如下面这个方法:

        boolean isTinyOrSmall(int normCapacity) {
            return (normCapacity & subpageOverflowMask) == 0;
        }

只要位与为0,就表示高位全是0,那就是小于8k,否则大于等于8k,用来判断分配容量是否是Tiny或者Small

newSubpagePoolArray

创建PoolSubpage数组,其实PoolSubpage是一个双向链表:

        private PoolSubpage<T>[] newSubpagePoolArray(int size) {
            return new PoolSubpage[size];
        }

newSubpagePoolHead(int pageSize)

创建了一个只记录页大小的子页的头结点,头结点的前驱和后继都指向自己:

        private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
            PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
            head.prev = head;
            head.next = head;
            return head;
        }

按chunk的内存使用率进行分组

内部会按照chunk的使用率分成6组,每个组都是PoolChunkList类型的数组,里面还维护着chunk链表,每个链表有最大能申请的容量,有内存使用率的范围,然后PoolChunkList也以链表的形式连接,只要chunk的内存使用率发生变化,就会判断是否超出范围,超出会进行移动,具体后续会讲:

     q050;//50-100
     q025;//25-75
     q000;//1-50
     qInit;//0-25
     q075;//75-100
     q100;//100-100

来个示意图就是这样:

202309132201371935.png

heapArena.allocate

接着我们上次讲到的heapArena.allocate,看看里面做了什么:

        PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
            PooledByteBuf<T> buf = newByteBuf(maxCapacity);
            allocate(cache, buf, reqCapacity);
            return buf;
        }

newByteBuf

         @Override
            protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
                return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
                        : PooledHeapByteBuf.newInstance(maxCapacity);
            }

首先需要一个新的PooledByteBuf,不管是不是unsafe,都是从一个RECYCLER的对象池里取得,然后返回:

        static PooledUnsafeHeapByteBuf newUnsafeInstance(int maxCapacity) {
            PooledUnsafeHeapByteBuf buf = RECYCLER.get();//池里获取
            buf.reuse(maxCapacity);
            return buf;
        }

核心方法allocate

这里就是分配的重点啦,看我慢慢分析:

     private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
            final int normCapacity = normalizeCapacity(reqCapacity);//进行容量的规范化
            if (isTinyOrSmall(normCapacity)) { // capacity < pageSize 小于pageSize
                int tableIdx;
                PoolSubpage<T>[] table;
                boolean tiny = isTiny(normCapacity);
                if (tiny) { // < 512
                    if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                        return;//从缓存中可以拿到就返回
                    }
                    tableIdx = tinyIdx(normCapacity);//获取tiny数组下标
                    table = tinySubpagePools;//获取数组
                } else {
                    if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {//从缓存中获取
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = smallIdx(normCapacity);
                    table = smallSubpagePools;
                }
    
                final PoolSubpage<T> head = table[tableIdx];//获取头结点
    
                /** 因为一个区域可能有多个线程操作,所以链表操作需要同步
                 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
                 * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
                 */
                synchronized (head) {
                    final PoolSubpage<T> s = head.next;
                    if (s != head) {//不是头结点就直接拿出来分配,头结点初始化的时候next和prea指向自己
                        assert s.doNotDestroy && s.elemSize == normCapacity;
                        long handle = s.allocate();
                        assert handle >= 0;
                        s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                        incTinySmallAllocation(tiny);
                        return;
                    }
                }
                synchronized (this) {//多线程共享的区域需要同步
                    allocateNormal(buf, reqCapacity, normCapacity);
                }
    
                incTinySmallAllocation(tiny);//增加分配次数
                return;
            }
            if (normCapacity <= chunkSize) {
                if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
                    ++allocationsNormal;
                }
            } else {
                // Huge allocations are never served via the cache so just call allocateHuge
                allocateHuge(buf, reqCapacity);//超过chunkSize的huge
            }
        }

normalizeCapacity规范化申请容量

这个就是规范化容量,因为请求的容量不一定是我们规定的尺寸,我们要根据不同的尺寸范围,对请求的容量进行规范化,比如我们最小单位是16,如果你请求小于16,那就会被规范化为16。然后是按照大到小的顺序进行容量类型的判定,返回规范化后的容量。

如果申请容量大于等于chunkSize,再看是否要对齐,然后直接返回了。
如果大于等于512,就规范化到大于等于申请容量的规范类型,比如申请513,规范化到1k,申请1.5k,规范化到2k
否则就是小于512的,如果小于16就补齐到16,否则就规范到16的倍数。

    int normalizeCapacity(int reqCapacity) {
            checkPositiveOrZero(reqCapacity, "reqCapacity");//检查非负
            //大于等于块大小就判断是否要对齐 ,返回处理后的大小
            if (reqCapacity >= chunkSize) {
                return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);//对齐前面已经说过啦
            }
            //不是Tiny类型的 向上取到大于等于申请容量的规范类型 512 1k 2k 4k 这4个类型
            if (!isTiny(reqCapacity)) { // >= 512
                // Doubled
    
                int normalizedCapacity = reqCapacity;
                normalizedCapacity --;
                normalizedCapacity |= normalizedCapacity >>>  1;
                normalizedCapacity |= normalizedCapacity >>>  2;
                normalizedCapacity |= normalizedCapacity >>>  4;
                normalizedCapacity |= normalizedCapacity >>>  8;
                normalizedCapacity |= normalizedCapacity >>> 16;
                normalizedCapacity ++;
    
                if (normalizedCapacity < 0) {
                    normalizedCapacity >>>= 1;
                }
                assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
    
                return normalizedCapacity;
            }
    //对齐处理
            if (directMemoryCacheAlignment > 0) {
                return alignCapacity(reqCapacity);
            }
    
            // Quantum-spaced Tiny类型又是16的倍数
            if ((reqCapacity & 15) == 0) {//位运算,取出余数,为0就表示是16的倍数
                return reqCapacity;
            }
            //非16倍数的 向上取成16的倍数 比如要100,(reqCapacity & ~15)先减去余数4,然后+16,即变成了112,16的7倍
            return (reqCapacity & ~15) + 16;
        }

isTinyOrSmall申请容量是否小于页大小8k

就是前面说的取余掩码,等于0说明小于8k,否则大于8k.

        boolean isTinyOrSmall(int normCapacity) {
            return (normCapacity & subpageOverflowMask) == 0;
        }

isTiny申请容量是否小于512

如果是小于8k的还要判断是Tiny还是Small类型,这个也是位运算,取了大于等于512的所有高位,看是否
0,是的话就说明小于512,否则就大于等于512

        static boolean isTiny(int normCapacity) {
            return (normCapacity & 0xFFFFFE00) == 0;//因为normCapacity是正数,normCapacity最高位肯定是0,不用担心会是负数
        }

tinyIdx获取tiny的索引

tiny数组长度是32,是512>>>4,范围是16-496,间隔16,所以容量>>>4可以获取对应索引,因为前面容量规范化过了,tiny最小是16,所以实际能获取对应的索引1-31,索引0给头结点了。

        static int tinyIdx(int normCapacity) {
            return normCapacity >>> 4;//这里跟tiny数组大小为512>>>4=32 是一个道理,获取数组下标
        }

allocateNormal

如果此时子页PoolSubpage只有头结点的情况,就会进行allocateNormal,负责申请内存:

     private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
            if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
                q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
                q075.allocate(buf, reqCapacity, normCapacity)) {
                return;//分配成功就返回
            }
    
            // Add a new chunk.不成功就增加一个块
            PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
            boolean success = c.allocate(buf, reqCapacity, normCapacity);//分配空间
            assert success;
            qInit.add(c);//加入到初始块列表里
        }

首先会根据顺序q050,q025,q000,qInit,q075的顺序进行内存申请,之所以是这个顺序,我想可能是因为可以申请内存的使用率大吧,前3个使用率都有50%,而且都是相邻的,移动的时候也方便点,后两个是25%,我们看看里面做了什么。

PoolChunkList的allocate

先判断有没有超过块列表中块的的最大容量,比如q050,他里面的块的最大容量是16m50%,也就是8m,超过就返回,进行下一个块列表的申请。没超过就从头结点块开始,尝试申请内存,如果申请成功了,就判断块的使用率是否大于等于块列表的使用率,是的话就移动给下一个块列表,并从当前块列表中删除后返回,否则直接返回。具体申请和移动后面详细会讲。

    boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
            if (normCapacity > maxCapacity) {//超过了最大分配容量
                return false;
            }
    //遍历里面的块,块存在就尝试分配,块用满了就往下一个放
            for (PoolChunk<T> cur = head; cur != null; cur = cur.next) {
                if (cur.allocate(buf, reqCapacity, normCapacity)) {
                    if (cur.usage() >= maxUsage) {//使用率已经大于等于最大的了
                        remove(cur);//从当前块列表移除
                        nextList.add(cur);//放到下一个块列表里
                    }
                    return true;
                }
            }
            return false;
        }

篇幅有点长了,接下去newChunk到后面讲吧,里面也有很多细节的东西。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文