好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

基于Spring Boot的线程池监控问题及解决方案

前言

这篇是推动大家异步编程的思想的线程池的准备篇,要做好监控,让大家使用无后顾之忧,敬畏生产。

为什么需要对线程池进行监控

Java线程池作为最常使用到的并发工具,相信大家都不陌生,但是你真的确定使用对了吗?大名鼎鼎的阿里Java代码规范要求我们不使用 Executors来快速创建线程池,但是抛弃Executors,使用其它方式创建线程池就一定不会出现问题吗?本质上对于我们来说线程池本身的运行过程是一个黑盒,我们没办法了解线程池中的运行状态时,出现问题没有办法及时判断和预警。面对这种黑盒操作必须通过监控方式让其透明化,这样对我们来说才能更好的使用好线程池。因此必须对线程池做监控。

如何做线程池的监控

对于如何做监控,本质就是涉及三点,分别是数据采集、数据存储以及大盘的展示,接下来我们分说下这三点;

数据采集

采集什么数据,对于我们来说需要采集就是黑盒的数据,什么又是线程池的黑盒数据,其实也就是整个线程处理的整个流程,在整个流程中,我们可以通过ThreadPoolExecutor中的七个方法获取数据,通过这七个方法采集到的数据就可以使线程池的执行过程透明化。

getCorePoolSize():获取核心线程数;

getMaximumPoolSize:获取最大线程数;

getQueue():获取线程池中的阻塞队列,并通过阻塞队列中的方法获取队列长度、元素个数等;

getPoolSize():获取线程池中的工作线程数(包括核心线程和非核心线程);

getActiveCount():获取活跃线程数,也就是正在执行任务的线程;

getLargestPoolSize():获取线程池曾经到过的最大工作线程数;

getTaskCount():获取历史已完成以及正在执行的总的任务数量;

除了我们了解的这些流程以外,ThreadPoolExecutor中还提供了三种钩子函数,

beforeExecute():Worker线程执行任务之前会调用的方法;

afterExecute():在Worker线程执行任务之后会调用的方法;

terminated():当线程池从运行状态变更到TERMINATED状态之前调用的方法;

对于beforeExecute和afterExecute可以理解为使用Aop监听线程执行的时间,这样子我们可以对每个线程运行的时间整体做监控,terminated可以理解为线程关闭时候的监控,这样我们就可以整体获取采集到线程池生命周期的所有数据了。

数据存储以及大盘的展示

对于存储我们这个比较适合采用时序性数据库,此外现在很多成熟的监控产品都可以满足我们大屏展示的诉求,这里推荐下美团Cat和Prometheus,这里不展开进行讲解,大家可以根据自己公司的监控产品进行选择,对于不同的方案采取的存储形式会有些差异,甚至自己都可以自定义实现一个功能,反正难度不大。

进一步扩展以及思考

在实际的项目开发中我们会遇到以下场景:

不同的业务采用同一个线程池,这样如果某个服务阻塞,会影响到整体共用线程池的所有服务,会触发线程池的拒绝策略;

流量突然增加,需要动态调整线程池的参数,这个时候又不能重启;

针对这两种场景,我们对线程池再次进行了深入的思考:

如何合理配置线程池参数;

如何动态调整线程池参数;

如何给不同的服务之间做线程池的隔离;

如何合理配置线程池参数

关于这个问题面试的时候也是经常被问到,我只能说这个问题开始就是一个坑,针对与CPU密集型和I/O密集型,线程池的参数是有不同设计的,也不是遵守几个公式就可以搞定,当然可以参考,我认为对于线程池合理的参数的配置是经过多次调整得到的,甚至增加和减少业务都会影响一些参数,我不太建议大家每天背书式的CPU密集型就是N+1,非CPU密集型就是2N,因此我们更希望看到线程池动态配置。

如何动态调整线程池参数

关于如何动态调整线程池,还是回到我们场景问题的解决上,对于流量突增核心就是提升线程池的处理速度,那如何提升线程池的处理速度,有两种方式,一种是加快业务的处理,也就是消费的快,显然这种在运行的业务中我们想改变还是比较困难,这个可以作为复盘的重点;还有一种就是增加消费者,增加消费者的重点就是调整核心线程数以及非核心线程数的数量。

居于这种思考,这个时候我们需要看下ThreadPoolExecutor线程池源码,首先看下开始定义的变量,通过变量的设计我们就会发现大师就是大师,大师通过AtomicInteger修饰的ctl变量,高3位存储了线程池的状态,低29存储线程的个数,通过一个变量完成两件事情,完成状态判断以及限制线程最大个数。使用一个HashSet存储Worker的引用,而Worker继承了AbstractQueuedSynchronizer,实现一个一个不可冲入的独占锁保证线程的安全性。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

//用来标记线程池状态(高3位),线程个数(低29位)     

private   final   AtomicInteger ctl =  new   AtomicInteger(ctlOf(RUNNING,  0 ));

//工作状态存储在高3位中

private   static   final   int   COUNT_BITS = Integer.SIZE -  3 ;

//线程个数所能表达的最大数值

private   static   final   int   CAPACITY   = ( 1   << COUNT_BITS) -  1 ;

//线程池状态

//RUNNING -1 能够接收新任务,也可以处理阻塞队列中的任务

private   static   final   int   RUNNING    = - 1   << COUNT_BITS;

//SHUTDOWN 0 不可以接受新任务,继续处理阻塞队列中的任务

private   static   final   int   SHUTDOWN   =   0   << COUNT_BITS;

//STOP 1 不接收新任务,不处理阻塞队列中的任务,并且会中断正在处理的任务

private   static   final   int   STOP       =   1   << COUNT_BITS;

//TIDYING 2 所有任务已经中止,且工作线程数量为0,最后变迁到这个状态的线程将要执行terminated()钩子方法,只会有一个线程执行这个方法;

private   static   final   int   TIDYING    =   2   << COUNT_BITS;

//TERMINATED 3 中止状态,已经执行完terminated()钩子方法

private   static   final   int   TERMINATED =   3   << COUNT_BITS;

//任务队列,当线程池中的线程达到核心线程数量时,再提交任务 就会直接提交到 workQueue

private   final   BlockingQueue<Runnable> workQueue;

//线程池全局锁,增加worker减少worker时需要持有mainLock,修改线程池运行状态时,也需要

private   final   ReentrantLock mainLock =  new   ReentrantLock();

//线程池中真正存放worker的地方。

private   final   HashSet<Worker> workers =  new   HashSet<Worker>();

private   final   Condition termination = mainLock.newCondition();

//记录线程池生命周期内 线程数最大值

private   int   largestPoolSize;

//记录线程池所完成任务总数

private   long   completedTaskCount;

//创建线程会使用线程工厂

private   volatile   ThreadFactory threadFactory;

//拒绝策略

private   volatile   RejectedExecutionHandler handler;

//存活时间

private   volatile   long   keepAliveTime;

//控制核心线程数量内的线程 是否可以被回收。true 可以,false不可以。

private   volatile   boolean   allowCoreThreadTimeOut;

//核心线程池数量

private   volatile   int   corePoolSize;

//线程池最大数量

private   volatile   int   maximumPoolSize;

我们的重点看的是volatile修饰的corePoolSize、maximumPoolSize以及keepAliveTime,当然threadFactory和handler也可以看下,不过这两个不是我们解决动态调整线程池的关键。对于这些volatile修饰的关键的变量,从并发角度思考的,必然是有并发读写的操作才使用volatile修饰的,在指标采集中我们看到其get 的方法,对于写的操作我们可以猜测肯定提供了set 的方式,这个时候我们可以搜索下setCorePoolSize,果不其然我们真的搜索到了。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

     public   void   setCorePoolSize( int   corePoolSize) {

         if   (corePoolSize <  0 )

             throw   new   IllegalArgumentException();

         int   delta = corePoolSize -  this .corePoolSize;

         this .corePoolSize = corePoolSize;

         //新设置的corePoolSize小于当前核心线程数的时候

         //会调用interruptIdleWorkers方法来中断空闲的工作线程

         if   (workerCountOf(ctl.get()) > corePoolSize)

             interruptIdleWorkers();

         else   if   (delta >  0 ) {

             //当设置的值大于当前值的时候核心线程数的时候

             //按照等待队列中的任务数量来创建新的工作线程

             int   k = Math.min(delta, workQueue.size());

             while   (k-- >  0   && addWorker( null ,  true )) {

                 if   (workQueue.isEmpty())

                     break ;

             }

         }

     }

