ThreadLocal 又称线程本地变量,使用其能够将数据封闭在各自的线程中,每一个 ThreadLocal 能够存放一个线程级别的变量且它本身能够被多个线程共享使用,并且又能达到线程安全的目的,且绝对线程安全。本文主要介绍了 ThreadLocal、InheritableThreadLocal、FastThreadLocal、TransmittableThreadLocal 四种 ThreadLocal 的原理和应用方式。
ThreadLocal
数据结构
简单来说,ThreadLocal 使用的以 WeakReference 引用为键的 HASH 表进行存储。
关于引用的详解《JDK ReferenceObject》
/**
* ThreadLocalMap is a customized hash map suitable only for
* maintaining thread local values. No operations are exported
* outside of the ThreadLocal class. The class is package private to
* allow declaration of fields in class Thread. To help deal with
* very large and long-lived usages, the hash table entries use
* WeakReferences for keys. However, since reference queues are not
* used, stale entries are guaranteed to be removed only when
* the table starts running out of space.
*/
static class ThreadLocalMap {
/**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
}
ThreadLocalMap 是 ThreadLocal 的静态内部类,当一个线程有多个 ThreadLocal 时,需要其来管理多个 ThreadLocal。
当发生 HASH 碰撞时,则 +1 向后寻址,直到找到空位置或垃圾回收位置进行存储。
if (key != null) {
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
当按 HASH 值寻找键时,也采用 MISS 后向后查找的思路。
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
散列算法
因为使用的是 HASH 表进行储存,则一定会通过散列算法减少 HASH 碰撞。ThreadLocal 使用的是斐波那契(Fibonacci)散列法
,本质上是一个乘数散列法。
斐波那契数列
意大利数学家斐波那契,提出了一个著名的“兔子数列”,该数列从第 3 个数起,后面的每个数都是它前面那两个数的和。如果把斐波那契数列的任何一项除以前一项,将会得到一个比值极限约为0.618,俗称黄金分割点,因此斐波纳契数列又称黄金分割数列,用数列{Fn}
表示,则有:
对闭区间[a, b]
上的单峰函数f(t)
,按相邻两斐波那契数之比,使用对称规则进行搜索的方法。其特点是:逐步缩短所考察的区间,以尽量少的函数求值次数,达到预定的某一缩短率。
计算黄金分割最简单的方法,是计算斐波契数列1,1,2,3,5,8,13,21,…后二数之比 2/3,3/5,5/8,8/13,13/21,…近似值的。
神秘的数字 0x61c88647
ThreadLocal
执行设置元素时,通过以下算式计算哈希值的代码;
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
其实这是一个哈希值的黄金分割点,也就是 0.618
由于 int 类型占 32 位,,因此用2 ^ 32 * 0.6180339887
,得到的结果是:-1640531527
,也就是 16 进制的0x61c88647
。
散列计算方式
通过 Key 对 threadLocalHashCode 值的保存,当需要获取 Table 的下标时直接取出计算即可。
int i = key.threadLocalHashCode & (table.length - 1);
扩容机制
扩容条件
//只要使用到数组结构,就一定会有扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
- 首先,进行
启发式清理 cleanSomeSlots
,把过期元素清理掉,看空间是否 - 之后,判断
sz >= threshold
,其中threshold = len * 2 / 3
,也就是说数组中天填充的元素,大于len * 2 / 3
,就需要扩容了。 - 最后,就是我们要分析的重点,
rehash();
,扩容重新计算元素位置。
探测式清理和校验
private void rehash() {
expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis
if (size >= threshold - threshold / 4)
resize();
}
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}
- 这部分是主要是探测式清理过期元素,以及判断清理后是否满足扩容条件,size >= threshold * 3/4
- 满足后执行扩容操作,其实扩容完的核心操作就是重新计算哈希值,把元素填充到新的数组中。
rehash扩容
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab;
}
扩容的整体操作,具体包括如下步骤;
- 首先把数组长度扩容到原来的2倍,
oldLen * 2
,实例化新数组。 - 遍历for,所有的旧数组中的元素,重新放到新数组中。
- 在放置数组的过程中,如果发生哈希碰撞,则链式法顺延。
- 同时这还有检测key值的操作
if (k == null)
,方便GC。
清理机制
探测式清理(expungeStaleEntry)
探测式清理,是以当前遇到的 GC 元素开始,向后不断的清理。直到遇到 null 为止,才停止 rehash 计算。
expungeStaleEntry
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
探测式清理在获取、设置元素中使用到; new ThreadLocal<>().get() -> map.getEntry(this) -> getEntryAfterMiss(key, i, e) -> expungeStaleEntry(i)
启发式清理(cleanSomeSlots)
启发式清理大概意思是;试探的扫描一些单元格,寻找过期元素,也就是被垃圾回收的元素。当添加新元素或删除另一个过时元素时,将调用此函数。它执行对数扫描次数,作为不扫描(快速但保留垃圾)和与元素数量成比例的扫描次数之间的平衡,这将找到所有垃圾,但会导致一些插入花费O(n)时间。
/**
* Heuristically scan some cells looking for stale entries.
* This is invoked when either a new element is added, or
* another stale one has been expunged. It performs a
* logarithmic number of scans, as a balance between no
* scanning (fast but retains garbage) and a number of scans
* proportional to number of elements, that would find all
* garbage but would cause some insertions to take O(n) time.
*
* @param i a position known NOT to hold a stale entry. The
* scan starts at the element after i.
*
* @param n scan control: {@code log2(n)} cells are scanned,
* unless a stale entry is found, in which case
* {@code log2(table.length)-1} additional cells are scanned.
* When called from insertions, this parameter is the number
* of elements, but when from replaceStaleEntry, it is the
* table length. (Note: all this could be changed to be either
* more or less aggressive by weighting n instead of just
* using straight log n. But this version is simple, fast, and
* seems to work well.)
*
* @return true if any stale entries have been removed.
*/
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
内存泄露和避免
内存泄漏的原因
实线箭头表示强引用,虚线箭头表示弱引用
从上图中可以看出,ThreadLocalMap 使用 ThreadLocal 的弱引用作为 Key,如果一个 ThreadLocal 不存在外部强引用时,Key(ThreadLocal)势必会被 GC 回收,这样就会导致 ThreadLocalMap 中 Key 为 null,而 Value 还存在着强引用,只有 Thread 线程退出以后,Value 的强引用链条才会断掉。
但如果当前线程再迟迟不结束的话,这些 Key 为 null 的 Entry 的 Value 就会一直存在一条强引用链:
Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value
永远无法回收,造成内存泄漏。
由于 Thread 中包含变量 ThreadLocalMap,因此 ThreadLocalMap 与 Thread 的生命周期是一样长的,如果一直没有删除对应 Key,就会导致内存泄漏。
Key使用弱引用的原因
Key使用强引用的后果
当 ThreadLocalMap 的 Key 为强引用。回收 ThreadLocal 时,因为 ThreadLocalMap 还持有 ThreadLocal 的强引用,如果没有手动删除,ThreadLocal 不会被回收,导致 Entry 内存泄漏。
Key使用弱引用的好处
当 ThreadLocalMap 的 Key 为弱引用回收 ThreadLocal 时,由于 ThreadLocalMap 持有 ThreadLocal 的弱引用,即使没有手动删除,ThreadLocal 也会被回收。当 Key 为 null,在下一次 ThreadLocalMap 调用set()
、get()
、remove()
方法的时候会被清除 Value 值。
正确的使用方法
- 每次使用完 ThreadLocal 都调用它的
remove()
方法清除数据 - 将 ThreadLocal 变量定义成
private static
,这样就一直存在 ThreadLocal 的强引用,也就能保证任何时候都能通过 ThreadLocal 的弱引用访问到 Entry 的 Value 值,进而清除掉 。
InheritableThreadLocal
解决了 ThreadLocal 不能获取父线程的 ThreadLocal 内容的问题,也就是解决了父子间传递。
线程在创建时,将父类的 inheritableThreadLocals 深拷贝了一份。
//----成员变量
/*
* InheritableThreadLocal values pertaining to this thread. This map is
* maintained by the InheritableThreadLocal class.
*/
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
//----函数
/**
* Initializes a Thread.
*/
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc) {
Thread parent = currentThread();
//....省略部分代码
if (parent.inheritableThreadLocals != null)
//创建线程时,如果拥有可继承的 ThreadLocals 的话,自动创建
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
//....省略部分代码
}
/**
* Factory method to create map of inherited thread locals.
* Designed to be called only from Thread constructor.
*
* @param parentMap the map associated with parent thread
* @return a map containing the parent's inheritable bindings
*/
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}
/**
* Construct a new map including all Inheritable ThreadLocals
* from given parent map. Called only by createInheritedMap.
*
* @param parentMap the map associated with parent thread.
*/
private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];
for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}
当使用get()
去获取时,根据对应的ThreadLocal<T>
寻找期望值。
Netty FastThreadLocal
ThreadLocal 使用了线性探测的方式解决 Hash 冲突,由于经常存在 Hash 冲突,影响效率。FastThreadLocal 直接使用数组避免了 Hash 冲突的发生,具体做法是:每一个 FastThreadLocal 实例创建时,分配一个下标 index;每个 FastThreadLocal 都能获取到一个不重复的下标。当调用FastThreadLocal.get()
方法获取值时,直接从数组获取返回,如return array[index]
。
NioEventLoopGroup的线程封装
NioEventLoopGroup 的父类 MultithreadEventExecutorGroup 的构造函数中对 Executor 的赋值语句如下:
protected MultithreadEventExecutorGroup(..., ThreadFactory threadFactory, ...) {
this(...,hreadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), ...);
}
封装了所调用的线程,将普通 Runnable 装为 FastThreadLocalRunnable。并且通过单独封装的 FastThreadLocalThread 来调用执行。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
public class DefaultThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
//...
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
}
到现在,我们知道 EventLoop 中的线程都是 FastThreadLocalThread 类型的。并且在此类型的线程中有一个属性叫做 InternalThreadLocalMap,该属性是为了实现 FastThreadLocal。
FTL的使用及资源释放
public static void main(String[] args) throws Exception {
FastThreadLocal<String> local = new FastThreadLocal<String>(){
@Override
protected String initialValue() throws Exception {
return "Init";
}
//移除回调方法
@Override
protected void onRemoval(String value) throws Exception {
System.out.println("Value has be removed");
}
};
new DefaultThreadFactory("FastThread Pool")
.newThread(() -> {
System.out.println(local.get());
}).start();
new FastThreadLocalThread(() -> {
local.set("Modify");
System.out.println(local.get());
}).start();
}
这里 onRemoval 的回调是在 FastThreadLocalRunnable 体现的。
final class FastThreadLocalRunnable implements Runnable {
private final Runnable runnable;
@Override
public void run() {
try {
runnable.run();
} finally {
FastThreadLocal.removeAll();
}
}
static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}
}
//删除该线程下的所有FastThreadLocal
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[0]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap);
}
}
} finally {
InternalThreadLocalMap.remove();
}
}
由于线程池中的线程由于会被复用,所以线程池中的每一条线程在执行 Runnable 结束后,要清理掉其 InternalThreadLocalMap 和其内的 FastThreadLocal 信息,否则 InternalThreadLocalMap 信息还存储着上一次被使用时的信息;另外,假设这条线程不再被使用,但是这个线程有可能不会被销毁(与线程池的类型和配置相关),那么线程上的 FastThreadLocal 将发生资源泄露。
ThreadLocalMap封装实现
InternalThreadLocalMap 的父类是 UnpaddedInternalThreadLocalMap,该类中指定了保存数据的结构。
UnpaddedInternalThreadLocalMap主要属性
//SlowMap
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
//FastMap
static final AtomicInteger nextIndex = new AtomicInteger();
Object[] indexedVariables;
数组 indexedVariables 就是用来存储 ftl 的 value 的,使用下标的方式直接访问。nextIndex 在 ftl 实例创建时用来给每个 ftl 实例分配一个下标,slowThreadLocalMap 在线程不是 ftl 时使用到。由于是自增插入,因此不会有冲突产生。
InternalThreadLocalMap的主要属性
// 用于标识数组的槽位还未使用
public static final Object UNSET = new Object();
/**
* 用于标识ftl变量是否注册了cleaner
* BitSet简要原理:
* BitSet默认底层数据结构是一个long[]数组,开始时长度为1,即只有long[0],而一个long有64bit。
* 当BitSet.set(1)的时候,表示将long[0]的第二位设置为true,即0000 0000 ... 0010(64bit),则long[0]==2
* 当BitSet.get(1)的时候,第二位为1,则表示true;如果是0,则表示false
* 当BitSet.set(64)的时候,表示设置第65位,此时long[0]已经不够用了,扩容成long[]来,进行存储
*
* 存储类似 {index:boolean} 键值对,用于防止一个FastThreadLocal多次启动清理线程
* 将index位置的bit设为true,表示该InternalThreadLocalMap中对该FastThreadLocal已经启动了清理线程
*/
private BitSet cleanerFlags;
private InternalThreadLocalMap() {
super(newIndexedVariableTable());
}
private static Object[] newIndexedVariableTable() {
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
return array;
}
比较简单,newIndexedVariableTable()
方法创建长度为 32 的数组,然后初始化为 UNSET,然后传给父类。之后 ftl 的值就保存到这个数组里面。注意,这里保存的直接是变量值,不是 entry,这是和 jdk ThreadLocal 不同的。。
Get方法实现分析
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
return initialize(threadLocalMap);
}
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
// 直接从ftlt线程获取threadLocalMap
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
// 父类的类型为 JDK ThreadLocal 的静态属性,从该 threadLocal 获取 InternalThreadLocalMap
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
FTL的资源回收机制
对于 FTL 提供了三种回收机制:
- 自动:使用 FTLT 执行一个被 FastThreadLocalRunnable wrap 的 Runnable 任务,在任务执行完毕后会自动进行 FTL 的清理。
- 手动:FTL 和 InternalThreadLocalMap 都提供了 remove 方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用 FTL)手动进行调用,进行显式删除。
FTL在Netty中的使用
FTL 在 Netty 中最重要的使用,就是分配 ByteBuf。基本做法是:每个线程都分配一块内存(PoolArena),当需要分配 ByteBuf 时,线程先从自己持有的 PoolArena 分配,如果自己无法分配,再采用全局分配。但是由于内存资源有限,所以还是会有多个线程持有同一块 PoolArena 的情况。不过这种方式已经最大限度地减轻了多线程的资源竞争,提高程序效率。
具体的代码在 PoolByteBufAllocator 的内部类 PoolThreadLocalCache 中:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
// PoolThreadCache即为各个线程持有的内存块的封装
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
}
TransmittableThreadLocal
无法正确理解 TransmittableThreadLocal 的应用场景和作用,实现逻辑直接看代码也不是很能理清,在此占位。