好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

elasticsearch索引创建create index集群matedata更新

创建索引更新集群index matedata

创建索引需要创建索引并且更新集群index matedata,这一过程在MetaDataCreateIndexService的createIndex方法中完成。这里会提交一个高优先级,AckedClusterStateUpdateTask类型的task。索引创建需要即时得到反馈,异常这个task需要返回,会超时,而且这个任务的优先级也非常高。

下面具体看一下它的execute方法,这个方法会在master执行任务时调用,这个方法非常长,主要完成以下三个功能:更新合并request,template中的mapping和setting,调用indiceService创建索引,对创建后的索引添加mapping。这一系列功能完成后,合并完成后生成新的matedata,并更新集群状态,完成了索引的创建。具体的调用方法参考 上一篇 。

首先创建index的create方法

代码如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

@Override

             public ClusterState execute(ClusterState currentState) throws Exception {

                 boolean indexCreated = false ;

                 String removalReason = null ;

                 try {

             //检查request的合法性,1.5版本主要检查index名字是否合法,如不能含有某些字符,另外就是集群节点版本

                     validate(request, currentState);

                     for (Alias alias : request.aliases()) { //检查别称是否合法

                         aliasValidator.validateAlias(alias, request.index(), currentState.metaData());

                     }

                     // 查找索引模板

                     List&lt;IndexTemplateMetaData&gt; templates = findTemplates(request, currentState, indexTemplateFilter);

                     Map&lt;String, Custom&gt; customs = Maps.newHashMap();

                     // add the request mapping

                     Map&lt;String, Map&lt;String, Object&gt;&gt; mappings = Maps.newHashMap();

                     Map&lt;String, AliasMetaData&gt; templatesAliases = Maps.newHashMap();

                     List&lt;String&gt; templateNames = Lists.newArrayList();

             //取出request中的mapping配置,虽然mapping可以后面添加,多数情况创建索引的时候还是会附带着mapping,在request中mapping是一个map

                     for (Map.Entry&lt;String, String&gt; entry : request.mappings().entrySet()) {

                         mappings.put(entry.getKey(), parseMapping(entry.getValue()));

                     }

             //一些预设如warm等

                     for (Map.Entry&lt;String, Custom&gt; entry : request.customs().entrySet()) {

                         customs.put(entry.getKey(), entry.getValue());

                     }

                     // 将找到的template和request中的mapping合并

                     for (IndexTemplateMetaData template : templates) {

                         templateNames.add(template.getName());

                         for (ObjectObjectCursor&lt;String, CompressedString&gt; cursor : template.mappings()) {

                             if (mappings.containsKey(cursor.key)) {

                                 XContentHelper.mergeDefaults(mappings.get(cursor.key), parseMapping(cursor.value.string()));

                             } else {

                                 mappings.put(cursor.key, parseMapping(cursor.value.string()));

                             }

                         }

                         // 合并custom

                         for (ObjectObjectCursor&lt;String, Custom&gt; cursor : template.customs()) {

                             String type = cursor.key;

                             IndexMetaData.Custom custom = cursor.value;

                             IndexMetaData.Custom existing = customs.get(type);

                             if (existing == null ) {

                                 customs.put(type, custom);

                             } else {

                                 IndexMetaData.Custom merged = IndexMetaData.lookupFactorySafe(type).merge(existing, custom);

                                 customs.put(type, merged);

                             }

                         }

                         //处理合并别名

                         for (ObjectObjectCursor&lt;String, AliasMetaData&gt; cursor : template.aliases()) {

                             AliasMetaData aliasMetaData = cursor.value;

                             //if an alias with same name came with the create index request itself,

                             // ignore this one taken from the index template

                             if (request.aliases().contains( new Alias(aliasMetaData.alias()))) {

                                 continue ;

                             }

                             //if an alias with same name was already processed, ignore this one

                             if (templatesAliases.containsKey(cursor.key)) {

                                 continue ;

                             }

                             //Allow templatesAliases to be templated by replacing a token with the name of the index that we are applying it to

                             if (aliasMetaData.alias().contains( "{index}" )) {

                                 String templatedAlias = aliasMetaData.alias().replace( "{index}" , request.index());

                                 aliasMetaData = AliasMetaData.newAliasMetaData(aliasMetaData, templatedAlias);

                             }

                             aliasValidator.validateAliasMetaData(aliasMetaData, request.index(), currentState.metaData());

                             templatesAliases.put(aliasMetaData.alias(), aliasMetaData);

                         }

                     }

                     // 合并完template和request,现在开始处理配置基本的mapping,合并逻辑跟之前相同,只是mapping来源不同

                     File mappingsDir = new File(environment.configFile(), "mappings" );

                     if (mappingsDir.isDirectory()) {

                         // first index level

                         File indexMappingsDir = new File(mappingsDir, request.index());

                         if (indexMappingsDir.isDirectory()) {

                             addMappings(mappings, indexMappingsDir);

                         }

                         // second is the _default mapping

                         File defaultMappingsDir = new File(mappingsDir, "_default" );

                         if (defaultMappingsDir.isDirectory()) {

                             addMappings(mappings, defaultMappingsDir);

                         }

                     }

             //处理index的配置(setting)

                     ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder();

                     //加入模板中的setting

                     for ( int i = templates.size() - 1 ; i &gt;= 0 ; i--) {

                         indexSettingsBuilder.put(templates.get(i).settings());

                     }

                     // 加入request中的mapping,request中设置会覆盖模板中的设置

                     indexSettingsBuilder.put(request.settings());

             //处理shard,shard数量不能小于1,因此这里需要特殊处理,如果没有则要使用默认值

                     if (request.index().equals(ScriptService.SCRIPT_INDEX)) {

                         indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1 ));

                     } else {

                         if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null ) {

                             if (request.index().equals(riverIndexName)) {

                                 indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1 ));

                             } else {

                                 indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5 ));

                             }

                         }

                     }

                     if (request.index().equals(ScriptService.SCRIPT_INDEX)) {

                         indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 0 ));

                         indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all" );

                     }

                     else {

                         if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null ) {

                             if (request.index().equals(riverIndexName)) {

                                 indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1 ));

                             } else {

                                 indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1 ));

                             }

                         }

                     }

             //处理副本

                     if (settings.get(SETTING_AUTO_EXPAND_REPLICAS) != null &amp;&amp; indexSettingsBuilder.get(SETTING_AUTO_EXPAND_REPLICAS) == null ) {

                         indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, settings.get(SETTING_AUTO_EXPAND_REPLICAS));

                     }

                     if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null ) {

                         DiscoveryNodes nodes = currentState.nodes();

                         final Version createdVersion = Version.smallest(version, nodes.smallestNonClientNodeVersion());

                         indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion);

                     }

                     if (indexSettingsBuilder.get(SETTING_CREATION_DATE) == null ) {

                         indexSettingsBuilder.put(SETTING_CREATION_DATE, System.currentTimeMillis());

                     }

                     indexSettingsBuilder.put(SETTING_UUID, Strings.randomBase64UUID());

             //创建setting

                     Settings actualIndexSettings = indexSettingsBuilder.build();

             // 通过indiceservice创建索引

                     indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id());

                     indexCreated = true ;

                     //如果创建成功这里就可以获取到对应的indexservice,否则会抛出异常

                     IndexService indexService = indicesService.indexServiceSafe(request.index());

             //获取mappingService试图放置mapping

                     MapperService mapperService = indexService.mapperService();

                     // 为索引添加mapping,首先是默认mapping

                     if (mappings.containsKey(MapperService.DEFAULT_MAPPING)) {

                         try {

                             mapperService.merge(MapperService.DEFAULT_MAPPING, new CompressedString(XContentFactory.jsonBuilder().map(mappings.get(MapperService.DEFAULT_MAPPING)).string()), false );

                         } catch (Exception e) {

                             removalReason = "failed on parsing default mapping on index creation" ;

                             throw new MapperParsingException( "mapping [" + MapperService.DEFAULT_MAPPING + "]" , e);

                         }

                     }

                     for (Map.Entry&lt;String, Map&lt;String, Object&gt;&gt; entry : mappings.entrySet()) {

                         if (entry.getKey().equals(MapperService.DEFAULT_MAPPING)) {

                             continue ;

                         }

                         try {

                             // apply the default here, its the first time we parse it

                             mapperService.merge(entry.getKey(), new CompressedString(XContentFactory.jsonBuilder().map(entry.getValue()).string()), true );

                         } catch (Exception e) {

                             removalReason = "failed on parsing mappings on index creation" ;

                             throw new MapperParsingException( "mapping [" + entry.getKey() + "]" , e);

                         }

                     }

             //添加request中的别称

                     IndexQueryParserService indexQueryParserService = indexService.queryParserService();

                     for (Alias alias : request.aliases()) {

                         if (Strings.hasLength(alias.filter())) {

                             aliasValidator.validateAliasFilter(alias.name(), alias.filter(), indexQueryParserService);

                         }

                     }

                     for (AliasMetaData aliasMetaData : templatesAliases.values()) {

                         if (aliasMetaData.filter() != null ) {

                             aliasValidator.validateAliasFilter(aliasMetaData.alias(), aliasMetaData.filter().uncompressed(), indexQueryParserService);

                         }

                     }

                     // 以下更新Index的matedata,

                     Map&lt;String, MappingMetaData&gt; mappingsMetaData = Maps.newHashMap();

                     for (DocumentMapper mapper : mapperService.docMappers( true )) {

                         MappingMetaData mappingMd = new MappingMetaData(mapper);

                         mappingsMetaData.put(mapper.type(), mappingMd);

                     }

                     final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()).settings(actualIndexSettings);

                     for (MappingMetaData mappingMd : mappingsMetaData.values()) {

                         indexMetaDataBuilder.putMapping(mappingMd);

                     }

                     for (AliasMetaData aliasMetaData : templatesAliases.values()) {

                         indexMetaDataBuilder.putAlias(aliasMetaData);

                     }

                     for (Alias alias : request.aliases()) {

                         AliasMetaData aliasMetaData = AliasMetaData.builder(alias.name()).filter(alias.filter())

                                 .indexRouting(alias.indexRouting()).searchRouting(alias.searchRouting()).build();

                         indexMetaDataBuilder.putAlias(aliasMetaData);

                     }

                     for (Map.Entry&lt;String, Custom&gt; customEntry : customs.entrySet()) {

                         indexMetaDataBuilder.putCustom(customEntry.getKey(), customEntry.getValue());

                     }

                     indexMetaDataBuilder.state(request.state());

             //matedata更新完毕,build新的matedata

                     final IndexMetaData indexMetaData;

                     try {

                         indexMetaData = indexMetaDataBuilder.build();

                     } catch (Exception e) {

                         removalReason = "failed to build index metadata" ;

                         throw e;

                     }

                     indexService.indicesLifecycle().beforeIndexAddedToCluster( new Index(request.index()),

                             indexMetaData.settings());

             //更新集群的matedata,将新build的indexmatadata加入到metadata中

                     MetaData newMetaData = MetaData.builder(currentState.metaData())

                             .put(indexMetaData, false )

                             .build();

                     logger.info( "[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}" , request.index(), request.cause(), templateNames, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet());

             //阻塞集群,更新matadata

                     ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());

                     if (!request.blocks().isEmpty()) {

                         for (ClusterBlock block : request.blocks()) {

                             blocks.addIndexBlock(request.index(), block);

                         }

                     }

                     if (request.state() == State.CLOSE) {

                         blocks.addIndexBlock(request.index(), MetaDataIndexStateService.INDEX_CLOSED_BLOCK);

                     }

                     ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build();

                     if (request.state() == State.OPEN) {

                         RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())

                                 .addAsNew(updatedState.metaData().index(request.index()));

                         RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build());

                         updatedState = ClusterState.builder(updatedState).routingResult(routingResult).build();

                     }

                     removalReason = "cleaning up after validating index on master" ;

                     return updatedState;

                 } finally {

                     if (indexCreated) {

                         // Index was already partially created - need to clean up

                         indicesService.removeIndex(request.index(), removalReason != null ? removalReason : "failed to create index" );

                     }

                 }

             }

         });

     }

