0%

JDK ThreadLocal

ThreadLocal 又称线程本地变量,使用其能够将数据封闭在各自的线程中,每一个 ThreadLocal 能够存放一个线程级别的变量且它本身能够被多个线程共享使用,并且又能达到线程安全的目的,且绝对线程安全。本文主要介绍了 ThreadLocal、InheritableThreadLocal、FastThreadLocal、TransmittableThreadLocal 四种 ThreadLocal 的原理和应用方式。

ThreadLocal

数据结构

简单来说,ThreadLocal 使用的以 WeakReference 引用为键的 HASH 表进行存储。

关于引用的详解《JDK ReferenceObject》

/**
 * ThreadLocalMap is a customized hash map suitable only for
 * maintaining thread local values. No operations are exported
 * outside of the ThreadLocal class. The class is package private to
 * allow declaration of fields in class Thread.  To help deal with
 * very large and long-lived usages, the hash table entries use
 * WeakReferences for keys. However, since reference queues are not
 * used, stale entries are guaranteed to be removed only when
 * the table starts running out of space.
 */
static class ThreadLocalMap {

    /**
     * The entries in this hash map extend WeakReference, using
     * its main ref field as the key (which is always a
     * ThreadLocal object).  Note that null keys (i.e. entry.get()
     * == null) mean that the key is no longer referenced, so the
     * entry can be expunged from table.  Such entries are referred to
     * as "stale entries" in the code that follows.
     */
    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
}

ThreadLocalMap 是 ThreadLocal 的静态内部类,当一个线程有多个 ThreadLocal 时,需要其来管理多个 ThreadLocal。

当发生 HASH 碰撞时,则 +1 向后寻址,直到找到空位置或垃圾回收位置进行存储。

if (key != null) {
    Entry c = new Entry(key, value);
    int h = key.threadLocalHashCode & (len - 1);
    while (table[h] != null)
        h = nextIndex(h, len);
    table[h] = c;
    size++;
}

当按 HASH 值寻找键时,也采用 MISS 后向后查找的思路。

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;

    while (e != null) {
        ThreadLocal<?> k = e.get();
        if (k == key)
            return e;
        if (k == null)
            expungeStaleEntry(i);
        else
            i = nextIndex(i, len);
        e = tab[i];
    }
    return null;
}

散列算法

因为使用的是 HASH 表进行储存,则一定会通过散列算法减少 HASH 碰撞。ThreadLocal 使用的是斐波那契(Fibonacci)散列法,本质上是一个乘数散列法。

斐波那契数列

意大利数学家斐波那契,提出了一个著名的“兔子数列”,该数列从第 3 个数起,后面的每个数都是它前面那两个数的和。如果把斐波那契数列的任何一项除以前一项,将会得到一个比值极限约为0.618,俗称黄金分割点,因此斐波纳契数列又称黄金分割数列,用数列{Fn}表示,则有:

对闭区间[a, b]上的单峰函数f(t),按相邻两斐波那契数之比,使用对称规则进行搜索的方法。其特点是:逐步缩短所考察的区间,以尽量少的函数求值次数,达到预定的某一缩短率。

计算黄金分割最简单的方法,是计算斐波契数列1,1,2,3,5,8,13,21,…后二数之比 2/3,3/5,5/8,8/13,13/21,…近似值的。

神秘的数字 0x61c88647

ThreadLocal 执行设置元素时,通过以下算式计算哈希值的代码;

private static final int HASH_INCREMENT = 0x61c88647;

private static int nextHashCode() {
    return nextHashCode.getAndAdd(HASH_INCREMENT);
}

其实这是一个哈希值的黄金分割点,也就是 0.618

由于 int 类型占 32 位,,因此用2 ^ 32 * 0.6180339887,得到的结果是:-1640531527,也就是 16 进制的0x61c88647

散列计算方式

通过 Key 对 threadLocalHashCode 值的保存,当需要获取 Table 的下标时直接取出计算即可。

