好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

elasticsearch源码分析index action实现方式

action的作用

上一篇从结构上分析了action的 ,本篇将以index action为例仔分析一下action的实现方式。

再概括一下action的作用:对于每种功能(如index)action都会包括两个基本的类*action(IndexAction)和Transport*action(TransportIndexAction),前者类中会有一个实例(IndexAction INSTANCE = new IndexAction())这个实例用于client绑定对应的TransportAction(registerAction(IndexAction.INSTANCE, TransportIndexAction.class)),绑定过程发送在ActionModuel中。

另外在Action类中还会定义一个action的名字(String NAME = "indices:data/write/index")这个名字用于TransportService绑定对于的handle,用于处理NettyTransport接收到的信息。TransportAction的是最终的逻辑处理者,当接收到请求时,会首先判断本节点能否处理,如果能够处理则调用相关的方法处理得到结果返回,否则将通过NettyTransport转发该请求到对应的node进行处理。所有的Transport的结构都是这种类型。

TransportAction的类图

首先看一下TransportAction的类图,所的Transport*action都继承自于它。

它主要由两个方法execute和doExecute,execute方法有两种实现,第一种实现需要自行添加actionListener。最终的逻辑都在doExecute方法中,这个方法在各个功能模块中实现。以下是TransportIndexAction的继承关系:

实现上由于功能划分的原因,TransportIndexAction直接继承自TranspShardReplicationOperationAction,这个抽象类中的方法是所有需要操作shard副本的功能action的父,因此它的实现还包括delete,bulk等功能action。它实现了多个内部类,这些内部类用来辅助完成相关的功能。这里主要说一下OperationTransportHandler,ReplicaOperationTransportHandler及AsyncShardOperationAction三个子类。

OperationTransportHandler的代码

如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

class OperationTransportHandler extends BaseTransportRequestHandler<Request> {

//继承自BaseTransportRequestHanlder

………………

         @Override

