好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

elasticsearch索引的创建过程index create逻辑分析

索引的创建过程

从本篇开始,就进入了Index的核心代码部分。这里首先分析一下索引的创建过程。elasticsearch中的索引是多个分片的集合,它只是逻辑上的索引,并不具备实际的索引功能,所有对数据的操作最终还是由每个分片完成。

创建索引的过程,从elasticsearch集群上来说就是写入索引元数据的过程,这一操作只能在master节点上完成。这是一个阻塞式动作,在加上分配在集群上均衡的过程也非常耗时,因此在一次创建大量索引的过程master节点会出现单点性能瓶颈,能够看到响应过程很慢。

在开始具体源码分析之前,首先回顾一下Action部分的内容(参考 index action分析 ),elasticsearch的每一个功能都对应两个Action,*action和Transport*action。*action中定义了每个功能对应的路径,同时Action的instance绑定对应的Transport*Action。所有功能请求都需要在集群上转发,这大概也是每个功能都有Transport*Action的原因吧。对于create当然也不例外,它的开始点也是TransportCreateAction。另外,在 action support分析 中分析过,不同的action需要经过和需要操作的节点也不同。create index只能由master节点进行,而且也只在master节点上进行,保证集群数据的一致性。

materOperation方法实现

因此TransportCreateAction继承了TransportMasterNodeOperationAction,并实现了materOperation方法。它的方法如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

protected void masterOperation( final CreateIndexRequest request, final ClusterState state, final ActionListener&lt;CreateIndexResponse&gt; listener) throws ElasticsearchException {

         String cause = request.cause();

         if (cause.length() == 0 ) {

             cause = "api" ;

         }

         final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index())

                 .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())

                 .settings(request.settings()).mappings(request.mappings())

                 .aliases(request.aliases()).customs(request.customs());

         createIndexService.createIndex(updateRequest, new ActionListener&lt;ClusterStateUpdateResponse&gt;() {

             @Override

             public void onResponse(ClusterStateUpdateResponse response) {

                 listener.onResponse( new CreateIndexResponse(response.isAcknowledged()));

             }

             @Override

             public void onFailure(Throwable t) {

                 if (t instanceof IndexAlreadyExistsException) {

                     logger.trace( "[{}] failed to create" , t, request.index());

                 } else {

                     logger.debug( "[{}] failed to create" , t, request.index());

                 }

                 listener.onFailure(t);

             }

         });

     }

这里看上很简单,只是调用了createIndexService(它其实是MetaDataCreateIndexService)的方法,就是修改集群matedata过程。

clusterservice处理

修改前首先获取到index名称对应的lock,这样保证操作数据一致性,然后生成updatetask,交给clusterservice处理。代码如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

public void createIndex( final CreateIndexClusterStateUpdateRequest request, final ActionListener&lt;ClusterStateUpdateResponse&gt; listener) {

         // 获取锁,只对该索引的操作加锁,而不是整个cluster

         final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index());

         // 如果能够获取锁离开创建索引,否则在下面启动新的线程进行

         if (mdLock.tryAcquire()) {

             createIndex(request, listener, mdLock);

             return ;

         }

         threadPool.executor(ThreadPool.Names.MANAGEMENT).execute( new ActionRunnable(listener) {

             @Override

             public void doRun() throws InterruptedException {

                 if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {

                     listener.onFailure( new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock" ));

                     return ;

                 }

                 createIndex(request, listener, mdLock);

             }

         });

     }

createIndex方法,会封装create请求,然后向cluster发送一个updatetask。代码如下所示:

?

1

2

3

4

5

private void createIndex( final CreateIndexClusterStateUpdateRequest request, final ActionListener&lt;ClusterStateUpdateResponse&gt; listener, final Semaphore mdLock) {

         ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();

         updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);

         request.settings(updatedSettingsBuilder.build());

         clusterService.submitStateUpdateTask( "create-index [" + request.index() + "], cause [" + request.cause() + "]" , Priority.URGENT, new AckedClusterStateUpdateTask&lt;ClusterStateUpdateResponse&gt;(request, listener)

建立索引 修改配置

增加或者修改mapping都是对集群状态修改,它们的过程都很相似,都是通过clusterService提交一个更新操作,同时附带有优先级。clusterservice会根据优先级和更新状态task的类型来进行对应的操作。如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

public void submitStateUpdateTask( final String source, Priority priority, final ClusterStateUpdateTask updateTask) {

         if (!lifecycle.started()) {

             return ;

         }

         try {

             final UpdateTask task = new UpdateTask(source, priority, updateTask); //根据优先级新建不同的task

             if (updateTask instanceof TimeoutClusterStateUpdateTask) { //超时任务,这类任务需要即时返回,因此立刻执行。

                 final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask;

                 updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() {

                     @Override

                     public void run() {

                         threadPool.generic().execute( new Runnable() {

                             @Override

                             public void run() {

                                 timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source()));

                             }

                         });

                     }

                 });

             } else { //其它类型,可以延迟执行,则交给线程池来执行。

                 updateTasksExecutor.execute(task);

             }

         } catch (EsRejectedExecutionException e) {

             // ignore cases where we are shutting down..., there is really nothing interesting

             // to be done here...

             if (!lifecycle.stoppedOrClosed()) {

                 throw e;

             }

         }

     }

说完它们的执行过程,再来看一下create index的具体逻辑。这个逻辑在matedataservice所提交的AckedClusterStateUpdateTask中的execute方法中。总体来说,这一过程就是将request中关于索引的配置mapping等取出来加入到当前的clustermatedata中,构造一个新的matedata的过程。这一过程还是比较复杂,限于篇幅将在下次中进行分析。

总结

创建索引的过程就是master节点更新集群matedata的过程,为了保证数据一致性,需要获取锁。

因此存在单点瓶颈。对于外部调用来说,跟其它功能一样,外部接口调用CreateIndexAction的相关方法,然后通过TransPortCreateIndexAction讲请求发送到集群上,进行索引创建。

以上就是elasticsearch索引创建过程index create的详细内容,更多关于elasticsearch索引创建过程index create的资料请关注其它相关文章!

原文链接:https://www.cnblogs.com/zziawanblog/p/6921866.html

查看更多关于elasticsearch索引的创建过程index create逻辑分析的详细内容...

  阅读:26次