2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104341612

PoolChunk的newChunk

继续上篇的内存,如果没有分配成功,就创建一个新的块Chunk,这里就涉及到树的操作了,可能比较难理解,不会慢慢分析,以及画图来解释的。

            @Override
            protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
                return new PoolChunk<byte[]>(this, newByteArray(chunkSize), pageSize, maxOrder, pageShifts, chunkSize, 0);
            }

PoolChunk的一些属性

因为涉及到的属性比较多,所以还是先列出来,回头可以忘记了可以看:

       private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;//31
    
        final PoolArena<T> arena;//所在的arena区域
        final T memory;//真正分配的内存,如果是堆内的话就是字节数组,否则就是直接缓冲区DirectByteBuffer,这个是真正操作分配的内存,其他的一些都是逻辑上分配内存
        final boolean unpooled;//是否要进行池化
        final int offset;//缓存行偏移,默认0
        private final byte[] memoryMap;//内存映射深度字节数组
        private final byte[] depthMap;//深度映射字节数组,这个数组不变,作为对照计算的
        private final PoolSubpage<T>[] subpages;//子页数组,也是满二叉树的叶子节点数组
        /** Used to determine if the requested capacity is equal to or greater than pageSize. */
        private final int subpageOverflowMask;//跟前面讲过的一样,这个用来判断是否小于页大小
        private final int pageSize;//页大小 8k
        private final int pageShifts;//页位移,也就是pageSize=1<<<pageShifts,8k就是13,即2的13次方是8k
        private final int maxOrder;//最大深度索引,默认11 从0开始的
        private final int chunkSize;//块大小,默认16m
        private final int log2ChunkSize;//ChunkSize取log2的值 24
        private final int maxSubpageAllocs;//最大子叶数,跟最大深度有关,最大深度上的叶子结点个数就是子页数
        /** Used to mark memory as unusable */
        private final byte unusable;//是否无法使用,最大深度索引+1,默认是12,表示不可用
    
        private final Deque<ByteBuffer> cachedNioBuffers;//池化用
    
        private int freeBytes;//可分配的字节数,默认是16m
    
        PoolChunkList<T> parent;//所在的块列表
        PoolChunk<T> prev;//前驱
        PoolChunk<T> next;//后继

PoolChunk构造方法

主要还是这个构造方法,此时memory因为是堆内存,所以传入的是个字节数组,如果是直接内存,则是直接缓冲区。

    PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
            unpooled = false;//池化
            this.arena = arena;
            this.memory = memory;
            this.pageSize = pageSize;//8k
            this.pageShifts = pageShifts;//13
            this.maxOrder = maxOrder;//11
            this.chunkSize = chunkSize;//16m
            this.offset = offset;//0
            unusable = (byte) (maxOrder + 1);//最大深度索引+1,表示不可用 默认是11+1=12
            log2ChunkSize = log2(chunkSize);//chunkSize是2的多少次数 24次
            subpageOverflowMask = ~(pageSize - 1);//比大小的掩码
            freeBytes = chunkSize;//初始可分配就是块大小16m
            //最大深度应该小于30
            assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
            maxSubpageAllocs = 1 << maxOrder;//可分配子页的个数 2的11次方=2048,是最大深度索引为maxOrder的二叉树的个数,也就是满二叉树的叶子节点
    
            // 满二叉树的数组 4095个 总共有12层 根据等比公式 结果为4095个
            memoryMap = new byte[maxSubpageAllocs << 1];//叶子结点的两倍
            depthMap = new byte[memoryMap.length];//参照深度,固定的
            int memoryMapIndex = 1;//内存映射索引从1开始 到4095 总共4095个 第0个索引不用
            for (int d = 0; d <= maxOrder; ++ d) { //深度索引从0开始到maxOrder
                int depth = 1 << d;//深度为d的层上有depth个结点,depth是某一深度索引d的结点个数
                for (int p = 0; p < depth; ++ p) {//从左到右,从上到下,进行编号,从1开始,并且设置深度索引d               
                    memoryMap[memoryMapIndex] = (byte) d;//设置深度索引
                    depthMap[memoryMapIndex] = (byte) d;
                    memoryMapIndex ++;//内存映射索引+1
                }
            }
            //分配子页个数
            subpages = newSubpageArray(maxSubpageAllocs);
            cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);//创建性能比较好的队列
        }

