1. 前言

Netty 自行封装了 FastThreadLocal 以替换 JDK 提供的 ThreadLocal。结合封装的 FastThreadLocalThread,在多线程环境下显著提高了变量的查询以及更新效率。

下文将通过对比 ThreadLocalFastThreadLocal,结合源码解析,探究 FastThreadLocalFastThreadLocalThread 搭配使用后性能提升的奥秘。

2. ThreadLocalMap

ThreadLocalMapThreadLocal 中定义的静态类,其作用是保存 Thread 中引用的 ThreadLocal 对象。

在 JDK 中,每一个 Thread 对象均包含以下两个变量:

public class Thread implements Runnable {

    // 此处省略若干代码

    // 存储 ThreadLocal 变量,通过每个 Thread 存储一个 ThreadLocalMap,实现了变量的线程隔离
    ThreadLocal.ThreadLocalMap threadLocals = null;

    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}

在编程实践中,线程中可能包含多个 ThreadLocal 引用,它们均保存在 ThreadLocal.ThreadLocalMap threadLocals 中。每个线程包含自己的 ThreadLocalMap,从而避免了多线程争用。

static class ThreadLocalMap {

    // 需要注意,此处 Entry 使用 WeakReference (弱引用)
    // 这样在资源紧张的时候可以回收部分不再引用的 ThreadLocal 变量
    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    
    // ThreadLocal 对象存储数组的初始化长度
    private static final int INITIAL_CAPACITY = 16;
    
    // ThreadLocal 对象存储数组
    private Entry[] table;
    
    // 初始化 ThreadLocalMap,使用数组存放 ThreadLocal 资源
    // 使用 ThreadLocal 对象的 threadLocalHashCode 进行 hash 得到索引
    // 此处使用对象数组存放 ThreadLocal 对象,操作类似于 HashMap
    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
        table = new Entry[INITIAL_CAPACITY];
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
        table[i] = new Entry(firstKey, firstValue);
        size = 1;
        setThreshold(INITIAL_CAPACITY);
    }
    
    // 获取 ThreadLocal 对象,此处需要根据 threadLocalHashCode 进行 hash 操作得到索引
    private Entry getEntry(ThreadLocal<?> key) {
        int i = key.threadLocalHashCode & (table.length - 1);
        Entry e = table[i];
        if (e != null && e.get() == key)
            return e;
        else
            return getEntryAfterMiss(key, i, e);
    }
}

由以上代码可知,在 ThreadLocalMap 初始化时,会创建一个对象数组。对象数组的初始长度为 16,在后续的扩张中,数组长度会保持在 2^n 级别,以便进行 hash 操作确定 ThreadLocal 对象的索引。

在每次获取 ThreadLocal 对象的时候,会根据对象的 threadLocalHashCode 与对象数组长度减一的求与值,确定对象索引,从而快速获取 value。

使用 hash 确定数组下标,存在以下几个问题:

  • 解决 hash 冲突;
  • 对象数组扩容带来的 rehash。

ThreadLocal 是 JDK 提供的通用类,在大部分场景下,线程中的 ThreadLocal 变量较少,因此 hash 冲突以及 rehash 较少。即使偶尔发生 hash 冲突以及 rehash,通常也不会给应用程序带来较大的性能损耗。

3. FastThreadLocalThread

Netty 对 ThreadLocal 改造为 FastThreadLocal,以应对自身大并发量、数据吞吐量大的应用场景。为了更好的使用,Netty 继承 Thread 构建了 FastThreadLocalThread

当且仅当 FastThreadLocalFastThreadLocalThread 合并使用时,方能真正起到提速的作用。

// 限于篇幅,省略较多函数
public class FastThreadLocalThread extends Thread {

    // 相对于 Thread 中使用 ThreadLocal.ThreadLocalMap 存放资源
    // FastThreadLocalThread 使用 InternalThreadLocalMap 存放资源
    private InternalThreadLocalMap threadLocalMap;

    public final InternalThreadLocalMap threadLocalMap() {
        return threadLocalMap;
    }

