好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Java8 自定义CompletableFuture的原理解析

Java8 自定义CompletableFuture原理

Future 接口 的局限性有很多,其中一个就是需要主动的去询问是否完成,如果等子线程的任务完成以后,通知我,那岂不是更好?

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

public class FutureInAction3 {

     public static void main(String[] args) {

         Future<String> future = invoke(() -> {

             try {

                 Thread.sleep(10000L);

                 return "I am Finished." ;

             } catch (InterruptedException e) {

                 return "I am Error" ;

             }

         });

         future.setCompletable( new Completable<String>() {

             @Override

             public void complete(String s) {

                 System.out.println( "complete called ---- " + s);

             }

             @Override

             public void exception(Throwable cause) {

                 System.out.println( "error" );

                 cause.printStackTrace();

             }

         });

         System.out.println( "....do something else ....." );

         System.out.println( "try to get result ->" + future.get());

     }

     private static <T> Future<T> invoke(Callable<T> callable) {

         AtomicReference<T> result = new AtomicReference<>();

         AtomicBoolean finished = new AtomicBoolean( false );

         Future<T> future = new Future<T>() {

             private Completable<T> completable;

             @Override

             public T get() {

                 return result.get();

             }

             @Override

             public boolean isDone() {

                 return finished.get();

             }

             // 设置完成

             @Override

             public void setCompletable(Completable<T> completable) {

                 this .completable = completable;

             }

             // 获取

             @Override

             public Completable<T> getCompletable() {

                 return completable;

             }

         };

         Thread t = new Thread(() -> {

             try {

                 T value = callable.action();

                 result.set(value);

                 finished.set( true );

                 if (future.getCompletable() != null )

                     future.getCompletable().complete(value);

             } catch (Throwable cause) {

                 if (future.getCompletable() != null )

                     future.getCompletable().exception(cause);

             }

         });

         t.start();

         return future;

     }

     private interface Future<T> {

         T get();

         boolean isDone();

         //  1

         void setCompletable(Completable<T> completable);

         //  2

         Completable<T> getCompletable();

     }

     private interface Callable<T> {

         T action();

     }

     // 回调接口

     private interface Completable<T> {

         void complete(T t);

         void exception(Throwable cause);

     }

}

CompleteFuture简单使用

Java8 中的 completeFuture 是对 Future 的扩展实现, 主要是为了弥补 Future 没有相应的回调机制的缺陷.

我们先看看 Java8 之前的 Future 的使用

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

package demos;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

/**

  * @author djh on  2019/4/22 10:23

  * @E-Mail 1544579459@qq.com

  */

public class Demo {

     public static void main(String[] args) throws ExecutionException, InterruptedException {

         ExecutorService cachePool = Executors.newCachedThreadPool();

         Future<String> future = cachePool.submit(() -> {

             Thread.sleep( 3000 );

             return "异步任务计算结果!" ;

         });

         // 提交完异步任务后, 主线程可以继续干一些其他的事情.

         doSomeThingElse();

         // 为了获取异步计算结果, 我们可以通过 future.get 和 轮询机制来获取.

         String result;

         // Get 方式会导致当前线程阻塞, 这显然违背了异步计算的初衷.

         // result = future.get();

         // 轮询方式虽然不会导致当前线程阻塞, 但是会导致高额的 CPU 负载.

         long start = System.currentTimeMillis();

         while ( true ) {

             if (future.isDone()) {

                 break ;

             }

         }

         System.out.println( "轮询耗时:" + (System.currentTimeMillis() - start));       

         result = future.get();

         System.out.println( "获取到异步计算结果啦: " + result);

         cachePool.shutdown();

     }

     private static void doSomeThingElse() {

         try {

             Thread.sleep( 1000 );

         } catch (InterruptedException e) {

             e.printStackTrace();

         }

         System.out.println( "我的最重要的事情干完了, 我要获取异步计算结果来执行剩下的事情." );

     }

}

输出:

我的最重要的事情干完了, 我要获取异步计算结果来执行剩下的事情.
轮询耗时:2000
获取到异步计算结果啦: 异步任务计算结果!

Process finished with exit code 0

从上面的 Demo 中我们可以看出, future 在执行异步任务时, 对于结果的获取显的不那么优雅, 很多第三方库就针对 Future 提供了回调式的接口以用来获取异步计算结果, 如Google的: ListenableFuture, 而 Java8 所提供的 CompleteFuture 便是官方为了弥补这方面的不足而提供的 API.

下面简单介绍用法

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

package demos;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

/**

  * @author djh on  2019/5/1 20:26

  * @E-Mail 1544579459@qq.com

  */

public class CompleteFutureDemo {

     public static void main(String[] args) throws ExecutionException, InterruptedException {

         CompletableFuture<String> completableFutureOne = new CompletableFuture<>();

         ExecutorService cachePool = Executors.newCachedThreadPool();

         cachePool.execute(() -> {

             try {

                 Thread.sleep( 3000 );

                 completableFutureOne.complete( "异步任务执行结果" );

                 System.out.println(Thread.currentThread().getName());

             } catch (InterruptedException e) {

                 e.printStackTrace();

             }

         });

         // WhenComplete 方法返回的 CompletableFuture 仍然是原来的 CompletableFuture 计算结果.

         CompletableFuture<String> completableFutureTwo = completableFutureOne.whenComplete((s, throwable) -> {

             System.out.println( "当异步任务执行完毕时打印异步任务的执行结果: " + s);

         });

         // ThenApply 方法返回的是一个新的 completeFuture.

         CompletableFuture<Integer> completableFutureThree = completableFutureTwo.thenApply(s -> {

             System.out.println( "当异步任务执行结束时, 根据上一次的异步任务结果, 继续开始一个新的异步任务!" );

             try {

                 Thread.sleep( 1000 );

             } catch (InterruptedException e) {

                 e.printStackTrace();

             }

             return s.length();

         });

         System.out.println( "阻塞方式获取执行结果:" + completableFutureThree.get());

         cachePool.shutdown();

     }

}

从上面的 Demo 中我们主要需要注意 thenApply 和 whenComplete 这两个方法, 这两个方法便是 CompleteFuture 中最具有意义的方法, 他们都会在 completeFuture 调用 complete 方法传入异步计算结果时回调, 从而获取到异步任务的结果.

相比之下 future 的阻塞和轮询方式获取异步任务的计算结果, CompleteFuture 获取结果的方式就显的优雅的多。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://artisan.blog.csdn.net/article/details/115450097

查看更多关于Java8 自定义CompletableFuture的原理解析的详细内容...

  阅读:32次