好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

elasticsearch索引index之put mapping的设置分析

mapping的设置过程

mapping机制使得elasticsearch索引数据变的更加灵活,近乎于no schema。mapping可以在建立索引时设置,也可以在后期设置。

后期设置可以是修改mapping(无法对已有的field属性进行修改,一般来说只是增加新的field)或者对没有mapping的索引设置mapping。

put mapping操作必须是master节点来完成,因为它涉及到集群matedata的修改,同时它跟index和type密切相关。修改只是针对特定index的特定type。

在 Action support分析 中我们分析过几种Action的抽象类型,put mapping Action属于TransportMasterNodeOperationAction的子类。

put mapping

它实现了masterOperation方法,每个继承自TransportMasterNodeOperationAction的子类都会根据自己的具体功能来实现这个方法。

这里的实现如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

protected void masterOperation( final PutMappingRequest request, final ClusterState state, final ActionListener<PutMappingResponse> listener) throws ElasticsearchException {

         final String[] concreteIndices = clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices());

       //构造request

         PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest()

                 .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())

                 .indices(concreteIndices).type(request.type())

                 .source(request.source()).ignoreConflicts(request.ignoreConflicts());

       //调用putMapping方法,同时传入一个Listener

         metaDataMappingService.putMapping(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {

             @Override

             public void onResponse(ClusterStateUpdateResponse response) {

                 listener.onResponse( new PutMappingResponse(response.isAcknowledged()));

             }

             @Override

             public void onFailure(Throwable t) {

                 logger.debug( "failed to put mappings on indices [{}], type [{}]" , t, concreteIndices, request.type());

                 listener.onFailure(t);

             }

         });

     }

以上是TransportPutMappingAction对masterOperation方法的实现,这里并没有多少复杂的逻辑和操作。具体操作在matedataMappingService中。

updateTask响应

跟之前的CreateIndex一样,put Mapping也是向master提交一个updateTask。所有逻辑也都在execute方法中。这个task的基本跟CreateIndex一样,也需要在给定的时间内响应。它的代码如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