    public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
        this.threadLocalMap = threadLocalMap;
    }
    
    @UnstableApi
    public boolean willCleanupFastThreadLocals() {
        return cleanupFastThreadLocals;
    }

    @UnstableApi
    public static boolean willCleanupFastThreadLocals(Thread thread) {
        return thread instanceof FastThreadLocalThread &&
                ((FastThreadLocalThread) thread).willCleanupFastThreadLocals();
    }
}

由以上代码可以看出,相对于 ThreadFastThreadLocalThread 添加了 threadLocalMap 对象,以及 threadLocalMap 的清理标志获取函数。

ThreadLocal 即使使用了 WeakReference 以保证资源释放,但仍可能存在内存泄漏。FastThreadLocalThreadFastThreadLocal 均为 Netty 定制,可以在线程任务执行后,强制执行 InternalThreadLocalMap 的清理函数 removeAll(详情见下文)。

4. FastThreadLocal

4.1 InternalThreadLocalMap

前情提要:FastThreadLocalThread 中声明了 InternalThreadLocalMap 对象 threadLocalMap

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    
}

从以上代码可知,InternalThreadLocalMap 继承于 UnpaddedInternalThreadLocalMap。因此,我们需要先探究 UnpaddedInternalThreadLocalMap 的定义。

class UnpaddedInternalThreadLocalMap {

    // 如果在 Thread 中使用 FastThreadLocal,则实际上使用 ThreadLocal 存放资源
    static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
    
    // 资源索引,每一个 FastThreadLocal 对象都会有对应的 ID,即通过 nextIndex 自增得到
    static final AtomicInteger nextIndex = new AtomicInteger();

    // FastThreadLocal 的资源存放地址
    // ThreadLocal 中是通过 ThreadLocalMap 存放资源,索引是 ThreadLocal 对象的 threadLocalHashCode 进行 hash 得到
    // FastThreadLocal 使用 Object[] 数组,使用通过 nextIndex 自增得到的数值作为索引,保证每次查询数值都是 O(1) 操作
    // 需要注意,FastThreadLocal 对象为了避免伪共享带来的性能损耗,使用 padding 使得对象大小超过 128 byte
    // 避免伪共享的情况下,indexedVariables 的多个连续数值在不更新的前提下可以被缓存至 CPU Cache Line 中,大大提高查询效率
    Object[] indexedVariables;

    // Core thread-locals
    int futureListenerStackDepth;
    int localChannelReaderStackDepth;
    Map<Class<?>, Boolean> handlerSharableCache;
    IntegerHolder counterHashCode;
    ThreadLocalRandom random;
    Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache;
    Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache;

    // String-related thread-locals
    StringBuilder stringBuilder;
    Map<Charset, CharsetEncoder> charsetEncoderCache;
    Map<Charset, CharsetDecoder> charsetDecoderCache;

    // ArrayList-related thread-locals
    ArrayList<Object> arrayList;

    // 构造函数,后续需要关注
    UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
        this.indexedVariables = indexedVariables;
    }
}

以上代码中,需要注意 slowThreadLocalMap 的声明:

static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();

声明 slowThreadLocalMap 的原因在于,用户可能在 Thread 而非 FastThreadLocalThread 中调用 FastThreadLocal。因此,为了保证程序的兼容性,声明此变量保存普通的 ThreadLocal 相关变量。

以下是 InternalThreadLocalMap 的主要实现:

// 出于篇幅考虑,删除部分函数
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {

    private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8;
    
    // 资源未赋值变量
    public static final Object UNSET = new Object();

    // 获取 ThreadLocal 对象,此处会判断当前调用线程的类型分别调用不同的资源
    public static InternalThreadLocalMap getIfSet() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return ((FastThreadLocalThread) thread).threadLocalMap();
        }
        return slowThreadLocalMap.get();
    }

    // 获取 ThreadLocal 对象,此处会判断当前调用线程的类型,从而判断调用 fastGet 或是 slowGet
    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        } else {
            return slowGet();
        }
    }

    // 如果当前调用 FastThreadLocal 对象的是 FastThreadLocalThread
    // 则调用 FastThreadLocalThread 的 threadLocalMap 对象获取相关资源
    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }

    // 如果当前调用 FastThreadLocal 对象的是 Thread
    // 则调用 slowThreadLocalMap 对象获取相关资源 (slowThreadLocalMap 其实是调用 JDK 提供的 ThreadLocalMap)
    private static InternalThreadLocalMap slowGet() {
        ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
        InternalThreadLocalMap ret = slowThreadLocalMap.get();
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }

    // 保证 FastThreadLocal 的实体对象大小超过 128 byte,以避免伪共享发生
    // 如果资源能够避免伪共享,则 FastThreadLocal 的实体对象能够部分缓存至 L1 缓存
    // 通过提高缓存命中率加快查询速度 (查询 L1 缓存的速度要远快于查询主存速度)
    public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;

    private InternalThreadLocalMap() {
        super(newIndexedVariableTable());
    }

    // 初始化资源,初始化的长度为 32,并初始化为 UNSET
    private static Object[] newIndexedVariableTable() {
        Object[] array = new Object[32];
        Arrays.fill(array, UNSET);
        return array;
    }
}

