好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Netty组件NioEventLoopGroup创建线程执行器源码解析

前言

通过上一章的学习, 我们了解了 Server启动 的大致流程, 有很多组件与模块并没有细讲, 从这个章开始, 我们开始详细剖析netty的各个组件, 并结合启动流程, 将这些组件的使用场景及流程进行一个详细的说明

这一章主要学习NioEventLoop相关的知识,何为NioEventLoop? NioEventLoop是netty的一个线程, 在上一节我们创建两个NioEventLoopGroup:

?

1

2

EventLoopGroup bossGroup = new NioEventLoopGroup( 1 );

EventLoopGroup workerGroup = new NioEventLoopGroup();

这里创建了两个group, 我们提过这是boss线程组和worker线程组, 其实这两个线程组就相当于两个NioEventLoop的集合, 默认每个NioEventLoopGroup创建时, 如果不传入线程数, 会创建cpu核数*2个NioEventLoop线程, 其中boss线程通过轮询处理Server的accept事件, 而完成accept事件之后, 就会创建客户端channel, 通过一定的策略, 分发到worker线程进行处理, 而worker线程, 则主要用于处理客户端的读写事件

除了轮询事件, EventLoop线程还维护了两个队列, 一个是延迟任务队列, 另一个是普通任务队列, 在进行事件轮询的同时, 如果队列中有任务需要执行则会去执行队列中的任务

一个NioEventLoop绑定一个selector用于处理多个客户端channel, 但是一个客户端channel只能被一个NioEventLoop处理, 具体关系如图2-0-1所示:

图中我们看到, 一个NioEventLoopGroup下有多个NioEventLoop线程, 而一个线程可以处理多个channel, 其中有个叫pipeline和handler的东西, 同学们可能比较陌生, 这是netty的事件传输机制, 每个pipeline和channel唯一绑定, 这里只需要稍作了解, 之后章节会带大家详细剖析

了解了这些概念, 我们继续以小节的形式对NioEventLoop进行剖析

第一节:  NioEventLoopGroup之创建线程执行器

首先回到第一章最开始的demo, 我们最初创建了两个线程组:

?

1

2

EventLoopGroup bossGroup = new NioEventLoopGroup( 1 );

EventLoopGroup workerGroup = new NioEventLoopGroup();

创建EventLoopGroup的构造方法

这里, 我们跟随创建EventLoopGroup的构造方法, 来继续学习NioEventLoopGroup的创建过程

以workerGroup为例我们跟进其构造方法:

?

1

2

3

public NioEventLoopGroup() {

     this ( 0 );

}

继续跟进this(0):

?

1

2

3

public NioEventLoopGroup( int nThreads) {

     this (nThreads, (Executor) null );

}

这里的nThreads就是刚传入的0, 继续跟进:

?

1

2

3

public NioEventLoopGroup( int nThreads, Executor executor) {

     this (nThreads, executor, SelectorProvider.provider());

}

这里nThreads仍然为0, executor为null, 这个execute是用于开启NioEventLoop线程所需要的线程执行器, SelectorProvider.provider()是用于创建selector, 这个之后会讲到

我们一直跟到构造方法最后:

?

1

2

3

4

5

6

public NioEventLoopGroup( int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,

                          final SelectorProvider selectorProvider,

                          final SelectStrategyFactory selectStrategyFactory,

                          final RejectedExecutionHandler rejectedExecutionHandler) {

     super (nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);

}

这里调用了父类的构造方法

跟进super, 进入了其父类MultithreadEventExecutorGroup的构造方法中:

?

1

2

3

4

protected MultithreadEventLoopGroup( int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,

                                  Object... args) {

     super (nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);

}

这里我们看到, 如果传入的线程数量参数为0, 则会给一个默认值, 这个默认值就是两倍的CPU核数, chooserFactory是用于创建线程选择器, 之后会讲到, 继续跟代码之后, 我们就看到了创建NioEventLoop的真正逻辑, 在MultithreadEventExecutorGroup类的构造方法中

跟到MultithreadEventExecutorGroup类的构造方法

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

protected MultithreadEventExecutorGroup( int nThreads, Executor executor,

                                         EventExecutorChooserFactory chooserFactory, Object... args) {

     //代码省略

     if (executor == null ) {

         //创建一个新的线程执行器(1)

         executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

     }

     //构造NioEventLoop(2)

     children = new EventExecutor[nThreads];

     for ( int i = 0 ; i < nThreads; i ++) {

         boolean success = false ;

         try {

             children[i] = newChild(executor, args);

             success = true ;

         } catch (Exception e) {

             throw new IllegalStateException( "failed to create a child event loop" , e);

         } finally {

            //代码省略

         }

     }

     //创建线程选择器(3)

     chooser = chooserFactory.newChooser(children);

     //代码省略

}