int i = key.threadLocalHashCode & (table.length - 1);

扩容机制

扩容条件

//只要使用到数组结构,就一定会有扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
    rehash();
  • 首先,进行启发式清理 cleanSomeSlots,把过期元素清理掉,看空间是否
  • 之后,判断sz >= threshold,其中 threshold = len * 2 / 3,也就是说数组中天填充的元素,大于 len * 2 / 3,就需要扩容了。
  • 最后,就是我们要分析的重点,rehash();,扩容重新计算元素位置。

探测式清理和校验

private void rehash() {
    expungeStaleEntries();
    
    // Use lower threshold for doubling to avoid hysteresis
    if (size >= threshold - threshold / 4)
        resize();
}

private void expungeStaleEntries() {
    Entry[] tab = table;
    int len = tab.length;
    for (int j = 0; j < len; j++) {
        Entry e = tab[j];
        if (e != null && e.get() == null)
            expungeStaleEntry(j);
    }
}
  • 这部分是主要是探测式清理过期元素,以及判断清理后是否满足扩容条件,size >= threshold * 3/4
  • 满足后执行扩容操作,其实扩容完的核心操作就是重新计算哈希值,把元素填充到新的数组中。

rehash扩容

private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;
    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == null) {
                e.value = null; // Help the GC
            } else {
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }
    setThreshold(newLen);
    size = count;
    table = newTab;
}

扩容的整体操作,具体包括如下步骤;

  1. 首先把数组长度扩容到原来的2倍,oldLen * 2,实例化新数组。
  2. 遍历for,所有的旧数组中的元素,重新放到新数组中。
  3. 在放置数组的过程中,如果发生哈希碰撞,则链式法顺延。
  4. 同时这还有检测key值的操作 if (k == null),方便GC。

清理机制

探测式清理(expungeStaleEntry)

探测式清理,是以当前遇到的 GC 元素开始,向后不断的清理。直到遇到 null 为止,才停止 rehash 计算。

expungeStaleEntry

private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    // expunge entry at staleSlot
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;
    // Rehash until we encounter null
    Entry e;
    int i;
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            int h = k.threadLocalHashCode & (len - 1);
            if (h != i) {
                tab[i] = null;
                // Unlike Knuth 6.4 Algorithm R, we must scan until
                // null because multiple entries could have been stale.
                while (tab[h] != null)
                    h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    return i;
}

探测式清理在获取、设置元素中使用到; new ThreadLocal<>().get() -> map.getEntry(this) -> getEntryAfterMiss(key, i, e) -> expungeStaleEntry(i)

启发式清理(cleanSomeSlots)

启发式清理大概意思是;试探的扫描一些单元格,寻找过期元素,也就是被垃圾回收的元素。当添加新元素或删除另一个过时元素时,将调用此函数。它执行对数扫描次数,作为不扫描(快速但保留垃圾)和与元素数量成比例的扫描次数之间的平衡,这将找到所有垃圾,但会导致一些插入花费O(n)时间。

/**
    * Heuristically scan some cells looking for stale entries.
    * This is invoked when either a new element is added, or
    * another stale one has been expunged. It performs a
    * logarithmic number of scans, as a balance between no
    * scanning (fast but retains garbage) and a number of scans
    * proportional to number of elements, that would find all
    * garbage but would cause some insertions to take O(n) time.
    *
    * @param i a position known NOT to hold a stale entry. The
    * scan starts at the element after i.
    *
    * @param n scan control: {@code log2(n)} cells are scanned,
    * unless a stale entry is found, in which case
    * {@code log2(table.length)-1} additional cells are scanned.
    * When called from insertions, this parameter is the number
    * of elements, but when from replaceStaleEntry, it is the
    * table length. (Note: all this could be changed to be either
    * more or less aggressive by weighting n instead of just
    * using straight log n. But this version is simple, fast, and
    * seems to work well.)
    *
    * @return true if any stale entries have been removed.
    */