public void putMapping( final PutMappingClusterStateUpdateRequest request, final ActionListener&lt;ClusterStateUpdateResponse&gt; listener) {

     //提交一个高基本的updateTask

         clusterService.submitStateUpdateTask( "put-mapping [" + request.type() + "]" , Priority.HIGH, new AckedClusterStateUpdateTask&lt;ClusterStateUpdateResponse&gt;(request, listener) {

             @Override

             protected ClusterStateUpdateResponse newResponse( boolean acknowledged) {

                 return new ClusterStateUpdateResponse(acknowledged);

             }

             @Override

             public ClusterState execute( final ClusterState currentState) throws Exception {

                 List&lt;String&gt; indicesToClose = Lists.newArrayList();

                 try {

             //必须针对已经在matadata中存在的index,否则抛出异常

                     for (String index : request.indices()) {

                         if (!currentState.metaData().hasIndex(index)) {

                             throw new IndexMissingException( new Index(index));

                         }

                     }

                     //还需要存在于indices中,否则无法进行操作。所以这里要进行预建

                     for (String index : request.indices()) {

                         if (indicesService.hasIndex(index)) {

                             continue ;

                         }

                         final IndexMetaData indexMetaData = currentState.metaData().index(index);

               //不存在就进行创建

                         IndexService indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), clusterService.localNode().id());

                         indicesToClose.add(indexMetaData.index());

                         // make sure to add custom default mapping if exists

                         if (indexMetaData.mappings().containsKey(MapperService.DEFAULT_MAPPING)) {

                             indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.mappings().get(MapperService.DEFAULT_MAPPING).source(), false );

                         }

                         // only add the current relevant mapping (if exists)

                         if (indexMetaData.mappings().containsKey(request.type())) {

                             indexService.mapperService().merge(request.type(), indexMetaData.mappings().get(request.type()).source(), false );

                         }

                     }

             //合并更新Mapping

                     Map&lt;String, DocumentMapper&gt; newMappers = newHashMap();

                     Map&lt;String, DocumentMapper&gt; existingMappers = newHashMap();

             //针对每个index进行Mapping合并

                     for (String index : request.indices()) {

                         IndexService indexService = indicesService.indexServiceSafe(index);

                         // try and parse it (no need to add it here) so we can bail early in case of parsing exception

                         DocumentMapper newMapper;

                         DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type());

                         if (MapperService.DEFAULT_MAPPING.equals(request.type())) { //存在defaultmapping则合并default mapping

                             // _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default

                             newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source()), false );

                         } else {

                             newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source()), existingMapper == null );

                             if (existingMapper != null ) {

                                 // first, simulate

                                 DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate( true ));

                                 // if we have conflicts, and we are not supposed to ignore them, throw an exception

                                 if (!request.ignoreConflicts() &amp;&amp; mergeResult.hasConflicts()) {

                                     throw new MergeMappingException(mergeResult.conflicts());

                                 }

                             }

                         }

                         newMappers.put(index, newMapper);

                         if (existingMapper != null ) {

                             existingMappers.put(index, existingMapper);

                         }

                     }

                     String mappingType = request.type();

                     if (mappingType == null ) {

                         mappingType = newMappers.values().iterator().next().type();

                     } else if (!mappingType.equals(newMappers.values().iterator().next().type())) {

                         throw new InvalidTypeNameException( "Type name provided does not match type name within mapping definition" );

                     }

                     if (!MapperService.DEFAULT_MAPPING.equals(mappingType) &amp;&amp; !PercolatorService.TYPE_NAME.equals(mappingType) &amp;&amp; mappingType.charAt( 0 ) == '_' ) {

                         throw new InvalidTypeNameException( "Document mapping type name can't start with '_'" );

                     }

                     final Map&lt;String, MappingMetaData&gt; mappings = newHashMap();

                     for (Map.Entry&lt;String, DocumentMapper&gt; entry : newMappers.entrySet()) {

                         String index = entry.getKey();

                         // do the actual merge here on the master, and update the mapping source

                         DocumentMapper newMapper = entry.getValue();

                         IndexService indexService = indicesService.indexService(index);

                         if (indexService == null ) {

                             continue ;

                         }

                         CompressedString existingSource = null ;

                         if (existingMappers.containsKey(entry.getKey())) {

                             existingSource = existingMappers.get(entry.getKey()).mappingSource();

                         }

                         DocumentMapper mergedMapper = indexService.mapperService().merge(newMapper.type(), newMapper.mappingSource(), false );

                         CompressedString updatedSource = mergedMapper.mappingSource();

                         if (existingSource != null ) {

                             if (existingSource.equals(updatedSource)) {

                                 // same source, no changes, ignore it

                             } else {

                                 // use the merged mapping source

                                 mappings.put(index, new MappingMetaData(mergedMapper));

                                 if (logger.isDebugEnabled()) {

                                     logger.debug( "[{}] update_mapping [{}] with source [{}]" , index, mergedMapper.type(), updatedSource);

                                 } else if (logger.isInfoEnabled()) {

                                     logger.info( "[{}] update_mapping [{}]" , index, mergedMapper.type());

                                 }

                             }

                         } else {

                             mappings.put(index, new MappingMetaData(mergedMapper));

                             if (logger.isDebugEnabled()) {

                                 logger.debug( "[{}] create_mapping [{}] with source [{}]" , index, newMapper.type(), updatedSource);

                             } else if (logger.isInfoEnabled()) {

                                 logger.info( "[{}] create_mapping [{}]" , index, newMapper.type());

                             }

                         }

                     }

                     if (mappings.isEmpty()) {

                         // no changes, return

                         return currentState;

                     }

             //根据mapping的更新情况重新生成matadata

                     MetaData.Builder builder = MetaData.builder(currentState.metaData());

                     for (String indexName : request.indices()) {

                         IndexMetaData indexMetaData = currentState.metaData().index(indexName);

                         if (indexMetaData == null ) {

                             throw new IndexMissingException( new Index(indexName));

                         }

                         MappingMetaData mappingMd = mappings.get(indexName);

                         if (mappingMd != null ) {

                             builder.put(IndexMetaData.builder(indexMetaData).putMapping(mappingMd));

                         }

                     }

                     return ClusterState.builder(currentState).metaData(builder).build();

                 } finally {

                     for (String index : indicesToClose) {

                         indicesService.removeIndex(index, "created for mapping processing" );

                     }

                 }

             }

         });

     }

以上就是mapping的设置过程,首先它跟Create index一样,只有master节点才能操作,而且是以task的形式提交给master;其次它的本质是将request中的mapping和index现存的或者是default mapping合并,并最终生成新的matadata更新到集群的各个节点。

总结

集群中的master操作无论是index方面还是集群方面,最终都是集群matadata的更新过程。而这些操作只能在master上进行,并且都是会超时的任务。put mapping当然也不例外。上面的两段代码基本概况了mapping的设置过程。这里就不再重复了。

这里还有一个问题没有涉及到就是mapping的合并。mapping合并会在很多地方用到。在下一节中会它进行详细分析,更多关于elasticsearch索引index put mapping设置的资料请关注其它相关文章!

原文链接:https://www.cnblogs.com/zziawanblog/p/7011367.html

查看更多关于elasticsearch索引index之put mapping的设置分析的详细内容...

  阅读:14次