好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

解决线程池中ThreadGroup的坑

线程池中ThreadGroup的坑

在Java中每一个线程都归属于某个线程组管理的一员,例如在主函数main()主工作流程中产生一个线程,则产生的线程属于main这个线程组管理的一员。简单地说,线程组(ThreadGroup)就是由线程组成的管理线程的类,这个类是java.lang.ThreadGroup类。

定义一个线程组,通过以下代码可以实现。

?

1

2

ThreadGroup group= new ThreadGroup([groupName]);

Thread thread= new Thread(group,]the first thread of group]);

ThreadGroup类中的某些方法,可以对线程组中的线程产生作用。例如,setMaxPriority()方法可以设定线程组中的所有线程拥有最大的优先权。

所有线程都隶属于一个线程组。那可以是一个默认线程组(不指定group),亦可是一个创建线程时明确指定的组。在创建之初,线程被限制到一个组里,而且不能改变到一个不同的组。每个应用都至少有一个线程从属于系统线程组。若创建多个线程而不指定一个组,它们就会自动归属于系统线程组。

线程组也必须从属于其他线程组。必须在构建器里指定新线程组从属于哪个线程组。若在创建一个线程组的时候没有指定它的归属,则同样会自动成为系统线程组的一名属下。因此,一个应用程序中的所有线程组最终都会将系统线程组作为自己的[父]。

那么假如我们需要在线程池中实现一个带自定义ThreadGroup的线程分组,该怎么实现呢?

我们在给线程池(ThreadPoolExecutor)提交任务的时候可以通过execute(Runnable command)来将一个线程任务加入到该线程池,那么我们是否可以通过new一个指定了ThreadGroup的Thread实例来加入线程池来达到前面说到的目的呢?

ThreadGroup是否可行

通过new Thread(threadGroup,runnable)实现线程池中任务分组

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public static void main(String[] args) {

         ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newCachedThreadPool();

         final ThreadGroup group = new ThreadGroup( "Main_Test_Group" );

         for ( int i = 0 ; i < 5 ; i++) {

             Thread thread = new Thread(group, new Runnable() {

                 @Override

                 public void run() {

                     int sleep = ( int )(Math.random() * 10 );

                     try {

                         Thread.sleep( 1000 * 3 );

                         System.out.println(Thread.currentThread().getName()+ "执行完毕" );

                         System.out.println( "当前线程组中的运行线程数" +group.activeCount());

                     } catch (InterruptedException e) {

                         e.printStackTrace();

                     }

                 }

             }, group.getName()+ " #" +i+ "" );

             pool.execute(thread);

         }

     }

运行结果

pool-1-thread-3执行完毕
pool-1-thread-1执行完毕
当前线程组中的运行线程数0
pool-1-thread-2执行完毕
当前线程组中的运行线程数0
当前线程组中的运行线程数0
pool-1-thread-4执行完毕
pool-1-thread-5执行完毕
当前线程组中的运行线程数0
当前线程组中的运行线程数0

运行结果中可以看到group中的线程并没有因为线程池启动了这个线程任务而运行起来.因此通过线程组来对线程池中的线层任务分组不可行.

从java.util.concurrent.ThreadPoolExecutor源码中可以看到如下构造函数:

?

1

2

3

4

5

6

7

8

public ThreadPoolExecutor( int corePoolSize,

                               int maximumPoolSize,

                               long keepAliveTime,

                               TimeUnit unit,

                               BlockingQueue<Runnable> workQueue) {

         this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

              Executors.defaultThreadFactory(), defaultHandler);

     }

如果我们在实例化ThreadPoolExecutor时不指定ThreadFactory,那么将以默认的ThreadFactory来创建Thread.

Executors内部类DefaultThreadFactory

下面的源码即是默认的Thread工厂

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

static class DefaultThreadFactory implements ThreadFactory {

         private static final AtomicInteger poolNumber = new AtomicInteger( 1 );

         private final ThreadGroup group;

         private final AtomicInteger threadNumber = new AtomicInteger( 1 );

         private final String namePrefix;

         DefaultThreadFactory() {

             SecurityManager s = System.getSecurityManager();

             group = (s != null ) ? s.getThreadGroup() :

                                   Thread.currentThread().getThreadGroup();

             namePrefix = "pool-" +

                           poolNumber.getAndIncrement() +

                          "-thread-" ;

         }

         public Thread newThread(Runnable r) {

             Thread t = new Thread(group, r,

                                   namePrefix + threadNumber.getAndIncrement(),

                                   0 );

             if (t.isDaemon())

                 t.setDaemon( false );

             if (t.getPriority() != Thread.NORM_PRIORITY)

                 t.setPriority(Thread.NORM_PRIORITY);

             return t;

         }

     }