private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
        i = nextIndex(i, len);
        Entry e = tab[i];
        if (e != null && e.get() == null) {
            n = len;
            removed = true;
            i = expungeStaleEntry(i);
        }
    } while ( (n >>>= 1) != 0);
    return removed;
}

内存泄露和避免

内存泄漏的原因

实线箭头表示强引用,虚线箭头表示弱引用

从上图中可以看出,ThreadLocalMap 使用 ThreadLocal 的弱引用作为 Key,如果一个 ThreadLocal 不存在外部强引用时,Key(ThreadLocal)势必会被 GC 回收,这样就会导致 ThreadLocalMap 中 Key 为 null,而 Value 还存在着强引用,只有 Thread 线程退出以后,Value 的强引用链条才会断掉。

但如果当前线程再迟迟不结束的话,这些 Key 为 null 的 Entry 的 Value 就会一直存在一条强引用链:

Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value

永远无法回收,造成内存泄漏。

由于 Thread 中包含变量 ThreadLocalMap,因此 ThreadLocalMap 与 Thread 的生命周期是一样长的,如果一直没有删除对应 Key,就会导致内存泄漏。

Key使用弱引用的原因

Key使用强引用的后果

当 ThreadLocalMap 的 Key 为强引用。回收 ThreadLocal 时,因为 ThreadLocalMap 还持有 ThreadLocal 的强引用,如果没有手动删除,ThreadLocal 不会被回收,导致 Entry 内存泄漏。

Key使用弱引用的好处

当 ThreadLocalMap 的 Key 为弱引用回收 ThreadLocal 时,由于 ThreadLocalMap 持有 ThreadLocal 的弱引用,即使没有手动删除,ThreadLocal 也会被回收。当 Key 为 null,在下一次 ThreadLocalMap 调用set()get()remove()方法的时候会被清除 Value 值。

正确的使用方法

  • 每次使用完 ThreadLocal 都调用它的remove()方法清除数据
  • 将 ThreadLocal 变量定义成private static,这样就一直存在 ThreadLocal 的强引用,也就能保证任何时候都能通过 ThreadLocal 的弱引用访问到 Entry 的 Value 值,进而清除掉 。

InheritableThreadLocal

解决了 ThreadLocal 不能获取父线程的 ThreadLocal 内容的问题,也就是解决了父子间传递。

线程在创建时,将父类的 inheritableThreadLocals 深拷贝了一份。

//----成员变量
/*
 * InheritableThreadLocal values pertaining to this thread. This map is
 * maintained by the InheritableThreadLocal class.
 */
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
//----函数
/**
 * Initializes a Thread.
 */
private void init(ThreadGroup g, Runnable target, String name,
                  long stackSize, AccessControlContext acc) {
   
    Thread parent = currentThread();
    //....省略部分代码
    if (parent.inheritableThreadLocals != null)
        //创建线程时,如果拥有可继承的 ThreadLocals 的话,自动创建
        this.inheritableThreadLocals =
            ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
    //....省略部分代码
}
/**
 * Factory method to create map of inherited thread locals.
 * Designed to be called only from Thread constructor.
 *
 * @param  parentMap the map associated with parent thread
 * @return a map containing the parent's inheritable bindings
 */
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
    return new ThreadLocalMap(parentMap);
}
/**
 * Construct a new map including all Inheritable ThreadLocals
 * from given parent map. Called only by createInheritedMap.
 *
 * @param parentMap the map associated with parent thread.
 */
private ThreadLocalMap(ThreadLocalMap parentMap) {
    Entry[] parentTable = parentMap.table;
    int len = parentTable.length;
    setThreshold(len);
    table = new Entry[len];

    for (int j = 0; j < len; j++) {
        Entry e = parentTable[j];
        if (e != null) {
            @SuppressWarnings("unchecked")
            ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
            if (key != null) {
                Object value = key.childValue(e.value);
                Entry c = new Entry(key, value);
                int h = key.threadLocalHashCode & (len - 1);
                while (table[h] != null)
                    h = nextIndex(h, len);
                table[h] = c;
                size++;
            }
        }
    }
}

