好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

带你快速搞定java并发库

一、总览

计算机程序 = 数据 + 算法。

并发编程的一切根本原因是为了保证数据的正确性,线程的效率性。

Java并发库共分为四个大的部分,如下图

Executor 和 future 是为了保证线程的效率性

Lock 和数据结构 是为了维持数据的一致性。

Java并发编程的时候,思考顺序为,

对自己的数据要么加锁。要么使用提供的数据结构,保证数据的安全性

调度线程的时候使用Executor更好的调度。

二、Executor总览

Executor 提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。

相当于manager,老板让manager去执行一件任务,具体的是谁执行,什么时候执行,就不管了。

看上图的继承关系,介绍几个

内置的线程池基本上都在这里

newScheduledThreadPool 定时执行的线程池

newCachedThreadPool 缓存使用过的线程

newFixedThreadPool 固定数量的线程池

newWorkStealingPool 将大任务分解为小任务的线程池

三、继承结构

构造函数

包含一个定时的service

?

1

2

3

4

5

6

7

8

9

10

11

12

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {

     return new DelegatedScheduledExecutorService

         ( new ScheduledThreadPoolExecutor( 1 ));

}

static class DelegatedScheduledExecutorService

         extends DelegatedExecutorService

         implements ScheduledExecutorService {

     private final ScheduledExecutorService e;

     DelegatedScheduledExecutorService(ScheduledExecutorService executor) {

         super (executor);

         e = executor;

     }

四、怎么保证只有一个线程

定时执行的时候调用这个方法,调用过程如下,注意看其中的注释,由上往下的调用顺序

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,

                                                  long initialDelay,

                                                  long delay,

                                                  TimeUnit unit) {

     if (command == null || unit == null )

         throw new NullPointerException();

     if (delay <= 0 )

         throw new IllegalArgumentException();

     ScheduledFutureTask<Void> sft =

         new ScheduledFutureTask<Void>(command,

                                       null ,

                                       triggerTime(initialDelay, unit),

                                       unit.toNanos(-delay));

     RunnableScheduledFuture<Void> t = decorateTask(command, sft);

     sft.outerTask = t;

     //  延迟执行

     delayedExecute(t);

     return t;

}

private void delayedExecute(RunnableScheduledFuture<?> task) {

     if (isShutdown())

         reject(task);

     else {

         // 加入任务队列

         super .getQueue().add(task);

         if (isShutdown() &&

             !canRunInCurrentRunState(task.isPeriodic()) &&

             remove(task))

             task.cancel( false );

         else

             // 确保执行

             ensurePrestart();

     }

}

// 如果worker数量小于corePoolSize,创建新的线程,其他情况不处理

void ensurePrestart() {

     int wc = workerCountOf(ctl.get());

     if (wc < corePoolSize)

         addWorker( null , true );

     else if (wc == 0 )

         addWorker( null , false );

}

五、怎么保证时间可以定时执行

?

1

2

3

4

5

6

7

8

9

10

11

public ScheduledFuture<?> schedule(Runnable command,

                                    long delay,

                                    TimeUnit unit) {

     if (command == null || unit == null )

         throw new NullPointerException();

     RunnableScheduledFuture<?> t = decorateTask(command,

         new ScheduledFutureTask<Void>(command, null ,

                                       triggerTime(delay, unit)));

     delayedExecute(t);

     return t;

}

在每次执行的时候会把下一次执行的时间放进任务中

?

1

2

3

4

5

6

7

8

9

10

private long triggerTime( long delay, TimeUnit unit) {

     return triggerTime(unit.toNanos((delay < 0 ) ? 0 : delay));

}

/**

  * Returns the trigger time of a delayed action.

  */

long triggerTime( long delay) {

     return now() +

         ((delay < (Long.MAX_VALUE >> 1 )) ? delay : overflowFree(delay));

}

FutureTask 定时是通过LockSupport.parkNanos(this, nanos);LockSupport.park(this);

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

private int awaitDone( boolean timed, long nanos)

     throws InterruptedException {

     final long deadline = timed ? System.nanoTime() + nanos : 0L;

     WaitNode q = null ;

     boolean queued = false ;

     for (;;) {

         if (Thread.interrupted()) {

             removeWaiter(q);

             throw new InterruptedException();

         }

         int s = state;

         if (s > COMPLETING) {

             if (q != null )

                 q.thread = null ;

             return s;

         }

         else if (s == COMPLETING) // cannot time out yet

             Thread.yield();

         else if (q == null )

             q = new WaitNode();

         else if (!queued)

             queued = UNSAFE测试数据pareAndSwapObject( this , waitersOffset,

                                                  q.next = waiters, q);

         else if (timed) {

             nanos = deadline - System.nanoTime();

             if (nanos <= 0L) {

                 removeWaiter(q);

                 return state;

             }

             //注意这里

             LockSupport.parkNanos( this , nanos);

         }

         else //注意这里

             LockSupport.park( this );

     }

}

总结:Executor是通过将任务放在队列中,生成的futureTask。然后将生成的任务在队列中排序,将时间最近的需要出发的任务做检查。如果时间不到,就阻塞线程到下次出发时间。

注意:newSingleThreadScheduledExecutor只会有一个线程,不管你提交多少任务,这些任务会顺序执行,如果发生异常会取消下面的任务,线程池也不会关闭,注意捕捉异常

六、使用

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

ScheduledExecutorService single = Executors.newSingleThreadScheduledExecutor();

Runnable runnable1 = () -> {

     try {

         Thread.sleep( 4000 );

         System.out.println( "11111111111111" );

     } catch (InterruptedException e) {

         e.printStackTrace();

     }

};

Runnable runnable2 = () -> {

     try {

         Thread.sleep( 4000 );

         System.out.println( "222" );

     } catch (InterruptedException e) {

         e.printStackTrace();

     }

};

single.scheduleWithFixedDelay(runnable1, 0 , 1 , TimeUnit.SECONDS);

single.scheduleWithFixedDelay(runnable2, 0 , 2 , TimeUnit.SECONDS);

11111111111111 222 11111111111111 222 11111111111111

在项目中要注意关闭线程池

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

actionService = Executors.newSingleThreadScheduledExecutor();

         actionService.scheduleWithFixedDelay(() -> {

             try {

                 Thread.currentThread().setName( "robotActionService" );

                 Integer robotId = robotQueue.poll();

                 if (robotId == null ) {

                     //    关闭线程池

                     actionService.shutdown();

                 } else {

                     int aiLv = robots.get(robotId);

                     if (actionQueueMap.containsKey(aiLv)) {

                         ActionQueue actionQueue = actionQueueMap.get(aiLv);

                         actionQueue.doAction(robotId);

                     }

                 }

             } catch (Exception e) {

                 //    捕捉异常

                 LOG.error( "" ,e);

             }

         }, 1 , 1 , TimeUnit.SECONDS);

总结

本篇文章就到这里了,希望能给你带来帮助,也希望您能够多多关注的更多内容!

原文链接:https://gamwatcher.blog.csdn.net/article/details/88406100

查看更多关于带你快速搞定java并发库的详细内容...

  阅读:19次