接下来我们看下interruptIdleWorkers的源码,此处源码使用ReentrantLock可重入锁,因为Worker的是通过一个全局的HashSer存储,这里通过ReentrantLock保证线程安全。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

     private   void   interruptIdleWorkers( boolean   onlyOne) {

         //可重入锁

         final   ReentrantLock mainLock =  this .mainLock;

         mainLock.lock();

         try   {

             for   (Worker w : workers) {

                 Thread t = w.thread;

                 if   (!t.isInterrupted() && w.tryLock()) {

                     try   {

                         //中断当前线程

                         t.interrupt();

                     }  catch   (SecurityException ignore) {

                     }  finally   {

                         w.unlock();

                     }

                 }

                 if   (onlyOne)

                     break ;

             }

         }  finally   {

             mainLock.unlock();

         }

     }

接下来我们在验证一下是否存在其他相关的参数设置,如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

     public   void   setMaximumPoolSize( int   maximumPoolSize) {

         if   (maximumPoolSize <=  0   || maximumPoolSize < corePoolSize)

             throw   new   IllegalArgumentException();

         this .maximumPoolSize = maximumPoolSize;

         if   (workerCountOf(ctl.get()) > maximumPoolSize)

             interruptIdleWorkers();

     }

     public   void   setKeepAliveTime( long   time, TimeUnit unit) {

         if   (time <  0 )

             throw   new   IllegalArgumentException();

         if   (time ==  0   && allowsCoreThreadTimeOut())

             throw   new   IllegalArgumentException( "Core threads must have nonzero keep alive times" );

         long   keepAliveTime = unit.toNanos(time);

         long   delta = keepAliveTime -  this .keepAliveTime;

         this .keepAliveTime = keepAliveTime;

         if   (delta <  0 )

             interruptIdleWorkers();

     }

     public   void   setRejectedExecutionHandler(RejectedExecutionHandler handler) {

         if   (handler ==  null )

             throw   new   NullPointerException();

         this .handler = handler;

     }

这里我们会发现一个问题BlockingQueue的队列容量不能修改,看到美团的文章提供的一个可修改的队列ResizableCapacityLinkedBlockingQueue,于是乎去看了一下LinkedBlockingQueue的源码,发现了关于capacity是一个final修饰的,这个时候我就思考一番,这个地方采用volatile修饰,对外暴露可修改,这样就实现了动态修改阻塞队列的大小。

如何给不同的服务之间做线程池的隔离

关于如何给不同服务之间做线程池的隔离,这里我们可以采用Hystrix的舱壁模式,也就是说针对不同服务类型的服务单独创建线程池,这样就可以实现服务之间不相互影响,不会因为某个服务导致整体的服务影响都阻塞。

实现方案

聊了这么多前置的知识储备,接下来我们来聊聊实现方案,整体的实现方案我们建立在Spring Boot的基础实现,采用Spring Cloud刷新动态配置,采用该方式比较合适单体应用,对于有Appllo和Nacos可以通过监听配置方式的来动态刷新。

Maven依赖如下;

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

     <dependencies>

         <dependency>

             <groupId>org.springframework.boot</groupId>

             <artifactId>spring-boot-starter</artifactId>

         </dependency>

         <dependency>

             <groupId>org.springframework.boot</groupId>

             <artifactId>spring-boot-starter-web</artifactId>

         </dependency>

         <dependency>

             <groupId>org.springframework.cloud</groupId>

             <artifactId>spring-cloud-context</artifactId>

         </dependency>

         <dependency>

             <groupId>org.springframework.boot</groupId>

             <artifactId>spring-boot-starter-test</artifactId>

             <scope>test</scope>

         </dependency>

         <dependency>

             <groupId>org.projectlombok</groupId>

             <artifactId>lombok</artifactId>

             <version> 1.18 . 12 </version>

         </dependency>

         <dependency>

             <groupId>org.slf4j</groupId>

             <artifactId>slf4j-api</artifactId>

             <version> 1.7 . 5 </version>

         </dependency>

         <dependency>

             <groupId>ch.qos.logback</groupId>

             <artifactId>logback-core</artifactId>

             <version> 1.2 . 3 </version>

         </dependency>

         <dependency>

             <groupId>ch.qos.logback</groupId>

             <artifactId>logback-classic</artifactId>

             <version> 1.2 . 3 </version>

         </dependency>

 

     </dependencies>

 

     <dependencyManagement>

         <dependencies>

             <dependency>

                 <groupId>org.springframework.cloud</groupId>

                 <artifactId>spring-cloud-dependencies</artifactId>

                 <version>Hoxton.SR7</version>

                 <type>pom</type>

                 <scope> import </scope>

             </dependency>

         </dependencies>

     </dependencyManagement>

配置信息如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

monitor.threadpool.executors[ 0 ].thread-pool-name=first-monitor-thread-pool

monitor.threadpool.executors[ 0 ].core-pool-size= 4

monitor.threadpool.executors[ 0 ].max-pool-size= 8

monitor.threadpool.executors[ 0 ].queue-capacity= 100

 

monitor.threadpool.executors[ 1 ].thread-pool-name=second-monitor-thread-pool

monitor.threadpool.executors[ 1 ].core-pool-size= 2

monitor.threadpool.executors[ 1 ].max-pool-size= 4

monitor.threadpool.executors[ 1 ].queue-capacity= 40

    

/**

  * 线程池配置

  *

  * @author wangtongzhou 

  * @since 2022-03-11 21:41

  */

@Data

public   class   ThreadPoolProperties {

     /**

      * 线程池名称

      */

     private   String threadPoolName;

      * 核心线程数

     private   Integer corePoolSize = Runtime.getRuntime().availableProcessors();

      * 最大线程数

     private   Integer maxPoolSize;

      * 队列最大数量

     private   Integer queueCapacity;

      * 拒绝策略

     private   String rejectedExecutionType =  "AbortPolicy" ;

      * 空闲线程存活时间

     private   Long keepAliveTime = 1L;

      * 空闲线程存活时间单位

     private   TimeUnit unit = TimeUnit.MILLISECONDS;

}

  * 动态刷新线程池配置

  *  @since   2022 - 03 - 13   14 : 09

@ConfigurationProperties (prefix =  "monitor.threadpool" )

@Component

public   class   DynamicThreadPoolProperties {

