好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

详解FutureTask如何实现最大等待时间

预备知识

Java 线程挂起的常用方式有以下几种

Thread.sleep(long millis) :这个方法可以让线程挂起一段时间,并释放 CPU 时间片,等待一段时间后自动恢复执行。这种方式可以用来实现简单的定时器功能,但如果不恰当使用会影响系统性能。

Object.wait()  和  Object.notify()  或  Object.notifyAll() :这是一种通过等待某个条件的发生来挂起线程的方式。 wait()  方法会让线程等待,直到其他线程调用了  notify()  或  notifyAll()  方法来通知它。这种方式需要使用 synchronized 或者 ReentrantLock 等同步机制来保证线程之间的协作和通信。

LockSupport.park()  和  LockSupport.unpark(Thread thread) :这两个方法可以让线程挂起和恢复。 park()  方法会使当前线程挂起,直到其他线程调用了  unpark(Thread thread)  方法来唤醒它。这种方式比较灵活,可以根据需要控制线程的挂起和恢复。

先上结论

1.futureTask.get时通过LockSupport.park()挂起线程

2.在Thread.run() 方法中 调用 setException(ex)或set(result),然后调用LockSupport.unpark(t)唤醒线程。

示例-引入主题

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class FutureTaskDemo {
     public static void main(String[] args) {
         FutureTask<String> futureTask = new FutureTask<>( new Callable() {
             @Override
             public Object call() throws Exception {
                 System.out.println( "异步线程执行" );
                 Thread.sleep( 3000 ); //模拟线程执行任务需要3秒
                 return "ok" ;
             }
         });
         Thread t1 = new Thread(futureTask, "线程一" );
         t1.start();
 
         try {
             //关键代码
             String s = futureTask.get( 2 , TimeUnit.SECONDS); //最大等待线程2秒
         } catch (InterruptedException e) {
             e.printStackTrace();
         } catch (ExecutionException e) {
             e.printStackTrace();
         } catch (TimeoutException e) {
             e.printStackTrace();
         }
     }
}

进入futureTask.get(2, TimeUnit.SECONDS)

?
1
2
3
4
5
6
7
8
9
10
public V get( long timeout, TimeUnit unit)
       throws InterruptedException, ExecutionException, TimeoutException {
       if (unit == null )
           throw new NullPointerException();
       int s = state;
       if (s <= COMPLETING &&
           (s = awaitDone( true , unit.toNanos(timeout))) <= COMPLETING) //重点awaitDone,即完成了最大等待,依然没有结果就抛出异常逻辑
           throw new TimeoutException();
       return report(s);
   }

awaitDone返回线程任务执行状态,即小于等于COMPLETING(任务正在运行,等待完成)抛出异常TimeoutException

进入(awaitDone(true, unit.toNanos(timeout)))原理分析

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private int awaitDone( boolean timed, long nanos)
         throws InterruptedException {
         final long deadline = timed ? System.nanoTime() + nanos : 0L;
         WaitNode q = null ;
         boolean queued = false ;
         for (;;) {
             if (Thread.interrupted()) {
                 removeWaiter(q);
                 throw new InterruptedException();
             }
 
             int s = state;
             if (s > COMPLETING) {
                 if (q != null )
                     q.thread = null ;
                 return s;
             }
             else if (s == COMPLETING) // cannot time out yet
                 Thread.yield();
             else if (q == null )
                 q = new WaitNode();
             else if (!queued)
                 queued = UNSAFE.compareAndSwapObject( this , waitersOffset,
                                                      q.next = waiters, q);
             else if (timed) {
                 nanos = deadline - System.nanoTime();
                 if (nanos <= 0L) {
                     removeWaiter(q);
                     return state;
                 }
                 LockSupport.parkNanos( this , nanos);
             }
             else
                 LockSupport.park( this );
         }
     }

总体解读awaitDone

利用自旋(for (;)的方式 ,检查state(任务状态)与waitNode(维护等待的线程),

第一步:首先检查if (Thread.interrupted()) 线程是否被打断(LockSupport.parkNanos挂起的线程被打断不抛出异常),

第二步:判断任务状态与waitNode是否入队+确定最大等待时间

​ 若已完成(if (s > COMPLETING))返回任务状态

​ 若已完成(if (s == COMPLETING))-->表示正在完成,但尚未完成。则让出 CPU,进入就绪状态,等待其他线程的执行

​ 若if (q == null)==>创建等待等待节点

​ 若if (!queued)==>表示上一步创建的节点没有和当前线程绑定,故绑定

​ 最后else if (timed)与else,判断最大等待时间

?
1
2
3
4
5
static final class WaitNode {
         volatile Thread thread;
         volatile WaitNode next;
         WaitNode() { thread = Thread.currentThread(); }
     }
?
1
2
3
4
5
6
7
8
9
10
11
12
private static final int NEW          = 0 ;
private static final int COMPLETING   = 1 ;
private static final int NORMAL       = 2 ;
private static final int EXCEPTIONAL  = 3 ;
private static final int CANCELLED    = 4 ;
private static final int INTERRUPTING = 5 ;
private static final int INTERRUPTED  = 6 ;
state可能转换的过程
     1 .NEW -> COMPLETING -> NORMAL (成功完成)
     2 .NEW -> COMPLETING -> EXCEPTIONAL (异常)
     3 .NEW -> CANCELLED (任务被取消)
     4 .NEW -> INTERRUPTING -> INTERRUPTED(任务被打断)

关键代码

?
1
LockSupport.park( this , nanos) ==内部实现==> UNSAFE.park( false , nanos)();

​ 即让当前线程堵塞直至指定的时间(nanos),该方法同Thread.sleep()一样不会释放持有的对象锁,但不同的是Thread.sleep会被打断(interrupted)并抛出异常,而LockSupport.park被打断不会抛出异常,故在自旋时(for (;

查看更多关于详解FutureTask如何实现最大等待时间的详细内容...

  阅读:13次