         public void messageReceived( final Request request, final TransportChannel channel) throws Exception {

             // no need to have a threaded listener since we just send back a response

             request.listenerThreaded( false );

             // if we have a local operation, execute it on a thread since we don't spawn

             request.operationThreaded( true );

       //调用Transport的execute方法,通过channel返回结果

             execute(request, new ActionListener<Response>() {

                 @Override

                 public void onResponse(Response result) {

                     try {

                         channel.sendResponse(result);

                     } catch (Throwable e) {

                         onFailure(e);

                     }

                 }

                 @Override

                 public void onFailure(Throwable e) {

                     try {

                         channel.sendResponse(e);

                     } catch (Throwable e1) {

                         logger.warn( "Failed to send response for " + actionName, e1);

                     }

                 }

             });

         }

看过 NettyTransport请求发送和处理 的同学一定对这个代码不陌生,这就是elasticsearch节点间处理信息的典型模式。当请求通过NettyTransport发送到本节点时会根据请求的action名称找到对应的handler,使用对应的handler来处理该请求。这个handler就对应着[indices:data/write/index],可以看到它调用execute方法来处理。它的注册时在TransportShardReplicationOperationAction构造函数中完成的。

知道了OperationTransportHandler,ReplicaOperationTransportHandler就好理解了它的实现方式跟前者完全一样,对应的action名称加了一个[[r]],它的作用是处理需要在副本上进行的操作,代码如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {

……………………

         @Override

         public void messageReceived( final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {

             try {

                 shardOperationOnReplica(request);

             } catch (Throwable t) {

                 failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t);

                 throw t;

             }

             channel.sendResponse(TransportResponse.Empty.INSTANCE);

         }

     }

可以看到代码结构非常像,只是调用了副本操作的方法shardOperationOnReplica,这个方法在这TransportShardReplicationOperationAction中是抽象的,它的实现在各个子类中,例如deleteaction中实现了对于delete请求如何在副本上处理。

分析完这两个handle是不是对于action的处理过程有了一定的眉目了呢?但是这才是冰山一角,这两个Handler是用来接收来自其它节点的请求,如果请求的正好是本节点该如何处理呢?这些逻辑都在AsyncShardOperationAction类中。首先看一下它的内部结构:

因为TransportShardReplicationOperationAction的所有子类都是对索引的修改,会引起数据不一致,因此它的操作流程都是现在primaryShard上操作然后是Replicashard上操作。代码如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

protected void doStart() throws ElasticsearchException {

             try {

           //检查是否有阻塞

                 ClusterBlockException blockException = checkGlobalBlock(observer.observedState());

                 if (blockException != null ) {

                     if (blockException.retryable()) {

                         logger.trace( "cluster is blocked ({}), scheduling a retry" , blockException.getMessage());

                         retry(blockException);

                         return ;

                     } else {

                         throw blockException;

                     }

                 }

           //检测是否是创建索引

                 if (resolveIndex()) {

                     internalRequest.concreteIndex(observer.observedState().metaData().concreteSingleIndex(internalRequest.request().index(), internalRequest.request().indicesOptions()));

                 } else {

                     internalRequest.concreteIndex(internalRequest.request().index());

                 }

                 // check if we need to execute, and if not, return

                 if (!resolveRequest(observer.observedState(), internalRequest, listener)) {

                     return ;

                 }

           //再次检测是否有阻塞

                 blockException = checkRequestBlock(observer.observedState(), internalRequest);

                 if (blockException != null ) {

                     if (blockException.retryable()) {

                         logger.trace( "cluster is blocked ({}), scheduling a retry" , blockException.getMessage());

                         retry(blockException);

                         return ;

                     } else {

                         throw blockException;

                     }

                 }

                 shardIt = shards(observer.observedState(), internalRequest);

             } catch (Throwable e) {

                 listener.onFailure(e);

                 return ;

             }

         //查找primaryShard

             boolean foundPrimary = false ;

             ShardRouting shardX;

             while ((shardX = shardIt.nextOrNull()) != null ) {

                 final ShardRouting shard = shardX;

                 // we only deal with primary shardIt here...

                 if (!shard.primary()) {

                     continue ;

                 }

                 if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) {

                     logger.trace( "primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry." , shard.shardId(), shard.currentNodeId());

                     retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node." );

                     return ;

                 }

                 if (!primaryOperationStarted测试数据pareAndSet( false , true )) {

                     return ;

                 }

                 foundPrimary = true ;

           //primaryShard就在本地,直接进行相关操作

                 if (shard.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {

                     try {

                         if (internalRequest.request().operationThreaded()) {

                             internalRequest.request().beforeLocalFork();

                             threadPool.executor(executor).execute( new Runnable() {

                                 @Override

                                 public void run() {

                                     try {

                                         performOnPrimary(shard.id(), shard);

                                     } catch (Throwable t) {

                                         listener.onFailure(t);

                                     }

                                 }

                             });

                         } else {

                             performOnPrimary(shard.id(), shard);

                         }

                     } catch (Throwable t) {

                         listener.onFailure(t);

                     }

                 } else { //primaryShard在其它节点上,将请求通过truansport发送到对应的节点。

                     DiscoveryNode node = observer.observedState().nodes().get(shard.currentNodeId());

                     transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() {

                         @Override

                         public Response newInstance() {

                             return newResponseInstance();

                         }

                         @Override

                         public String executor() {

                             return ThreadPool.Names.SAME;

                         }

                         @Override

                         public void handleResponse(Response response) {

                             listener.onResponse(response);

                         }

                         @Override

                         public void handleException(TransportException exp) {

                             // if we got disconnected from the node, or the node / shard is not in the right state (being closed)

                             if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||

                                     retryPrimaryException(exp)) {

                                 primaryOperationStarted.set( false );

                                 internalRequest.request().setCanHaveDuplicates();

                                 // we already marked it as started when we executed it (removed the listener) so pass false

                                 // to re-add to the cluster listener

                                 logger.trace( "received an error from node the primary was assigned to ({}), scheduling a retry" , exp.getMessage());

                                 retry(exp);

                             } else {

                                 listener.onFailure(exp);

                             }

                         }

                     });

                 }

