Netty源码之对象池
概述
池化是我们提升性能的手段之一,比如线程池、内存池以及接下来讲的对象池。池化的作用目的是减少创建和销毁对象的开销,提高并发处理能力,而且利用池化也可以降低 GC 压力。Netty 设计了一个简易的对象池,在 Netty 内存使用广泛,比如对 PooledUnsafeDirectByteBuf 对象进行池化、比如对 io.netty.buffer.PoolThreadCache.MemoryRegionCache.Entry
对象(这个对象用于本地缓存封装内存块信息)进行池化。并不是所有场景都需要用到对象池,应该结合实际情况。使用方式也特别简单,下面给出了示例代码。
对象池 (英语:object pool pattern)是一种设计模式。一个 对象池 包含一组已经初始化过且可以使用的对象,而可以在有需求时创建和销毁对象。池的用户可以从池子中取得对象,对其进行操作处理,并在不需要时归还给池子而非直接销毁它。这是一种特殊的工厂对象。 若初始化、实例化的代价高,且有需求需要经常实例化,但每次实例化的数量较少的情况下,使用对象池可以获得显著的效能提升。从池子中取得对象的时间是可预测的,但新建一个实例所需的时间是不确定。
示例代码
class User {
private String name;
private Integer age;
private Recycler.Handle<User> handler;
public User() {
}
public User(Recycler.Handle<User> handler) {
this.handler = handler;
}
// SETTER & GETTER
public Recycler.Handle<User> getHandler() {
return handler;
}
}
public class RecyclerTest {
private static final Recycler<User> userRecycler = new Recycler<User>() {
@Override
protected User newObject(Handle<User> handle) {
return new User(handle);
}
};
@Test
public void testRecycleDemo() {
// 从对象池中获取User对象
User userFromCache1 = userRecycler.get();
System.out.println(userFromCache1);
// 从对象池中获取User对象
User userFromCache2 = userRecycler.get();
System.out.println(userFromCache2);
// 回收userFromCache1对象
userFromCache1.getHandler().recycle(userFromCache1);
// 发现现存获取的对象和userFromCache1对象一样的
User userFromCache3 = userRecycler.get();
System.out.println(userFromCache3);
}
}
// OUTPUT
// User@1963006a
// User@7fbe847c
// User@1963006a
上面的代码向我们展示了如何快速创建一个对象池,核心类是 Recycler。可以把它看成是一个工具类,可以在日常开发中使用。当我们实例化 Recycler 对象时,需要重写 newObject(Handle<T> handle)
方法,这个方法的核心是将对象与某个 Handle<T>
关联起来(持有 Handle 对象的引用),因为在底层是通过这个 Handle 对象包装待回收对象。
推理实现
在阅读源码之前,我们先尝试自己设计一个对象池。
单线程
单线程模型是最简单的,我们只需要构造一个栈(或队列)出栈、入栈操作分别对应分配和回收。如果栈中没有对象,新建一个全新对象即可。为了安全起见,我们需要设定栈的大小以及阈值。回收对象时需要判断已回收对象数量是否超过阈值,如果超出则丢弃,避免池化对象过多造成内存浪费。
多线程
对于多线程,最简单的办法是给相关栈操作加锁,但这样势必会影响性能。因此,我们可以通过 ThreadLocal 设计成无锁方式。但我们还没有考虑一个问题就是异线程回收(对象 A 本是线程 Thread_1 创建以及回收,但是在 Thread_2 调用回收方法,这种情况就属于异线程回收),我们秉承 谁创建谁回收 的准则,此时应该调用 Thread_1 相关的方法回收。但是这样方法必须加锁,也可能导致性能问题。那我们转变一下思路,线程 Thread_2 不主要推送回收对象(因为主要推送势必会导致某个方法必须加锁),本地线程暂存回收对象,等待 Thread_1 主动回收各个线程的属于 Thread_1 的回收对象,这样就可以避免加锁问题,而且 Thread_1 有更多的自由度,比如当超过某一阈值时,舍弃部分其他线程的回收对象,就完全是由 Thread_1 自己控制的。线程 Thread_2 也并非无脑存储其他线程的回收对象,它中会对部分线程才缓存对象,其余线程会直接丢弃。但是还有一点比较麻烦,就是对象 A 是由 Thread_1 分配的,但是回收是有两个线程 Thread_2 和 Thread_3 参与(多线程的世界复杂多变),这就有可能导致一个对象可能会被回收两次,Netty 并没有解决这种情况,而是直接抛出异常。
认识相关的类
在阅读源码之前,我们需要对相关的类做一个说明。请看大屏幕:
上面这幅图基本展示我这篇文章关注的核心的类。在理解这幅图之前,我先进行简单说明:
- 待回收对象使用 DefautlHandler 类包装。
- 每个回收对象的类型对应一个 Stack 对象,也对应一个全局静态的 Recycler 和 一类 WeakOrderQueue,这些对象都是为这个类型对象服务的。
- Recycler 内部有一个
FastThreadLocal<Map<Stack, WeakOrderQueue>>
与异线程回收相关,因为一个线程面对的不只一个 Stack。 - WeakOrderQueue 也是与异线程回收相关,内部是一个链表结构。这个后面详细讲解。
DefaultHandle
在图的右下角,它是 Recycler 对回收对象的包装类,Recycler 底层操作的对象是 DefaultHandle,而非直接的回收的对象。它实现 Handle 接口,里面包含 recycle(Object obj)
回收方法。内部有几个重要的变量,解释如下:
// io.netty.util.Recycler.DefaultHandle
private static final class DefaultHandle<T> implements Handle<T> {
// 上次回收此Handle的RecycleId
int lastRecycledId;
// 创建此Handle的RecycleId。和 lastRecycledId 配合使用进行重复回收检测
int recycleId;
// 该对象是否被回收过
boolean hasBeenRecycled;
// 创建「DefaultHandle」的Stack对象
Stack<?> stack;
// 待回收对象
Object value;
DefaultHandle(Stack<?> stack) {
this.stack = stack;
}
/**
* 回收此「Handle」所持有的对象「value」
* 如果对象不相等,抛出异常
*/
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
// 将回收对象入栈,将自己会给Stack,剩下的交给Stack就好了
stack.push(this);
}
}
DefaultHandler
包装待回收对象,同时也添加了部分信息。比如用于重复回收检测的 recycledId 和 lastRecycleId。lastRecycleId 用来存储最后一次回收对象的线程ID,recycledId 用来存 Recycler ID,这个值是通过 Recycler 全局变量 ID_GENERATOR 创建,每当回收一个 DefaultHandle 对象(该对象内部包装我们真正的回收对象),都会从 Recycler 对象中获得唯一 ID 值,一开始 recycledId 和 lastRecycledId 相等。后续通过判断两个值是否相等从而得出是否存在重复回收对象的现象并抛出异常。
Stack
这个对象非常重要,它是 Netty 对象池的核心类,内部定义了对象池数组结构以及获取、回收(包含本线程回收和异线程回收方法)核心方法。先认识认识相关变量:
// io.netty.util.Recycler.Stack
private static final class Stack<T> {
// Stack是被哪个「Recycler」创建的
final Recycler<T> parent;
// 我们将线程存储在一个 WeakReference 中,
// 否则我们可能是唯一一个在线程死亡后仍然持有对线程本身的强引用的引用,因为 DefaultHandle 将持有对栈的引用。
// 最大的问题是,如果我们不使用 WeakReference,那么如果用户在某个地方存储了对 DefaultHandle 的引用,
// 并且永远不会清除这个引用(或者不能及时清除它) ,那么 Thread 可能根本不能被收集。
final WeakReference<Thread> threadRef;
// 该「Stack」所对应其他线程剩余可缓存对象实例个数(就是其他线程此时可缓存对象数是多少)
// 默认值: 2048
final AtomicInteger availableSharedCapacity;
// 一个线程可同时缓存多少个「Stack」对象。这个「Stack」可以理解为其他线程的「Stack」
// 毕竟不可能缓存所有的「Stack」吧,所以需要做一点限制
// 默认值: 8
private final int maxDelayedQueues;
// 数组最大容量。默认值: 4096
private final int maxCapacity;
// 可以理解为对回收动作限流。默认值: 8
// 并非到阻塞时才限流,而是一开始就这样做
private final int interval;
// 存储缓存数据的数组。默认值: 256
DefaultHandle<?>[] elements;
// 数组中非空元素数量。默认值: 0
int size;
// 跳过回收对象数量。从0开始计数,每跳过一个回收对象+1。
// 当handleRecycleCount>interval时重置handleRecycleCount=0
// 默认值: 8。初始值和「interval」,以便第一个元素能被回收
private int handleRecycleCount;
// 与异线程回收的相关三个节点
private WeakOrderQueue cursor, prev;
private volatile WeakOrderQueue head;
}
我们看到了用来存储数据的 DefaultHandler[]
数组,它的类型是上面讲的包装类 DefaultHandler。内部定义了与异线程回收相关的 WeakOrderQueue,还有一些安全变量和信息变量。 Stack 对象是从 Recycle 内部的 FastThreadLocal 对象中获得,因此每个线程拥有属于自己的 Stack 对象,创造了无锁的环境,并通过 weakOrderQueue 与其他线程建立沟通的桥梁。相关的方法会在实现逻辑时讲述。
WeakOrderQueue
WeakOrderQueue 用于存储异线程回收本线程所分配的对象,这就是我们前面讲的桥梁。比如对象 A 是由线程 Thread_1 分配,但是在 Thread_2 回收,本着谁分配谁回收的原则,Thread_2 是不能回收对象的,所以 Thread_2 会把对象放入对应的 WeakOrderQueue 链表中,这个链表是由 Thread_2 创建,那怎么与 Thread_1 关联呢? 这里就有一个技巧了,异线程会维护一个 Map<Stack<?> WeakOrderQueue>
本地线程缓存,Thread_2 会根据 Stack(因为内部是使用 DefaultHandler 包装回收对象,而这个 DefaultHandler 也持有创建它的 Stack 引用)获取对应的 WeakOrderQueue,如果没有,则新建并更新 Stack 的 Head 节点(加锁)。这样就建立了Thread_1 和 Thread_2 关于对象 A 之间的关联,后续 Thread_1 就可以从 WeakOrderQueue 中回收对象了。 相关变量解释如下:
/**
* 「WorkOrderQueue」 内部链表是由「Head」和「tail」节点组成的。
* 队内数据并非立即对其他线程可见,采用最终一致性思想。
* 不需要进行保证立即可见,只需要保证最终可见就好了
*/
// io.netty.util.Recycler.WeakOrderQueue
private static final class WeakOrderQueue extends WeakReference<Thread> {
// Head节点管理「Link」对象的创建。内部next指向下一个「Link」节点,构成链表结构
private final Head head;
// 数据存储节点
private Link tail;
// 指向其他异线程的WorkOrderQueue链表
private WeakOrderQueue next;
// 唯一ID
private final int id = ID_GENERATOR.getAndIncrement();
// 可以理解为对回收动作限流。默认值: 8
// 并非到阻塞时才限流,而是一开始就这样做
private final int interval;
// 已丢弃回收对象数量
private int handleRecycleCount;
// LINK 节点继承「AtomicInteger」,内部还有一个「readIndex」指针
static final class Link extends AtomicInteger {
final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
int readIndex;
Link next;
}
}
WeakOrderQueue 继承 WeakReference,当所属线程被回收时,相应的 WeakOrderQueue 也会被回收。 内部通过 Link 对象构成链表结构,Link 内部维护一个 DefaultHandle[] 数组用来暂存异线程回收对象。添加时会判断是否会超出设置的阈值(默认值: 16),没有则添加成功,否则创建一个新的 Link 节点并添加回收对象,接着更新链表结构,让 tail 指针指向新建的 Link 对象。由于线程不止一个,所以对应的 WeakOrderQueue 也会有多个,WeakOrderQueue 之间则构成链表结构。 变量 interval 作用是回收**"限流"**,它从一开始就限制回收速率,每经过8 个对象才会回收一个,其余则丢弃。
Recycler
Recycler 是用户直接接触的对象,它是我们构建一个简易的对象池的入口。该类的主要功能是读取配置信息并初始化,并构建了两个非常重要的 FastThreadLocal 对象,分别是 FastThreadLocal<Stack<T>>
以及 FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>
。相关变量解释如下:
// io.netty.util.Recycler
public abstract class Recycler<T> {
// 全局唯一ID,会有两个地方使用到,一个是每个Recycler初始化OWN_THREAD_ID
// 另一个是每个WeakOrderQueue初始化id
private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
// 全局ID生成器
private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
// 每线程最大可缓存对象容量大小,默认值:4096
private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024;
// 每线程最大可缓存对象大小,默认值:4096
private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
// 初始容量,默认值:256
private static final int INITIAL_CAPACITY;
// 最大共享容量因子,默认值: 2
private static final int MAX_SHARED_CAPACITY_FACTOR;
// 每线程最多延迟回收队列,默认值: 8
private static final int MAX_DELAYED_QUEUES_PER_THREAD;
// Link用来存储异线程回收的对象,内部有一个数组,数组长度=LINK_CAPACITY
// 默认值: 16
private static final int LINK_CAPACITY;
// 异线程丢弃对象比例,默认值:8
// 表示在异线程中,每8个回收对象只回收其中一个,其余丢弃
private static final int RATIO;
// 每个线程所持有的State<T> 数据结构内的数组的最大长度
private final int maxCapacityPerThread;
// 共享容量因子。该值越大,在非本线程之外的待回收对象总数越小。
// 因为 非本线程之外的待回收对象总数 = maxCapacityPerThread/maxSharedCapacityFactor
private final int maxSharedCapacityFactor;
// 对于从未回收过的对象,Netty选择按一定比例(当)抛弃,避免池内缓存对象速度增长过快,从而影响主线程业务功能。
// 默认值: 8
private final int interval;
// 每线程对象池最多可缓存多少实例对象
private final int maxDelayedQueuesPerThread;
// 每个线程有对应的「Stack」
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
interval, maxDelayedQueuesPerThread);
}
// 移除后回调方法
@Override
protected void onRemoval(Stack<T> value) {
// Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
if (value.threadRef.get() == Thread.currentThread()) {
if (DELAYED_RECYCLED.isSet()) {
// 安全移除「WeakOrderQueue」
DELAYED_RECYCLED.get().remove(value);
}
}
}
};
// 用于异线程回收,每个线程保存其他线程的「WeakOrderQueue」
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
// 为每个线程初始化一个「WeakHashMap」对象,保证在没有强引用的情况下能回收对象
// key=>Stack,
// value=>WeakOrderQueue,
@Override
protected Map<Stack<?>, WeakOrderQueue> initialValue() {
return new WeakHashMap<Stack<?>, WeakOrderQueue>();
}
};
}
在讲源码之前,我们先回顾上面推理实现多线程那一小节,好好通过源码感受一下 Netty 关于对象池的无锁设计思想。
从池内获取对象
Recycler#get
这个方法是 Netty 是轻量级对象池的入口,主要逻辑是获取线程私有对象 Stack,通过它来获取对象池内的 DefaultHandle 对象。如果从 Stack 获取对象为空,则通过 newObject(Default)
方法新建一个目标对象。newObject(Default)
这个方法需要用户自己实现。
/**
* 从对象池获取池化对象
*/
// io.netty.util.Recycler#get
public final T get() {
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
// #1 获取当前线程缓存的「Stack」对象,
Stack<T> stack = threadLocal.get();
// #2 从Stack中弹出一个DefaultHandle对象
DefaultHandle<T> handle = stack.pop();
// #3 如果弹出的对象为空,表明内部没有缓存好的对象,
// 需要创建新的handle以及新的Object()
// newObject(Handler): 这个方法是抽象方法,需要用户自定义实现
if (handle == null) {
handle = stack.newHandle();
// Handle和对象之间互相持有对方的引用
// 更重要的是Handle对象持有Stack引用,因此在进行回收时就可以把对象直接push到Stack里
handle.value = newObject(handle);
}
// #5 返回
return (T) handle.value;
}
Stack#pop
Stack#pop()
还是比较简洁的,首先会判断 size
是否大于 0(大于 0 表示存在缓存对象),如果大于 0 则从 DefaultHandle[]
数组中获取,size 的值也是数组的下标。如果等于 0,则会走进回收异线程世界: 尝试从**异线程( 其他线程统称为异线程 )**的 WeakOrderQueue
队列中获取待回收对象。这个方法还包含了简单的重复回收校验逻辑判断,如果创建线程和回收线程不相等,则抛出异常。
// io.netty.util.Recycler.Stack#pop
/**
* 从栈中弹出一个「DefaultHandler」对象
* ① 如果size>0,则弹出elements[]数组中的元素
* ② 如果size=0,尝试从其他线程缓存对象中窃取部分缓存对象
*/
// io.netty.util.Recycler.Stack#pop
DefaultHandle<T> pop() {
int size = this.size;
if (size == 0) {
// 当前缓存缓存的对象数为0
// 尝试从异线程相关的「WeakOrderQueue」队列中获取可回收对象
if (!scavenge()) {
// 异线程也没有可回收对象,返回null
return null;
}
size = this.size;
if (size <= 0) {
// double check, avoid races
return null;
}
}
// 栈中有缓存对象
size --;
// 内部使用数组模拟栈
DefaultHandle ret = elements[size];
elements[size] = null;
// 更新size
this.size = size;
// 重复回收校验,如果回收ID不相等,表示不是同一个
if (ret.lastRecycledId != ret.recycleId) {
// 如果上一次对象回收ID和这次的不一样,抛出异常
throw new IllegalStateException("recycled multiple times");
}
// 重置「DefaultHandler」回收ID信息
ret.recycleId = 0;
ret.lastRecycledId = 0;
return ret;
}
Stack#scavenge
会调用 Stack#scavengeSome() 尝试从异线程相关的 WeakOrderQueue 链表中回收部分对象,如果回收成功,则返回 true,回收失败,重置 curson 指针,重新回到头结点。
/**
* scavenge有清扫的意思,这个方法用于从「WeakOrderQueue」中回收部分对象
*/
// io.netty.util.Recycler.Stack#scavenge
private boolean scavenge() {
// 尝试从「WeakOrderQueue」中获取部分待回收对象并转换到「Stack#DefaultHandler[]」数组中
if (scavengeSome()) {
// 转移成功
return true;
}
// 获取失败,重置prev、cursor指针
prev = null;
cursor = head;
return false;
}
Stack#scavengeSome
Stack#scavengeSome()
方法尝试从异线程中回收对象,请注意,这个方法是在 Stack 对象中执行的,而 Stack 之前讲过保存了与异线程相关的 WeakOrderQueue 链表,所以我们可以通过遍历链表寻找回收对象。 还有一个比较重要的点是如果 WeakOrderQueue 所绑定的线程已经死亡,意味着对应的 WeakOrderQueue 不会新增回收对象了,所以我们需要对这个 WeakOrderQueue 进行相应清理,保证能被 GC,避免内存泄漏,对应的清理方法 cursor.reclaimAllSpaceAndUnlink()
。scavengeSome()
只要成功数量大于 1 就直接返回 true,否则继续遍历其他异线程的 WeakOrderQueue,直接遍历所有的 WeakOrderQueue 或找到可用的回收对象为止。
// io.netty.util.Recycler.Stack#scavengeSome
private boolean scavengeSome() {
WeakOrderQueue prev;
WeakOrderQueue cursor = this.cursor;
// 当前cursor指针为空
if (cursor == null) {
prev = null;
// 指向头结点
cursor = head;
if (cursor == null) {
// 头结点为空,表明没有任何异线程存入,直接返回false
return false;
}
} else {
prev = this.prev;
}
boolean success = false;
// 存在异线程「WeakOrderQueue」,准备回收对象
do {
// 尝试从异线程「WeakOrderQueue」尽可能多地转移回收对象。但并不会更新指针
// 当转移对象数量>0,就返回true
if (cursor.transfer(this)) {
// 转移成功,跳出循环
success = true;
break;
}
// 保存下一个「WeakOrderQueue」的引用,因为本次的「WeakOrderQueue」可能会被回收
WeakOrderQueue next = cursor.getNext();
if (cursor.get() == null) {
// 如果与队列相关的线程终结了(cursor.get()==null),
// 需要判断是否还有可回收对象,若有转移后解除它的连接
// 我们永远不会断开第一个连接,因为不想在更新头部时进行同步
if (cursor.hasFinalData()) {
// 按一定比例转移对象
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
if (prev != null) {
// 确保在删除 WeakOrderQueue 以便进行GC之前收回所有空间。
cursor.reclaimAllSpaceAndUnlink();
// 更新节点信息,删除cursor节点,
prev.setNext(next);
}
} else {
prev = cursor;
}
// 更新cursor指针,指向「WeakOrderQueue」下一个对象
cursor = next;
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
WeakOrderQueue#transfer
WeakOrderQueue#transfer()
是从异线程 WeakOrderQueue 队列中转移部分对象到 Stack 数组中。WeakOrderQueue 内部是由 Link 对象构成的链表,而 Link 链表底层使用长度为 16 的数组存储回收对象,并且维护一个读指针 readIndex
,记录读操作的位置。transfer()
方法只会处理一个 Link 节点(并非所有待回收对象都能成功写入 Stack,会按一定比例丢弃),当遇到期望容量超出 Stack 数组长度时,需要进行扩容。 如果读指针到达 Link 节点末尾,判断 Link.next 对象是否为空,不为空则更新 head 的指针,以便回收当前的 Link 对象(已经用完了)。
/**
* 把「WeakOrderQueue」内存缓存的待回收对象转移到「Stack」数组中
* 记住,调用这个方法的是准备从异线程回收对象的线程,所以不需要加锁
* 一次性转移一个Link节点(长度: 16)
*/
// io.netty.util.Recycler.WeakOrderQueue#transfer
@SuppressWarnings("rawtypes")
boolean transfer(Stack<?> dst) {
// 获取链表的头结点
Link head = this.head.link;
if (head == null) {
// 头结点为空,表示没有数据,直接返回
return false;
}
// LINK_CAPACITY表示一个Link节点的容量大小,默认值: 16
// 判断读指针「readIndex」是否到达边界了,如果到达边界且指向下一个节点为NULL
if (head.readIndex == LINK_CAPACITY) {
if (head.next == null) {
return false;
}
head = head.next;
// 更新头结点,指向next
this.head.relink(head);
}
// 「Head」对象有两个指针变量,分别是「readIndex」和 「AtomicInteger」
// srcStart: 表示可读起点
final int srcStart = head.readIndex;
// head:表示自从创建开始已经添加个数
int srcEnd = head.get();
// 两者相减,表示当前可回收对象数量
final int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false;
}
final int dstSize = dst.size;
// 计算期望值
final int expectedCapacity = dstSize + srcSize;
// 超出「Stack」数组长度需要扩容
if (expectedCapacity > dst.elements.length) {
// 对「Stack」数组扩容,实际扩容规则按「Stack」规则计算的,所以扩容后的实际容量
// 不一定与期望容量相等,有可能不扩容
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
// 根据「Stack」实际容量最终确定回收终点索引
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
if (srcStart != srcEnd) {
// 循环遍历并复制
final DefaultHandle[] srcElems = head.elements;
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle<?> element = srcElems[i];
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
// 简单做一下重复回收判断,如果现值不相等,表示创建线程和回收线程并非同一个线程
// 抛出异常
throw new IllegalStateException("recycled already");
}
// 便于GC回收对象Link对象
srcElems[i] = null;
// 本次回收需要按照一定比例丢弃对象,并不是全盘接收
if (dst.dropHandle(element)) {
// 丢弃
continue;
}
// 不需要丢弃,复制回收对象到Stack目标数组中
element.stack = dst;
dstElems[newDstSize ++] = element;
}
if (srcEnd == LINK_CAPACITY && head.next != null) {
// 当前的Link的对象已经跑了一遍回收,且所指向的下一个节点不为NULL
// 更新head指向下一个Link节点,当前的Link需要被GC
this.head.relink(head.next);
}
// 更新读节点指针
head.readIndex = srcEnd;
// 再次判断目标数组是否写入新数据
if (dst.size == newDstSize) {
// 什么都没有回收,返回false
return false;
}
// 更新目标Stack的已使用容量大小的值
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
return false;
}
}
从对象池中获取对象,先判断本地线程 Stack 是否有存在的对象,如果有则直接返回,如果没有,需要尝试从异线程链表 WeakOrderQueue 回收对象。
异线程暂存对象
前面我们讲的是主线程分配内存,包含从异线程 WeakOrderQueue 链表中回收部分对象。现在我们站在异线程的角度,看它们是如何把不属于自己线程的待回收对象暂存于 WeakOrderQueue 链表中。 对象 A 是由 Thread_1 创建,但在 Thread_2 被回收,这种情况 Thread_2 会把对象 A暂存在本线程的 WeakOrderQueue 链表,而这个 WeakOrderQueue 会与 Thread_1 的 Stack#head 节点构成一个链表,Thread_1 遍历这个链表从而可以回收对象。
DefaultHandle#recycle
DefaultHandle#recycle()
会做一些关于身份校验,比如目标对象、回收线程等。然后调用 Stack#push()
方法放入内存池。
// io.netty.util.Recycler.DefaultHandle#recycle
/**
* 回收对象,对象内部持有「DefaultHandle」引用,
* 而且「DefaultHandle」只有回收自己
*/
@Override
public void recycle(Object object) {
if (object != value) {
如果目标回收对象不是自己,抛出异常
throw new IllegalArgumentException("object does not belong to handle");
}
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
// 发现回收线程和分配线程不是同一个,抛出异常
throw new IllegalStateException("recycled already");
}
// 压入「Stack」
stack.push(this);
}
Stack#push
根据本地线程或异线程选择不同回收策略。本地线程直接回收即可,但异线程回收需要写入 WeakOrderQueue 链表。
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (threadRef.get() == currentThread) {
// 本线程回收,直接写入内存池
pushNow(item);
} else {
// 异线程回收对象,推迟回收
pushLater(item, currentThread);
}
}
Stack#pushLater
Stack#pushLater()
方法主要的目的是从本地线程缓存中获取线程私有的 WeakOrderQueue 对象,并将回收队列写入队列中。
/**
* 将回收对象写入异线程关联的「WeakOrderQueue」链表中。
* 在新建「WeakOrderQueue」的过程中有两道门槛会导致创建失败:
* ① WeakOrderQueue数量超出了maxDelayedQueues限制,创建失败
* ② 向目标对象「Stack」申请长度为16的容量时失败
* 将新创建的对象放入Map数组以供下次使用,
* 且将item写入「WeakOrderQueue」
*/
// io.netty.util.Recycler.Stack#pushLater
private void pushLater(DefaultHandle<?> item, Thread thread) {
if (maxDelayedQueues == 0) {
// 不支持异线程回收,直接丢弃
return;
}
// 使用「FastThreadLocal」本地线程缓存Stack与WeakOrderQueue的关系
// 因为一个线程会对应多个Stack(即对应多个不同类型的对象池)
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
// 当前线程没有与之关联的「WeakOrderQueue」链表,需要新创建一个
// 但是创建之前判断是否已经满额了
if (delayedRecycled.size() >= maxDelayedQueues) {
// 满额,添加一个占位对象,表示以后回收这类对象直接丢弃
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// Stack对象可用的availableSharedCapacity<LINK_CAPACITY,表示连一个LINK节点空间不够了
// 这种情况会导致返回NULL。
// 如果对象不为NULl,表明创建成功,并更新「Stack」的head指针指向新的「WeakOrderQueue」
if ((queue = newWeakOrderQueue(thread)) == null) {
// 直接丢弃
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}
// 将回收对象添加到 WeakOrderQueue 队列中
queue.add(item);
}
Stack#newWeakOrderQueue()
创建一个全新的 WeakOrderQueue
并注册到对应的 Stack#head
头结点中。 .png&originHeight=1272&originWidth=2048&size=304273&status=done&style=none&width=2048)
// io.netty.util.Recycler.Stack#newWeakOrderQueue
private WeakOrderQueue newWeakOrderQueue(Thread thread) {
return WeakOrderQueue.newQueue(this, thread);
}
// io.netty.util.Recycler.WeakOrderQueue#newQueue
static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
// We allocated a Link so reserve the space
if (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) {
return null;
}
final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
// Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
// may be accessed while its still constructed.
stack.setHead(queue);
return queue;
}
// 加把锁
// io.netty.util.Recycler.Stack#setHead
synchronized void setHead(WeakOrderQueue queue) {
queue.setNext(head);
head = queue;
}
WeakOrderQueue#add
将回收对象写入 WeakOrderQueue 队列。WeakOrderQueue 是本地线程缓存的私有变量,线程对自己 WeakOrderQueue 写入并不需要加锁,Link 节点的写索引(writeIndex)、读索引(readIndex)也不存在并发问题,但存在数据可见性的问题,这个可见性是指对回收线程可见,因为回收线程需要通过 writeIndex 确定终止序号,源码还有一个关于 volatile
延迟更新写索引的用法。相当于回收线程只管读,至于从哪里读是由异线程告诉你(通过writeIndex),如果它延迟更新,相当于延迟告诉你,数据也并没有冲突或丢失,你可以从其他线程的 WeakOrderQueue 链表中获取,不必吊死在这一棵树上,等你下次来的时候就可以看到了,这样就提升了写入时的性能。
/**
* 向链表中添加数据实际是向「tail」节点添加数据。
* 如果「tail」指向的节点的写指针(tail.get()获取,通过AtomicInteger保存写指针)超出LINK_CAPACITY
* 则需要新创建一个Link对象
*/
// io.netty.util.Recycler.WeakOrderQueue#add
void add(DefaultHandle<?> handle) {
handle.lastRecycledId = id;
// 每隔8个对象只回收其中一个
if (handleRecycleCount < interval) {
handleRecycleCount++;
return;
}
handleRecycleCount = 0;
Link tail = this.tail;
int writeIndex;
// 写指针到达Link节点的末尾,需要申请一个新的「Link」节点
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
// 尝试创建,因为可能会超出availableSharedCapacity大小导致创建失败
Link link = head.newLink();
if (link == null) {
// 创建失败,丢弃
return;
}
// 更新节点信息,
this.tail = tail = tail.next = link;
// 获取写节点
writeIndex = tail.get();
}
// 更新值
tail.elements[writeIndex] = handle;
// 「DefaultHandle」的handle置为空,因为有可能这个「Stack」会被回收掉,如果这里存在强引用,
// 会导致「Stack」对象不能回收,后面待Stack回收时会重新设置
handle.stack = null;
// 延迟更新索引(索引被更新,队列元素才可见)
// 这是「volatile」高级用法,由于存在多个生产者一个消费者,所以不需要在更新「WeakOrderQueue」链表扣
// 立即可见,它可以从其他「WeakOrderQueue」队列中获取回收对象。使用lazySet保持最终可见性,但会存在延迟,
// 这有利于提升写入时的性能(本质是减少内存屏障的开销)
tail.lazySet(writeIndex + 1);
}
总结
我们可以从 Netty 的 Recycle
轻量级对象池中学到很多东西,其中最重要的是无锁设计方案。整个 Recycle 类就只有一个 synchronized 关键字,其他的都是通过 CAS 或原子变量实现。其中无锁设计的核心是 FastThreadLocal 类,它是 Netty 自己设计的适配 Netty 环境的高性能本地线程缓存变量,采用空间换时间的设计策略。还有一个人认为非常优秀的设计思路是异线程对象回收策略: 异线程并不是主动推送待回收对象给目标线程,而是通过 WeakOrderQueue 缓存待回收对象,这样就不需要加锁,然后将写指针设置为 AtomicInteger
对目标线程可见,但是 Netty 并不满足,它还采用了 AtomicInteger 的高级用法 AtomicInteger#lazySet
延迟更新写指针,因为根据实际情况,目标线程并不只是回收当前异线程的 WeakOrderQueue,还有其他异线程也可以被回收,所以对目标线程来说写指针可以不及时回显,这样就能提高异线程写入效率。