快速ThreadLocal FastThreadLocal和快速ThreadLocal线程FastThreadLocalThread

 2023-02-07
原文作者:李意文 原文地址:https://juejin.cn/post/7019961178717683720

1 FastThreadLocal

快速ThreadLocal,当从FastThreadLocalThread获取对象时,可以比传统的jdk的ThreadLocal有更好的性能。

FastThreadLocal使用一个常量index去索引数组,这样可以比传统的使用hashcode和hash table 有轻微的性能提升,虽然说提升不是很明显,但是在高频的场景下,这样的提升非常有用。

1.1 set方法

1.1.1 方法签名

io.netty.util.concurrent.FastThreadLocal#set(V)

1.1.2 代码

    /**
     * Set the value for the current thread.
     */
    public final void set(V value) {
        if (value != InternalThreadLocalMap.UNSET) {
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            setKnownNotUnset(threadLocalMap, value);
        } else {
            remove();
        }
    }

如上如果是value = InternalThreadLocalMap.UNSET,则表明是要做删除的,我们先看一下不删除的情况,

看一下InternalThreadLocalMap.get();里面做了啥?

    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        } else {
            return slowGet();
        }
    }

如上我们看到如果当前的线程是FastThreadLocalThread,则会调用fastGet,于是看一下fastGet的代码:

    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }

如上代码,如果FastThreadLocalThread中的InternalThreadLocalMap的对象是空的那么则会new一个出来;

看一下InternalThreadLocalMap的构造函数做了什么:

    private InternalThreadLocalMap() {
        indexedVariables = newIndexedVariableTable();
    }
    private static Object[] newIndexedVariableTable() {
        Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];
        Arrays.fill(array, UNSET);
        return array;
    }
    private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;
    public static final Object UNSET = new Object();

如上new了一个32个元素的数组,并且用UNSET去填充它;

接着回去继续看io.netty.util.concurrent.FastThreadLocal#set(V),看一下

     setKnownNotUnset(threadLocalMap, value);的代码:
    /**
     * @see InternalThreadLocalMap#setIndexedVariable(int, Object).
     */
    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        if (threadLocalMap.setIndexedVariable(index, value)) {
            addToVariablesToRemove(threadLocalMap, this);
        }
    }
    
    
    private final int index;
    
    public FastThreadLocal() {
        index = InternalThreadLocalMap.nextVariableIndex();
    }
    
    
    @SuppressWarnings("unchecked")
    private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        Set<FastThreadLocal<?>> variablesToRemove;
        if (v == InternalThreadLocalMap.UNSET || v == null) {
            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
        } else {
            variablesToRemove = (Set<FastThreadLocal<?>>) v;
        }
    
        variablesToRemove.add(variable);
    }

如上代码,我们看到每个FastThreadLocal都会维护一个全局唯一的index;

    InternalThreadLocalMap中的相关代码:
    /**
     * @return {@code true} if and only if a new thread-local variable has been created
     */
    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object oldValue = lookup[index];
            lookup[index] = value;
            return oldValue == UNSET;
        } else {
            expandIndexedVariableTableAndSet(index, value);
            return true;
        }
    }
    
    private void expandIndexedVariableTableAndSet(int index, Object value) {
        Object[] oldArray = indexedVariables;
        final int oldCapacity = oldArray.length;
        int newCapacity = index;
        newCapacity |= newCapacity >>>  1;
        newCapacity |= newCapacity >>>  2;
        newCapacity |= newCapacity >>>  4;
        newCapacity |= newCapacity >>>  8;
        newCapacity |= newCapacity >>> 16;
        newCapacity ++;
    
        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
        newArray[index] = value;
        indexedVariables = newArray;
    }

如上我们看到,FastThreadLocal会调用InternalThreadLocalMap的setIndexedVariable的方法,把对象value存入存入InternalThreadLocalMap中的数组的index这个位置;

并且如果index超过了数组的大小,数组会扩大2倍,以保证value可以存入index这个位置。

并且我们还可以看到InternalThreadLocalMap中的数组的第一个位置维护了一个Set,这个Set存的是FastThreadLocal。如果FastThreadLocal第一次在线程中存放变量则会被放入其中。

其中用到了IdentityHashMap这个和hashMap的区别是说,IdentityHashMap用的是引用相等去比较key,而HashMap用的equals。

1.2 get方法

1.2.1 方法签名

    @SuppressWarnings("unchecked")
    public final V get()

1.2.2 代码

    
    /**
     * Returns the current value for the current thread
     */
    @SuppressWarnings("unchecked")
    public final V get() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }
        return initialize(threadLocalMap);
    }
    public Object indexedVariable(int index) {
        Object[] lookup = indexedVariables;
        return index < lookup.length? lookup[index] : UNSET;
    }
    
    private V initialize(InternalThreadLocalMap threadLocalMap) {
        V v = null;
        try {
            v = initialValue();
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }
    
        threadLocalMap.setIndexedVariable(index, v);
        addToVariablesToRemove(threadLocalMap, this);
        return v;
    }
    
    
    
    /**
     * Returns the initial value for this thread-local variable.
     */
    protected V initialValue() throws Exception {
        return null;
    }

如上我们看到如果线程中的InternalThreadLocalMap的数组,相应的位置的值为UNSET或者数组的长度还没有index大,则表示的是目前FastThreadLocal还没有在当前的FastThreadLocalThread去放置对象,

这个会调用initialize去初始化对象,当然默认的只是是返回null,同时当前的FastThreadLocal会被加入到VariablesToRemove集合中。

1.3 remove

1.3.1 方法签名

    public final void remove()