这里要注意memoryMapdepthMap 两个数组,其实对应的是一棵满二叉树,把块大小16m,分成了4095个节点,最深层的叶子结点的尺寸就是一个页大小8k,因此,最大深度索引maxOrder=11的地方,根据满二叉树的特征,有211次方个节点,即2048。而16m除以8k也刚好是2048,刚好对应上,至于上面的父节点,都是作为管理的,可以标记叶子节点是否已经被分配了,而且二叉树查找的性能一般来说比直接数组查找要高,刚好也能体现伙伴算法的思想,而且查找的时候还用了深度优先,因为主要是要看叶子节点是否能分配,所以当然要上到下去判断啦,说了那么多,我们来看下这个数组怎么就是一棵树:

202309132201423201.png
memoryMapdepthMap刚开始就是这样初始化的,可以看到对应数组索引号是从1开始的,就是结点里面的编号,深度索引就是结点右边框里的数字,一棵深度为12的满二叉树刚好是4095个节点,从0开始,最大深度索引为11

newSubpageArray创建化子页数组

叶子节点的数组2048个,每个容量是8k

        private PoolSubpage<T>[] newSubpageArray(int size) {
            return new PoolSubpage[size];
        }

allocate分配空间

这里会判申请的规范容量是否大于等于一页的容量,使得话就用allocateRun分配,否则就allocateSubpage,之后会返回一个handle 句柄,这个东西是一个64位的标记,记载着很多内存地址信息,如果是allocateRun返回的就是内存映射索引,如果是allocateSubpage返回的就是一个包含内存映射索引和子页信息的值, 接下来马上我会讲他到底做什么用的。获取后先从缓存里获取,如果没有就进行initBuf初始化。

        boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
            final long handle;//一个唯一的值,根据叶子节点id,位图索引生成
            if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize 大于页大小
                handle =  allocateRun(normCapacity);//run来分配
            } else {
                handle = allocateSubpage(normCapacity);//子页来分 <pageSize
            }
    
            if (handle < 0) {
                return false;
            }
            ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;//从缓存的NIO缓冲区队列里取
            initBuf(buf, nioBuffer, handle, reqCapacity);
            return true;
        }

allocateRun大于页大小的分配,即Normal类型

此时的normCapacity都是2的倍数了。

        private long allocateRun(int normCapacity) {
            int d = maxOrder - (log2(normCapacity) - pageShifts);//这里的容量都是pageSize及以上的,log2(normCapacity) - pageShifts 表示容量是页大小的2的多少倍,最大深度索引再减去这个,刚好是定位到页大小倍数的深度索引
            int id = allocateNode(d);
            if (id < 0) {
                return id;
            }
            freeBytes -= runLength(id);//减去分配的大小
            return id;
        }

log2高效的取出2的次数

        private static int log2(int val) {
            // compute the (0-based, with lsb = 0) position of highest set bit i.e, log2
            return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);//用这种位运算代替直接取log,提高性能
        }

首先是用Integer.numberOfLeadingZeros(val)这个是通过位运算取出补码表示的最高非0位的前面还有多少个0,比如16=00010000最高非0位是1,前面还有30,如果是8位二进制的话,就是3,如果是32位的话,还得加上前面240,也就是270,然后INTEGER_SIZE_MINUS_ONE =32-1=31,最后值就为4,也就是1624次方,没问题。这个直接用位运算效率会高点。

深度d到底怎么算的

    int d = maxOrder - (log2(normCapacity) - pageShifts)

首先我们看下(log2(normCapacity) - pageShifts),因为在这里的normCapacity至少是8k,所以取了log2后至少是13,如果刚好是13,那么这个结果就是0d=maxOrder=11。意思就是说定位要了深度索引11,也就是最大深度索引,上面是2048个子页,大小也是8k。如果是16k呢,那结果就是11-(14-13)=10,深度索引是10。也就是内存映射为1024的地方开始。至于从这深度索引的地方开始做什么,马上就会讲,知道这个深度索引怎么来的就好了。这个方法确实效率很高,不用从头遍历来确定深度索引,直接用位运算来确定从哪个深度索引。

allocateNode