当使用get()去获取时,根据对应的ThreadLocal<T>寻找期望值。

Netty FastThreadLocal

ThreadLocal 使用了线性探测的方式解决 Hash 冲突,由于经常存在 Hash 冲突,影响效率。FastThreadLocal 直接使用数组避免了 Hash 冲突的发生,具体做法是:每一个 FastThreadLocal 实例创建时,分配一个下标 index;每个 FastThreadLocal 都能获取到一个不重复的下标。当调用FastThreadLocal.get()方法获取值时,直接从数组获取返回,如return array[index]

NioEventLoopGroup的线程封装

NioEventLoopGroup 的父类 MultithreadEventExecutorGroup 的构造函数中对 Executor 的赋值语句如下:

protected MultithreadEventExecutorGroup(..., ThreadFactory threadFactory, ...) {
    this(...,hreadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), ...);
}

封装了所调用的线程,将普通 Runnable 装为 FastThreadLocalRunnable。并且通过单独封装的 FastThreadLocalThread 来调用执行。

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
public class DefaultThreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        //...
        return t;
    }

    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    }
}

到现在,我们知道 EventLoop 中的线程都是 FastThreadLocalThread 类型的。并且在此类型的线程中有一个属性叫做 InternalThreadLocalMap,该属性是为了实现 FastThreadLocal。

FTL的使用及资源释放

public static void main(String[] args) throws Exception {
    FastThreadLocal<String> local = new FastThreadLocal<String>(){
        @Override
        protected String initialValue() throws Exception {
            return "Init";
        }

        //移除回调方法
        @Override
        protected void onRemoval(String value) throws Exception {
            System.out.println("Value has be removed");
        }
    };

    new DefaultThreadFactory("FastThread Pool")
            .newThread(() -> {
                System.out.println(local.get());
            }).start();

    new FastThreadLocalThread(() -> {
        local.set("Modify");
        System.out.println(local.get());
    }).start();
}

这里 onRemoval 的回调是在 FastThreadLocalRunnable 体现的。

final class FastThreadLocalRunnable implements Runnable {
    private final Runnable runnable;

    @Override
    public void run() {
        try {
            runnable.run();
        } finally {
            FastThreadLocal.removeAll();
        }
    }

    static Runnable wrap(Runnable runnable) {
        return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
    }
}

//删除该线程下的所有FastThreadLocal
public static void removeAll() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
    if (threadLocalMap == null) {
        return;
    }

    try {
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        if (v != null && v != InternalThreadLocalMap.UNSET) {
            @SuppressWarnings("unchecked")
            Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
            FastThreadLocal<?>[] variablesToRemoveArray =
                variablesToRemove.toArray(new FastThreadLocal[0]);
            for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                tlv.remove(threadLocalMap);
            }
        }
    } finally {
        InternalThreadLocalMap.remove();
    }
}

由于线程池中的线程由于会被复用,所以线程池中的每一条线程在执行 Runnable 结束后,要清理掉其 InternalThreadLocalMap 和其内的 FastThreadLocal 信息,否则 InternalThreadLocalMap 信息还存储着上一次被使用时的信息;另外,假设这条线程不再被使用,但是这个线程有可能不会被销毁(与线程池的类型和配置相关),那么线程上的 FastThreadLocal 将发生资源泄露。

ThreadLocalMap封装实现

InternalThreadLocalMap 的父类是 UnpaddedInternalThreadLocalMap,该类中指定了保存数据的结构。

UnpaddedInternalThreadLocalMap主要属性

//SlowMap
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
//FastMap
static final AtomicInteger nextIndex = new AtomicInteger();
Object[] indexedVariables;

