前言
ThreadLocal被ThreadLocalMap中的entry的key弱引用,如果出现GC的情况时,
没有被其他对象引用,会被回收,但是ThreadLocal对应的value却不会回收,容易造成内存泄漏,这也间接导致了内存溢出以及数据假丢失;
那么问题来了,有没有更高效的ThreadLocal有;
今天我们就来分析一波FastThreadLocalThread
一、FastThreadLocalThread源码分析
Netty为了在某些场景下提高性能,改进了jdk ThreadLocal,Netty实现的FastThreadLocal 优化了Java 原生 ThreadLocal 的访问速度,存储速度。避免了检测弱引用带来的 value 回收难问题,和数组位置冲突带来的线性查找问题,解决这些问题并不是没有代价;
Netty实现的 FastThreadLocal 底层也是通过数组存储 value 对象,与Java原生ThreadLocal使用自身作为Entry的key不同,FastThreadLocal通过保存数组的全局唯一下标,实现了对value的快速访问。同时FastThreadLocal 也实现了清理对象的方法;
1、FastThreadLocalThread
在Netty中,要使用 FastThreadLocal 实现线程本地变量需要将线程包装成 FastThreadLocalThread ,如果不是 FastThreadLocalThread ,会使用 slowThreadLocalMap的 ThreadLocal 来存储变量副本;
io.netty.util.concurrent.DefaultThreadFactory @Override public Thread newThread(Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); // 一般daemon为 false ,意思是不设置为守护线程 if (t.isDaemon() != daemon) { t.setDaemon(daemon); } // 优先级 默认为5 if (t.getPriority() != priority) { t.setPriority(priority); } return t; } protected Thread newThread(Runnable r, String name ) { return new FastThreadLocalThread(threadGroup, r, name ); }FastThreadLocalThread 继承自Thread类,有如下成员变量:
io.netty.util.concurrent.FastThreadLocalThread // 任务执行完,是否清除FastThreadLocal的标记 private final boolean cleanupFastThreadLocals; // 类似于Thread类中ThreadLocalMap,为了实现FastThreadLocal private InternalThreadLocalMap threadLocalMap;2、 InternalThreadLocalMap
FastThreadLocalThread.threadLocalMap 是 InternalThreadLocalMap 对象实例。在第一次获取FTL数据时,会初始化FastThreadLocalThread.threadLocalMap,调用的构造函数如下:
private InternalThreadLocalMap() { //为了简便,InternalThreadLocalMap父类 //UnpaddedInternalThreadLocalMap不展开介绍 super(newIndexedVariableTable()); } //默认的数组大小为32,且使用UNSET对象填充数组 //如果下标处数据为UNSET,则表示没有数据 private static Object[] newIndexedVariableTable() { Object[] array = new Object[32]; Arrays.fill(array, UNSET); return array; }为了避免写时候影响同一cpu缓冲行的其他数据并发访问,其使用了缓存行填充技术 (cpu 缓冲行填充),在类定义中声明了如下long字段进行填充;
//InternalThreadLocalMap // Cache line padding (must be public ) // With CompressedOops enabled, an instance of this class should occupy at least 128 bytes. public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;FTL使用的数组下标是InternalThreadLocalMap中的静态变量nextIndex统一递增生成的:
static final AtomicInteger nextIndex = new AtomicInteger(); public static int nextVariableIndex() { //Netty中所有FTL数组下标都是通过递增这个静态变量实现的 //采用静态变量生成所有FTL元素在数组中的下标会造成一个问题, //会造成InternalThreadLocalMap中数组不必要的自动扩容 int index = nextIndex.getAndIncrement(); if ( index < 0) { nextIndex.decrementAndGet(); throw new IllegalStateException( "too many thread-local indexed variables" ); } return index ; }InternalThreadLocalMap.nextVariableIndex()方法获取FTL在该FastThreadLocalThread.threadLocalMap数组下标,因为InternalThreadLocalMap.nextVariableIndex() 使用静态域 nextIndex 递增维护所有FTL的下标,会造成后面实例化的 FTL 下标过大,如果FTL下标大于其对应 FastThreadLocalThread.threadLocalMap 数组的长度,会进行数组的自动扩容,如下:
private void expandIndexedVariableTableAndSet( int index , Object value) { Object[] oldArray = indexedVariables; final int oldCapacity = oldArray.length; //下面复杂的实现是为了将newCapacity规范为最接近的一个2的指数, //这段代码在早期的 jdk HashMap 中见过 int newCapacity = index ; newCapacity |= newCapacity >>> 1; newCapacity |= newCapacity >>> 2; newCapacity |= newCapacity >>> 4; newCapacity |= newCapacity >>> 8; newCapacity |= newCapacity >>> 16; newCapacity ++; Object[] newArray = Arrays.copyOf(oldArray, newCapacity); Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); newArray[ index ] = value; indexedVariables = newArray; }3、FastThreadLocal
构造函数:
有两个重要的下标域,FTL不仅在FastThreadLocalThread.threadLocalMap中保存了用户实际使用的value(在数组中的下标为index),还在数组中保存为了实现清理记录的相关数据,也即下标variablesToRemoveIndex,一般情况 variablesToRemoveIndex = 0;因为variablesToRemoveIndex 是静态变量,所以全局唯一;
//如果在该FTL中放入了数据,也就实际调用了其 set 或get函数,会在 //该FastThreadLocalThread.threadLocalMap数组的 // variablesToRemoveIndex下标处放置一个IdentityHashMap, //并将该FTL放入IdentityHashMap中,在后续清理时会取出 //variablesToRemoveIndex下标处的IdentityHashMap进行清理 private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); //在threadLocalMap数组中存放实际数据的下标 private final int index ; public FastThreadLocal() { index = InternalThreadLocalMap.nextVariableIndex(); }用户可扩展的函数:
//初始化 value 函数 protected V initialValue() throws Exception { return null ; } //让使用者在该FTL被移除时可以有机会做些操作。 protected void onRemoval(@SuppressWarnings( "UnusedParameters" ) V value) throws Exception { }FastThreadLocalThread
cleanupFastThreadLocals 字段在 4.1 的最新版本中已经没有在用到了
/** * true ,表示FTL会在线程结束时被主动清理 见 FastThreadLocalRunnable 类 * false ,需要将FTL放入后台清理线程的队列中 */ // This will be set to true if we have a chance to wrap the Runnable. //这个字段则用于标识该线程在结束时是否会主动清理FTL private final boolean cleanupFastThreadLocals; //次对象将在 第一次 FastThreadLocal.get 和 FastThreadLocal. set 时候创建 private InternalThreadLocalMap threadLocalMap; public FastThreadLocalThread(Runnable target) { super(FastThreadLocalRunnable.wrap(target)); cleanupFastThreadLocals = true ; }4、 set 方法
public final void set (V value) { //判断设置的 value 值是否是缺省值 if (value != InternalThreadLocalMap.UNSET) { //获取当前线程的 InternalThreadLocalMap , 如果当前线程为FastThreadLocalThread,那么直接通过threadLocalMap引用获取 //否则通过 jdk 原生的 threadLocal 获取 InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); //FastThreadLocal 对应的 index 下标的 value 替换成新的 value setKnownNotUnset(threadLocalMap, value); } else { //如果放置的对象为UNSET,则表示清理,会对该FTL进行清理,类似毒丸对象 remove(); } }这里扩容方会调用 InternalThreadLocalMap.expandIndexedVariableTableAndSet
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) { //在数组下标 index 处放置实际对象,如果 index 大于数组length,会进行数组扩容. if (threadLocalMap.setIndexedVariable( index , value)) { //放置成功之后,将该FTL加入到 variablesToRemoveIndex 下标的 //IdentityHashMap,等待后续清理 addToVariablesToRemove(threadLocalMap, this); } } /** * 该FTL加入到variablesToRemoveIndex下标的IdentityHashMap * IdentityHashMap的特性可以保证同一个实例不会被多次加入到该位置 */ @SuppressWarnings( "unchecked" ) private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { //获取 variablesToRemoveIndex下标处的 IdentityHashMap Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); Set <FastThreadLocal<?>> variablesToRemove; //如果是第一次获取,则 variablesToRemoveIndex下标处的值为 UNSET if (v == InternalThreadLocalMap.UNSET || v == null ) { //新建一个新的 IdentityHashMap 并 variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); //放入到下标variablesToRemoveIndex处 threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); } else { variablesToRemove = ( Set <FastThreadLocal<?>>) v; } //将该FTL放入该IdentityHashMap中 variablesToRemove. add (variable); }下面看InternalThreadLocalMap.get()实现:
public static InternalThreadLocalMap get() { Thread thread = Thread.currentThread(); //首先看当前 thread 是否为FastThreadLocalThread实例 //如果是的话,可以快速通过引用,获取到其 threadLocalMap if (thread instanceof FastThreadLocalThread) { return fastGet((FastThreadLocalThread) thread); } else { //如果不是,则 jdk 原生慢速获取到其 threadLocalMap return slowGet(); } }5、 get 方法
get方法极为简单,实现如下:
===========================FastThreadLocal========================== public final V get() { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); Object v = threadLocalMap.indexedVariable( index ); if (v != InternalThreadLocalMap.UNSET) { return (V) v; } return initialize(threadLocalMap); }首先获取当前线程的map,然后根据 FastThreadLocal的index 获取value,然后返回,如果是空对象,则通过 initialize 返回,initialize 方法会将返回值设置到 map 的槽位中,并放进 Set 中;
initialize ============================FastThreadLocal========================== private V initialize(InternalThreadLocalMap threadLocalMap) { V v = null ; try { //1、获取初始值 v = initialValue(); } catch (Exception e) { throw new RuntimeException(e); } // 2、设置value到InternalThreadLocalMap中 threadLocalMap.setIndexedVariables( index , v); // 3、添加当前的FastThreadLocal到InternalThreadLocalMap的 Set <FastThreadLocal<?>>中 addToVariablesToRemove(threadLocalMap, this); return v; } //初始化参数:由子类复写 protected V initialValue() throws Exception { return null ; } 获取 ThreadLocalMap 直接通过索引取出对象 如果为空那么调用初始化方法初始化6、ftl的资源回收机制
netty中ftl的两种回收机制回收机制:
自动:使用ftlt执行一个被FastThreadLocalRunnable wrap的Runnable任务,在任务执行完毕后会自动进行ftl的清理;
手动:ftl和InternalThreadLocalMap都提供了remove方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用ftl)手动进行调用,进行显示删除;
FastThreadLocalRunnable final class FastThreadLocalRunnable implements Runnable { private final Runnable runnable; @Override public void run() { try { runnable.run(); } finally { FastThreadLocal.removeAll(); } } static Runnable wrap(Runnable runnable) { return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable); } }如果将线程执行的任务包装成 FastThreadLocalRunnable,那么在任务执行完后自动删除ftl的资源。
===============================FastThreadLocal=========================== public static void removeAll() { // 获取到map InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); if (threadLocalMap == null ) { return ; } try { // 获取到 Set <FastThreadLocal>集合 Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); if (v != null && v != InternalThreadLocalMap.UNSET) { @SuppressWarnings( "unchecked" ) Set <FastThreadLocal<?>> variablesToRemove = ( Set <FastThreadLocal<?>>) v; // 将 Set 转换为数组 FastThreadLocal<?>[] variablesToRemoveArray = variablesToRemove.toArray(new FastThreadLocal[variablesToRemove. size ()]); // 遍历数组,删除每一个FastThreadLocal对应的value for (FastThreadLocal<?> tlv: variablesToRemoveArray) { tlv.remove(threadLocalMap); } } } finally { // 删除当前线程的InternalThreadLocalMap InternalThreadLocalMap.remove(); } } public static void remove() { Thread thread = Thread.currentThread(); if (thread instanceof FastThreadLocalThread) { // 将FastThreadLocalThread 内部的map置位 null ((FastThreadLocalThread) thread).setThreadLocalMap( null ); } else { // 将 ThreadLocal内部ThreadLocalMap 中的value置位 null slowThreadLocalMap.remove(); } }remove方法:
===============================FastThreadLocal========================== private void remove() { remove(InternalThreadLocalMap.getIfSet()); } private void remove(InternalThreadLocalMap threadLocalMap) { if (threadLocalMap == null ) { return ; } // 从 InternalThreadLocalMap 中删除当前的FastThreadLocal对应的value并设UNSET Object v = threadLocalMap.removeIndexedVariable( index ); // 从 InternalThreadLocalMap 中的 Set <FastThreadLocal<?>>中删除当前的FastThreadLocal对象 removeFromVariablesToRemove(threadLocalMap, this); // 如果删除的是有效值,则进行onRemove方法的回调 if (v != InternalThreadLocalMap.UNSET) { try { // 回调子类复写的onRemoved方法,默认为空实现 onRemoved((V) v); } catch (Exception e) { throw new RuntimeException(e); } } }总结
只有不断的学习,不断的找到自己的缺点才可以进步;
一起加油;
原文地址:https://mp.weixin.qq.com/s/WNfFAgnXHyBSbCGwo1pyJQ
查看更多关于Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)的详细内容...