这个就是取出内存映射索引的关键了,也是核心的分配算法:

     private int allocateNode(int d) {
            int id = 1;//从内存映射索引为1的开始 也就是深度索引为0开始
            int initial = - (1 << d); // 用于比较id所在深度索引是否小于d
            byte val = value(id);//获取内存映射的深度值
            if (val > d) { // unusable 大于此深度索引就不可用了
                return -1;
            }//从头开始深度优先,遍历完所有深度索引小于d的可用的子结点,只有到id的深度索引是d的时候才会结束,而且是遍历一次都是深度索引+1,即是深度优先的算法,先找出对应的深度,然后从左到右看是否有内存可分配。
            while (val < d || (id & initial) == 0) {
                id <<= 1;//得到下一深度索引的左节点
                val = value(id);//获取对应深度索引值
                if (val > d) {//如果大于深度索引 即左节点不能用了
                    id ^= 1;//异或位运算,获取右结点
                    val = value(id);//再取出对应深度索引值
                }
            }
            byte value = value(id);
            //获取深度索引值,这里的value>=d 下面还要断言,如果是=d才是可以用的,>d即被设置了unusable,表示不可用了
            assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
                    value, id & initial, d);//断言id保存的深度索引值为d 且id所在深度索引为d,否则就会输出错误信息
            setValue(id, unusable); // mark as unusable 设置id深度索引值,为最大深度索引+1,即不可用了
            updateParentsAlloc(id);//更新父节点值
            return id;//返回内存映射索引
        }

setValue

如果找到了可以分配内存的,则把内存映射的深度值改了,改成了不可用unusable,表示已经分配。

        private void setValue(int id, byte val) {
            memoryMap[id] = val;
        }

updateParentsAlloc

同时要更新父节点,一直到根节点,改变是否可用的状态,如果仅仅一个子节点分配了,那就父节点的深度值就改成子节点里最小的那个,子节点深度比父节点大1,所以其实也就是深度值+1,如果两个都被用了,那就改成unusable

        private void updateParentsAlloc(int id) {
            while (id > 1) {//从id开始直到跟节点
                int parentId = id >>> 1;//获取父节点
                byte val1 = value(id);//获id节点的深度索引值
                byte val2 = value(id ^ 1);//获取另一个节点的深度索引值,即是左节点就获取右节点,是右节点就获取左节点
                byte val = val1 < val2 ? val1 : val2;//取最小的
                setValue(parentId, val);//设置父节点的深度索引值为子节点最小的那一个
                id = parentId;//继续遍历父节点
            }
        }

整个分配的过程我画个图演示下吧,比如我就简单点申请8192d算出来刚好是11,于是从内存映射索引1开始找,最后找到d=112048( 深度2的那层序号写错了,应该从4开始的 ):

202309132201430112.png
然后把他的深度设置成12,并且更新父节点:

202309132201437233.png
看下debug,设置了值,还没更新父节点:

202309132201450824.png

202309132201458585.png

202309132201467416.png
跟你更新父节点后,内存映射索引2048不可用了,父节点递归上去都取了可用子节点的深度值:

202309132201482337.png

202309132201490208.png

202309132201499189.png
看到了吧,父节点的改变,就是我前面说的,其实简单的规则就是 有两个子节点可用,深度值不变,如果只有一个可用,深度值变成另一个的,或者说是深度值+1,如果两个都不可用,深度就是最大深度+1

我再弄个两个都不可用的看看父类的情况:

2023091322015137910.png

2023091322015212811.png

2023091322015291412.png
这次20482049都被用了,然后父节点1024因为子节点都不可用也被设置不可用了,不过上面的父节点还是可用的,只是深度值+1

runLength

计算内存映射索引所在深度能分配的内存大小 即内存映射索引为id节点的可分配内存的尺寸 叶子节点默认是一个页的大小8192 ,父类尺寸就x2,直到根节点,即是一个块ChunkSize的大小16m ,比如深度值是0,就是根节点,直接就16m,如果深度值是11,就是1<<24-11=8k刚好是页大小。说白了,就是你这个id的节点可以分配多少内存。比如2048-4095对应8k1024-2047对应16k512-1023对应32k等,

    
    	//初始化的内存映射的深度值
        private byte depth(int id) {
            return depthMap[id];
        }
    
    
        private int runLength(int id) {
            // log2ChunkSize =24
            return 1 << log2ChunkSize - depth(id); //深度值越小,说明能分配的越多,越大能分配的越少
        }

这里讲了大于页大小的分配allocateRun,接下去讲小于的分配方式allocateSubpage,里面也有很多细节的东西。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文