对于使用者来说,需要关注以下几个函数:getIfSet()get()fastGet()slowGet()。存在以下两种情况:

  1. Thread 中调用 FastThreadLocal
  2. FastThreadLocalThread 中调用 FastThreadLocal

因为存在以上两种调用场景,在获取 InternalThreadLocalMap 时,会使用 instanceof 进行判断:

if (thread instanceof FastThreadLocalThread) {
    // 对应 fastGet 等操作
} else {
    // 对应 slowGet 等操作
}
  • 如果调用线程是 Thread:调用 UnpaddedInternalThreadLocalMap 中的 slowThreadLocalMap 变量;
  • 如果调用线程是 FastThreadLocalThread:调用 FastThreadLocalThread 中的 threadLocalMap 变量。

因为 InternalThreadLocalMap 构造函数为私有函数,所以在 getIfSet/fastGet 函数中均是获取 FastThreadLocalThreadthreadLocalMap 变量。若变量为空,则调用私有构造函数进行赋值操作。

// Cache line padding (must be public)
// With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;

private InternalThreadLocalMap() {
    super(newIndexedVariableTable());
}

private static Object[] newIndexedVariableTable() {
    Object[] array = new Object[32];
    Arrays.fill(array, UNSET);
    return array;
}

构造函数会创建一个 Object 数组(初始化长度为 32),并逐个初始化数值为 UNSET,为后续的赋值操作提供判断依据(详见 removeIndexedVariable 以及 isIndexedVariableSet 函数)。

Tips:
构造函数存在一段代码 public long rp1, ... rp9;。此段代码无实际实用意义,其存在是为了保证 InternalThreadLocalMap 的实例大小超过 128 字节(以上 long 变量 72 字节,基类 UnpaddedInternalThreadLocalMap 亦存在若干变量)。

CPU Cache Line 的大小一般为 64 字节,变量的大小超过 128 byte,则会极大的减少伪共享情况。
(当前 Netty 的版本号是 4.1.38,InternalThreadLocalMap 的实例大小是 136 byte,这是因为在 Netty 的 4.0.33 版本后,引入了 cleanerFlags 以及 arrayList 变量,忘记去除 rp9 变量导致的)。
关于伪共享,可关注《JAVA 拾遗 — CPU Cache 与缓存行》一文。

4.2 FastThreadLocal 初始化

public class FastThreadLocal<V> {
    
    private final int index;

    // 原子变量自增,获取 ID,作为 FastThreadLocal 的存放索引
    public FastThreadLocal() {
        index = InternalThreadLocalMap.nextVariableIndex();
    }
    
    // 设置 FastThreadLocal 资源
    public final void set(V value) {
        if (value != InternalThreadLocalMap.UNSET) {
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            setKnownNotUnset(threadLocalMap, value);
        } else {
            // 如果设置的资源为 UNSET,则销毁当前 FastThreadLocal 对应的资源对象
            remove();
        }
    }
    
