好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

调度线程池ScheduledThreadPoolExecutor源码解析

前言

ScheduledThreadPoolExecutor可以用来很方便实现我们的调度任务,具体使用可以参考调度线程池ScheduledThreadPoolExecutor的正确使用姿势这篇文章,那大家知道它是怎么实现的吗,本文就带大家来揭晓谜底。

实现机制分析

我们先思考下,如果让大家去实现ScheduledThreadPoolExecutor可以周期性执行任务的功能,需要考虑哪些方面呢?

ScheduledThreadPoolExecutor的整体实现思路是什么呢?

答:我们是不是可以继承线程池类,按照线程池的思路,将任务先丢到阻塞队列中,等到时间到了,工作线程就从阻塞队列获取任务执行。

如何实现等到了未来的时间点就开始执行呢?

答:我们可以根据参数获取这个任务还要多少时间执行,那么我们是不是可以从阻塞队列中获取任务的时候,通过条件队列的的awaitNanos(delay)方法,阻塞一定时间。

如何实现 任务的重复性执行呢?

答:这就更加简单了,任务执行完成后,把它再次加入到队列不就行了吗。

源码解析

类结构图

ScheduledThreadPoolExecutor的类结构图如上图所示,很明显它是在我们的线程池ThreadPoolExecutor框架基础上扩展的。

ScheduledExecutorService:实现了该接口,封装了调度相关的API ThreadPoolExecutor:继承了该类,保留了线程池的能力和整个实现的框架 DelayedWorkQueue:内部类,延迟阻塞队列。 ScheduledFutureTask:延迟任务对象,包含了任务、任务状态、剩余的时间、结果等信息。

重要属性

通过ScheduledThreadPoolExecutor类的成员属性,我们可以了解它的数据结构。

shutdown 后是否继续执行周期任务(重复执行)
private volatile  boolean  continueExistingPeriodicTasksAfterShutdown ; 
shutdown 后是否继续执行延迟任务(只执行一次)
private volatile  boolean  executeExistingDelayedTasksAfterShutdown  =   true  ; 
调用cancel()方法后,是否将该任务从队列中移除,默认false
private volatile  boolean  removeOnCancel  =   false  ; 
任务的序列号,保证FIFO队列的顺序,用来比较优先级
private static final AtomicLong sequencer  =  new AtomicLong (  ) 
ScheduledFutureTask延迟任务类

ScheduledFutureTask 继承 FutureTask,实现 RunnableScheduledFuture 接口,无论是 runnable 还是 callable,无论是否需要延迟和定时,所有的任务都会被封装成 ScheduledFutureTask。

该类具有延迟执行的特点, 覆盖FutureTask 的 run 方法来实现对延时执行、周期执行的支持。

对于延时任务调用FutureTask#run,而对于周期性任务则调用FutureTask#runAndReset 并且在成功之后根据 fixed-delay/fixed-rate模式来设置下次执行时间并重新将任务塞到工作队列。

成员属性如下:

 //  任务序列号
private final  long  sequenceNumber ;   //  任务可以被执行的时间,交付时间,以纳秒表示
private  long   time  ;   //   0  表示非周期任务  //  正数表示 fixed - rate(两次开始启动的间隔)模式的周期,  //  负数表示 fixed - delay(一次执行结束到下一次开始启动) 模式
private final  long  period ;   //  执行的任务对象
RunnableScheduledFuture < V >  outerTask  =  this ;   //  任务在队列数组中的索引下标,  -  1 表示删除  int  heapIndex ; 
DelayedWorkQueue延迟队列

DelayedWorkQueue 是支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue(小根堆、满二叉树)存储元素。

内部数据结构是数组,所以延迟队列出队头元素后需要让其他元素(尾)替换到头节点,防止空指针异常。

成员属性如下:

 //  初始容量
private static final  int  INITIAL_CAPACITY  =   16  ;   //  节点数量
private  int  size  =   0  ;   //  存放任务的数组
private RunnableScheduledFuture < ? >  [  ]  queue  =  new RunnableScheduledFuture < ? >  [ INITIAL_CAPACITY ]  ;   //  控制并发用的锁
private final ReentrantLock lock  =  new ReentrantLock (  )  ;   //  条件队列
private final Condition available  =  lock .newCondition  (  )  ;   // 指定用于等待队列头节点任务的线程
private Thread leader  =   null  ; 
提交延迟任务schedule()原理