1.3.2 代码

    /**
     * Sets the value to uninitialized for the specified thread local map.
     * After this, any subsequent call to get() will trigger a new call to initialValue().
     */
    public final void remove() {
        remove(InternalThreadLocalMap.getIfSet());
    }
    
    
    /**
     * Sets the value to uninitialized for the specified thread local map.
     * After this, any subsequent call to get() will trigger a new call to initialValue().
     * The specified thread local map must be for the current thread.
     */
    @SuppressWarnings("unchecked")
    public final void remove(InternalThreadLocalMap threadLocalMap) {
        if (threadLocalMap == null) {
            return;
        }
    
        Object v = threadLocalMap.removeIndexedVariable(index);
        removeFromVariablesToRemove(threadLocalMap, this);
    
        if (v != InternalThreadLocalMap.UNSET) {
            try {
                onRemoval((V) v);
            } catch (Exception e) {
                PlatformDependent.throwException(e);
            }
        }
    }
    
    
    private static void removeFromVariablesToRemove(
            InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    
        if (v == InternalThreadLocalMap.UNSET || v == null) {
            return;
        }
    
        @SuppressWarnings("unchecked")
        Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
        variablesToRemove.remove(variable);
    }
    
    
    protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }
    InternalThreadLocalMap中的相应代码:
    public static InternalThreadLocalMap getIfSet() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return ((FastThreadLocalThread) thread).threadLocalMap();
        }
        return slowThreadLocalMap.get();
    }
    
    
    public Object removeIndexedVariable(int index) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object v = lookup[index];
            lookup[index] = UNSET;
            return v;
        } else {
            return UNSET;
        }
    }

如上,可以看到我们可以通过覆写onRemoval,来监听哪些对象被删掉了,同时将VariablesToRemove set中删掉当前FastThreadLocal。

2 FastThreadLocalThread

    
    public class FastThreadLocalThread extends Thread {
        // This will be set to true if we have a chance to wrap the Runnable.
        private final boolean cleanupFastThreadLocals;
    
        private InternalThreadLocalMap threadLocalMap;
    //... 以下省略,因为都是get set和构造函数

3 QA

3.1 为什么要维护VariablesToRemove 这个set?

答:我们已经将FastThreadLocal和FastThreadLocalThread的核心代码都看完了,你感觉和传统的jdk的ThreadLocal相比还少了什么?对了还少了弱引用这个元素。

传统的ThrealLocal的

.ThreadLocalMap中的Entry是一个软引用,指向的是ThreadLocal,它能保证说如果一个ThreadLocal已经全局弱可达了,那么它可以保证这个ThreadLocal对象和ThreadLocal在所有线程所创建的对象,都被及时被回收掉。

[背景知识——java 引用总结][java]。

所以我们猜想VariablesToRemove 这个set也是用来干这个工作的。

然后我们开设来验证我们的这个猜想看成立不。

我们先回顾下,FastThreadLocal什么时候会被加入到VariablesToRemove中:

  • FastThreadLocal第一次在线程中存放变量则会被放入其中。 *``` initialize的时候

FastThreadLocal什么时候从VariablesToRemove中被删除:

remove的时候

因此,我们就看到了VariablesToRemove set的语义了,它存放到是当前有在线程中存放对象的FastThreadLocal的集合。

然后我们看一下我们漏掉的一个点FastThreadLocalThread中的变量cleanupFastThreadLocals,

看一下它怎么起作用的
public FastThreadLocalThread(Runnable target) {
    super(FastThreadLocalRunnable.wrap(target));
    cleanupFastThreadLocals = true;
}

看一下FastThreadLocalRunnable的代码:
final class FastThreadLocalRunnable implements Runnable {
    private final Runnable runnable;

    private FastThreadLocalRunnable(Runnable runnable) {
        this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
    }

    @Override
    public void run() {
        try {
            runnable.run();
        } finally {
            FastThreadLocal.removeAll();
        }
    }

    static Runnable wrap(Runnable runnable) {
        return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
    }
}

看下FastThreadLocal.removeAll();
public static void removeAll() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
    if (threadLocalMap == null) {
        return;
    }

    try {
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        if (v != null && v != InternalThreadLocalMap.UNSET) {
            @SuppressWarnings("unchecked")
            Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
            FastThreadLocal<?>[] variablesToRemoveArray =
                    variablesToRemove.toArray(new FastThreadLocal[0]);
            for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                tlv.remove(threadLocalMap);
            }
        }
    } finally {
        InternalThreadLocalMap.remove();
    }
}


@SuppressWarnings("unchecked")
public final void remove(InternalThreadLocalMap threadLocalMap) {
    if (threadLocalMap == null) {
        return;
    }

    Object v = threadLocalMap.removeIndexedVariable(index);
    removeFromVariablesToRemove(threadLocalMap, this);

    if (v != InternalThreadLocalMap.UNSET) {
        try {
            onRemoval((V) v);
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }
    }
}
InternalThreadLocalMap.remove();
public static void remove() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        ((FastThreadLocalThread) thread).setThreadLocalMap(null);
    } else {
        slowThreadLocalMap.remove();
    }
}
如上我们看到FastThreadLocal.removeAll();是将线程中的InternalThreadLocalMap的内容都清掉了。

所以回到开头的问题:为什么要维护VariablesToRemove 这个set?

是因为在Runnable结束的时候,我们要清掉这个任务在线程中留下的所有的FastThreadLocal的对象。

所以一开始我们猜想是不对的,维护VariablesToRemove 这个set只是在Runnable结束的时候要清掉所有的FastThreadLocal的对象而已。和软引用没有关系。


[java]: https://link.juejin.cn?target=