    // 设置资源,并将设置好的 FastThreadLocal 变量添加至待销毁资源列表中,待后续进行销毁操作
    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        if (threadLocalMap.setIndexedVariable(index, value)) {
            addToVariablesToRemove(threadLocalMap, this);
        }
    }
    
    // 根据 FastThreadLocal 初始化的 index,确定其在资源列表中的位置
    // 后续查询资源就可以根据索引快速确定位置
    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object oldValue = lookup[index];
            lookup[index] = value;
            return oldValue == UNSET;
        } else {
            expandIndexedVariableTableAndSet(index, value);
            return true;
        }
    }
    
    // 按照 2 的倍数,扩张资源池数组长度
    private void expandIndexedVariableTableAndSet(int index, Object value) {
        Object[] oldArray = indexedVariables;
        final int oldCapacity = oldArray.length;
        int newCapacity = index;
        newCapacity |= newCapacity >>>  1;
        newCapacity |= newCapacity >>>  2;
        newCapacity |= newCapacity >>>  4;
        newCapacity |= newCapacity >>>  8;
        newCapacity |= newCapacity >>> 16;
        newCapacity ++;

        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
        newArray[index] = value;
        indexedVariables = newArray;
    }
}

以上是 FastThreadLocal 的部分函数节选。由构造函数可知,FastThreadLocal 在初始化的时候,会使用 InternalThreadLocalMapnextVariableIndex 获取一个唯一 ID。

此 ID 为原子变量自增获取,后续对此变量的更新或者删除操作,均是通过此 index 进行操作。在设置变量的时候,存在 indexedVariables 空间不足的情况(初始化长度为 32),则会对此数组通过 expandIndexedVariableTableAndSet 进行扩容操作(>>> 为无符号右移,即若该数为正,则高位补 0,而若为负数,则右移后高位同样补 0)。通过这样的位移操作,每次数组均会乘 2(保持 2^n)。

因为使用常数索引 index,因此 Netty 中查询 FastThreadLocal 变量的速度为 O(1),扩容时采用 Arrays.copy 也很简单(相较于 JDK 的 ThreadLocal 的 rehash 操作)。

4.3 FastThreadLocal 变量获取及删除

public class FastThreadLocal<V> {

    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
    
    // 在线程执行完资源之后,需要根据业务场景,确定是否调用此函数以销毁线程中存在的 FastThreadLocal 资源
    public static void removeAll() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
        if (threadLocalMap == null) {
            return;
        }

        try {
            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
            if (v != null && v != InternalThreadLocalMap.UNSET) {
                @SuppressWarnings("unchecked")
                Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
                FastThreadLocal<?>[] variablesToRemoveArray =
                        variablesToRemove.toArray(new FastThreadLocal[0]);
                for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                    tlv.remove(threadLocalMap);
                }
            }
        } finally {
            // 实际上仅仅是将 FastThreadLocalThread 中的 threadLocalMap 置为 null
            // 或者是将 slowThreadLocalMap 销毁
            InternalThreadLocalMap.remove();
        }
    }
    
    @SuppressWarnings("unchecked")
    public final V get(InternalThreadLocalMap threadLocalMap) {
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }

        // 如果当前待获取资源为空,则进行初始操作,返回相应资源
        return initialize(threadLocalMap);
    }

    // 根据用户重载的 initialValue 函数,初始化待获取资源
    private V initialize(InternalThreadLocalMap threadLocalMap) {
        V v = null;
        try {
            v = initialValue();
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }

        threadLocalMap.setIndexedVariable(index, v);
        addToVariablesToRemove(threadLocalMap, this);
        return v;
    }
    
    // 将 FastThreadLocal 变量,添加至待删除的资源列表中
    @SuppressWarnings("unchecked")
    private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        Set<FastThreadLocal<?>> variablesToRemove;
        // 如果待删除资源列表为空,则初始化待删除资源列表 (Set)
        if (v == InternalThreadLocalMap.UNSET || v == null) {
            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
        } else {
            variablesToRemove = (Set<FastThreadLocal<?>>) v;
        }

        variablesToRemove.add(variable);
    }
    
    @SuppressWarnings("unchecked")
    public final void remove(InternalThreadLocalMap threadLocalMap) {
        if (threadLocalMap == null) {
            return;
        }

        Object v = threadLocalMap.removeIndexedVariable(index);
        removeFromVariablesToRemove(threadLocalMap, this);
    
        // FastThreadLocal 变量已经被赋值,则需要调用用户重载的 onRemoval 函数,销毁资源
        if (v != InternalThreadLocalMap.UNSET) {
            try {
                onRemoval((V) v);
            } catch (Exception e) {
                PlatformDependent.throwException(e);
            }
        }
    }
    
    // 确定资源的初始化函数 (如果用户不进行重载,则返回 null)
    protected V initialValue() throws Exception {
        return null;
    }

    // 用户需要重载此函数,以便销毁申请的资源
    protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }
}

