PoolThreadLocalCache
要讲PoolThreadCache
,得先知道PoolThreadLocalCache
是什么,其实他就是FastThreadLocal
的子类,里面存的类型才是PoolThreadCache
。
initialValue
initialValue
是获取初始化值的,也就是创建PoolThreadCache
并返回。这里主要是useCacheForAllThreads
标签,是否是所有线程都可以用缓存,还是只能FastThreadLocalThread
用缓存。当然如果有调度参数,可能还要启动调度任务,这个不是重点,暂时不说。
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);//获取使用率最少的一个
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
final Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {//使用了useCacheForAllThreads或者是FastThreadLocalThread线程
final PoolThreadCache cache = new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {//有间隔的调度任务
final EventExecutor executor = ThreadExecutorMap.currentExecutor();
if (executor != null) {
executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
}
return cache;
}
// No caching so just use 0 as sizes.否则没有缓存
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
leastUsedArena
获取线程使用数量最少的PoolArena
,这样才能实现线程对内存的负载均衡,否则一个内存被多个线程用,其他的内存没有线程用,那使用效率就低了,还会引起严重的线程竞争问题。
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
PoolThreadCache缓存什么
因为缓存的类型就是这个,只是里面放着各种缓存数据。
里面缓存这一堆MemoryRegionCache
类型的数组。
MemoryRegionCache
这个才是缓存包装,看看里面有什么:
里面有缓存的类型:
有能缓存的数量size
,分别对应三个类型是512,256,64
,其实是从这里来的:
还有一个allocations
表示缓存已经分配出去了多少个,这个在后面释放的时候会用到。
最后一个就是queue
队列了,他是一个MpscArrayQueue
多生产者单消费者的队列,也就是说可以多个线程放入数据,只有一个线程可以取数据。里面存的是Entry
类型,这个有点让我想起了写操作的时候,也是将数据封装成Entry
类型,我们来看看。
Entry
一个回收处理器recyclerHandle
,一个块信息,一个句柄,一个nioBuffer
(一般都是null
)。可见主要还是存哪个块的哪部分内存被缓存了。handle
里就描述了很多信息,可以获得块内的偏移地址,子页偏移地址,子页等等。所以我们现在知道了, 缓存就是缓存块内对应的内存信息 。
PoolThreadCache构造函数
前面说到PoolThreadLocalCache
的initialValue
创建了PoolThreadCache
,那说明时候调用的呢,在创建缓冲区的方法中尝试从PoolThreadLocalCache
获取PoolThreadCache
:
里面就是FastThreadLocal
的get
方法,前几篇讲过,就不多说了,如果没获取到就会调用initialValue
创建一个默认的:
然后我们看构造函数,其实没什么特别的,就是一些缓存数组的初始化,和属性设置,看注释应该就可以懂:
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {//直接缓冲区的缓存
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);//获取2的次数
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();//directArena线程缓存数+1
} else {//没有就设置默认值
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {//堆内缓冲区
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
// Only check if there are caches in use.
if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
}
createSubPageCaches
这个就是创建子页类型的缓存数组,tiny
类型默认是32
个SubPageMemoryRegionCache
,每个内部可以缓存512
个Entry
,small
类型默认是4
个SubPageMemoryRegionCache
,每个内部可以缓存256
个Entry
。数组个数就是PoolArena
中的,内存大小也是一一对应的。
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0 && numCaches > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
createNormalCaches
对normal
类型进行了控制,最多只能存maxCachedBufferCapacity 默认是32K
的大小,也就是说可以存8K,16K,32K
这3
个档次,所以数组个数是3
。
private static <T> MemoryRegionCache<T>[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);//默认最多缓存maxCachedBufferCapacity=32K 不然太大了
int arraySize = Math.max(1, log2(max / area.pageSize) + 1);//默认是3
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
for (int i = 0; i < cache.length; i++) {
cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
}
return cache;
} else {
return null;
}
}
PoolThreadCache分配内存allocate
前面讲了PoolThreadCache
创建好了,解析来看看怎么用。在PoolArena
的allocate
方法理有3
处:
首先是tiny
和small
类型的。
然后是normal
类型的:
我们进去看看,其实内部逻辑都差不多:
cacheForTiny
其中cacheForTiny,cacheForSmall,cacheForNormal
也类似,我就拿cacheForTiny
来说,其实就是获取对应类型的索引,看数组中是否存在该缓存,如果存在,就返回,否则就是null
:
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) {
return null;
}
return cache[idx];
}
allocate
然后就是尝试让缓存来分配,如果有分配过,无论成功失败,都会使得allocations
增加,如果分配的数量超过阈值后,就会清0
,并且对缓存进行清除trim
,估计是避免长时间缓存着又没用到,等于说是内存泄露了,trim
后面讲,里面涉及东西比较多:
@SuppressWarnings({ "unchecked", "rawtypes" })
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {//已分配次数是否大于清除次数阈值
allocations = 0;//分配次数清0
trim();
}
return allocated;
}
MemoryRegionCache的allocate
关键就是看缓存怎么分配啦,其实就是从队列queue
里取出Entry
实体,然后进行initBuf
初始化。
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll();//取出实体
if (entry == null) {
return false;
}
initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
entry.recycle();//回收实体
// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;//已分配出去的+1
return true;
}
initBuf
这个初始化是关键,其实他是子类实现的,我们看看,一种是SubPageMemoryRegionCache
的,对应子页tiny
和small
类型:
一种是NormalMemoryRegionCache
的,对应normal
类型:
这个跟我们正常内存分配流程最后的一样,子页的是调用PoolChunk
的initBufWithSubpage
,normal
类型是initBuf
:
当然最开始的时候队列里面没有实体,就返回false
啦,然后走正常的内存分配流程。
PoolThreadCache尝试缓存内存
这个是在缓冲区调用了release
的时候会尝试缓存:
如果能回收缓冲区的话,最终调用deallocate
:
PooledByteBuf的deallocate
其实就是释放内存资源,属性重新设置回默认,自己也回收到对象池里。
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
tmpNioBuf = null;
chunk = null;
recycle();//放进池里面
}
}
后面就会用到PoolArena
的free
方法啦,里面也比较复杂,所以下一篇再讲吧。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。