延迟执行方法,并指定延迟执行的时间,只会执行一次。

schedule()方法是延迟任务方法的入口。
public ScheduledFuture < ? >  schedule ( Runnable command ,   long  delay ,  TimeUnit unit )   {   //  判空处理
    if  ( command  ==   null   ||  unit  ==   null  )  throw new NullPointerException (  )  ;   //  将外部传入的任务封装成延迟任务对象ScheduledFutureTask
    RunnableScheduledFuture < ? >  t  =  decorateTask ( command ,  new ScheduledFutureTask < Void >  ( command ,   null  ,  triggerTime ( delay ,  unit )  )  )  ;   //  执行延迟任务
    delayedExecute ( t )  ;  return t ;   } 
decorateTask(...) 该方法是封装延迟任务

调用triggerTime(delay, unit)方法计算延迟的时间。

 //  返回【当前时间  +  延迟时间】,就是触发当前任务执行的时间
private  long  triggerTime (  long  delay ,  TimeUnit unit )   {   //  设置触发的时间
    return triggerTime ( unit .toNanos  (  ( delay  <   0  )   ?   0   :  delay )  )  ;   }   long  triggerTime (  long  delay )   {   //  如果 delay  <   Long  .Max_VALUE  /  2 ,则下次执行时间为当前时间  + delay  //  否则为了避免队列中出现由于溢出导致的排序紊乱 , 需要调用overflowFree来修正一下delay
    return now (  )   +   (  ( delay  <   (  Long  .MAX_VALUE   >>   1  )  )   ?  delay  :  overflowFree ( delay )  )  ;   }   //  下面这种情况很少,大家看不懂可以不用强行理解  //  如果某个任务的 delay 为负数,说明当前可以执行(其实早该执行了)。  //  阻塞队列中维护任务顺序是基于 compareTo 比较的,比较两个任务的顺序会用  time  相减。  //  那么可能出现一个 delay 为正数减去另一个为负数的 delay,结果上溢为负数,则会导致 compareTo 产生错误的结果
private  long  overflowFree (  long  delay )   {  Delayed head  =   ( Delayed )  super .getQueue  (  )  .peek  (  )  ;  if  ( head  !=   null  )   {   long  headDelay  =  head .getDelay  ( NANOSECONDS )  ;   //  判断一下队首的delay是不是负数,如果是正数就不用管,怎么减都不会溢出  //  否则拿当前 delay 减去队首的 delay 来比较看,如果不出现上溢,排序不会乱  //  不然就把当前 delay 值给调整为  Long  .MAX_VALUE   +  队首 delay
        if  ( headDelay  <   0   &&   ( delay  -  headDelay  <   0  )  )  delay  =   Long  .MAX_VALUE   +  headDelay ;   }  return delay ;   } 
