创建索引更新集群index matedata
创建索引需要创建索引并且更新集群index matedata,这一过程在MetaDataCreateIndexService的createIndex方法中完成。这里会提交一个高优先级,AckedClusterStateUpdateTask类型的task。索引创建需要即时得到反馈,异常这个task需要返回,会超时,而且这个任务的优先级也非常高。
下面具体看一下它的execute方法,这个方法会在master执行任务时调用,这个方法非常长,主要完成以下三个功能:更新合并request,template中的mapping和setting,调用indiceService创建索引,对创建后的索引添加mapping。这一系列功能完成后,合并完成后生成新的matedata,并更新集群状态,完成了索引的创建。具体的调用方法参考 上一篇 。
首先创建index的create方法
代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
@Override public ClusterState execute(ClusterState currentState) throws Exception { boolean indexCreated = false ; String removalReason = null ; try { //检查request的合法性,1.5版本主要检查index名字是否合法,如不能含有某些字符,另外就是集群节点版本 validate(request, currentState); for (Alias alias : request.aliases()) { //检查别称是否合法 aliasValidator.validateAlias(alias, request.index(), currentState.metaData()); } // 查找索引模板 List<IndexTemplateMetaData> templates = findTemplates(request, currentState, indexTemplateFilter); Map<String, Custom> customs = Maps.newHashMap(); // add the request mapping Map<String, Map<String, Object>> mappings = Maps.newHashMap(); Map<String, AliasMetaData> templatesAliases = Maps.newHashMap(); List<String> templateNames = Lists.newArrayList(); //取出request中的mapping配置,虽然mapping可以后面添加,多数情况创建索引的时候还是会附带着mapping,在request中mapping是一个map for (Map.Entry<String, String> entry : request.mappings().entrySet()) { mappings.put(entry.getKey(), parseMapping(entry.getValue())); } //一些预设如warm等 for (Map.Entry<String, Custom> entry : request.customs().entrySet()) { customs.put(entry.getKey(), entry.getValue()); } // 将找到的template和request中的mapping合并 for (IndexTemplateMetaData template : templates) { templateNames.add(template.getName()); for (ObjectObjectCursor<String, CompressedString> cursor : template.mappings()) { if (mappings.containsKey(cursor.key)) { XContentHelper.mergeDefaults(mappings.get(cursor.key), parseMapping(cursor.value.string())); } else { mappings.put(cursor.key, parseMapping(cursor.value.string())); } } // 合并custom for (ObjectObjectCursor<String, Custom> cursor : template.customs()) { String type = cursor.key; IndexMetaData.Custom custom = cursor.value; IndexMetaData.Custom existing = customs.get(type); if (existing == null ) { customs.put(type, custom); } else { IndexMetaData.Custom merged = IndexMetaData.lookupFactorySafe(type).merge(existing, custom); customs.put(type, merged); } } //处理合并别名 for (ObjectObjectCursor<String, AliasMetaData> cursor : template.aliases()) { AliasMetaData aliasMetaData = cursor.value; //if an alias with same name came with the create index request itself, // ignore this one taken from the index template if (request.aliases().contains( new Alias(aliasMetaData.alias()))) { continue ; } //if an alias with same name was already processed, ignore this one if (templatesAliases.containsKey(cursor.key)) { continue ; } //Allow templatesAliases to be templated by replacing a token with the name of the index that we are applying it to if (aliasMetaData.alias().contains( "{index}" )) { String templatedAlias = aliasMetaData.alias().replace( "{index}" , request.index()); aliasMetaData = AliasMetaData.newAliasMetaData(aliasMetaData, templatedAlias); } aliasValidator.validateAliasMetaData(aliasMetaData, request.index(), currentState.metaData()); templatesAliases.put(aliasMetaData.alias(), aliasMetaData); } } // 合并完template和request,现在开始处理配置基本的mapping,合并逻辑跟之前相同,只是mapping来源不同 File mappingsDir = new File(environment.configFile(), "mappings" ); if (mappingsDir.isDirectory()) { // first index level File indexMappingsDir = new File(mappingsDir, request.index()); if (indexMappingsDir.isDirectory()) { addMappings(mappings, indexMappingsDir); } // second is the _default mapping File defaultMappingsDir = new File(mappingsDir, "_default" ); if (defaultMappingsDir.isDirectory()) { addMappings(mappings, defaultMappingsDir); } } //处理index的配置(setting) ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder(); //加入模板中的setting for ( int i = templates.size() - 1 ; i >= 0 ; i--) { indexSettingsBuilder.put(templates.get(i).settings()); } // 加入request中的mapping,request中设置会覆盖模板中的设置 indexSettingsBuilder.put(request.settings()); //处理shard,shard数量不能小于1,因此这里需要特殊处理,如果没有则要使用默认值 if (request.index().equals(ScriptService.SCRIPT_INDEX)) { indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1 )); } else { if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null ) { if (request.index().equals(riverIndexName)) { indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1 )); } else { indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5 )); } } } if (request.index().equals(ScriptService.SCRIPT_INDEX)) { indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 0 )); indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all" ); } else { if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null ) { if (request.index().equals(riverIndexName)) { indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1 )); } else { indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1 )); } } } //处理副本 if (settings.get(SETTING_AUTO_EXPAND_REPLICAS) != null && indexSettingsBuilder.get(SETTING_AUTO_EXPAND_REPLICAS) == null ) { indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, settings.get(SETTING_AUTO_EXPAND_REPLICAS)); } if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null ) { DiscoveryNodes nodes = currentState.nodes(); final Version createdVersion = Version.smallest(version, nodes.smallestNonClientNodeVersion()); indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion); } if (indexSettingsBuilder.get(SETTING_CREATION_DATE) == null ) { indexSettingsBuilder.put(SETTING_CREATION_DATE, System.currentTimeMillis()); } indexSettingsBuilder.put(SETTING_UUID, Strings.randomBase64UUID()); //创建setting Settings actualIndexSettings = indexSettingsBuilder.build(); // 通过indiceservice创建索引 indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id()); indexCreated = true ; //如果创建成功这里就可以获取到对应的indexservice,否则会抛出异常 IndexService indexService = indicesService.indexServiceSafe(request.index()); //获取mappingService试图放置mapping MapperService mapperService = indexService.mapperService(); // 为索引添加mapping,首先是默认mapping if (mappings.containsKey(MapperService.DEFAULT_MAPPING)) { try { mapperService.merge(MapperService.DEFAULT_MAPPING, new CompressedString(XContentFactory.jsonBuilder().map(mappings.get(MapperService.DEFAULT_MAPPING)).string()), false ); } catch (Exception e) { removalReason = "failed on parsing default mapping on index creation" ; throw new MapperParsingException( "mapping [" + MapperService.DEFAULT_MAPPING + "]" , e); } } for (Map.Entry<String, Map<String, Object>> entry : mappings.entrySet()) { if (entry.getKey().equals(MapperService.DEFAULT_MAPPING)) { continue ; } try { // apply the default here, its the first time we parse it mapperService.merge(entry.getKey(), new CompressedString(XContentFactory.jsonBuilder().map(entry.getValue()).string()), true ); } catch (Exception e) { removalReason = "failed on parsing mappings on index creation" ; throw new MapperParsingException( "mapping [" + entry.getKey() + "]" , e); } } //添加request中的别称 IndexQueryParserService indexQueryParserService = indexService.queryParserService(); for (Alias alias : request.aliases()) { if (Strings.hasLength(alias.filter())) { aliasValidator.validateAliasFilter(alias.name(), alias.filter(), indexQueryParserService); } } for (AliasMetaData aliasMetaData : templatesAliases.values()) { if (aliasMetaData.filter() != null ) { aliasValidator.validateAliasFilter(aliasMetaData.alias(), aliasMetaData.filter().uncompressed(), indexQueryParserService); } } // 以下更新Index的matedata, Map<String, MappingMetaData> mappingsMetaData = Maps.newHashMap(); for (DocumentMapper mapper : mapperService.docMappers( true )) { MappingMetaData mappingMd = new MappingMetaData(mapper); mappingsMetaData.put(mapper.type(), mappingMd); } final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()).settings(actualIndexSettings); for (MappingMetaData mappingMd : mappingsMetaData.values()) { indexMetaDataBuilder.putMapping(mappingMd); } for (AliasMetaData aliasMetaData : templatesAliases.values()) { indexMetaDataBuilder.putAlias(aliasMetaData); } for (Alias alias : request.aliases()) { AliasMetaData aliasMetaData = AliasMetaData.builder(alias.name()).filter(alias.filter()) .indexRouting(alias.indexRouting()).searchRouting(alias.searchRouting()).build(); indexMetaDataBuilder.putAlias(aliasMetaData); } for (Map.Entry<String, Custom> customEntry : customs.entrySet()) { indexMetaDataBuilder.putCustom(customEntry.getKey(), customEntry.getValue()); } indexMetaDataBuilder.state(request.state()); //matedata更新完毕,build新的matedata final IndexMetaData indexMetaData; try { indexMetaData = indexMetaDataBuilder.build(); } catch (Exception e) { removalReason = "failed to build index metadata" ; throw e; } indexService.indicesLifecycle().beforeIndexAddedToCluster( new Index(request.index()), indexMetaData.settings()); //更新集群的matedata,将新build的indexmatadata加入到metadata中 MetaData newMetaData = MetaData.builder(currentState.metaData()) .put(indexMetaData, false ) .build(); logger.info( "[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}" , request.index(), request.cause(), templateNames, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet()); //阻塞集群,更新matadata ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); if (!request.blocks().isEmpty()) { for (ClusterBlock block : request.blocks()) { blocks.addIndexBlock(request.index(), block); } } if (request.state() == State.CLOSE) { blocks.addIndexBlock(request.index(), MetaDataIndexStateService.INDEX_CLOSED_BLOCK); } ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build(); if (request.state() == State.OPEN) { RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable()) .addAsNew(updatedState.metaData().index(request.index())); RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build()); updatedState = ClusterState.builder(updatedState).routingResult(routingResult).build(); } removalReason = "cleaning up after validating index on master" ; return updatedState; } finally { if (indexCreated) { // Index was already partially created - need to clean up indicesService.removeIndex(request.index(), removalReason != null ? removalReason : "failed to create index" ); } } } }); } |
以上就是创建index的create方法,方法中主要进行了两个动作:合并更新index的matadata和创建index。更新合并matadata的过程都在上面的代码中体现了。
从indice中获取对应的IndexService
创建索引是调用indiceSerivice构建一个guice的injector,这个injector包含了Index的所有功能(如分词,相似度等)。同时会将其存储到indiceService中,以一个map的格式存储Map<String, Tuple<IndexService, Injector>> indices。运行中的集群每次对某个索引的操作都首先从indice中获取对应的IndexService。
这一部分代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
public synchronized IndexService createIndex(String sIndexName, @IndexSettings Settings settings, String localNodeId) throws ElasticsearchException { if (!lifecycle.started()) { throw new ElasticsearchIllegalStateException( "Can't create an index [" + sIndexName + "], node is closed" ); } Index index = new Index(sIndexName); //检测index是否已经存在 if (indices.containsKey(index.name())) { throw new IndexAlreadyExistsException(index); } indicesLifecycle.beforeIndexCreated(index, settings); logger.debug( "creating Index [{}], shards [{}]/[{}]" , sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS)); Settings indexSettings = settingsBuilder() .put( this .settings) .put(settings) .classLoader(settings.getClassLoader()) .build(); //构建index对应的injector ModulesBuilder modules = new ModulesBuilder(); modules.add( new IndexNameModule(index)); modules.add( new LocalNodeIdModule(localNodeId)); modules.add( new IndexSettingsModule(index, indexSettings)); modules.add( new IndexPluginsModule(indexSettings, pluginsService)); modules.add( new IndexStoreModule(indexSettings)); modules.add( new IndexEngineModule(indexSettings)); modules.add( new AnalysisModule(indexSettings, indicesAnalysisService)); modules.add( new SimilarityModule(indexSettings)); modules.add( new IndexCacheModule(indexSettings)); modules.add( new IndexFieldDataModule(indexSettings)); modules.add( new CodecModule(indexSettings)); modules.add( new MapperServiceModule()); modules.add( new IndexQueryParserModule(indexSettings)); modules.add( new IndexAliasesServiceModule()); modules.add( new IndexGatewayModule(indexSettings, injector.getInstance(Gateway. class ))); modules.add( new IndexModule(indexSettings)); Injector indexInjector; try { indexInjector = modules.createChildInjector(injector); } catch (CreationException e) { throw new IndexCreationException(index, Injectors.getFirstErrorFailure(e)); } catch (Throwable e) { throw new IndexCreationException(index, e); } IndexService indexService = indexInjector.getInstance(IndexService. class ); indicesLifecycle.afterIndexCreated(indexService); //将Indexservice和IndexInjector加入到indice map中 indices = newMapBuilder(indices).put(index.name(), new Tuple<>(indexService, indexInjector)).immutableMap(); return indexService; } |
以上方法就是具体创建索引的过程,它是在master上操作的,同时它是同步方法。这样才能保证集群的Index创建一致性,因此这也会导致之前所说的大量创建创建索引时候的速度瓶颈。但是创建大量索引的动作是不常见的,需要尽量避免。创建一个索引对于一个集群来说就是开启对于该索引的各种操作,因此这里通过guice将索引的各个功能模块注入,并获得index操作的接口类Indexservice。如果这个方法执行成功,则可以合并template及request中的mapping,并且向刚创建的索引添加合并后的mapping,最后构建新的matadata,并将集群新的matadata发送给各个节点完成索引创建。
总结
索引创建的过程包括三步:更新集群matadata,调用indiceService中创建索引,向新创建的索引中放置(合并到Index对应的mappingService中)mapping。这三步都在以上的两个方法中。完成这三步,集群中就保存了新索引的信息,同时索引配置和mapping放置也完成。索引就可以正常使用。
以上就是elasticsearch索引创建create index集群matedata更新的详细内容,更多关于elasticsearch索引create index matedata更新的资料请关注其它相关文章!
原文链接:https://www.cnblogs.com/zziawanblog/p/6971379.html
查看更多关于elasticsearch索引创建create index集群matedata更新的详细内容...