好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)

前言

ThreadLocal被ThreadLocalMap中的entry的key弱引用,如果出现GC的情况时,

没有被其他对象引用,会被回收,但是ThreadLocal对应的value却不会回收,容易造成内存泄漏,这也间接导致了内存溢出以及数据假丢失;

那么问题来了,有没有更高效的ThreadLocal有;

今天我们就来分析一波FastThreadLocalThread

一、FastThreadLocalThread源码分析

Netty为了在某些场景下提高性能,改进了jdk ThreadLocal,Netty实现的FastThreadLocal 优化了Java 原生 ThreadLocal 的访问速度,存储速度。避免了检测弱引用带来的 value 回收难问题,和数组位置冲突带来的线性查找问题,解决这些问题并不是没有代价;

Netty实现的 FastThreadLocal 底层也是通过数组存储 value 对象,与Java原生ThreadLocal使用自身作为Entry的key不同,FastThreadLocal通过保存数组的全局唯一下标,实现了对value的快速访问。同时FastThreadLocal 也实现了清理对象的方法;

1、FastThreadLocalThread

在Netty中,要使用 FastThreadLocal 实现线程本地变量需要将线程包装成 FastThreadLocalThread ,如果不是 FastThreadLocalThread ,会使用 slowThreadLocalMap的 ThreadLocal 来存储变量副本;

io.netty.util.concurrent.DefaultThreadFactory  @Override  public  Thread newThread(Runnable r) {      Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());      // 一般daemon为 false ,意思是不设置为守护线程      if (t.isDaemon() != daemon) {          t.setDaemon(daemon);      }      // 优先级 默认为5      if (t.getPriority() != priority) {          t.setPriority(priority);      }       return  t;  }  protected Thread newThread(Runnable r, String  name ) {       return  new FastThreadLocalThread(threadGroup, r,  name );  } 

FastThreadLocalThread 继承自Thread类,有如下成员变量:

io.netty.util.concurrent.FastThreadLocalThread  // 任务执行完,是否清除FastThreadLocal的标记  private final boolean cleanupFastThreadLocals;  // 类似于Thread类中ThreadLocalMap,为了实现FastThreadLocal  private InternalThreadLocalMap threadLocalMap; 

2、 InternalThreadLocalMap

FastThreadLocalThread.threadLocalMap 是 InternalThreadLocalMap 对象实例。在第一次获取FTL数据时,会初始化FastThreadLocalThread.threadLocalMap,调用的构造函数如下:

