好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

elasticsearch集群cluster discovery可配式模块示例分析

前言

elasticsearch cluster实现了自己发现机制zen。Discovery功能主要包括以下几部分内容:master选举,master错误探测,集群中其它节点探测,单播多播ping。本篇会首先概述以下Discovery这一部分的功能,然后介绍节点检测。其它内容会在接下来介绍。

Discovery模块的概述

discovery是可配式模块,官方支持亚马逊的Azure discovery,Google Compute Engine,EC2 Discovery三种发现机制,根据插件规则完全可以自己实现其它的发现机制。整个模块通过实现guice的DiscoveryModule对外提供模块的注册和启动, 默认使用zen discovery。发现模块对外接口为DiscoveryService,它的方法如下所示:

它本质上是discovery的一个代理,所有的功能最终都是由所绑定的discovery所实现的。节点启动时通过DiscoveryModule获取DiscoveryService,然后启动DiscoveryService,DiscoveryService启动绑定的Discovery,整个功能模块就完成了加载和启动。这也是elasticsearch所有模块的实现方式,通过module对外提供绑定和获取,通过service接口对外提供模块的功能,在后面的分析中会经常遇到。

cluster节点探测

接下来分析cluster的一个重要功能就是节点探测。cluster中不能没有master节点,因此集群中所有节点都要周期探测master节点,一旦无法检测到,将会进行master选举。同时作为master,对于节点变动也要时刻关注,因此它需要周期性探测集群中所有节点,确保及时剔除已经宕机的节点。这种相互间的心跳检测就是cluster的faultdetection。下图是faultdetection的继承关系:

有两种实现方式,分别是master探测集群中其它节点和其它节点对master节点的探测。

FaultDetection只要一个抽象方法handleTransportDisconnect,该方法在内部类FDConnectionListener中被调用。在elasticsearch中大量使用了listener的异步方式,异步可以极大提升系统性能。它的代码如下所示:

?

1

2

3

4

5

6

7

8

9

private class FDConnectionListener implements TransportConnectionListener {

         @Override

         public void onNodeConnected(DiscoveryNode node) {

         }

         @Override

         public void onNodeDisconnected(DiscoveryNode node) {

             handleTransportDisconnect(node);

         }

     }

faultdetection启动时会注册相应的FDConnetionListener,当探测到节点丢失,会通过onNodeDisconnected方法回调对于的handleTransportDisconnect进行处理。

MasterFaultDetection的启动代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

private   void   innerStart( final   DiscoveryNode masterNode) {

         this .masterNode = masterNode;

         this .retryCount = 0 ;

         this .notifiedMasterFailure.set( false );

         // 尝试连接master节点

         try {

             transportService.connectToNode(masterNode);

         } catch ( final Exception e) {

             // 连接失败通知masterNode失败

             notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]" );

             return ;

         }

     //关闭之前的masterping,重启新的masterping

         if (masterPinger != null ) {

             masterPinger.stop();

         }

         this .masterPinger = new MasterPinger();

         // 周期之后启动masterPing,这里并没有周期启动masterPing,只是设定了延迟时间。

         threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);

     }

代码有有详细注释,就不再过多解释。

master连接失败的逻辑

代码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

private void notifyMasterFailure( final DiscoveryNode masterNode, final String reason) {

         if (notifiedMasterFailure测试数据pareAndSet( false , true )) {

             threadPool.generic().execute( new Runnable() {

                 @Override

                 public void run() {

             //通知所有listener master丢失

                     for (Listener listener : listeners) {

                         listener.onMasterFailure(masterNode, reason);

                     }

                 }

             });

             stop( "master failure, " + reason);

         }

     }

 在ZenDiscovery中实现了listener.onMasterFailure接口。会进行master丢失的相关处理,在后面再分析。

MasterPing的关键代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

private class MasterPinger implements Runnable {

         private volatile boolean running = true ;

         public void stop() {

             this .running = false ;

         }

         @Override

         public void run() {

             if (!running) {

                 // return and don't spawn...

                 return ;

             }

             final DiscoveryNode masterToPing = masterNode;

    final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);