调用RunnableScheduledFuture的构造方法封装为延迟任务
ScheduledFutureTask ( Runnable r ,  V result ,   long  ns )   {  super ( r ,  result )  ;   //  任务的触发时间
    this .time   =  ns ;   //  任务的周期, 延迟任务的为0,因为不需要重复执行
    this .period   =   0  ;   //  任务的序号  +   1  this .sequenceNumber   =  sequencer .getAndIncrement  (  )  ;   } 
调用decorateTask()方法装饰延迟任务
 //  没有做任何操作,直接将 task 返回,该方法主要目的是用于子类扩展
protected  < V >  RunnableScheduledFuture < V >  decorateTask (  Runnable runnable ,  RunnableScheduledFuture < V >  task )   {  return task ;   } 
提交周期任务scheduleAtFixedRate()原理

按照固定的频率周期性的执行任务,捕手renwu,一次任务的启动到下一次任务的启动的间隔

public ScheduledFuture < ? >  scheduleAtFixedRate ( Runnable command ,   long  initialDelay ,   long  period ,  TimeUnit unit )   {  if  ( command  ==   null   ||  unit  ==   null  )  throw new NullPointerException (  )  ;  if  ( period  <=   0  )  throw new IllegalArgumentException (  )  ;   //  任务封装,【指定初始的延迟时间和周期时间】
    ScheduledFutureTask < Void >  sft  = new ScheduledFutureTask < Void >  ( command ,   null  ,  triggerTime ( initialDelay ,  unit )  ,  unit .toNanos  ( period )  )  ;   //  默认返回本身
    RunnableScheduledFuture < Void >  t  =  decorateTask ( command ,  sft )  ;  sft .outerTask   =  t ;   //  开始执行这个任务
    delayedExecute ( t )  ;  return t ;   } 
提交周期任务scheduleWithFixedDelay()原理

按照指定的延时周期性执行任务,上一个任务执行完毕后,延时一定时间,再次执行任务。

public ScheduledFuture < ? >  scheduleWithFixedDelay ( Runnable command ,   long  initialDelay ,   long  delay ,  TimeUnit unit )   {  if  ( command  ==   null   ||  unit  ==   null  )  throw new NullPointerException (  )  ;  if  ( delay  <=   0  )  throw new IllegalArgumentException (  )  ;   //  任务封装,【指定初始的延迟时间和周期时间】,周期时间为  -  表示是 fixed - delay 模式
    ScheduledFutureTask < Void >  sft  =  new ScheduledFutureTask < Void >  ( command ,   null  ,  triggerTime ( initialDelay ,  unit )  ,  unit .toNanos  (  - delay )  )  ;  RunnableScheduledFuture < Void >  t  =  decorateTask ( command ,  sft )  ;  sft .outerTask   =  t ;   //  开始执行这个任务
    delayedExecute ( t )  ;  return t ;   } 
执行任务delayedExecute(t)原理

上面多种提交任务的方式,殊途同归,最终都会调用delayedExecute()方法执行延迟或者周期任务。

delayedExecute()方法是执行延迟任务的入口

private void delayedExecute ( RunnableScheduledFuture < ? >  task )   {   //  线程池是 SHUTDOWN 状态,执行拒绝策略
    if  ( isShutdown (  )  )   //  调用拒绝策略的方法
        reject ( task )  ;  else  {   //  把当前任务放入阻塞队列
        super .getQueue  (  )  .add  ( task )  ;   //  线程池状态为 SHUTDOWN 并且不允许执行任务了,就从队列删除该任务,并设置任务的状态为取消状态  //  非主流程,可以跳过,不重点看了
        if  ( isShutdown (  )   &&   ! canRunInCurrentRunState ( task .isPeriodic  (  )  )   &&  remove ( task )  )  task .cancel  (  false  )  ;  else  //  开始执行了哈
            ensurePrestart (  )  ;   }   } 
ensurePrestart()方法开启线程执行
 //  ThreadPoolExecutor#ensurePrestart
void ensurePrestart (  )   {   int  wc  =  workerCountOf ( ctl .get  (  )  )  ;   //  worker数目小于corePoolSize,则添加一个worker。
    if  ( wc  <  corePoolSize )   //  第二个参数  true  表示采用核心线程数量限制,false 表示采用 maximumPoolSize
        addWorker (  null  ,   true  )  ;   //  corePoolSize  =   0 的情况,至少开启一个线程,【担保机制】
    else if  ( wc  ==   0  )  addWorker (  null  ,   false  )  ;   } 

addWorker()方法实际上父类ThreadPoolExecutor的方法,这个方法在该文章 Java线程池源码深度解析中详细介绍过,这边做个总结:

如果线程池中工作线程数量小于最大线程数,创建工作线程,执行任务。 如果线程池中工作线程数量大于最大线程数,直接返回。 获取延迟任务take()原理

目前工作线程已经创建好了,工作线程开始工作了,它会从阻塞队列中获取延迟任务执行,这部分也是线程池里面的原理,不做展开,那我们看下它是如何实现延迟执行的? 主要关注如何从阻塞队列中获取任务。

DelayedWorkQueue#take()方法获取延迟任务

该方法会在上面的addWoker()方法创建工作线程后,工作线程中循环持续调用workQueue.take()方法获取延迟任务。

该方法主要获取延迟队列中任务延迟时间小于等于0 的任务。

如果延迟时间不小于0,那么调用条件队列的awaitNanos(delay)阻塞方法等待一段时间,等时间到了,延迟时间自然小于等于0了。

获取到任务后,工作线程就可以开始执行调度任务了。

 //  DelayedWorkQueue#take (  )  public RunnableScheduledFuture < ? >  take (  )  throws InterruptedException  {  final ReentrantLock lock  =  this .lock  ;   //  加可中断锁
    lock .lockInterruptibly  (  )  ;  try  {   //  自旋
        for  (  ;;  )   {   //  获取阻塞队列中的头结点
            RunnableScheduledFuture < ? >  first  =  queue [  0  ]  ;   //  如果阻塞队列没有数据,为空
            if  ( first  ==   null  )   //  等待队列不空,直至有任务通过 offer 入队并唤醒
                available .await  (  )  ;  else  {   //  获取头节点的的任务还剩余多少时间才执行  long  delay  =  first .getDelay  ( NANOSECONDS )  ;  if  ( delay  <=   0  )   //  到达触发时间,获取头节点并调整堆,重新选择延迟时间最小的节点放入头部
                    return finishPoll ( first )  ;   //  逻辑到这说明头节点的延迟时间还没到
                first  =   null  ;   //  说明有 leader 线程在等待获取头节点,当前线程直接去阻塞等待
                if  ( leader  !=   null  )   //  当前线程阻塞
                    available .await  (  )  ;  else  {   //  没有 leader 线程,【当前线程作为leader线程,并设置头结点的延迟时间作为阻塞时间】
                    Thread thisThread  =  Thread .currentThread  (  )  ;  leader  =  thisThread ;  try  {   //  当前线程通过awaitNanos方法等待delay时间后,会自动唤醒,往后面继续执行
                        available .awaitNanos  ( delay )  ;   //  到达阻塞时间时,当前线程会从这里醒来,进入下一轮循环,就有可能执行了  }  finally  {   //  t堆顶更新,leader 置为  null ,offer 方法释放锁后,  //  有其它线程通过 take / poll 拿到锁 , 读到 leader  ==   null ,然后将自身更新为leader。
                        if  ( leader  ==  thisThread )   //  leader 置为  null  用以接下来判断是否需要唤醒后继线程
                            leader  =   null  ;   }   }   }   }   }  finally  {   //  没有 leader 线程并且头结点不为  null ,唤醒阻塞获取头节点的线程,  //  【如果没有这一步,就会出现有了需要执行的任务,但是没有线程去执行】
        if  ( leader  ==   null   &&  queue [  0  ]   !=   null  )  available .signal  (  )  ;   //  解锁
        lock .unlock  (  )  ;   }   } 