     private   List<ThreadPoolProperties> executors;

自定可修改阻塞队列大小的方式如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560

561

562

563

564

565

566

567

568

569

570

571

572

573

574

575

576

577

578

579

580

581

582

583

584

585

586

587

588

589

590

591

592

593

594

595

596

597

598

599

600

601

602

603

604

605

606

607

608

609

610

611

612

613

614

615

616

617

618

619

620

621

622

623

624

625

626

627

628

629

630

631

632

633

634

635

636

637

638

639

640

641

642

643

644

645

646

647

648

649

650

651

652

653

654

655

656

657

658

659

660

661

662

663

664

665

666

667

668

669

670

671

672

673

674

675

676

677

678

679

680

681

682

683

684

685

686

687

688

689

690

691

692

693

694

695

696

697

698

699

700

701

702

703

704

705

706

707

708

709

710

711

712

713

714

715

716

717

718

719

720

721

722

723

724

725

726

727

728

729

730

731

732

733

734

735

736

737

738

739

740

741

742

743

744

745

746

747

748

749

750

751

752

753

754

755

756

757

758

759

760

761

762

763

764

765

766

767

768

769

770

771

772

773

774

775

776

777

778

779

780

781

782

783

784

785

786

787

788

789

790

791

792

793

794

795

796

797

798

799

800

801

802

803

804

805

806

807

808

809

810

811

812

813

814

815

816

817

818

819

820

821

822

823

824

825

826

827

828

829

830

831

832

833

834

835

836

837

838

839

840

841

842

843

844

845

846

847

848

849

850

851

852

853

854

855

856

857

858

859

860

861

862

863

864

865

866

867

868

869

870

871

872

873

874

875

876

877

878

879

880

881

882

883

884

885

886

887

888

889

890

891

892

893

894

895

896

897

898

899

900

901

902

903

904

905

906

907

908

909

910

911

912

913

914

915

916

917

918

919

920

921

922

923

924

925

926

927

928

929

930

931

932

933

934

935

936

937

938

939

940

941

942

943

944

945

946

947

948

949

950

951

952

953

954

955

956

957

958

959

960

961

962

963

964

965

966

967

968

969

970

971

972

973

974

975

976

977

978

979

980

981

982

983

984

985

986

987

988

989

990

991

992

993

994

995

996

997

998

999

1000

1001

1002

1003

1004

1005

1006

1007

1008

1009

1010

1011

1012

1013

1014

1015

1016

1017

1018

1019

1020

1021

1022

1023

1024

1025

1026

1027

1028

1029

1030

1031

1032

1033

1034

1035

1036

1037

1038

1039

1040

1041

1042

1043

1044

1045

1046

1047

1048

1049

1050

1051

1052

1053

1054

1055

1056

1057

1058

1059

1060

1061

1062

1063

1064

1065

1066

1067

1068

1069

1070

1071

1072

1073

1074

1075

1076

1077

1078

1079

1080

1081

1082

1083

1084

1085

1086

1087

1088

1089

1090

1091

1092

1093

1094

1095

1096

1097

1098

1099

1100

1101

1102

1103

1104

1105

1106

1107

1108

1109

1110

1111

1112

1113

1114

1115

1116

1117

1118

1119

1120

1121

1122

1123

1124

1125

1126

1127

1128

1129

1130

1131

1132

1133

1134

1135

1136

1137

1138

1139

1140

1141

1142

1143

1144

1145

1146

1147

1148

1149

1150

1151

1152

1153

1154

1155

1156

1157

1158

1159

1160

1161

1162

1163

1164

1165

1166

1167

1168

1169

1170

1171

1172

1173

1174

1175

1176

1177

1178

1179

1180

1181

1182

1183

1184

1185

1186

1187

1188

1189

1190

1191

1192

1193

1194

1195

1196

1197

1198

1199

1200

1201

1202

1203

1204

1205

1206

1207

1208

1209

1210

1211

1212

1213

1214

1215

1216

1217

1218

1219

1220

1221

1222

1223

1224

1225

1226

1227

1228

1229

1230

1231

1232

1233

1234

1235

1236

1237

1238

1239

1240

1241

1242

1243

1244

1245

1246

1247

1248

1249

1250

1251

1252

1253

1254

1255

1256

1257

1258

1259

1260

1261

1262

1263

1264

1265

1266

1267

1268

1269

1270

1271

1272

1273

1274

1275

1276

1277

1278

1279

1280

1281

1282

1283

1284

1285

1286

1287

1288

1289

1290

1291

1292

1293

1294

1295

1296

1297

1298

1299

1300

1301

1302

1303

1304

1305

1306

1307

1308

1309

1310

1311

1312

1313

1314

1315

1316

1317

1318

1319

1320

1321

1322

1323

1324

1325

1326

1327

1328

1329

1330

1331

1332

1333

1334

1335

1336

1337

1338

1339

1340

1341

1342

1343

1344

1345

1346

1347

1348

1349

1350

1351

1352

1353

1354

1355

1356

1357

1358

1359

1360

1361

1362

1363

1364

1365

1366

1367

1368

1369

1370

1371

1372

1373

1374

1375

1376

1377

1378

1379

1380

1381

1382

1383

1384

1385

1386

1387

1388

1389

1390

1391

1392

1393

1394

1395

1396

1397

1398

1399

1400

1401

1402

1403

1404

1405

1406

1407

1408

1409

1410

1411

1412

1413

1414

1415

1416

1417

1418

1419

1420

1421

1422

1423

1424

1425

1426

1427

1428

/**

  * 可重新设定队列大小的阻塞队列

  *

  * @author wangtongzhou 

  * @since 2022-03-13 11:54

  */

public   class   ResizableCapacityLinkedBlockingQueue<E>  extends   AbstractQueue<E>