             final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);

             transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {

                         @Override

                         public MasterPingResponseResponse newInstance() {

                             return new MasterPingResponseResponse();

                         }

                         @Override

                         public void handleResponse(MasterPingResponseResponse response) {

                             if (!running) {

                                 return ;

                             }

                             // reset the counter, we got a good result

                             MasterFaultDetection. this .retryCount = 0 ;

                             // check if the master node did not get switched on us..., if it did, we simply return with no reschedule

                             if (masterToPing.equals(MasterFaultDetection. this .masterNode())) {

                                 // 启动新的ping周期

                                 threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger. this );

                             }

                         }

                         @Override

                         public void handleException(TransportException exp) {

                             if (!running) {

                                 return ;

                             }

                             synchronized (masterNodeMutex) {

                                 // check if the master node did not get switched on us...

                                 if (masterToPing.equals(MasterFaultDetection. this .masterNode())) {

                                     if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {

                                         handleTransportDisconnect(masterToPing);

                                         return ;

                                     } else if (exp.getCause() instanceof NoLongerMasterException) {

                                         logger.debug( "[master] pinging a master {} that is no longer a master" , masterNode);

                                         notifyMasterFailure(masterToPing, "no longer master" );

                                         return ;

                                     } else if (exp.getCause() instanceof NotMasterException) {

                                         logger.debug( "[master] pinging a master {} that is not the master" , masterNode);

                                         notifyMasterFailure(masterToPing, "not master" );

                                         return ;

                                     } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {

                                         logger.debug( "[master] pinging a master {} but we do not exists on it, act as if its master failure" , masterNode);

                                         notifyMasterFailure(masterToPing, "do not exists on master, act as master failure" );

                                         return ;

                                     }

                                     int retryCount = ++MasterFaultDetection. this .retryCount;

                                     logger.trace( "[master] failed to ping [{}], retry [{}] out of [{}]" , exp, masterNode, retryCount, pingRetryCount);

                                     if (retryCount >= pingRetryCount) {

                                         logger.debug( "[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout" , masterNode, pingRetryCount, pingRetryTimeout);

                                         // not good, failure

                                         notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with  maximum [" + pingRetryTimeout + "] timeout" );

                                     } else {

                                          // resend the request, not reschedule, rely on send timeout

                                         transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this );

                                     }

                                 }

                             }

                         }

             );

         }

     }

MasterPing是一个线程,在innerStart的方法中没有设定周期启动masterping,但是masterping需要周期进行,这个秘密就在run 方法中,如果ping成功就会重启一个新的ping。这样既保证了ping线程的唯一性同时也保证了ping的顺序和间隔。

ping的方式跟之前一样是也是通过transport发送一个masterpingrequest,进行一个连接。节点收到该请求后,如果已不再是master会抛出NotMasterException,状态更新出差会抛出其它异常,异常会通过。否则会正常响应notifyMasterFailure方法处理跟启动逻辑一样。

对于网络问题导致的无响应情况,会调用handleTransportDisconnect(masterToPing)方法处理。masterfaultDetection对该方法的实现如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

protected void handleTransportDisconnect(DiscoveryNode node) {

     //这里需要同步

         synchronized (masterNodeMutex) {

         //master 已经换成其它节点,就没必要再连接

             if (!node.equals( this .masterNode)) {

                 return ;

             }

             if (connectOnNetworkDisconnect) {

                 try {

             //尝试再次连接

                     transportService.connectToNode(node);

                     // if all is well, make sure we restart the pinger

                     if (masterPinger != null ) {

                         masterPinger.stop();

                     }

             //连接成功启动新的masterping

                     this .masterPinger = new MasterPinger();

                     // we use schedule with a 0 time value to run the pinger on the pool as it will run on later

                     threadPool.schedule(TimeValue.timeValueMillis( 0 ), ThreadPool.Names.SAME, masterPinger);

                 } catch (Exception e) {

             //连接出现异常,启动master节点丢失通知

                     logger.trace( "[master] [{}] transport disconnected (with verified connect)" , masterNode);

                     notifyMasterFailure(masterNode, "transport disconnected (with verified connect)" );

                 }

             } else {

           //不需要重连,通知master丢失。

                 logger.trace( "[master] [{}] transport disconnected" , node);

                 notifyMasterFailure(node, "transport disconnected" );

             }

         }

     }

这就是masterfaultDetection的整个流程:启动中如果master丢失则通知节点丢失,否则在一定延迟(3s)后启动masterping,masterping线程尝试连接master节点,如果master节点网络失联,尝试再次连接。master节点收到masterpingrequest后首先看一下自己还是不是master,如果不是则抛出异常,否则正常回应。节点如果收到响应是异常则启动master丢失通知,否则此次ping结束。在一定延迟后启动新的masterping线程。

NodeFaultDetection的逻辑跟实现上跟MasterFualtDetetion相似,区别主要在于ping异常处理上。当某个节点出现异常或者没有响应时,会启动节点丢失机制,只是受到通知后的处理逻辑不通。就不再详细分析,有兴趣可以参考具体代码,希望大家以后多多支持!

原文链接:https://HdhCmsTestcnblogs测试数据/zziawanblog/p/6533731.html

查看更多关于elasticsearch集群cluster discovery可配式模块示例分析的详细内容...

  阅读:16次