以上就是创建index的create方法,方法中主要进行了两个动作:合并更新index的matadata和创建index。更新合并matadata的过程都在上面的代码中体现了。

从indice中获取对应的IndexService

创建索引是调用indiceSerivice构建一个guice的injector,这个injector包含了Index的所有功能(如分词,相似度等)。同时会将其存储到indiceService中,以一个map的格式存储Map<String, Tuple<IndexService, Injector>> indices。运行中的集群每次对某个索引的操作都首先从indice中获取对应的IndexService。

这一部分代码如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

public synchronized IndexService createIndex(String sIndexName, @IndexSettings Settings settings, String localNodeId) throws ElasticsearchException {

         if (!lifecycle.started()) {

             throw new ElasticsearchIllegalStateException( "Can't create an index [" + sIndexName + "], node is closed" );

         }

         Index index = new Index(sIndexName);

     //检测index是否已经存在

         if (indices.containsKey(index.name())) {

             throw new IndexAlreadyExistsException(index);

         }

         indicesLifecycle.beforeIndexCreated(index, settings);

         logger.debug( "creating Index [{}], shards [{}]/[{}]" , sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS));

         Settings indexSettings = settingsBuilder()

                 .put( this .settings)

                 .put(settings)

                 .classLoader(settings.getClassLoader())

                 .build();

     //构建index对应的injector

         ModulesBuilder modules = new ModulesBuilder();

         modules.add( new IndexNameModule(index));

         modules.add( new LocalNodeIdModule(localNodeId));

         modules.add( new IndexSettingsModule(index, indexSettings));

         modules.add( new IndexPluginsModule(indexSettings, pluginsService));

         modules.add( new IndexStoreModule(indexSettings));

         modules.add( new IndexEngineModule(indexSettings));

         modules.add( new AnalysisModule(indexSettings, indicesAnalysisService));

         modules.add( new SimilarityModule(indexSettings));

         modules.add( new IndexCacheModule(indexSettings));

         modules.add( new IndexFieldDataModule(indexSettings));

         modules.add( new CodecModule(indexSettings));

         modules.add( new MapperServiceModule());

         modules.add( new IndexQueryParserModule(indexSettings));

         modules.add( new IndexAliasesServiceModule());

         modules.add( new IndexGatewayModule(indexSettings, injector.getInstance(Gateway. class )));

         modules.add( new IndexModule(indexSettings));

         Injector indexInjector;

         try {

             indexInjector = modules.createChildInjector(injector);

         } catch (CreationException e) {

             throw new IndexCreationException(index, Injectors.getFirstErrorFailure(e));

         } catch (Throwable e) {

             throw new IndexCreationException(index, e);

         }

         IndexService indexService = indexInjector.getInstance(IndexService. class );

         indicesLifecycle.afterIndexCreated(indexService);

      //将Indexservice和IndexInjector加入到indice map中

         indices = newMapBuilder(indices).put(index.name(), new Tuple&lt;&gt;(indexService, indexInjector)).immutableMap();

         return indexService;

     }

以上方法就是具体创建索引的过程,它是在master上操作的,同时它是同步方法。这样才能保证集群的Index创建一致性,因此这也会导致之前所说的大量创建创建索引时候的速度瓶颈。但是创建大量索引的动作是不常见的,需要尽量避免。创建一个索引对于一个集群来说就是开启对于该索引的各种操作,因此这里通过guice将索引的各个功能模块注入,并获得index操作的接口类Indexservice。如果这个方法执行成功,则可以合并template及request中的mapping,并且向刚创建的索引添加合并后的mapping,最后构建新的matadata,并将集群新的matadata发送给各个节点完成索引创建。

总结

索引创建的过程包括三步:更新集群matadata,调用indiceService中创建索引,向新创建的索引中放置(合并到Index对应的mappingService中)mapping。这三步都在以上的两个方法中。完成这三步,集群中就保存了新索引的信息,同时索引配置和mapping放置也完成。索引就可以正常使用。

以上就是elasticsearch索引创建create index集群matedata更新的详细内容,更多关于elasticsearch索引create index matedata更新的资料请关注其它相关文章!

原文链接:https://www.cnblogs.com/zziawanblog/p/6971379.html

查看更多关于elasticsearch索引创建create index集群matedata更新的详细内容...

  阅读:17次