从唯一的构造函数可以看到DefaultThreadFactory以SecurityManager 实例中的ThreadGroup来指定线程的group,如果SecurityManager 获取到的ThreadGroup为null才默认以当前线程的group来指定.public Thread newThread(Runnable r) 则以group来new 一个Thead.这样我们可以在实例化ThreadPoolExecutor对象的时候在其构造函数内传入自定义的ThreadFactory实例即可达到目的.

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

public class MyTheadFactory implements ThreadFactory {

     private static final AtomicInteger poolNumber = new AtomicInteger( 1 );

     private final AtomicInteger threadNumber = new AtomicInteger( 1 );

     private final String namePrefix;

     private ThreadGroup defaultGroup;

     public MyTheadFactory() {

         SecurityManager s = System.getSecurityManager();

         defaultGroup = (s != null ) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();

         namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" ;

     }

     public MyTheadFactory(ThreadGroup group) {

        this .defaultGroup = group;

         namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" ;

     }

     public Thread newThread(Runnable r) {

         Thread t = new Thread(defaultGroup, null , namePrefix + threadNumber.getAndIncrement(), 0 );

         if (t.isDaemon())

             t.setDaemon( false );

         if (t.getPriority() != Thread.NORM_PRIORITY)

             t.setPriority(Thread.NORM_PRIORITY);

         return t;

     }

}

ThreadGroup的使用及手写线程池

监听线程异常关闭

以下代码在window下不方便测试,需在linux 上 测试

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

// 以下线程如果强制关闭的话,是无法打印`线程被杀掉了`

// 模拟关闭 kill PID

public static void main(String[] args)  {

         Runtime.getRuntime().addShutdownHook( new Thread( () -> {

             System.out.println( "线程被杀掉了" );

         }));

         while ( true ){

             System.out.println( "i am working ..." );

             try {

                 Thread.sleep( 1000 );

             } catch (InterruptedException e) {

                 e.printStackTrace();

             }

         }

     }

如何拿到Thread线程中异常

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

public static void main(String[] args) {

         Thread thread = new Thread(() -> {

             try {

                 Thread.sleep( 1000 );

                 int i = 10 / 0 ;

             } catch (InterruptedException e) {

                 e.printStackTrace();

             }

         });

         thread.setUncaughtExceptionHandler((t,e)->{

             System.out.println( "线程的名字" + t.getName());

             System.out.println(e);

         });  // 通过注入接口的方式

         thread.start();

     }

ThreadGroup

注意: threadGroup 设置为isDaemon 后,会随最后一个线程结束而销毁,如果没有设置isDaemon ,则需要手动调用 destory()

线程池使用

自己搭建的简单线程池实现

其中ThreadGroup 的应用没有写,但是我们可以观察线程关闭后,检查ThreadGroup 中是否还有活跃的线程等,具体参考ThreadGroup API

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

import java.util.ArrayList;

import java.util.Iterator;

import java.util.LinkedList;

import java.util.List;

import java.util.stream.IntStream;

/**

  * @Author: shengjm

  * @Date: 2020/2/10 9:52

  * @Description:

  */

public class SimpleThreadPool extends Thread{

     /**

      * 线程数量

      */

     private int size;

     private final int queueSize;

     /**

      * 默认线程队列数量

      */

     private final static int DEFAULR_TASK_QUEUE_SIZE = 2000 ;

     private static volatile int seq = 0 ;

     private final static String THREAD_PREFIX = "SIMPLE_THREAD_POLL_" ;

     private final static ThreadGroup GROUP = new ThreadGroup( "Pool_Group" );

     private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();

     private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>();

     private final DiscardPolicy discardPolicy;

     private volatile boolean destory = false ;

     private int min;

     private int max;

     private int active;

     /**

      * 定义异常策略的实现

      */

     private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {

         throw new DiscardException( "线程池已经被撑爆了,后继多余的人将丢失" );

     };

     /**

      *

      */

     public SimpleThreadPool(){

         this ( 4 , 8 , 12 ,DEFAULR_TASK_QUEUE_SIZE,DEFAULT_DISCARD_POLICY);

     }

     /**

      *

      */

     public SimpleThreadPool( int min , int active , int max , int queueSize,DiscardPolicy discardPolicy) {

         this .min = min;

         this .active = active;

         this .max = max;

         this .queueSize = queueSize;

         this .discardPolicy = discardPolicy;

         init();

     }

  /**

   * 初始化

   */

     private void init() {

         for ( int i = 0 ; i < min; i++){

             createWorkTask();

         }

         this .size = min;

         this .start();

     }

