好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Java并行执行任务的几种方案小结

背景

最近在排查生产环境问题,发现商品详情接口时不时会报RPC调用超时,检查代码发现接口里面查询活动耗时比较长,都是串行执行的,仔细查看发现完全可以改成并行去执行,缩短接口查询耗时。

比如我们的商品详情接口,需要展示立减、阶梯满减、团购等活动标签。需要查询三次不同的活动信息,再组装活动标签信息。

如果每次查询耗时1s,按照串行的方式去调用,整个接口下来至少需要3s,整个耗时,对于我们来讲是无法接受的。其实在jdk中,给我们提供了几种非常便捷的并行执行任务的方法。

CountDownLatch ExecutorService.invokeAll() Fork/Join 分而治之 有点类似MapReduce的影子,这个有兴趣的可以自行去了解

改进方案

代码例子:

?

1

2

3

4

5

6

7

private void assemblyActivityTag(CartItemDTO itemDTO){

     //1.查询立减活动信息,耗时1s        

     //2.查询阶梯满减活动信息,耗时1s       

     //3.查询团购活动信息,耗时1s       

     //4.组装活动标签信息,耗时1s

     // 串行执行下来整个耗时4s

}

CountDownLatch

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

private void assemblyActivityTag(CartItemDTO itemDTO){

     ExecutorService executorService = Executors.newCachedThreadPool();

     CountDownLatch latch = new CountDownLatch( 3 );

     executorService.execute( new Runnable() {

         @Override

         public void run() {

         //1.查询立减活动信息

             latch.countDown();

         }

     });

     executorService.execute( new Runnable() {

         @Override

         public void run() {

             //2.查询阶梯满减活动信息

             latch.countDown();

         }

     });

     executorService.execute( new Runnable() {

         @Override

         public void run() {

             //3.查询团购活动信息

             latch.countDown();

         }

     });

     try {

         // 一定记得加上timeout时间,防止阻塞主线程

         latch.await( 3000 ,TimeUnit.MILLISECONDS);

     } catch (InterruptedException e) {

         e.printStackTrace();

     }

     //4.等待所有子任务完成,组装活动标签信息

     

     //5.关闭线程池

     executorService.shutdown();

}

ExecutorService.invokeAll()

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

private void assemblyActivityTag(CartItemDTO itemDTO) {

         ExecutorService executorService = Executors.newCachedThreadPool();

         List<Callable<String>> tasks = Lists.newArrayList();

         tasks.add( new Callable<String>() {

             @Override

             public String call() throws Exception {

                 //1.查询立减活动信息

                 return null ;

             }

         });

         tasks.add( new Callable<String>() {

             @Override

             public String call() throws Exception {

                 //2.查询阶梯满减活动信息

                 return null ;

             }

         });

         tasks.add( new Callable<String>() {

             @Override

             public String call() throws Exception {

                 //3.查询团购活动信息

                 return null ;

             }

         });

         try {

             List<Future<String>> futureList = executorService.invokeAll(tasks, 3000 , TimeUnit.MILLISECONDS);

             for (Future<String> future : futureList) {

                 // 获取线程执行结果

                 try {

                     String activityTag = future.get();

                 } catch (ExecutionException e) {

                     e.printStackTrace();

                 }

             }

         } catch (InterruptedException e) {

             e.printStackTrace();

         }

         //4.组装活动标签信息

         //5.关闭线程池

         executorService.shutdown();

     }

注意点和区别

在使用CountDownLatch,尽可能使用线程安全的容器去处理子线程的返回值,避免多线程情况下,出现脏数据。

如果想知道每个子线程的对应的返回值,ExecutorService.invokeAll()方式,是没法区分的,只能依赖返回值的顺序去匹配。

使用上面2种方式时,切记设置超时时间,防止子任务执行时间过长,阻塞主线程任务

线程池用完结束,记得shutdown()

java并行执行任务demo

在一个方法中同时调用多个方法或者服务,并等待所有结果返回

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

package com.test.demo;

import org.junit.Test;

import java.util.concurrent.CompletableFuture;

public class TestFuture {

     @Test

     public void testA(){

         CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> c());

         CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> a());

         CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> b());

         try {

             //获取并行执行任务结果

             System.out.println(future3.get());

             System.out.println(future1.get());

             System.out.println(future2.get());

         } catch (Exception e){

         }

     }

     public String a(){

         try {

             Thread.sleep( 1000 );

         } catch (Exception e){

         }

         return "a" ;

     }

     private String b(){

         try {

             //模拟业务执行时间

             Thread.sleep( 2000 );

         } catch (Exception e){

         }

         return "b" ;

     }

     private String c(){

         try {

             //模拟业务执行时间

             Thread.sleep( 5000 );

         } catch (Exception e){

         }

         return "c" ;

     }

}

测试结果:

从执行结果中可以看到一共耗时5s,如果同步进行执行,耗时应该在8s

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://juejin.cn/post/6844903844355260429

查看更多关于Java并行执行任务的几种方案小结的详细内容...

  阅读:12次