private InternalThreadLocalMap() {              //为了简便,InternalThreadLocalMap父类              //UnpaddedInternalThreadLocalMap不展开介绍              super(newIndexedVariableTable());          }          //默认的数组大小为32,且使用UNSET对象填充数组          //如果下标处数据为UNSET,则表示没有数据          private  static  Object[] newIndexedVariableTable() {              Object[] array = new Object[32];              Arrays.fill(array, UNSET);               return  array;          } 

为了避免写时候影响同一cpu缓冲行的其他数据并发访问,其使用了缓存行填充技术 (cpu 缓冲行填充),在类定义中声明了如下long字段进行填充;

//InternalThreadLocalMap  // Cache line padding (must be  public )   //  With  CompressedOops enabled, an instance  of  this class should occupy  at  least 128 bytes.   public  long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9; 

FTL使用的数组下标是InternalThreadLocalMap中的静态变量nextIndex统一递增生成的:

static  final AtomicInteger nextIndex = new AtomicInteger();  public   static   int  nextVariableIndex() {      //Netty中所有FTL数组下标都是通过递增这个静态变量实现的      //采用静态变量生成所有FTL元素在数组中的下标会造成一个问题,      //会造成InternalThreadLocalMap中数组不必要的自动扩容       int   index  = nextIndex.getAndIncrement();      if ( index  < 0) {          nextIndex.decrementAndGet();          throw new IllegalStateException( "too many thread-local indexed variables" );      }       return   index ;  } 

InternalThreadLocalMap.nextVariableIndex()方法获取FTL在该FastThreadLocalThread.threadLocalMap数组下标,因为InternalThreadLocalMap.nextVariableIndex() 使用静态域 nextIndex 递增维护所有FTL的下标,会造成后面实例化的 FTL 下标过大,如果FTL下标大于其对应 FastThreadLocalThread.threadLocalMap 数组的长度,会进行数组的自动扩容,如下:

private void expandIndexedVariableTableAndSet( int   index , Object value) {      Object[] oldArray = indexedVariables;      final  int  oldCapacity = oldArray.length;      //下面复杂的实现是为了将newCapacity规范为最接近的一个2的指数,       //这段代码在早期的 jdk HashMap 中见过       int  newCapacity =  index ;      newCapacity |= newCapacity >>>  1;      newCapacity |= newCapacity >>>  2;      newCapacity |= newCapacity >>>  4;      newCapacity |= newCapacity >>>  8;      newCapacity |= newCapacity >>> 16;      newCapacity ++;      Object[] newArray = Arrays.copyOf(oldArray, newCapacity);      Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);      newArray[ index ] = value;      indexedVariables = newArray;  } 

3、FastThreadLocal

构造函数:

有两个重要的下标域,FTL不仅在FastThreadLocalThread.threadLocalMap中保存了用户实际使用的value(在数组中的下标为index),还在数组中保存为了实现清理记录的相关数据,也即下标variablesToRemoveIndex,一般情况 variablesToRemoveIndex = 0;因为variablesToRemoveIndex 是静态变量,所以全局唯一;

//如果在该FTL中放入了数据,也就实际调用了其 set 或get函数,会在         //该FastThreadLocalThread.threadLocalMap数组的         // variablesToRemoveIndex下标处放置一个IdentityHashMap,         //并将该FTL放入IdentityHashMap中,在后续清理时会取出         //variablesToRemoveIndex下标处的IdentityHashMap进行清理          private  static  final  int  variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();         //在threadLocalMap数组中存放实际数据的下标          private final  int   index ;           public  FastThreadLocal() {               index  = InternalThreadLocalMap.nextVariableIndex();          } 

用户可扩展的函数:

//初始化 value 函数  protected V initialValue() throws Exception {       return   null ;  }  //让使用者在该FTL被移除时可以有机会做些操作。  protected void onRemoval(@SuppressWarnings( "UnusedParameters" ) V value) throws Exception { } 

FastThreadLocalThread

cleanupFastThreadLocals 字段在 4.1 的最新版本中已经没有在用到了

/**   *  true ,表示FTL会在线程结束时被主动清理 见  FastThreadLocalRunnable 类   *  false ,需要将FTL放入后台清理线程的队列中   */  // This will be  set   to   true  if we have a chance  to  wrap the Runnable.  //这个字段则用于标识该线程在结束时是否会主动清理FTL  private final boolean cleanupFastThreadLocals;  //次对象将在 第一次 FastThreadLocal.get 和 FastThreadLocal. set  时候创建  private InternalThreadLocalMap threadLocalMap;  public  FastThreadLocalThread(Runnable target) {      super(FastThreadLocalRunnable.wrap(target));      cleanupFastThreadLocals =  true ;  } 

4、 set 方法

public  final void  set (V value) {      //判断设置的 value 值是否是缺省值      if (value != InternalThreadLocalMap.UNSET) {          //获取当前线程的 InternalThreadLocalMap , 如果当前线程为FastThreadLocalThread,那么直接通过threadLocalMap引用获取          //否则通过 jdk 原生的 threadLocal 获取          InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();          //FastThreadLocal 对应的  index  下标的 value 替换成新的 value          setKnownNotUnset(threadLocalMap, value);      }  else  {          //如果放置的对象为UNSET,则表示清理,会对该FTL进行清理,类似毒丸对象          remove();      }  } 

这里扩容方会调用 InternalThreadLocalMap.expandIndexedVariableTableAndSet

private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {      //在数组下标 index 处放置实际对象,如果 index 大于数组length,会进行数组扩容.      if (threadLocalMap.setIndexedVariable( index , value)) {          //放置成功之后,将该FTL加入到 variablesToRemoveIndex 下标的          //IdentityHashMap,等待后续清理          addToVariablesToRemove(threadLocalMap, this);      }  }  /**   * 该FTL加入到variablesToRemoveIndex下标的IdentityHashMap   * IdentityHashMap的特性可以保证同一个实例不会被多次加入到该位置   */  @SuppressWarnings( "unchecked" )  private  static  void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {      //获取 variablesToRemoveIndex下标处的 IdentityHashMap      Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);       Set <FastThreadLocal<?>> variablesToRemove;      //如果是第一次获取,则 variablesToRemoveIndex下标处的值为 UNSET      if (v == InternalThreadLocalMap.UNSET || v ==  null ) {          //新建一个新的 IdentityHashMap 并          variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());          //放入到下标variablesToRemoveIndex处          threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);      }  else  {          variablesToRemove = ( Set <FastThreadLocal<?>>) v;      }      //将该FTL放入该IdentityHashMap中      variablesToRemove. add (variable);  } 

下面看InternalThreadLocalMap.get()实现:

public   static  InternalThreadLocalMap get() {      Thread thread = Thread.currentThread();      //首先看当前 thread 是否为FastThreadLocalThread实例      //如果是的话,可以快速通过引用,获取到其 threadLocalMap      if (thread instanceof FastThreadLocalThread) {           return  fastGet((FastThreadLocalThread) thread);      }  else  {          //如果不是,则 jdk 原生慢速获取到其 threadLocalMap           return  slowGet();      }  } 

5、 get 方法

get方法极为简单,实现如下:

===========================FastThreadLocal==========================  public  final V get() {          InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();          Object v = threadLocalMap.indexedVariable( index );          if (v != InternalThreadLocalMap.UNSET) {               return  (V) v;          }           return  initialize(threadLocalMap);      } 

首先获取当前线程的map,然后根据 FastThreadLocal的index 获取value,然后返回,如果是空对象,则通过 initialize 返回,initialize 方法会将返回值设置到 map 的槽位中,并放进 Set 中;

initialize  ============================FastThreadLocal==========================  private V initialize(InternalThreadLocalMap threadLocalMap) {      V v =  null ;      try {          //1、获取初始值          v = initialValue();      } catch (Exception e) {          throw new RuntimeException(e);      }      // 2、设置value到InternalThreadLocalMap中      threadLocalMap.setIndexedVariables( index , v);      // 3、添加当前的FastThreadLocal到InternalThreadLocalMap的 Set <FastThreadLocal<?>>中      addToVariablesToRemove(threadLocalMap, this);       return  v;  }  //初始化参数:由子类复写  protected V initialValue() throws Exception {       return   null ;  }  获取 ThreadLocalMap 直接通过索引取出对象 如果为空那么调用初始化方法初始化

6、ftl的资源回收机制

netty中ftl的两种回收机制回收机制:

自动:使用ftlt执行一个被FastThreadLocalRunnable wrap的Runnable任务,在任务执行完毕后会自动进行ftl的清理;

手动:ftl和InternalThreadLocalMap都提供了remove方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用ftl)手动进行调用,进行显示删除;

FastThreadLocalRunnable  final class FastThreadLocalRunnable implements Runnable {      private final Runnable runnable;      @Override       public  void run() {          try {              runnable.run();          } finally {              FastThreadLocal.removeAll();          }      }       static  Runnable wrap(Runnable runnable) {           return  runnable instanceof FastThreadLocalRunnable                   ? runnable : new FastThreadLocalRunnable(runnable);      }  } 

如果将线程执行的任务包装成 FastThreadLocalRunnable,那么在任务执行完后自动删除ftl的资源。

===============================FastThreadLocal===========================  public   static  void removeAll() {      // 获取到map      InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();      if (threadLocalMap ==  null ) {           return ;      }      try {          // 获取到 Set <FastThreadLocal>集合          Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);          if (v !=  null  && v != InternalThreadLocalMap.UNSET) {              @SuppressWarnings( "unchecked" )               Set <FastThreadLocal<?>> variablesToRemove = ( Set <FastThreadLocal<?>>) v;              // 将 Set 转换为数组              FastThreadLocal<?>[] variablesToRemoveArray =                      variablesToRemove.toArray(new FastThreadLocal[variablesToRemove. size ()]);              // 遍历数组,删除每一个FastThreadLocal对应的value               for  (FastThreadLocal<?> tlv: variablesToRemoveArray) {                  tlv.remove(threadLocalMap);              }          }      } finally {          // 删除当前线程的InternalThreadLocalMap          InternalThreadLocalMap.remove();      }  }  public   static  void remove() {      Thread thread = Thread.currentThread();      if (thread instanceof FastThreadLocalThread) {           // 将FastThreadLocalThread 内部的map置位 null           ((FastThreadLocalThread) thread).setThreadLocalMap( null );      }  else  {          // 将 ThreadLocal内部ThreadLocalMap 中的value置位 null           slowThreadLocalMap.remove();      }  } 

remove方法:

===============================FastThreadLocal==========================  private void remove() {      remove(InternalThreadLocalMap.getIfSet());  }  private void remove(InternalThreadLocalMap threadLocalMap) {      if (threadLocalMap ==  null ) {           return ;      }      // 从 InternalThreadLocalMap 中删除当前的FastThreadLocal对应的value并设UNSET      Object v = threadLocalMap.removeIndexedVariable( index );      // 从 InternalThreadLocalMap 中的 Set <FastThreadLocal<?>>中删除当前的FastThreadLocal对象      removeFromVariablesToRemove(threadLocalMap, this);      // 如果删除的是有效值,则进行onRemove方法的回调      if (v != InternalThreadLocalMap.UNSET) {          try {              // 回调子类复写的onRemoved方法,默认为空实现              onRemoved((V) v);          } catch (Exception e) {              throw new RuntimeException(e);          }      }  } 

总结

只有不断的学习,不断的找到自己的缺点才可以进步;

一起加油;

原文地址:https://mp.weixin.qq.com/s/WNfFAgnXHyBSbCGwo1pyJQ

查看更多关于Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)的详细内容...

  阅读:12次