finishPoll()方法获取到任务后执行

该方法主要做两个事情, 获取头节点并调整堆,重新选择延迟时间最小的节点放入头部。

private RunnableScheduledFuture < ? >  finishPoll ( RunnableScheduledFuture < ? >  f )   {   //  获取尾索引  int  s  =   --size;   //  获取尾节点
    RunnableScheduledFuture < ? >  x  =  queue [ s ]  ;   //  将堆结构最后一个节点占用的 slot 设置为  null ,因为该节点要尝试升级成堆顶,会根据特性下调
    queue [ s ]   =   null  ;   //  s  ==   0  说明 当前堆结构只有堆顶一个节点,此时不需要做任何的事情
    if  ( s  !=   0  )   //  从索引处  0  开始向下调整
        siftDown (  0  ,  x )  ;   //  出队的元素索引设置为  -  1  setIndex ( f ,   -  1  )  ;  return f ;   } 
延迟任务运行的原理

从延迟队列中获取任务后,工作线程会调用延迟任务的run()方法执行任务。

ScheduledFutureTask#run()方法运行任务

调用isPeriodic()方法判断任务是否是周期性任务还是非周期性任务

如果任务是非周期任务,就调用父类的FutureTask#run()执行一次

如果任务是非周期任务,就调用父类的FutureTask#runAndReset(), 返回true会设置下一次的执行时间,重新放入线程池的阻塞队列中,等待下次获取执行

public void run (  )   {   //  是否周期性,就是判断 period 是否为  0   boolean  periodic  =  isPeriodic (  )  ;   //  根据是否是周期任务检查当前状态能否执行任务,不能执行就取消任务
    if  (  ! canRunInCurrentRunState ( periodic )  )  cancel (  false  )  ;   //  非周期任务,直接调用 FutureTask#run 执行一次
    else if  (  ! periodic )  ScheduledFutureTask .super  .run  (  )  ;   //  周期任务的执行,返回  true  表示执行成功
    else if  ( ScheduledFutureTask .super  .runAndReset  (  )  )   {   //  设置周期任务的下一次执行时间
        setNextRunTime (  )  ;   //  任务的下一次执行安排,如果当前线程池状态可以执行周期任务,加入队列,并开启新线程
        reExecutePeriodic ( outerTask )  ;   }   } 
FutureTask#runAndReset()执行周期性任务

周期任务正常完成后任务的状态不会变化,依旧是 NEW,不会设置 outcome 属性。

但是如果本次任务执行出现异常,会进入 setException 方法将任务状态置为异常,把异常保存在 outcome 中。

方法返回 false,后续的该任务将不会再周期的执行

protected  boolean  runAndReset (  )  {   //  任务不是新建的状态了,或者被别的线程执行了,直接返回  false  if  ( state  !=  NEW  ||   ! UNSAFE 测试数据pareAndSwapObject  ( this ,  runnerOffset ,   null  ,  Thread .currentThread  (  )  )  )  return  false  ;   boolean  ran  =   false  ;   int  s  =  state ;  try  {  Callable < V >  c  =  callable ;  if  ( c  !=   null   &&  s  ==  NEW )   {  try  {   //  执行方法,没有返回值
                c .call  (  )  ;  ran  =   true  ;   }  catch  ( Throwable ex )   {   //  出现异常,把任务设置为异常状态,唤醒所有的 get 阻塞线程
                setException ( ex )  ;   }   }   }  finally  {   //  执行完成把执行线程引用置为  null  runner  =   null  ;  s  =  state ;   //  如果线程被中断进行中断处理
        if  ( s  >=  INTERRUPTING )  handlePossibleCancellationInterrupt ( s )  ;   }   //  如果正常执行,返回  true ,并且任务状态没有被取消
    return ran  &&  s  ==  NEW ;   } 