数组 indexedVariables 就是用来存储 ftl 的 value 的,使用下标的方式直接访问。nextIndex 在 ftl 实例创建时用来给每个 ftl 实例分配一个下标,slowThreadLocalMap 在线程不是 ftl 时使用到。由于是自增插入,因此不会有冲突产生。

InternalThreadLocalMap的主要属性

// 用于标识数组的槽位还未使用 
public static final Object UNSET = new Object();
/**
 * 用于标识ftl变量是否注册了cleaner
 * BitSet简要原理:
 * BitSet默认底层数据结构是一个long[]数组,开始时长度为1,即只有long[0],而一个long有64bit。
 * 当BitSet.set(1)的时候,表示将long[0]的第二位设置为true,即0000 0000 ... 0010(64bit),则long[0]==2
 * 当BitSet.get(1)的时候,第二位为1,则表示true;如果是0,则表示false
 * 当BitSet.set(64)的时候,表示设置第65位,此时long[0]已经不够用了,扩容成long[]来,进行存储
 *
 * 存储类似 {index:boolean} 键值对,用于防止一个FastThreadLocal多次启动清理线程
 * 将index位置的bit设为true,表示该InternalThreadLocalMap中对该FastThreadLocal已经启动了清理线程
 */
private BitSet cleanerFlags;	

private InternalThreadLocalMap() {
    super(newIndexedVariableTable());
}

private static Object[] newIndexedVariableTable() {
        Object[] array = new Object[32];
        Arrays.fill(array, UNSET);
        return array;
}

比较简单,newIndexedVariableTable()方法创建长度为 32 的数组,然后初始化为 UNSET,然后传给父类。之后 ftl 的值就保存到这个数组里面。注意,这里保存的直接是变量值,不是 entry,这是和 jdk ThreadLocal 不同的。。

Get方法实现分析

public final V get() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }

    return initialize(threadLocalMap);
}

public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        return fastGet((FastThreadLocalThread) thread);
    } else {
        return slowGet();
    }
}

// 直接从ftlt线程获取threadLocalMap
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
    if (threadLocalMap == null) {
        thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }
    return threadLocalMap;
}   
// 父类的类型为 JDK ThreadLocal 的静态属性,从该 threadLocal 获取 InternalThreadLocalMap
private static InternalThreadLocalMap slowGet() {
    ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
    InternalThreadLocalMap ret = slowThreadLocalMap.get();
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}

FTL的资源回收机制

对于 FTL 提供了三种回收机制:

  • 自动:使用 FTLT 执行一个被 FastThreadLocalRunnable wrap 的 Runnable 任务,在任务执行完毕后会自动进行 FTL 的清理。
  • 手动:FTL 和 InternalThreadLocalMap 都提供了 remove 方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用 FTL)手动进行调用,进行显式删除。

FTL在Netty中的使用

FTL 在 Netty 中最重要的使用,就是分配 ByteBuf。基本做法是:每个线程都分配一块内存(PoolArena),当需要分配 ByteBuf 时,线程先从自己持有的 PoolArena 分配,如果自己无法分配,再采用全局分配。但是由于内存资源有限,所以还是会有多个线程持有同一块 PoolArena 的情况。不过这种方式已经最大限度地减轻了多线程的资源竞争,提高程序效率。

具体的代码在 PoolByteBufAllocator 的内部类 PoolThreadLocalCache 中:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {

    @Override
    protected synchronized PoolThreadCache initialValue() {
        final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
        final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

        Thread current = Thread.currentThread();
        if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
          // PoolThreadCache即为各个线程持有的内存块的封装  
          return new PoolThreadCache(
                    heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                    DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
        }
        // No caching so just use 0 as sizes.
        return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
    }
}   

TransmittableThreadLocal

无法正确理解 TransmittableThreadLocal 的应用场景和作用,实现逻辑直接看代码也不是很能理清,在此占位。