好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

netty服务端辅助类ServerBootstrap创建逻辑分析

ServerBootstrap创建

ServerBootstrap 为 netty 建立服务端的辅助类, 以 NIO为例,创建代码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

public static void main(String[] args) throws Exception {

         ServerBootstrap bs = new ServerBootstrap();

         bs.group( new NioEventLoopGroup( 1 ), new NioEventLoopGroup())

                 .channel(NioServerSocketChannel. class )

                 .childHandler( new ChannelInitializer<Channel>() {

                     @Override

                     protected void initChannel(Channel ch) throws Exception {

                         ch.pipeline()

                         .addLast( new HttpServerCodec())

                         .addLast( new HttpObjectAggregator( 65535 ))

                         .addLast( new Controller());

                     }

                 }).bind( 8080 ).sync().channel().closeFuture().sync();

     }

核心参数

?

1

2

3

4

//配置属性,如 SO_KEEPALIVE 等private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

     //acceot 的 子channel所绑定的 事件循环组"

     private volatile EventLoopGroup childGroup;

     private volatile ChannelHandler childHandler;

初始化流程

主要为 绑定本地端口 -> 注册自身到 EventLoop , 并注册 accept 和 read 事件 -> EventLoop的主循环中会不断的select注册的channel的事件,并处理。

首先执行绑定

核心逻辑位于 

io.netty.bootstrap.AbstractBootstrap.doBind(SocketAddress) 和  io.netty.bootstrap.AbstractBootstrap.initAndRegister()中

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

private ChannelFuture doBind( final SocketAddress localAddress) {

         final ChannelFuture regFuture = initAndRegister();

         .......... if (regFuture.isDone()) {

             // At this point we know that the registration was complete and successful.

             ChannelPromise promise = channel.newPromise();

             //绑定逻辑

             doBind0(regFuture, channel, localAddress, promise);

             return promise;

         } else {

             // Registration future is almost always fulfilled already, but just in case it's not.

             final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);

             regFuture.addListener( new ChannelFutureListener() {

                 @Override

                 public void operationComplete(ChannelFuture future) throws Exception {

                     Throwable cause = future.cause();

                     if (cause != null ) {

                         // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an

                         // IllegalStateException once we try to access the EventLoop of the Channel.

                         promise.setFailure(cause);

                     } else {

                         // Registration was successful, so set the correct executor to use.

                         // See https://github测试数据/netty/netty/issues/2586

                         promise.registered();

 

                         doBind0(regFuture, channel, localAddress, promise);

                     }

                 }

             });

             return promise;

         }

     }

注册自身到 EventLoop

先来看 initAndRegister , 核心逻辑就是利用channelFactory初始化一个NioServerSocketChannel实例,并为其设置上config中的参数,然后将其注册到EventLoop中,实际上是委托的channel的Unsafe来实现注册的,核心逻辑位于 AbstractUnsafe.register0 中 完成注册

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

final ChannelFuture initAndRegister() {

         Channel channel = null ;

         try {

             //本例子中实际调用的是  NioServerSocketChannel的构造参数, 并为其设置感兴趣的事件类型为  OP_ACCEPT

             channel = channelFactory.newChannel();

             init(channel);

         } catch (Throwable t) {

             if (channel != null ) {

                 // channel can be null if newChannel crashed (eg SocketException("too many open files"))

                 channel.unsafe().closeForcibly();

             }

             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor

             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);

         }

         ChannelFuture regFuture = config().group().register(channel);

         if (regFuture.cause() != null ) {

             if (channel.isRegistered()) {

                 channel.close();

             } else {

                 channel.unsafe().closeForcibly();

             }

         }

 

          return regFuture;

     }

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

void init(Channel channel) throws Exception {

         //设置属性

          ..........

 

         p.addLast( new ChannelInitializer<Channel>() {

             @Override

             public void initChannel( final Channel ch) throws Exception {

                 final ChannelPipeline pipeline = ch.pipeline();

                 ChannelHandler handler = config.handler();

                 if (handler != null ) {

                     pipeline.addLast(handler);

                 }

                 ch.eventLoop().execute( new Runnable() {

                     @Override

                     public void run() {

                         //为NioServerSocketChannel 设置一个 默认的 channelhandler : ServerBootstrapAcceptor , 当发生 accept事件时,将 accept的channel注册到 childEventLoop中

                         pipeline.addLast( new ServerBootstrapAcceptor(

                                 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

                     }

                 });

             }

         });

     }

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

private void register0(ChannelPromise promise) {

             try {

                 // check if the channel is still open as it could be closed in the mean time when the register

                 // call was outside of the eventLoop

                 if (!promise.setUncancellable() || !ensureOpen(promise)) {

                     return ;

                 }

                 boolean firstRegistration = neverRegistered;

                 //执行channel到 eventloop的 selector

                 doRegister();

                 neverRegistered = false ;

                 registered = true ;

 

                 // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the

                 // user may already fire events through the pipeline in the ChannelFutureListener.

                 pipeline.invokeHandlerAddedIfNeeded();

                 safeSetSuccess(promise);

?

1

2

//触发 InboundChannelHnader.channelRegistered 事件

                   pipeline.fireChannelRegistered();

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

// Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) {

                         //触发channelActive事件,并会为 channel 绑定上 read 事件

                         pipeline.fireChannelActive();

                     } else if (config().isAutoRead()) {

                         // This channel was registered before and autoRead() is set. This means we need to begin read

                         // again so that we process inbound data.

                         //

                         // See https://github测试数据/netty/netty/issues/4805

                         beginRead();

                     }

                 }

             } catch (Throwable t) {

                 // Close the channel directly to avoid FD leak.

                 closeForcibly();

                 closeFuture.setClosed();

                 safeSetFailure(promise, t);

             }

         }

绑定端口逻辑

initAndRegister注册成功后,开始执行真正的绑定端口逻辑,核心逻辑位于 NioSocketChannel.doBind0(SocketAddress) 中

?

1

2

3

4

5

6

7

private void doBind0(SocketAddress localAddress) throws Exception {

         if (PlatformDependent.javaVersion() >= 7 ) {

             SocketUtils.bind(javaChannel(), localAddress);

         } else {

             SocketUtils.bind(javaChannel().socket(), localAddress);

         }

     }

至此 绑定个成功, 当触发 ACCEPT 事件时, 会触发  NioServerSocketChannel.doReadMessages -> ServerBootstrapAcceptor.channelRead , 并将 子channel 注册到 childEventLoop中

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

public void channelRead(ChannelHandlerContext ctx, Object msg) {

             final Channel child = (Channel) msg;

             child.pipeline().addLast(childHandler);

             setChannelOptions(child, childOptions, logger);

             for (Entry<AttributeKey<?>, Object> e: childAttrs) {

                 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());

             }

             try {

                 //注册channel

                 childGroup.register(child).addListener( new ChannelFutureListener() {

                     @Override

                     public void operationComplete(ChannelFuture future) throws Exception {

                         if (!future.isSuccess()) {

                             forceClose(child, future.cause());

                         }

                     }

                 });

             } catch (Throwable t) {

                 forceClose(child, t);

             }

         }

以上就是netty服务端辅助类ServerBootstrap创建逻辑分析的详细内容,更多关于netty辅助类ServerBootstrap创建逻辑的资料请关注其它相关文章!

原文链接:https://HdhCmsTestcnblogs测试数据/ironroot/p/8581065.html

查看更多关于netty服务端辅助类ServerBootstrap创建逻辑分析的详细内容...

  阅读:12次