ScheduledFutureTask#setNextRunTime()设置下次执行时间

如果属性period大于0,表示fixed-rate模式,直接加上period时间即可。

如果属性period小于等于0, 表示是fixed-delay模式, 调用triggerTime重新计算下次时间。

 //  任务下一次的触发时间
private void setNextRunTime (  )   {   long  p  =  period ;  if  ( p  >   0  )   //  fixed - rate 模式,【时间设置为上一次执行任务的时间  +  p】,两次任务执行的时间差  time   +=  p ;  else  //  fixed - delay 模式,下一次执行时间是【当前这次任务结束的时间(就是现在)  +  delay 值】  time   =  triggerTime (  - p )  ;   } 
ScheduledFutureTask#reExecutePeriodic(),重新放入阻塞任务队列,等待获取,进行下一轮执行
 //  ScheduledThreadPoolExecutor#reExecutePeriodic
void reExecutePeriodic ( RunnableScheduledFuture < ? >  task )   {  if  ( canRunInCurrentRunState (  true  )  )   {   //  【放入任务队列】
        super .getQueue  (  )  .add  ( task )  ;   //  如果提交完任务之后,线程池状态变为了 shutdown 状态,需要再次检查是否可以执行,  //  如果不能执行且任务还在队列中未被取走,则取消任务
        if  (  ! canRunInCurrentRunState (  true  )   &&  remove ( task )  )  task .cancel  (  false  )  ;  else  //  当前线程池状态可以执行周期任务,加入队列,并【根据线程数量是否大于核心线程数确定是否开启新线程】
            ensurePrestart (  )  ;   }   } 

原文地址:https://mp.weixin.qq测试数据/s/SVrP2WY_sorxsKizfff95g

查看更多关于调度线程池ScheduledThreadPoolExecutor源码解析的详细内容...

  阅读:18次