                 break ;

             }

             ………………

         }

这就是对应请求的处理过程。

primary操作的方法

?

1

2

3

4

5

6

void performOnPrimary( int primaryShardId, final ShardRouting shard) {

            ……

                 PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request()));

                 performReplicas(response);

             …………

         }

以上就是performOnPrimary方法的部分代码,首先调用外部类的shardOperationOnPrimary方法,该方法实现在各个子类中,在TransportIndexAction中的实现如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

@Override

     protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {

         final IndexRequest request = shardRequest.request;

         // 查看是否需要routing

         IndexMetaData indexMetaData = clusterState.metaData().index(shardRequest.shardId.getIndex());

         MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());

         if (mappingMd != null && mappingMd.routing().required()) {

             if (request.routing() == null ) {

                 throw new RoutingMissingException(shardRequest.shardId.getIndex(), request.type(), request.id());

             }

         }

       //调用indexserice执行对应的index操作

         IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());

         IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());

         SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())

                 .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());

         long version;

         boolean created;

         try {

             Engine.IndexingOperation op;

             if (request.opType() == IndexRequest.OpType.INDEX) {

                 Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());

                 if (index.parsedDoc().mappingsModified()) {

                     mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());

                 }

                 indexShard.index(index);

                 version = index.version();

                 op = index;

                 created = index.created();

             } else {

                 Engine.Create create = indexShard.prepareCreate(sourceToParse,

                         request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());

                 if (create.parsedDoc().mappingsModified()) {

                     mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());

                 }

                 indexShard.create(create);

                 version = create.version();

                 op = create;

                 created = true ;

             }

             if (request.refresh()) {

                 try {

                     indexShard.refresh( "refresh_flag_index" );

                 } catch (Throwable e) {

                     // ignore

                 }

             }

             // update the version on the request, so it will be used for the replicas

             request.version(version);

             request.versionType(request.versionType().versionTypeForReplicationAndRecovery());

             assert request.versionType().validateVersionForWrites(request.version());

             IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);

             return new PrimaryResponse<>(shardRequest.request, response, op);

         } catch (WriteFailureException e) {

             if (e.getMappingTypeToUpdate() != null ) {

                 DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());

                 if (docMapper != null ) {

                     mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());

                 }

             }

             throw e.getCause();

         }

     }

上面的代码就是index的执行过程,这一过程涉及到index的底层操作,这里就不展开,只是说明它在action中是如何实现的,后面会有详细说明。接下来看在副本上的操作。副本可能有多个,因此首先调用了performReplicas方法,在这个方法中首先开始监听集群的状态,然后便利所有的副本进行处理,如果是异步则加入一个listener,否则同步执行返回结果。最后调用performReplica,在该方法中调用外部类的抽象方法shardOperationOnReplica。 这一过程比较简单,这里就不再贴代码,有兴趣可以参考相关源码。

总结

这里以TransportIndexAction为例分析了tansportaction的结构层次。它在TransportAction直接还有一层那就是TransportShardReplicationOperationAction,这个类是actionsupport包中的一个,这个包把所有的子操作方法做了进一步的抽象,抽象出几个大类放到了这里,所有其它子功能很多都继承自这。这个包会在后面有详细分析。 

以上就是elasticsearch源码分析index action实现方式的详细内容,更多关于elasticsearch源码分析index action的资料请关注其它相关文章!

原文链接:https://HdhCmsTestcnblogs测试数据/zziawanblog/p/6660387.html

查看更多关于elasticsearch源码分析index action实现方式的详细内容...

  阅读:15次