用户在使用 FastThreadLocal 时,需要继承 initialValue 以及 onRemoval 函数(FastThreadLocal 对象的初始化及销毁交由用户控制)。

  • initialValue: 在获取 FastThreadLocal 对象时,若对象未设置,则调用 initialValue 初始化资源(get 等函数中判断对象为空,则调用 initialize 初始化资源);
  • onRemoval: 在 FastThreadLocal 更新对象或最终销毁资源时,调用 onRemoval 销毁资源(set 等函数中判断待设置对象已被设置过,则调用 onRemoval 销毁资源)。

以下是 Recycler 调用 FastThreadLocal 的使用示范(Recycler 是 Netty 的轻量级对象池):

this.threadLocal = new FastThreadLocal<Recycler.Stack<T>>() {
    protected Recycler.Stack<T> initialValue() {
        return new Recycler.Stack(Recycler.this, Thread.currentThread(), Recycler.this.maxCapacityPerThread, Recycler.this.maxSharedCapacityFactor, Recycler.this.ratioMask, Recycler.this.maxDelayedQueuesPerThread);
    }

    protected void onRemoval(Recycler.Stack<T> value) {
        if (value.threadRef.get() == Thread.currentThread() && Recycler.DELAYED_RECYCLED.isSet()) {
            ((Map)Recycler.DELAYED_RECYCLED.get()).remove(value);
        }
    }
};

需要注意,在 FastThreadLocal 中,存在一个静态变量 variablesToRemoveIndex,其作用是在对象池中占据一个固定位置,存放一个集合 Set<FastThreadLocal<?>> variablesToRemove

每次初始化变量的时候,均会将对应的 FastThreadLocal 存放至 variablesToRemove 中。在更新对象的时候(set 等函数)或者清理 FastThreadLocalThread 中的变量时(removeAll 函数),程序就会根据 variablesToRemove 进行相应的清理工作。

这样,用户在使用 FastThreadLocalThread 时,就无须花费过多的精力关注线程安全问题。在 Netty 中,线程池的生命周期较长,无需过多的关注内存清理;然而如果用户在线程池等场景使用 FastThreadLocalThread,就需要在执行完任务后,清理 FastThreadLocal 参数,以免对后续的业务产生影响。

总结

通过以上源码分析,可以得知 Netty 为了提升 ThreadLocal 性能,做了很多改善操作:

  1. 定制 FastThreadLocalThread 以及 FastThreadLocal
  2. 使用 padding 手段扩充 FastThreadLocal 的实例大小,避免伪共享;
  3. 使用原子变量自增获取的 ID 作为常数索引,优化查询速度至 O(1),避免了 hash 冲突以及扩容导致的 rehash 操作;
  4. 提供 initialValue 以及 onRemoval 函数,用户可以自行重载函数,实现 FastThreadLocal 资源的高度定制化操作;
  5. FastThreadLocal 对象数组的扩容(expandIndexedVariableTableAndSet)采用位操作,计算数组长度;
  6. 针对在 Thread 中调用 FastThreadLocal 以及在 FastThreadLocalThread 中调用 FastThreadLocal,分别采用不同的获取方式,增强了兼容性。

更多细节,读者可以自己参照源码进行进一步分析。

对于采用 Object[] 数组存放 FastThreadLocal 变量,是否存在牺牲空间换取性能,个人理解如下:

  • Netty 的默认启动线程是 2 * cpu core,也就是两倍 CPU 核数,且此线程组会在 Netty 的生命周期中持续存在。
  • Netty 不存在创建过多线程导致内存占用过多的现象(用户手动调节 Netty 的 boss group 以及 worker group 线程数量都会很慎重)。
  • 此外,Netty 中对于 FastThreadLocal 存在较大的读取以及更新需求量,确实存在优化 ThreadLocal 的需求。

因此,适当的浪费一些空间,换取查询和更新的性能提升,是恰当的操作。

说明:本文基于 Netty 4.1.38 版本源码进行分析,不同版本内部实现细节可能存在差异。