这边将代码主要分为三个步骤:

1.创建线程执行器

2.创建EventLoop

3.创建线程选择器

这一小节我们主要剖析第1步, 创建线程执行器

这里有个new DefaultThreadFactory()创建一个DefaultThreadFactory对象, 这个对象作为参数传入ThreadPerTaskExecutor的构造函数,  DefaultThreadFactory顾名思义, 是一个线程工厂, 用于创建线程的, 简单看下这个类的继承关系:

?

1

2

public class DefaultThreadFactory implements ThreadFactory{ //类体

}

我们继续跟进该类的构造方法:

?

1

2

3

protected ThreadFactory newDefaultThreadFactory() {

     return new DefaultThreadFactory(getClass());

}

其中getClass()就是当前类的class对象, 而当前类是NioEventLoopGroup

继续跟进到DefaultThreadFactory的构造方法中:

?

1

2

3

public DefaultThreadFactory(Class<?> poolType) {

     this (poolType, false , Thread.NORM_PRIORITY);

}

poolType是NioEventLoop的class对象, Thread.NORM_PRIORITY是设置默认的优先级为5

继续跟构造方法:

?

1

2

3

public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {

     this (toPoolName(poolType), daemon, priority);

}

这里的toPoolName(poolType)是将线程组命名, 这里返回后结果是"nioEventLoopGroup"(开n头小写), daemon为false, priority为5

继续跟构造方法:

?

1

2

3

4

public DefaultThreadFactory(String poolName, boolean daemon, int priority) {

     this (poolName, daemon, priority, System.getSecurityManager() == null ?

             Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());

}

 System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup() 

这段代码是通过三目运算创建jdk底层的线程组

继续跟this():

?

1

2

3

4

5

6

7

8

9

10

public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {

     //省略验证代码

     //线程名字前缀

     prefix = poolName + '-' + poolId.incrementAndGet() + '-' ;

     this .daemon = daemon;

     //优先级

     this .priority = priority;

     //初始化线程组

     this .threadGroup = threadGroup;

}

这里初始化了DefaultThreadFactory的属性, prefix为poolName(也就是nioEventLoopGroup)+'-'+线程组id(原子自增)+'-'

以及初始化了优先级和jdk底层的线程组等属性

回到最初MultithreadEventExecutorGroup类的构造方法中, 我们看继续看第一步:

?

1

2

//创建一个新的线程执行器(1)

executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

我们继续跟进ThreadPerTaskExecutor的类中

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

public final class ThreadPerTaskExecutor implements Executor {

     private final ThreadFactory threadFactory;

     public ThreadPerTaskExecutor(ThreadFactory threadFactory) {

         if (threadFactory == null ) {

             throw new NullPointerException( "threadFactory" );

         }

         this .threadFactory = threadFactory;

     }

     @Override

     public void execute(Runnable command) {

         //起一个线程

         threadFactory.newThread(command).start();

     }

}

我们发现这个类非常简单, 继承了jdk的Executor类, 从继承关系中我就能猜想到, 而这个类就是用于开启线程的线程执行器

构造方法传入ThreadFactory类型的参数, 这个ThreadFactory就是我们刚才剖析的DefaultThreadFactory, 这个类继承了ThreadFactory, 所以在构造方法中初始化了ThreadFactory类型的属性

我们再看重写的 execute(Runnable command) 方法, 传入一个任务, 然后由threadFactory对象创建一个线程执行该任务

这个execute(Runnable command)方法, 其实就是用开开启NioEventLoop线程用的, 那么NioEventLoop什么时候开启的, 后面章节会进行剖析

这样, 通过 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()) 这种方式就返回了一个线程执行器Executor, 用于开启NioEventLoop线程

以上就是Netty组件NioEventLoopGroup创建线程执行器源码解析的详细内容,更多关于Netty NioEventLoopGroup线程执行器的资料请关注其它相关文章!

原文链接:https://HdhCmsTestcnblogs测试数据/xiangnan6122/p/10202901.html

查看更多关于Netty组件NioEventLoopGroup创建线程执行器源码解析的详细内容...

  阅读:30次