好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

elasticsearch数据信息索引操作action support示例分析

抽象类分析

Action这一部分主要是数据(索引)的操作和部分集群信息操作。 所有的请求通过client转发到对应的action上然后再由对应的TransportAction来执行相关请求。如果请求能在本机上执行则在本机上执行,否则使用Transport进行转发到对应的节点。action support部分是对action的抽象,所有的具体action都继承了support action中的某个类。这里将对这些抽象类进行分析。

这一部分总共分为broadcast(广播),master,nodes,replication及single几个部分。broadcast主要针对一些无具体目标主机的操作,如查询index是否存在,所有继承这个类的action都具有这种类似的性质;nodes主要是对节点的操作,如热点线程查询(hotThread)查询节点上的繁忙线程;replication的子类主要是需要或可以在副本上进行的操作,如索引操作,数据不仅要发送到主shard还要发送到各个副本。single则主要是目标明确的单shard操作,如get操作,根据doc的id取doc,doc 的id能够确定它在哪个shard上,因此操作也在此shard上执行。

doExecute方法

这些support action的实现可以分为两类,第一类就是实现一个内部类作为异步操作器,子类执行doExecute时,初始化该操作器并启动。另外一种就是直接实现一个方法,子类doExecute方法调用该方法进行。TransportBroadcastOperationAction就属于前者,它实现了内部操作器AsyncBroadcastAction。TransportCountAction继承于它,它doExecute方法如下所示:

?

1

2

3

4

5

@Override

     protected void doExecute(CountRequest request, ActionListener<CountResponse> listener) {

         request.nowInMillis = System.currentTimeMillis();

         super .doExecute(request, listener);

     }

调用父类的doExecute方法,也就是TransportBroadcastOperationAction的方法,它的实现如下所示:

?

1

2

3

4

@Override

     protected void doExecute(Request request, ActionListener&lt;Response&gt; listener) {

         new AsyncBroadcastAction(request, listener).start();

     }

可以看到它初始化了AsyncBroadcastAction并启动。AsyncBroadcastAction只是确定了操作的流程,及操作完成如何返回response,并未涉及到具体的操作逻辑。因为这些逻辑都在每个子action中实现,不同的action需要进行不同的操作。如count需要count每个shard并且返回最后的总数值,而IndexExistAction则需要对比所有索引查看查询的索引是否存在。start方法的代码如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

public void start() {

       //没有shards

             if (shardsIts.size() == 0 ) {

                 // no shards

                 try {

                     listener.onResponse(newResponse(request, new AtomicReferenceArray( 0 ), clusterState));

                 } catch (Throwable e) {

                     listener.onFailure(e);

                 }

                 return ;

             }

             request.beforeStart();

             // count the local operations, and perform the non local ones

             int shardIndex = - 1 ;

        //遍历对每个shards进行操作

             for ( final ShardIterator shardIt : shardsIts) {

                 shardIndex++;

                 final ShardRouting shard = shardIt.nextOrNull();

                 if (shard != null ) {

                     performOperation(shardIt, shard, shardIndex);

                 } else {

                     // really, no shards active in this group

                     onOperation( null , shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));

                 }

             }

         }

start方法就是遍历所有shards,如果shard存在则执行performOperation方法,在这个方法中会区分该请求能否在本机上进行,能执行则调用shardOperation方法得到结果。这个方法在这是抽象的,每个子类都有实现。否则发送到对应的主机上。,如果shard为null则进行onOperation操作,遍历该shard的其它副本看能否找到可以操作的shard。

performOperation代码

如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

