好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Netty事件循环主逻辑NioEventLoop的run方法分析

Netty事件循环主逻辑

Netty 事件循环主逻辑在 NioEventLoop.run 中的 processSelectedKeys函数中

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

protected void run() {

       //主循环不断读取IO事件和task,因为 EventLoop 也是 juc 的 ScheduledExecutorService 实现

         for (;;) {

             try {

                 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

                     case SelectStrategy.CONTINUE:

                         continue ;

                     case SelectStrategy.SELECT:

                         select(wakenUp.getAndSet( false ));

 

                        

 

                         if (wakenUp.get()) {

                             selector.wakeup();

                         }

                         // fall through

                     default :

                 }

 

                 cancelledKeys = 0 ;

                 needsToSelectAgain = false ;

            // IO事件占总执行时间的百分比 */

                 final int ioRatio = this .ioRatio;

                 if (ioRatio == 100 ) {

                     try {

                         processSelectedKeys();

                     } finally {

                         // Ensure we always run tasks.

                         runAllTasks();

                     }

                 } else {

                     final long ioStartTime = System.nanoTime();

                     try {

                         processSelectedKeys();

                     } finally {

                         // Ensure we always run tasks.

                         final long ioTime = System.nanoTime() - ioStartTime;

                         runAllTasks(ioTime * ( 100 - ioRatio) / ioRatio);

                     }

                 }

             } catch (Throwable t) {

                 handleLoopException(t);

             }

             // Always handle shutdown even if the loop processing threw an exception.

             try {

                 if (isShuttingDown()) {

                     closeAll();

                     if (confirmShutdown()) {

                         return ;

                     }

                 }

             } catch (Throwable t) {

                 handleLoopException(t);

             }

         }

     }

processSelectedKeys 函数 执行时会判断是否执行优化的版本,即判断 SelectedSelectionKeySet 是否为空。

是否开启优化取决于是否设置了环境变量  io.netty.noKeySetOptimization ,默认是 false 代表开启

?

1

2

private static final boolean DISABLE_KEYSET_OPTIMIZATION =

             SystemPropertyUtil.getBoolean( "io.netty.noKeySetOptimization" , false );

原理是通过反射的方式设置 eventLoop绑定的selector中的 selectKeys属性 为 SelectedSelectionKeySet ,好处是不用 迭代  selector.selectedKeys()

初始化 EventLoop

注入时机为初始化 EventLoop 的时候

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

private SelectorTuple openSelector() {

         12       //注入逻辑40

         Object maybeException = AccessController.doPrivileged( new PrivilegedAction<Object>() {

             @Override

             public Object run() {

                 try {

                     Field selectedKeysField = selectorImplClass.getDeclaredField( "selectedKeys" );

                     Field publicSelectedKeysField = selectorImplClass.getDeclaredField( "publicSelectedKeys" );

 

                     Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);

                     if (cause != null ) {

                         return cause;

                     }

                     cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);

                     if (cause != null ) {

                         return cause;

                     }

 

                     selectedKeysField.set(unwrappedSelector, selectedKeySet);

                     publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);

                     return null ;

                 } catch (NoSuchFieldException e) {

                     return e;

                 } catch (IllegalAccessException e) {

                     return e;

                 }

             }

         });

 

         ........ 78      }

处理读事件

处理读事件主要在processSelectedKey 中 ,分别对 读、写、连接事件进行了处理。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

private void processSelectedKeysOptimized() {

         for ( int i = 0 ; i < selectedKeys.size; ++i) {

             final SelectionKey k = selectedKeys.keys[i];

             // null out entry in the array to allow to have it GC'ed once the Channel close

             // See https://github测试数据/netty/netty/issues/2363

             selectedKeys.keys[i] = null ;

             final Object a = k.attachment();

             if (a instanceof AbstractNioChannel) {

                 //分别处理每个channel的事件

                 processSelectedKey(k, (AbstractNioChannel) a);

             } else {

                 @SuppressWarnings ( "unchecked" )

                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;

                 processSelectedKey(k, task);

             }

             if (needsToSelectAgain) {

                 // null out entries in the array to allow to have it GC'ed once the Channel close

                 // See https://github测试数据/netty/netty/issues/2363

                 selectedKeys.reset(i + 1 );

                 selectAgain();

                 i = - 1 ;

             }

         }

     }

     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

         if (!k.isValid()) {

             final EventLoop eventLoop;

             try {

                 eventLoop = ch.eventLoop();

             } catch (Throwable ignored) {

                 // If the channel implementation throws an exception because there is no event loop, we ignore this

                 // because we are only trying to determine if ch is registered to this event loop and thus has authority

                 // to close ch.

                 return ;

             }

             // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop

             // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is

             // still healthy and should not be closed.

             // See https://github测试数据/netty/netty/issues/5125

             if (eventLoop != this || eventLoop == null ) {

                 return ;

             }

             // close the channel if the key is not valid anymore

             unsafe.close(unsafe.voidPromise());

             return ;

         }

         try {

             int readyOps = k.readyOps();

             // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise

             // the NIO JDK channel implementation may throw a NotYetConnectedException.

             if ((readyOps & SelectionKey.OP_CONNECT) != 0 ) {

                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking

                 // See https://github测试数据/netty/netty/issues/924

                 int ops = k.interestOps();

                 ops &= ~SelectionKey.OP_CONNECT;

                 k.interestOps(ops);

           //处理了连接事件

                 unsafe.finishConnect();

             }

             // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.

             if ((readyOps & SelectionKey.OP_WRITE) != 0 ) {

                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

//将要写入的buffer flush掉

         ch.unsafe().forceFlush();

            }

 

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead

            // to a spin loop

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 ) {

          //回调 pipeline 上所有的 ChannelInboundHandler 的 fireChannelRead  和 channelReadComplete 函数

                unsafe.read();

            }

        } catch (CancelledKeyException ignored) {

            unsafe.close(unsafe.voidPromise());

        }

    }

注意 

NioServerSocketChannel 和 NioSocketChannel 都是 同样的 处理逻辑, 不同的是  前者 只关注 OP_ACCEPT 和 OP_READ事件, 后者 关注  OP_READ、OP_WRITE、OP_CONNECT事件

当NioServerSocketChannel 发生 OP_ACCEPT事件时 会 触发

 AbstractNioChannel.NioUnsafe.read ->  NioSctpServerChannel.doReadMessages(List<Object>)  -> ServerBootstrapAcceptor.channelRead ,

将受到的 NioSocketChannel 注册到 childEventLoop 。

以上就是Netty事件循环主逻辑NioEventLoop的run方法分析的详细内容,更多关于Netty循环逻辑NioEventLoop run方法的资料请关注其它相关文章!

原文链接:https://HdhCmsTestcnblogs测试数据/ironroot/p/8575134.html

查看更多关于Netty事件循环主逻辑NioEventLoop的run方法分析的详细内容...

  阅读:10次