         implements   BlockingDeque<E>, java.io.Serializable {

     /*

      * Implemented as a simple doubly-linked list protected by a

      * single lock and using conditions to manage blocking.

      *

      * To implement weakly consistent iterators, it appears we need to

      * keep all Nodes GC-reachable from a predecessor dequeued Node.

      * That would cause two problems:

      * - allow a rogue Iterator to cause unbounded memory retention

      * - cause cross-generational linking of old Nodes to new Nodes if

      *   a Node was tenured while live, which generational GCs have a

      *   hard time dealing with, causing repeated major collections.

      * However, only non-deleted Nodes need to be reachable from

      * dequeued Nodes, and reachability does not necessarily have to

      * be of the kind understood by the GC.  We use the trick of

      * linking a Node that has just been dequeued to itself.  Such a

      * self-link implicitly means to jump to "first" (for next links)

      * or "last" (for prev links).

      */

 

     /*

      * We have "diamond" multiple interface/abstract class inheritance

      * here, and that introduces ambiguities. Often we want the

      * BlockingDeque javadoc combined with the AbstractQueue

      * implementation, so a lot of method specs are duplicated here.

      */

 

     private   static   final   long   serialVersionUID = -387911632671998426L;

 

     /**

      * Doubly-linked list node class

      */

     static   final   class   Node<E> {

         /**

          * The item, or null if this node has been removed.

          */

         E item;

 

         /**

          * One of:

          * - the real predecessor Node

          * - this Node, meaning the predecessor is tail

          * - null, meaning there is no predecessor

          */

         Node<E> prev;

 

         /**

          * One of:

          * - the real successor Node

          * - this Node, meaning the successor is head

          * - null, meaning there is no successor

          */

         Node<E> next;

 

         Node(E x) {

             item = x;

         }

     }

 

     /**

      * Pointer to first node.

      * Invariant: (first == null && last == null) ||

      * (first.prev == null && first.item != null)

      */

     transient   Node<E> first;

 

     /**

      * Pointer to last node.

      * Invariant: (first == null && last == null) ||

      * (last.next == null && last.item != null)

      */

     transient   Node<E> last;

 

     /**

      * Number of items in the deque

      */

     private   transient   int   count;

 

     /**

      * Maximum number of items in the deque

      */

     private   volatile   int   capacity;

 

     public   int   getCapacity() {

         return   capacity;

     }

 

     public   void   setCapacity( int   capacity) {

         this .capacity = capacity;

     }

 

     /**

      * Main lock guarding all access

      */

     final   ReentrantLock lock =  new   ReentrantLock();

 

     /**

      * Condition for waiting takes

      */

     private   final   Condition notEmpty = lock.newCondition();

 

     /**

      * Condition for waiting puts

      */

     private   final   Condition notFull = lock.newCondition();

 

     /**

      * Creates a {@code ResizableCapacityLinkedBlockIngQueue} with a capacity of

      * {@link Integer#MAX_VALUE}.

      */

     public   ResizableCapacityLinkedBlockingQueue() {

         this (Integer.MAX_VALUE);

     }

 

     /**

      * Creates a {@code ResizableCapacityLinkedBlockIngQueue} with the given (fixed) capacity.

      *

      * @param capacity the capacity of this deque

      * @throws IllegalArgumentException if {@code capacity} is less than 1

      */

     public   ResizableCapacityLinkedBlockingQueue( int   capacity) {

         if   (capacity <=  0 ) {

             throw   new   IllegalArgumentException();

         }

         this .capacity = capacity;

     }

 

     /**

      * Creates a {@code ResizableCapacityLinkedBlockIngQueue} with a capacity of

      * {@link Integer#MAX_VALUE}, initially containing the elements of

      * the given collection, added in traversal order of the

      * collection's iterator.

      *

      * @param c the collection of elements to initially contain

      * @throws NullPointerException if the specified collection or any

      *                              of its elements are null

      */

     public   ResizableCapacityLinkedBlockingQueue(Collection<?  extends   E> c) {

         this (Integer.MAX_VALUE);

         final   ReentrantLock lock =  this .lock;

         lock.lock();  // Never contended, but necessary for visibility

         try   {

             for   (E e : c) {

                 if   (e ==  null ) {

                     throw   new   NullPointerException();

                 }

                 if   (!linkLast( new   Node<E>(e))) {

                     throw   new   IllegalStateException( "Deque full" );

                 }

             }

         }  finally   {

             lock.unlock();

         }

     }

 

 

     // Basic linking and unlinking operations, called only while holding lock

 

     /**

      * Links node as first element, or returns false if full.

      */

     private   boolean   linkFirst(Node<E> node) {

         // assert lock.isHeldByCurrentThread();

         if   (count >= capacity) {

             return   false ;

         }

         Node<E> f = first;

         node.next = f;

         first = node;

         if   (last ==  null ) {

             last = node;

         }  else   {

             f.prev = node;

         }

         ++count;

         notEmpty.signal();

         return   true ;

     }

 

     /**

      * Links node as last element, or returns false if full.

      */

     private   boolean   linkLast(Node<E> node) {

         // assert lock.isHeldByCurrentThread();

         if   (count >= capacity) {

             return   false ;

         }

         Node<E> l = last;

         node.prev = l;

         last = node;

         if   (first ==  null ) {

             first = node;

         }  else   {

             l.next = node;

         }

         ++count;

         notEmpty.signal();

         return   true ;

     }

 

     /**

      * Removes and returns first element, or null if empty.

      */

     private   E unlinkFirst() {

         // assert lock.isHeldByCurrentThread();

         Node<E> f = first;

         if   (f ==  null ) {

             return   null ;

         }

         Node<E> n = f.next;

         E item = f.item;

         f.item =  null ;

         f.next = f;  // help GC

         first = n;

         if   (n ==  null ) {

             last =  null ;

         }  else   {

             n.prev =  null ;

         }

         --count;

         notFull.signal();

         return   item;

     }

 

     /**

      * Removes and returns last element, or null if empty.

      */

     private   E unlinkLast() {

         // assert lock.isHeldByCurrentThread();

         Node<E> l = last;

         if   (l ==  null ) {

             return   null ;

         }

         Node<E> p = l.prev;

         E item = l.item;

         l.item =  null ;

         l.prev = l;  // help GC

         last = p;

         if   (p ==  null ) {

             first =  null ;

         }  else   {

             p.next =  null ;

         }

         --count;

         notFull.signal();

         return   item;

     }

 

     /**

      * Unlinks x.

      */

     void   unlink(Node<E> x) {

         // assert lock.isHeldByCurrentThread();

         Node<E> p = x.prev;

         Node<E> n = x.next;

         if   (p ==  null ) {

             unlinkFirst();

         }  else   if   (n ==  null ) {

             unlinkLast();

         }  else   {

             p.next = n;

             n.prev = p;

             x.item =  null ;

             // Don't mess with x's links.  They may still be in use by

             // an iterator.

             --count;

             notFull.signal();

         }

     }

 

     // BlockingDeque methods

 

     /**

      * @throws IllegalStateException if this deque is full

      * @throws NullPointerException  {@inheritDoc}

      */

     @Override

     public   void   addFirst(E e) {

         if   (!offerFirst(e)) {

             throw   new   IllegalStateException( "Deque full" );

         }

     }

 

     /**

      * @throws IllegalStateException if this deque is full

      * @throws NullPointerException  {@inheritDoc}

      */

     @Override

     public   void   addLast(E e) {

         if   (!offerLast(e)) {

             throw   new   IllegalStateException( "Deque full" );

         }

     }

 

     /**

      * @throws NullPointerException {@inheritDoc}

      */

     @Override

     public   boolean   offerFirst(E e) {

         if   (e ==  null ) {

             throw   new   NullPointerException();

         }

         Node<E> node =  new   Node<E>(e);

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             return   linkFirst(node);

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * @throws NullPointerException {@inheritDoc}

      */

     @Override

     public   boolean   offerLast(E e) {

         if   (e ==  null )  throw   new   NullPointerException();

         Node<E> node =  new   Node<E>(e);

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             return   linkLast(node);

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * @throws NullPointerException {@inheritDoc}

      * @throws InterruptedException {@inheritDoc}

      */

     @Override

     public   void   putFirst(E e)  throws   InterruptedException {

         if   (e ==  null ) {

             throw   new   NullPointerException();

         }

         Node<E> node =  new   Node<E>(e);

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             while   (!linkFirst(node)) {

                 notFull.await();

             }

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * @throws NullPointerException {@inheritDoc}

      * @throws InterruptedException {@inheritDoc}

      */

     @Override

     public   void   putLast(E e)  throws   InterruptedException {

         if   (e ==  null ) {

             throw   new   NullPointerException();

         }

         Node<E> node =  new   Node<E>(e);

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             while   (!linkLast(node)) {

                 notFull.await();

             }

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * @throws NullPointerException {@inheritDoc}

      * @throws InterruptedException {@inheritDoc}

      */

     @Override

     public   boolean   offerFirst(E e,  long   timeout, TimeUnit unit)

             throws   InterruptedException {

         if   (e ==  null ) {

             throw   new   NullPointerException();

         }

         Node<E> node =  new   Node<E>(e);

         long   nanos = unit.toNanos(timeout);

         final   ReentrantLock lock =  this .lock;

         lock.lockInterruptibly();

         try   {

             while   (!linkFirst(node)) {

                 if   (nanos <=  0 ) {

                     return   false ;

                 }

                 nanos = notFull.awaitNanos(nanos);

             }

             return   true ;

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * @throws NullPointerException {@inheritDoc}

      * @throws InterruptedException {@inheritDoc}

      */

     @Override

     public   boolean   offerLast(E e,  long   timeout, TimeUnit unit)

             throws   InterruptedException {

         if   (e ==  null )  throw   new   NullPointerException();

         Node<E> node =  new   Node<E>(e);

         long   nanos = unit.toNanos(timeout);

         final   ReentrantLock lock =  this .lock;

         lock.lockInterruptibly();

         try   {

             while   (!linkLast(node)) {

                 if   (nanos <=  0 ) {

                     return   false ;

                 }

                 nanos = notFull.awaitNanos(nanos);

             }

             return   true ;

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * @throws NoSuchElementException {@inheritDoc}

      */

     @Override

     public   E removeFirst() {

         E x = pollFirst();

         if   (x ==  null ) {

             throw   new   NoSuchElementException();

         }

         return   x;

     }

 

     /**

      * @throws NoSuchElementException {@inheritDoc}

      */

     @Override

     public   E removeLast() {

         E x = pollLast();

         if   (x ==  null ) {

             throw   new   NoSuchElementException();

         }

         return   x;

     }

 

     @Override

     public   E pollFirst() {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             return   unlinkFirst();

         }  finally   {

             lock.unlock();

         }

     }

 

     @Override

     public   E pollLast() {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             return   unlinkLast();

         }  finally   {

             lock.unlock();

         }

     }

 

     @Override

     public   E takeFirst()  throws   InterruptedException {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             E x;

             while   ((x = unlinkFirst()) ==  null ) {

                 notEmpty.await();

             }

             return   x;

         }  finally   {

             lock.unlock();

         }

     }

 

     @Override

     public   E takeLast()  throws   InterruptedException {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             E x;

             while   ((x = unlinkLast()) ==  null ) {

                 notEmpty.await();

             }

             return   x;

         }  finally   {

             lock.unlock();

         }

     }

 

     @Override

     public   E pollFirst( long   timeout, TimeUnit unit)

             throws   InterruptedException {

         long   nanos = unit.toNanos(timeout);

         final   ReentrantLock lock =  this .lock;

         lock.lockInterruptibly();

         try   {

             E x;

             while   ((x = unlinkFirst()) ==  null ) {

                 if   (nanos <=  0 ) {

                     return   null ;

                 }

                 nanos = notEmpty.awaitNanos(nanos);

             }

             return   x;

         }  finally   {

             lock.unlock();

         }

     }

 

     @Override

     public   E pollLast( long   timeout, TimeUnit unit)

             throws   InterruptedException {

         long   nanos = unit.toNanos(timeout);

         final   ReentrantLock lock =  this .lock;

         lock.lockInterruptibly();

         try   {

             E x;

             while   ((x = unlinkLast()) ==  null ) {

                 if   (nanos <=  0 ) {

                     return   null ;

                 }

                 nanos = notEmpty.awaitNanos(nanos);

             }

             return   x;

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * @throws NoSuchElementException {@inheritDoc}

      */

     @Override

     public   E getFirst() {

         E x = peekFirst();

         if   (x ==  null ) {

             throw   new   NoSuchElementException();

         }

         return   x;

     }

 

     /**

      * @throws NoSuchElementException {@inheritDoc}

      */

     @Override

     public   E getLast() {

         E x = peekLast();

         if   (x ==  null ) {

             throw   new   NoSuchElementException();

         }

         return   x;

     }

 

     @Override

     public   E peekFirst() {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             return   (first ==  null ) ?  null   : first.item;

         }  finally   {

             lock.unlock();

         }

     }

 

     @Override

     public   E peekLast() {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             return   (last ==  null ) ?  null   : last.item;

         }  finally   {

             lock.unlock();

         }

     }

 

     @Override

     public   boolean   removeFirstOccurrence(Object o) {

         if   (o ==  null ) {

             return   false ;

         }

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             for   (Node<E> p = first; p !=  null ; p = p.next) {

                 if   (o.equals(p.item)) {

                     unlink(p);

                     return   true ;

                 }

             }

             return   false ;

         }  finally   {

             lock.unlock();

         }

     }

 

     @Override

     public   boolean   removeLastOccurrence(Object o) {

         if   (o ==  null ) {

             return   false ;

         }

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             for   (Node<E> p = last; p !=  null ; p = p.prev) {

                 if   (o.equals(p.item)) {

                     unlink(p);

                     return   true ;

                 }

             }

             return   false ;

         }  finally   {

             lock.unlock();

         }

     }

 

     // BlockingQueue methods

 

     /**

      * Inserts the specified element at the end of this deque unless it would

      * violate capacity restrictions.  When using a capacity-restricted deque,

      * it is generally preferable to use method {@link #offer(Object) offer}.

      *

      * <p>This method is equivalent to {@link #addLast}.

      *

      * @throws IllegalStateException if this deque is full

      * @throws NullPointerException  if the specified element is null

      */

     @Override

     public   boolean   add(E e) {

         addLast(e);

         return   true ;

     }

 

     /**

      * @throws NullPointerException if the specified element is null

      */

     @Override

     public   boolean   offer(E e) {

         return   offerLast(e);

     }

 

     /**

      * @throws NullPointerException {@inheritDoc}

      * @throws InterruptedException {@inheritDoc}

      */

     @Override

     public   void   put(E e)  throws   InterruptedException {

         putLast(e);

     }

 

     /**

      * @throws NullPointerException {@inheritDoc}

      * @throws InterruptedException {@inheritDoc}

      */

     @Override

     public   boolean   offer(E e,  long   timeout, TimeUnit unit)

             throws   InterruptedException {

         return   offerLast(e, timeout, unit);

     }

 

     /**

      * Retrieves and removes the head of the queue represented by this deque.

      * This method differs from {@link #poll poll} only in that it throws an

      * exception if this deque is empty.

      *

      * <p>This method is equivalent to {@link #removeFirst() removeFirst}.

      *

      * @return the head of the queue represented by this deque

      * @throws NoSuchElementException if this deque is empty

      */

     @Override

     public   E remove() {

         return   removeFirst();

     }

 

     @Override

     public   E poll() {

         return   pollFirst();

     }

 

     @Override

     public   E take()  throws   InterruptedException {

         return   takeFirst();

     }

 

     @Override

     public   E poll( long   timeout, TimeUnit unit)  throws   InterruptedException {

         return   pollFirst(timeout, unit);

     }

 

     /**

      * Retrieves, but does not remove, the head of the queue represented by

      * this deque.  This method differs from {@link #peek peek} only in that

      * it throws an exception if this deque is empty.

      *

      * <p>This method is equivalent to {@link #getFirst() getFirst}.

      *

      * @return the head of the queue represented by this deque

      * @throws NoSuchElementException if this deque is empty

      */

     @Override

     public   E element() {

         return   getFirst();

     }

 

     @Override

     public   E peek() {

         return   peekFirst();

     }

 

     /**

      * Returns the number of additional elements that this deque can ideally

      * (in the absence of memory or resource constraints) accept without

      * blocking. This is always equal to the initial capacity of this deque

      * less the current {@code size} of this deque.

      *

      * <p>Note that you <em>cannot</em> always tell if an attempt to insert

      * an element will succeed by inspecting {@code remainingCapacity}

      * because it may be the case that another thread is about to

      * insert or remove an element.

      */

     @Override

     public   int   remainingCapacity() {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             return   capacity - count;

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * @throws UnsupportedOperationException {@inheritDoc}

      * @throws ClassCastException            {@inheritDoc}

      * @throws NullPointerException          {@inheritDoc}

      * @throws IllegalArgumentException      {@inheritDoc}

      */

     @Override

     public   int   drainTo(Collection<?  super   E> c) {

         return   drainTo(c, Integer.MAX_VALUE);

     }

 

     /**

      * @throws UnsupportedOperationException {@inheritDoc}

      * @throws ClassCastException            {@inheritDoc}

      * @throws NullPointerException          {@inheritDoc}

      * @throws IllegalArgumentException      {@inheritDoc}

      */

     @Override

     public   int   drainTo(Collection<?  super   E> c,  int   maxElements) {

         if   (c ==  null ) {

             throw   new   NullPointerException();

         }

         if   (c ==  this ) {

             throw   new   IllegalArgumentException();

         }

         if   (maxElements <=  0 ) {

             return   0 ;

         }

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             int   n = Math.min(maxElements, count);

             for   ( int   i =  0 ; i < n; i++) {

                 c.add(first.item);    // In this order, in case add() throws.

                 unlinkFirst();

             }

             return   n;

         }  finally   {

             lock.unlock();

         }

     }

 

     // Stack methods

 

     /**

      * @throws IllegalStateException if this deque is full

      * @throws NullPointerException  {@inheritDoc}

      */

     @Override

     public   void   push(E e) {

         addFirst(e);

     }

 

     /**

      * @throws NoSuchElementException {@inheritDoc}

      */

     @Override

     public   E pop() {

         return   removeFirst();

     }

 

     // Collection methods

 

     /**

      * Removes the first occurrence of the specified element from this deque.

      * If the deque does not contain the element, it is unchanged.

      * More formally, removes the first element {@code e} such that

      * {@code o.equals(e)} (if such an element exists).

      * Returns {@code true} if this deque contained the specified element

      * (or equivalently, if this deque changed as a result of the call).

      *

      * <p>This method is equivalent to

      * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.

      *

      * @param o element to be removed from this deque, if present

      * @return {@code true} if this deque changed as a result of the call

      */

     @Override

     public   boolean   remove(Object o) {

         return   removeFirstOccurrence(o);

     }

 

     /**

      * Returns the number of elements in this deque.

      *

      * @return the number of elements in this deque

      */

     @Override

     public   int   size() {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             return   count;

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * Returns {@code true} if this deque contains the specified element.

      * More formally, returns {@code true} if and only if this deque contains

      * at least one element {@code e} such that {@code o.equals(e)}.

      *

      * @param o object to be checked for containment in this deque

      * @return {@code true} if this deque contains the specified element

      */

     @Override

     public   boolean   contains(Object o) {

         if   (o ==  null ) {

             return   false ;

         }

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             for   (Node<E> p = first; p !=  null ; p = p.next) {

                 if   (o.equals(p.item)) {

                     return   true ;

                 }

             }

             return   false ;

         }  finally   {

             lock.unlock();

         }

     }

 

     /*

      * TODO: Add support for more efficient bulk operations.

      *

      * We don't want to acquire the lock for every iteration, but we

      * also want other threads a chance to interact with the

      * collection, especially when count is close to capacity.

      */

 

//     /**

//      * Adds all of the elements in the specified collection to this

//      * queue.  Attempts to addAll of a queue to itself result in

//      * {@code IllegalArgumentException}. Further, the behavior of

//      * this operation is undefined if the specified collection is

//      * modified while the operation is in progress.

//      *

//      * @param c collection containing elements to be added to this queue

//      * @return {@code true} if this queue changed as a result of the call

//      * @throws ClassCastException            {@inheritDoc}

//      * @throws NullPointerException          {@inheritDoc}

//      * @throws IllegalArgumentException      {@inheritDoc}

//      * @throws IllegalStateException if this deque is full

//      * @see #add(Object)

//      */

//     public boolean addAll(Collection<? extends E> c) {

//         if (c == null)

//             throw new NullPointerException();

//         if (c == this)

//             throw new IllegalArgumentException();

//         final ReentrantLock lock = this.lock;

//         lock.lock();

//         try {

//             boolean modified = false;

//             for (E e : c)

//                 if (linkLast(e))

//                     modified = true;

//             return modified;

//         } finally {

//             lock.unlock();

//         }

//     }

 

     /**

      * Returns an array containing all of the elements in this deque, in

      * proper sequence (from first to last element).

      *

      * <p>The returned array will be "safe" in that no references to it are

      * maintained by this deque.  (In other words, this method must allocate

      * a new array).  The caller is thus free to modify the returned array.

      *

      * <p>This method acts as bridge between array-based and collection-based

      * APIs.

      *

      * @return an array containing all of the elements in this deque

      */

     @Override

     @SuppressWarnings ( "unchecked" )

     public   Object[] toArray() {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             Object[] a =  new   Object[count];

             int   k =  0 ;

             for   (Node<E> p = first; p !=  null ; p = p.next) {

                 a[k++] = p.item;

             }

             return   a;

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * Returns an array containing all of the elements in this deque, in

      * proper sequence; the runtime type of the returned array is that of

      * the specified array.  If the deque fits in the specified array, it

      * is returned therein.  Otherwise, a new array is allocated with the

      * runtime type of the specified array and the size of this deque.

      *

      * <p>If this deque fits in the specified array with room to spare

      * (i.e., the array has more elements than this deque), the element in

      * the array immediately following the end of the deque is set to

      * {@code null}.

      *

      * <p>Like the {@link #toArray()} method, this method acts as bridge between

      * array-based and collection-based APIs.  Further, this method allows

      * precise control over the runtime type of the output array, and may,

      * under certain circumstances, be used to save allocation costs.

      *

      * <p>Suppose {@code x} is a deque known to contain only strings.

      * The following code can be used to dump the deque into a newly

      * allocated array of {@code String}:

      *

      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>

      * <p>

      * Note that {@code toArray(new Object[0])} is identical in function to

      * {@code toArray()}.

      *

      * @param a the array into which the elements of the deque are to

      *          be stored, if it is big enough; otherwise, a new array of the

      *          same runtime type is allocated for this purpose

      * @return an array containing all of the elements in this deque

      * @throws ArrayStoreException  if the runtime type of the specified array

      *                              is not a supertype of the runtime type of every element in

      *                              this deque

      * @throws NullPointerException if the specified array is null

      */

     @Override

     @SuppressWarnings ( "unchecked" )

     public   <T> T[] toArray(T[] a) {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             if   (a.length < count) {

                 a = (T[]) java.lang.reflect.Array.newInstance

                         (a.getClass().getComponentType(), count);

             }

             int   k =  0 ;

             for   (Node<E> p = first; p !=  null ; p = p.next) {

                 a[k++] = (T) p.item;

             }

             if   (a.length > k) {

                 a[k] =  null ;

             }

             return   a;

         }  finally   {

             lock.unlock();

         }

     }

 

     @Override

     public   String toString() {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             Node<E> p = first;

             if   (p ==  null ) {

                 return   "[]" ;

             }

             StringBuilder sb =  new   StringBuilder();

             sb.append( '[' );

             for   (; ; ) {

                 E e = p.item;

                 sb.append(e ==  this   ?  "(this Collection)"   : e);

                 p = p.next;

                 if   (p ==  null ) {

                     return   sb.append( ']' ).toString();

                 }

                 sb.append( ',' ).append( ' ' );

             }

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * Atomically removes all of the elements from this deque.

      * The deque will be empty after this call returns.

      */

     @Override

     public   void   clear() {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             for   (Node<E> f = first; f !=  null ; ) {

                 f.item =  null ;

                 Node<E> n = f.next;

                 f.prev =  null ;

                 f.next =  null ;

                 f = n;

             }

             first = last =  null ;

             count =  0 ;

             notFull.signalAll();

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * Returns an iterator over the elements in this deque in proper sequence.

      * The elements will be returned in order from first (head) to last (tail).

      *

      * <p>The returned iterator is

      * <a href="package-summary.html#Weakly" rel="external nofollow"  rel="external nofollow"  rel="external nofollow" ><i>weakly consistent</i></a>.

      *

      * @return an iterator over the elements in this deque in proper sequence

      */

     @Override

     public   Iterator<E> iterator() {

         return   new   Itr();

     }

 

     /**

      * Returns an iterator over the elements in this deque in reverse

      * sequential order.  The elements will be returned in order from

      * last (tail) to first (head).

      *

      * <p>The returned iterator is

      * <a href="package-summary.html#Weakly" rel="external nofollow"  rel="external nofollow"  rel="external nofollow" ><i>weakly consistent</i></a>.

      *

      * @return an iterator over the elements in this deque in reverse order

      */

     @Override

     public   Iterator<E> descendingIterator() {

         return   new   DescendingItr();

     }

 

     /**

      * Base class for Iterators for ResizableCapacityLinkedBlockIngQueue

      */

     private   abstract   class   AbstractItr  implements   Iterator<E> {

         /**

          * The next node to return in next()

          */

         Node<E> next;

 

         /**

          * nextItem holds on to item fields because once we claim that

          * an element exists in hasNext(), we must return item read

          * under lock (in advance()) even if it was in the process of

          * being removed when hasNext() was called.

          */

         E nextItem;

 

         /**

          * Node returned by most recent call to next. Needed by remove.

          * Reset to null if this element is deleted by a call to remove.

          */

         private   Node<E> lastRet;

 

         abstract   Node<E> firstNode();

 

         abstract   Node<E> nextNode(Node<E> n);

 

         AbstractItr() {

             // set to initial position

             final   ReentrantLock lock = ResizableCapacityLinkedBlockingQueue. this .lock;

             lock.lock();

             try   {

                 next = firstNode();

                 nextItem = (next ==  null ) ?  null   : next.item;

             }  finally   {

                 lock.unlock();

             }

         }

 

         /**

          * Returns the successor node of the given non-null, but

          * possibly previously deleted, node.

          */

         private   Node<E> succ(Node<E> n) {

             // Chains of deleted nodes ending in null or self-links

             // are possible if multiple interior nodes are removed.

             for   (; ; ) {

                 Node<E> s = nextNode(n);

                 if   (s ==  null ) {

                     return   null ;

                 }  else   if   (s.item !=  null ) {

                     return   s;

                 }  else   if   (s == n) {

                     return   firstNode();

                 }  else   {

                     n = s;

                 }

             }

         }

 

         /**

          * Advances next.

          */

         void   advance() {

             final   ReentrantLock lock = ResizableCapacityLinkedBlockingQueue. this .lock;

             lock.lock();

             try   {

                 // assert next != null;

                 next = succ(next);

                 nextItem = (next ==  null ) ?  null   : next.item;

             }  finally   {

                 lock.unlock();

             }

         }

 

         @Override

         public   boolean   hasNext() {

             return   next !=  null ;

         }

 

         @Override

         public   E next() {

             if   (next ==  null ) {

                 throw   new   NoSuchElementException();

             }

             lastRet = next;

             E x = nextItem;

             advance();

             return   x;

         }

 

         @Override

         public   void   remove() {

             Node<E> n = lastRet;

             if   (n ==  null ) {

                 throw   new   IllegalStateException();

             }

             lastRet =  null ;

             final   ReentrantLock lock = ResizableCapacityLinkedBlockingQueue. this .lock;

             lock.lock();

             try   {

                 if   (n.item !=  null ) {

                     unlink(n);

                 }

             }  finally   {

                 lock.unlock();

             }

         }

     }

 

     /**

      * Forward iterator

      */

     private   class   Itr  extends   AbstractItr {

         @Override

         Node<E> firstNode() {

             return   first;

         }

 

         @Override

         Node<E> nextNode(Node<E> n) {

             return   n.next;

         }

     }

 

     /**

      * Descending iterator

      */

     private   class   DescendingItr  extends   AbstractItr {

         @Override

         Node<E> firstNode() {

             return   last;

         }

 

         @Override

         Node<E> nextNode(Node<E> n) {

             return   n.prev;

         }

     }

 

     /**

      * A customized variant of Spliterators.IteratorSpliterator

      */

     static   final   class   LBDSpliterator<E>  implements   Spliterator<E> {

         static   final   int   MAX_BATCH =  1   <<  25 ;   // max batch array size;

         final   ResizableCapacityLinkedBlockingQueue<E> queue;

         Node<E> current;     // current node; null until initialized

         int   batch;           // batch size for splits

         boolean   exhausted;   // true when no more nodes

         long   est;            // size estimate

 

         LBDSpliterator(ResizableCapacityLinkedBlockingQueue<E> queue) {

             this .queue = queue;

             this .est = queue.size();

         }

 

         @Override

         public   long   estimateSize() {

             return   est;

         }

 

         @Override

         public   Spliterator<E> trySplit() {

             Node<E> h;

             final   ResizableCapacityLinkedBlockingQueue<E> q =  this .queue;

             int   b = batch;

             int   n = (b <=  0 ) ?  1   : (b >= MAX_BATCH) ? MAX_BATCH : b +  1 ;

             if   (!exhausted &&

                     ((h = current) !=  null   || (h = q.first) !=  null ) &&

                     h.next !=  null ) {

                 Object[] a =  new   Object[n];

                 final   ReentrantLock lock = q.lock;

                 int   i =  0 ;

                 Node<E> p = current;

                 lock.lock();

                 try   {

                     if   (p !=  null   || (p = q.first) !=  null ) {

                         do   {

                             if   ((a[i] = p.item) !=  null ) {

                                 ++i;

                             }

                         }  while   ((p = p.next) !=  null   && i < n);

                     }

                 }  finally   {

                     lock.unlock();

                 }

                 if   ((current = p) ==  null ) {

                     est = 0L;

                     exhausted =  true ;

                 }  else   if   ((est -= i) < 0L) {

                     est = 0L;

                 }

                 if   (i >  0 ) {

                     batch = i;

                     return   Spliterators.spliterator

                             (a,  0 , i, Spliterator.ORDERED | Spliterator.NONNULL |

                                     Spliterator.CONCURRENT);

                 }

             }

             return   null ;

         }

 

         @Override

         public   void   forEachRemaining(Consumer<?  super   E> action) {

             if   (action ==  null ) {

                 throw   new   NullPointerException();

             }

             final   ResizableCapacityLinkedBlockingQueue<E> q =  this .queue;

             final   ReentrantLock lock = q.lock;

             if   (!exhausted) {

                 exhausted =  true ;

                 Node<E> p = current;

                 do   {

                     E e =  null ;

                     lock.lock();

                     try   {

                         if   (p ==  null ) {

                             p = q.first;

                         }

                         while   (p !=  null ) {

                             e = p.item;

                             p = p.next;

                             if   (e !=  null ) {

                                 break ;

                             }

                         }

                     }  finally   {

                         lock.unlock();

                     }

                     if   (e !=  null ) {

                         action.accept(e);

                     }

                 }  while   (p !=  null );

             }

         }

 

         @Override

         public   boolean   tryAdvance(Consumer<?  super   E> action) {

             if   (action ==  null ) {

                 throw   new   NullPointerException();

             }

             final   ResizableCapacityLinkedBlockingQueue<E> q =  this .queue;

             final   ReentrantLock lock = q.lock;

             if   (!exhausted) {

                 E e =  null ;

                 lock.lock();

                 try   {

                     if   (current ==  null ) {

                         current = q.first;

                     }

                     while   (current !=  null ) {

                         e = current.item;

                         current = current.next;

                         if   (e !=  null ) {

                             break ;

                         }

                     }

                 }  finally   {

                     lock.unlock();

                 }

                 if   (current ==  null ) {

                     exhausted =  true ;

                 }

                 if   (e !=  null ) {

                     action.accept(e);

                     return   true ;

                 }

             }

             return   false ;

         }

 

         @Override

         public   int   characteristics() {

             return   Spliterator.ORDERED | Spliterator.NONNULL |

                     Spliterator.CONCURRENT;

         }

     }

 

     /**

      * Returns a {@link Spliterator} over the elements in this deque.

      *

      * <p>The returned spliterator is

      * <a href="package-summary.html#Weakly" rel="external nofollow"  rel="external nofollow"  rel="external nofollow" ><i>weakly consistent</i></a>.

      *

      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},

      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.

      *

      * @return a {@code Spliterator} over the elements in this deque

      * @implNote The {@code Spliterator} implements {@code trySplit} to permit limited

      * parallelism.

      * @since 1.8

      */

     @Override

     public   Spliterator<E> spliterator() {

         return   new   LBDSpliterator<E>( this );

     }

 

     /**

      * Saves this deque to a stream (that is, serializes it).

      *

      * @param s the stream

      * @throws java.io.IOException if an I/O error occurs

      * @serialData The capacity (int), followed by elements (each an

      * {@code Object}) in the proper order, followed by a null

      */

     private   void   writeObject(java.io.ObjectOutputStream s)

             throws   java.io.IOException {

         final   ReentrantLock lock =  this .lock;

         lock.lock();

         try   {

             // Write out capacity and any hidden stuff

             s.defaultWriteObject();

             // Write out all elements in the proper order.

             for   (Node<E> p = first; p !=  null ; p = p.next) {

                 s.writeObject(p.item);

             }

             // Use trailing null as sentinel

             s.writeObject( null );

         }  finally   {

             lock.unlock();

         }

     }

 

     /**

      * Reconstitutes this deque from a stream (that is, deserializes it).

      *

      * @param s the stream

      * @throws ClassNotFoundException if the class of a serialized object

      *                                could not be found

      * @throws java.io.IOException    if an I/O error occurs

      */

     private   void   readObject(java.io.ObjectInputStream s)

             throws   java.io.IOException, ClassNotFoundException {

         s.defaultReadObject();

         count =  0 ;

         first =  null ;

         last =  null ;

         // Read in all elements and place in queue

         for   (; ; ) {

             @SuppressWarnings ( "unchecked" )

             E item = (E) s.readObject();

             if   (item ==  null ) {

                 break ;

             }

             add(item);

         }

     }

}

自定义线程池,增加每个线程处理的耗时,以及平均耗时、最大耗时、最小耗时,以及输出监控日志信息等等;

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

/**

  * 线程池监控类

  *

  * @author wangtongzhou 

  * @since 2022-02-23 07:27

  */

public   class   ThreadPoolMonitor  extends   ThreadPoolExecutor {

 

     private   static   final   Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor. class );

 

     /**

      * 默认拒绝策略

      */

     private   static   final   RejectedExecutionHandler defaultHandler =  new   AbortPolicy();

 

     /**

      * 线程池名称,一般以业务名称命名,方便区分

      */

     private   String poolName;

 

     /**

      * 最短执行时间

      */

     private   Long minCostTime;

 

     /**

      * 最长执行时间

      */

     private   Long maxCostTime;

     /**

      * 总的耗时

      */

     private   AtomicLong totalCostTime =  new   AtomicLong();

 

     private   ThreadLocal<Long> startTimeThreadLocal =  new   ThreadLocal<>();

 

     /**

      * 调用父类的构造方法,并初始化HashMap和线程池名称

      *

      * @param corePoolSize    线程池核心线程数

      * @param maximumPoolSize 线程池最大线程数

      * @param keepAliveTime   线程的最大空闲时间

      * @param unit            空闲时间的单位

      * @param workQueue       保存被提交任务的队列

      * @param poolName        线程池名称

      */

     public   ThreadPoolMonitor( int   corePoolSize,  int   maximumPoolSize,  long   keepAliveTime,

                              TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {

         this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

                 Executors.defaultThreadFactory(), poolName);

     }

 

 

     /**

      * 调用父类的构造方法,并初始化HashMap和线程池名称

      *

      * @param corePoolSize    线程池核心线程数

      * @param maximumPoolSize 线程池最大线程数

      * @param keepAliveTime   线程的最大空闲时间

      * @param unit            空闲时间的单位

      * @param workQueue       保存被提交任务的队列

      * @param

      * @param poolName        线程池名称

      */

     public   ThreadPoolMonitor( int   corePoolSize,  int   maximumPoolSize,  long   keepAliveTime,

                              TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName) {

         this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

                 Executors.defaultThreadFactory(), handler, poolName);

     }

 

 

     /**

      * 调用父类的构造方法,并初始化HashMap和线程池名称

      *

      * @param corePoolSize    线程池核心线程数

      * @param maximumPoolSize 线程池最大线程数

      * @param keepAliveTime   线程的最大空闲时间

      * @param unit            空闲时间的单位

      * @param workQueue       保存被提交任务的队列

      * @param threadFactory   线程工厂

      * @param poolName        线程池名称

      */

     public   ThreadPoolMonitor( int   corePoolSize,  int   maximumPoolSize,  long   keepAliveTime,

                              TimeUnit unit, BlockingQueue<Runnable> workQueue,

                              ThreadFactory threadFactory, String poolName) {

         super (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);

         this .poolName = poolName;

     }

 

 

     /**

      * 调用父类的构造方法,并初始化HashMap和线程池名称

      *

      * @param corePoolSize    线程池核心线程数

      * @param maximumPoolSize 线程池最大线程数

      * @param keepAliveTime   线程的最大空闲时间

      * @param unit            空闲时间的单位

      * @param workQueue       保存被提交任务的队列

      * @param threadFactory   线程工厂

      * @param handler         拒绝策略

      * @param poolName        线程池名称

      */

     public   ThreadPoolMonitor( int   corePoolSize,  int   maximumPoolSize,  long   keepAliveTime,

                              TimeUnit unit, BlockingQueue<Runnable> workQueue,

                              ThreadFactory threadFactory, RejectedExecutionHandler handler, String poolName) {

         super (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

         this .poolName = poolName;

     }

 

 

     /**

      * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况

      */

     @Override

     public   void   shutdown() {

         // 统计已执行任务、正在执行任务、未执行任务数量

         LOGGER.info( "{} 关闭线程池, 已执行任务: {}, 正在执行任务: {}, 未执行任务数量: {}" ,

                 this .poolName,  this .getCompletedTaskCount(),  this .getActiveCount(),  this .getQueue().size());

         super .shutdown();

     }

 

     /**

      * 线程池立即关闭时,统计线程池情况

      */

     @Override

     public   List<Runnable> shutdownNow() {

         // 统计已执行任务、正在执行任务、未执行任务数量

         LOGGER.info( "{} 立即关闭线程池,已执行任务: {}, 正在执行任务: {}, 未执行任务数量: {}" ,

                 this .poolName,  this .getCompletedTaskCount(),  this .getActiveCount(),  this .getQueue().size());

         return   super .shutdownNow();

     }

 

     /**

      * 任务执行之前,记录任务开始时间

      */

     @Override

     protected   void   beforeExecute(Thread t, Runnable r) {

         startTimeThreadLocal.set(System.currentTimeMillis());

     }

 

     /**

      * 任务执行之后,计算任务结束时间

      */

     @Override

     protected   void   afterExecute(Runnable r, Throwable t) {

         long   costTime = System.currentTimeMillis() - startTimeThreadLocal.get();

         startTimeThreadLocal.remove();

         maxCostTime = maxCostTime > costTime ? maxCostTime : costTime;

         if   (getCompletedTaskCount() ==  0 ) {

             minCostTime = costTime;

         }

         minCostTime = minCostTime < costTime ? minCostTime : costTime;

         totalCostTime.addAndGet(costTime);

         LOGGER.info( "{}-pool-monitor: "   +

                         "任务耗时: {} ms, 初始线程数: {}, 核心线程数: {}, 执行的任务数量: {}, "   +

                         "已完成任务数量: {}, 任务总数: {}, 队列里缓存的任务数量: {}, 池中存在的最大线程数: {}, "   +

                         "最大允许的线程数: {},  线程空闲时间: {}, 线程池是否关闭: {}, 线程池是否终止: {}" ,

                 this .poolName,

                 costTime,  this .getPoolSize(),  this .getCorePoolSize(),  this .getActiveCount(),

                 this .getCompletedTaskCount(),  this .getTaskCount(),  this .getQueue().size(),  this .getLargestPoolSize(),

                 this .getMaximumPoolSize(),  this .getKeepAliveTime(TimeUnit.MILLISECONDS),  this .isShutdown(),  this .isTerminated());

     }

 

 

     public   Long getMinCostTime() {

         return   minCostTime;

     }

 

     public   Long getMaxCostTime() {

         return   maxCostTime;

     }

 

     public   long   getAverageCostTime(){

         if (getCompletedTaskCount()== 0 ||totalCostTime.get()== 0 ){

             return   0 ;

         }

         return   totalCostTime.get()/getCompletedTaskCount();

     }

 

     /**

      * 生成线程池所用的线程,改写了线程池默认的线程工厂,传入线程池名称,便于问题追踪

      */

     static   class   MonitorThreadFactory  implements   ThreadFactory {

         private   static   final   AtomicInteger poolNumber =  new   AtomicInteger( 1 );

         private   final   ThreadGroup group;

         private   final   AtomicInteger threadNumber =  new   AtomicInteger( 1 );

         private   final   String namePrefix;

 

         /**

          * 初始化线程工厂

          *

          * @param poolName 线程池名称

          */

         MonitorThreadFactory(String poolName) {

             SecurityManager s = System.getSecurityManager();

             group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();

             namePrefix = poolName +  "-pool-"   + poolNumber.getAndIncrement() +  "-thread-" ;

         }

 

         @Override

         public   Thread newThread(Runnable r) {

             Thread t =  new   Thread(group, r, namePrefix + threadNumber.getAndIncrement(),  0 );

             if   (t.isDaemon()) {

                 t.setDaemon( false );

             }

             if   (t.getPriority() != Thread.NORM_PRIORITY) {

                 t.setPriority(Thread.NORM_PRIORITY);

             }

             return   t;

         }

     }

}

动态修改线程池的类,通过Spring的监听器监控配置刷新方法,实现动态更新线程池的参数;

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

/**

  * 动态刷新线程池

  *

  * @author wangtongzhou

  * @since 2022-03-13 14:13

  */

@Component

@Slf4j

public   class   DynamicThreadPoolManager {

 

 

     @Autowired

     private   DynamicThreadPoolProperties dynamicThreadPoolProperties;

 

     /**

      * 存储线程池对象

      */

     public   Map<String, ThreadPoolMonitor> threadPoolExecutorMap =  new   HashMap<>();

 

 

     public   Map<String, ThreadPoolMonitor> getThreadPoolExecutorMap() {

         return   threadPoolExecutorMap;

     }

 

 

     /**

      * 初始化线程池

      */

     @PostConstruct

     public   void   init() {

         createThreadPools(dynamicThreadPoolProperties);

     }

 

     /**

      * 初始化线程池的创建

      *

      * @param dynamicThreadPoolProperties

      */

     private   void   createThreadPools(DynamicThreadPoolProperties dynamicThreadPoolProperties) {

         dynamicThreadPoolProperties.getExecutors().forEach(config -> {

             if   (!threadPoolExecutorMap.containsKey(config.getThreadPoolName())) {

                 ThreadPoolMonitor threadPoolMonitor =  new   ThreadPoolMonitor(

                         config.getCorePoolSize(),

                         config.getMaxPoolSize(),

                         config.getKeepAliveTime(),

                         config.getUnit(),

                         new   ResizableCapacityLinkedBlockingQueue<>(config.getQueueCapacity()),

                         RejectedExecutionHandlerEnum.getRejectedExecutionHandler(config.getRejectedExecutionType()),

                         config.getThreadPoolName()

                 );

                 threadPoolExecutorMap.put(config.getThreadPoolName(),

                         threadPoolMonitor);

             }

 

         });

     }

 

     /**

      * 调整线程池

      *

      * @param dynamicThreadPoolProperties

      */

     private   void   changeThreadPools(DynamicThreadPoolProperties dynamicThreadPoolProperties) {

         dynamicThreadPoolProperties.getExecutors().forEach(config -> {

             ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(config.getThreadPoolName());

             if   (Objects.nonNull(threadPoolExecutor)) {

                 threadPoolExecutor.setCorePoolSize(config.getCorePoolSize());

                 threadPoolExecutor.setMaximumPoolSize(config.getMaxPoolSize());

                 threadPoolExecutor.setKeepAliveTime(config.getKeepAliveTime(), config.getUnit());

                 threadPoolExecutor.setRejectedExecutionHandler(RejectedExecutionHandlerEnum.getRejectedExecutionHandler(config.getRejectedExecutionType()));

                 BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();

                 if   (queue  instanceof   ResizableCapacityLinkedBlockingQueue) {

                     ((ResizableCapacityLinkedBlockingQueue<Runnable>) queue).setCapacity(config.getQueueCapacity());

                 }

             }

         });

     }

 

 

     @EventListener

     public   void   envListener(EnvironmentChangeEvent event) {

         log.info( "配置发生变更"   + event);

         changeThreadPools(dynamicThreadPoolProperties);

     }

 

}

DynamicThreadPoolPropertiesController对外暴露两个方法,第一个通过ContextRefresher提供对外刷新配置的接口,实现及时更新配置信息,第二提供一个查询接口的方法,

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

/**

  * 动态修改线程池参数

  *

  * @author wangtongzhou

  * @since 2022-03-13 17:27

  */

@RestController

public   class   DynamicThreadPoolPropertiesController {

 

     @Autowired

     private   ContextRefresher contextRefresher;

     private   DynamicThreadPoolProperties dynamicThreadPoolProperties;

     private   DynamicThreadPoolManager dynamicThreadPoolManager;

     @PostMapping ( "/threadPool/properties" )

     public   void   update() {

         ThreadPoolProperties threadPoolProperties =

                 dynamicThreadPoolProperties.getExecutors().get( 0 );

         threadPoolProperties.setCorePoolSize( 20 );

         threadPoolProperties.setMaxPoolSize( 50 );

         threadPoolProperties.setQueueCapacity( 200 );

         threadPoolProperties.setRejectedExecutionType( "CallerRunsPolicy" );

         contextRefresher.refresh();

     }

     @GetMapping ( "/threadPool/properties" )

     public   Map<String, Object> queryThreadPoolProperties() {

         Map<String, Object> metricMap =  new   HashMap<>();

         List<Map> threadPools =  new   ArrayList<>();

         dynamicThreadPoolManager.getThreadPoolExecutorMap().forEach((k, v) -> {

             ThreadPoolMonitor threadPoolMonitor = (ThreadPoolMonitor) v;

             Map<String, Object> poolInfo =  new   HashMap<>();

             poolInfo.put( "thread.pool.name" , k);

             poolInfo.put( "thread.pool.core.size" , threadPoolMonitor.getCorePoolSize());

             poolInfo.put( "thread.pool.largest.size" , threadPoolMonitor.getLargestPoolSize());

             poolInfo.put( "thread.pool.max.size" , threadPoolMonitor.getMaximumPoolSize());

             poolInfo.put( "thread.pool.thread.count" , threadPoolMonitor.getPoolSize());

             poolInfo.put( "thread.pool.max.costTime" , threadPoolMonitor.getMaxCostTime());

             poolInfo.put( "thread.pool.average.costTime" , threadPoolMonitor.getAverageCostTime());

             poolInfo.put( "thread.pool.min.costTime" , threadPoolMonitor.getMinCostTime());

             poolInfo.put( "thread.pool.active.count" , threadPoolMonitor.getActiveCount());

             poolInfo.put( "thread.pool测试数据pleted.taskCount" , threadPoolMonitor.getCompletedTaskCount());

             poolInfo.put( "thread.pool.queue.name" , threadPoolMonitor.getQueue().getClass().getName());

             poolInfo.put( "thread.pool.rejected.name" , threadPoolMonitor.getRejectedExecutionHandler().getClass().getName());

             poolInfo.put( "thread.pool.task.count" , threadPoolMonitor.getTaskCount());

             threadPools.add(poolInfo);

         });

         metricMap.put( "threadPools" , threadPools);

         return   metricMap;

}

整体上的流程到这里就完成了,算是一个Demo版,对于该组件更深入的思考我认为还可以做以下三件事情:

应该以starter的形式嵌入到应用,通过判断启动类加载的Appllo、Nacos还是默认实现;

对外可以Push、也可以是日志,还可以支持各种库,提供丰富的输出形式,这个

到此这篇关于基于Spring Boot的线程池监控方案的文章就介绍到这了,更多相关Spring Boot的线程池监控内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://HdhCmsTestcnblogs测试数据/wtzbk/p/16001949.html

查看更多关于基于Spring Boot的线程池监控问题及解决方案的详细内容...

  阅读:20次