protected void performOperation( final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) {

             if (shard == null ) { //shard 为null抛出异常

                 // no more active shards... (we should not really get here, just safety)

                 onOperation( null , shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));

             } else {

                 try {

                     final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);

                     if (shard.currentNodeId().equals(nodes.localNodeId())) { //shard在本地执行shardOperation方法,并通过onOperation方法封装结果

                         threadPool.executor(executor).execute( new Runnable() {

                             @Override

                             public void run() {

                                 try {

                                     onOperation(shard, shardIndex, shardOperation(shardRequest));

                                 } catch (Throwable e) {

                                     onOperation(shard, shardIt, shardIndex, e);

                                 }

                             }

                         });

                     } else { //不是本地shard,发送到对应节点。

                         DiscoveryNode node = nodes.get(shard.currentNodeId());

                         if (node == null ) {

                             // no node connected, act as failure

                             onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));

                         } else {

                             transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler&lt;ShardResponse&gt;() {

                                 @Override

                                 public ShardResponse newInstance() {

                                     return newShardResponse();

                                 }

                                 @Override

                                 public String executor() {

                                     return ThreadPool.Names.SAME;

                                 }

                                 @Override

                                 public void handleResponse(ShardResponse response) {

                                     onOperation(shard, shardIndex, response);

                                 }

                                 @Override

                                 public void handleException(TransportException e) {

                                     onOperation(shard, shardIt, shardIndex, e);

                                 }

                             });

                         }

                     }

                 } catch (Throwable e) {

                     onOperation(shard, shardIt, shardIndex, e);

                 }

             }

         }

方法shardOperation在countTransportAction的实现如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

@Override

     protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticsearchException {

         IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); //

         IndexShard indexShard = indexService.shardSafe(request.shardId().id());

     //构造查询context

         SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());

         SearchContext context = new DefaultSearchContext( 0 ,

                 new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),

                 shardTarget, indexShard.acquireSearcher( "count" ), indexService, indexShard,

                 scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());

         SearchContext.setCurrent(context);

         try {

             // TODO: min score should move to be "null" as a value that is not initialized...

             if (request.minScore() != - 1 ) {

                 context.minimumScore(request.minScore());

             }

             BytesReference source = request.querySource();

             if (source != null &amp;&amp; source.length() &gt; 0 ) {

                 try {

                     QueryParseContext.setTypes(request.types());

                     context.parsedQuery(indexService.queryParserService().parseQuery(source));

                 } finally {

                     QueryParseContext.removeTypes();

                 }

             }

             final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER;

             boolean terminatedEarly = false ;

             context.preProcess();

             try {

                 long count;

                 if (hasTerminateAfterCount) { //调用lucene的封装接口执行查询并返回结果

                     final Lucene.EarlyTerminatingCollector countCollector =

                             Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter());

                     terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector);

                     count = countCollector.count();

                 } else {

                     count = Lucene.count(context.searcher(), context.query());

                 }

                 return new ShardCountResponse(request.shardId(), count, terminatedEarly);

             } catch (Exception e) {

                 throw new QueryPhaseExecutionException(context, "failed to execute count" , e);

             }

         } finally {

             // this will also release the index searcher

             context.close();

             SearchContext.removeCurrent();

         }

     }

可以看到这里是每个action真正的逻辑实现。因为这里涉及到index部分的内容,这里就不详细分析。后面关于index的分析会有涉及。这就是support action中的第一种实现。

master的相关操作

第二种就master的相关操作,因此没有实现对应的操作类,而只是实现了一个方法。该方法的作用跟操作器作用相同,唯一的不同是它没有操作器这么多的变量, 而且它不是异步的。master的操作需要实时进行,执行过程中需要阻塞某些操作,保证集群状态一致性。这里就不再说明,请参考TransportMasterNodeOperationAction原码。

总结

本篇概括说了support action,并以countTransportAction为例说明了support Action中的异步操作器实现,最后简单的分析了master的同步操作。因为这里涉及到很多action不可能一一分析,有兴趣可以参考对应的代码。而且这里有以下index部分的内容,所以没有更深入的分析。在后面分析完index的相关功能后,会挑出几个重要的action做详细分析。

以上就是elasticsearch数据信息索引操作action support示例分析的详细内容,更多关于elasticsearch数据信息索引操作action support的资料请关注其它相关文章!

原文链接:https://HdhCmsTestcnblogs测试数据/zziawanblog/p/6671286.html

查看更多关于elasticsearch数据信息索引操作action support示例分析的详细内容...

  阅读:14次