     private void createWorkTask(){

         WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seq++));

         task.start();

         THREAD_QUEUE.add(task);

     }

  /**

   * 线程池自动扩充

   */

     @Override

     public void run() {

         while (!destory){

             System.out.println( this .min + " --- " + this .active+ " --- " + this .max + " --- " + this .size + " --- " +  TASK_QUEUE.size());

             try {

                 Thread.sleep( 1000 );

                 if (TASK_QUEUE.size() > active && size < active){

                     for ( int i = size; i < active;i++){

                         createWorkTask();

                     }

                     size = active;

                 } else if (TASK_QUEUE.size() > max && size < max){

                     for ( int i = size; i < max;i++){

                         createWorkTask();

                     }

                     size = max;

                 }

                 synchronized (THREAD_QUEUE){

                     if (TASK_QUEUE.isEmpty() && size > active){

                         int release = size - active;

                         for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator();it.hasNext();){

                             if (release <= 0 ){

                                 break ;

                             }

                             WorkerTask task = it.next();

                             task.close();

                             task.interrupt();

                             it.remove();

                             release--;

                         }

                         size = active;

                     }

                 }

             } catch (InterruptedException e) {

                 break ;

             }

         }

     }

     public void submit(Runnable runnable){

         synchronized (TASK_QUEUE){

             if (destory){

                 throw new DiscardException( "线程池已经被摧毁了..." );

             }

             if (TASK_QUEUE.size() > queueSize){

                 discardPolicy.discard();

             }

             TASK_QUEUE.addLast(runnable);

             TASK_QUEUE.notifyAll();

         }

     }

  /**

   * 关闭

   */

     public void shutdown(){

         while (!TASK_QUEUE.isEmpty()){

             try {

                 Thread.sleep( 10 );

             } catch (InterruptedException e) {

                 e.printStackTrace();

             }

         }

         synchronized (THREAD_QUEUE) {

             int initVal = THREAD_QUEUE.size();

             while (initVal > 0 ) {

                 for (WorkerTask workerTask : THREAD_QUEUE) {

                     if (workerTask.getTaskState() == TaskState.BLOCKED) {

                         workerTask.interrupt();

                         workerTask.close();

                         initVal--;

                     } else {

                         try {

                             Thread.sleep( 10 );

                         } catch (InterruptedException e) {

                             e.printStackTrace();

                         }

                     }

                 }

             }

             this .destory = true ;

         }

     }

     public int getSize() {

         return size;

     }

     public int getMin() {

         return min;

     }

     public int getMax() {

         return max;

     }

     public int getActive() {

         return active;

     }

     /**

      * 线程状态

      */

     private enum TaskState{

         FREE , RUNNING , BLOCKED , DEAD

     }

     /**

      * 自定义异常类

      */

     public static class DiscardException extends RuntimeException{

         public DiscardException(String message){

             super (message);

         }

     }

     /**

      * 定义异常策略

      */

     @FunctionalInterface

     public interface DiscardPolicy{

         void discard() throws DiscardException;

     }

     private static class WorkerTask extends Thread{

         private volatile TaskState taskState = TaskState.FREE;

         public TaskState getTaskState(){

             return this .taskState;

         }

         public WorkerTask(ThreadGroup group , String name){

             super (group , name);

         }

         @Override

         public void run(){

             OUTER:

             while ( this .taskState != TaskState.DEAD){

                 Runnable runnable;

                 synchronized (TASK_QUEUE){

                     while (TASK_QUEUE.isEmpty()){

                         try {

                             taskState = TaskState.BLOCKED;

                             TASK_QUEUE.wait();

                         } catch (InterruptedException e) {

                             break OUTER;

                         }

                     }

                     runnable = TASK_QUEUE.removeFirst();

                 }

                 if (runnable != null ){

                     taskState = TaskState.RUNNING;

                     runnable.run();

                     taskState = TaskState.FREE;

                 }

             }

         }

         public void close(){

             this .taskState = TaskState.DEAD;

         }

     }

     /**

      * 测试

      * @param args

      */

     public static void main(String[] args) {

         SimpleThreadPool simpleThreadPool = new SimpleThreadPool();

//        SimpleThreadPool simpleThreadPool = new SimpleThreadPool(6,15,SimpleThreadPool.DEFAULT_DISCARD_POLICY);

         IntStream.rangeClosed( 0 , 40 ).forEach(i -> {

             simpleThreadPool.submit(() -> {

                 try {

                     Thread.sleep( 1000 );

                 } catch (InterruptedException e) {

                     e.printStackTrace();

                 }

                 System.out.println( "the runnable " + i + "be servered by " + Thread.currentThread());

             });

         });

//        try {

//            Thread.sleep(15000);

//        } catch (InterruptedException e) {

//            e.printStackTrace();

//        }

         simpleThreadPool.shutdown();

     }

}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://blog.csdn.net/tyBaoErGe/article/details/50196379

查看更多关于解决线程池中ThreadGroup的坑的详细内容...

  阅读:18次