elasticsearch索引创建create index集群matedata更新的方法
本文小編為大家詳細介紹“elasticsearch索引創建createindex集群matedata更新的方法”,內容詳細,步驟清晰,細節處理妥當,希望這篇“elasticsearch索引創建createindex集群matedata更新的方法”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
更新集群index matedata
創建索引需要創建索引并且更新集群index matedata,這一過程在MetaDataCreateIndexService的createIndex方法中完成。這里會提交一個高優先級,AckedClusterStateUpdateTask類型的task。索引創建需要即時得到反饋,異常這個task需要返回,會超時,而且這個任務的優先級也非常高。
下面具體看一下它的execute方法,這個方法會在master執行任務時調用,這個方法非常長,主要完成以下三個功能:更新合并request,template中的mapping和setting,調用indiceService創建索引,對創建后的索引添加mapping。這一系列功能完成后,合并完成后生成新的matedata,并更新集群狀態,完成了索引的創建。
首先創建index的create方法
代碼如下所示:
@Override
publicClusterStateexecute(ClusterStatecurrentState)throwsException{
booleanindexCreated=false;
StringremovalReason=null;
try{
//檢查request的合法性,1.5版本主要檢查index名字是否合法,如不能含有某些字符,另外就是集群節點版本
validate(request,currentState);
for(Aliasalias:request.aliases()){//檢查別稱是否合法
aliasValidator.validateAlias(alias,request.index(),currentState.metaData());
}
//查找索引模板
List<IndexTemplateMetaData>templates=findTemplates(request,currentState,indexTemplateFilter);
Map<String,Custom>customs=Maps.newHashMap();
//addtherequestmapping
Map<String,Map<String,Object>>mappings=Maps.newHashMap();
Map<String,AliasMetaData>templatesAliases=Maps.newHashMap();
List<String>templateNames=Lists.newArrayList();
//取出request中的mapping配置,雖然mapping可以后面添加,多數情況創建索引的時候還是會附帶著mapping,在request中mapping是一個map
for(Map.Entry<String,String>entry:request.mappings().entrySet()){
mappings.put(entry.getKey(),parseMapping(entry.getValue()));
}
//一些預設如warm等
for(Map.Entry<String,Custom>entry:request.customs().entrySet()){
customs.put(entry.getKey(),entry.getValue());
}
//將找到的template和request中的mapping合并
for(IndexTemplateMetaDatatemplate:templates){
templateNames.add(template.getName());
for(ObjectObjectCursor<String,CompressedString>cursor:template.mappings()){
if(mappings.containsKey(cursor.key)){
XContentHelper.mergeDefaults(mappings.get(cursor.key),parseMapping(cursor.value.string()));
}else{
mappings.put(cursor.key,parseMapping(cursor.value.string()));
}
}
//合并custom
for(ObjectObjectCursor<String,Custom>cursor:template.customs()){
Stringtype=cursor.key;
IndexMetaData.Customcustom=cursor.value;
IndexMetaData.Customexisting=customs.get(type);
if(existing==null){
customs.put(type,custom);
}else{
IndexMetaData.Custommerged=IndexMetaData.lookupFactorySafe(type).merge(existing,custom);
customs.put(type,merged);
}
}
//處理合并別名
for(ObjectObjectCursor<String,AliasMetaData>cursor:template.aliases()){
AliasMetaDataaliasMetaData=cursor.value;
//ifanaliaswithsamenamecamewiththecreateindexrequestitself,
//ignorethisonetakenfromtheindextemplate
if(request.aliases().contains(newAlias(aliasMetaData.alias()))){
continue;
}
//ifanaliaswithsamenamewasalreadyprocessed,ignorethisone
if(templatesAliases.containsKey(cursor.key)){
continue;
}
//AllowtemplatesAliasestobetemplatedbyreplacingatokenwiththenameoftheindexthatweareapplyingitto
if(aliasMetaData.alias().contains("{index}")){
StringtemplatedAlias=aliasMetaData.alias().replace("{index}",request.index());
aliasMetaData=AliasMetaData.newAliasMetaData(aliasMetaData,templatedAlias);
}
aliasValidator.validateAliasMetaData(aliasMetaData,request.index(),currentState.metaData());
templatesAliases.put(aliasMetaData.alias(),aliasMetaData);
}
}
//合并完template和request,現在開始處理配置基本的mapping,合并邏輯跟之前相同,只是mapping來源不同
FilemappingsDir=newFile(environment.configFile(),"mappings");
if(mappingsDir.isDirectory()){
//firstindexlevel
FileindexMappingsDir=newFile(mappingsDir,request.index());
if(indexMappingsDir.isDirectory()){
addMappings(mappings,indexMappingsDir);
}
//secondisthe_defaultmapping
FiledefaultMappingsDir=newFile(mappingsDir,"_default");
if(defaultMappingsDir.isDirectory()){
addMappings(mappings,defaultMappingsDir);
}
}
//處理index的配置(setting)
ImmutableSettings.BuilderindexSettingsBuilder=settingsBuilder();
//加入模板中的setting
for(inti=templates.size()-1;i>=0;i--){
indexSettingsBuilder.put(templates.get(i).settings());
}
//加入request中的mapping,request中設置會覆蓋模板中的設置
indexSettingsBuilder.put(request.settings());
//處理shard,shard數量不能小于1,因此這里需要特殊處理,如果沒有則要使用默認值
if(request.index().equals(ScriptService.SCRIPT_INDEX)){
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS,settings.getAsInt(SETTING_NUMBER_OF_SHARDS,1));
}else{
if(indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS)==null){
if(request.index().equals(riverIndexName)){
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS,settings.getAsInt(SETTING_NUMBER_OF_SHARDS,1));
}else{
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS,settings.getAsInt(SETTING_NUMBER_OF_SHARDS,5));
}
}
}
if(request.index().equals(ScriptService.SCRIPT_INDEX)){
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS,settings.getAsInt(SETTING_NUMBER_OF_REPLICAS,0));
indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS,"0-all");
}
else{
if(indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS)==null){
if(request.index().equals(riverIndexName)){
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS,settings.getAsInt(SETTING_NUMBER_OF_REPLICAS,1));
}else{
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS,settings.getAsInt(SETTING_NUMBER_OF_REPLICAS,1));
}
}
}
//處理副本
if(settings.get(SETTING_AUTO_EXPAND_REPLICAS)!=null&&indexSettingsBuilder.get(SETTING_AUTO_EXPAND_REPLICAS)==null){
indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS,settings.get(SETTING_AUTO_EXPAND_REPLICAS));
}
if(indexSettingsBuilder.get(SETTING_VERSION_CREATED)==null){
DiscoveryNodesnodes=currentState.nodes();
finalVersioncreatedVersion=Version.smallest(version,nodes.smallestNonClientNodeVersion());
indexSettingsBuilder.put(SETTING_VERSION_CREATED,createdVersion);
}
if(indexSettingsBuilder.get(SETTING_CREATION_DATE)==null){
indexSettingsBuilder.put(SETTING_CREATION_DATE,System.currentTimeMillis());
}
indexSettingsBuilder.put(SETTING_UUID,Strings.randomBase64UUID());
//創建setting
SettingsactualIndexSettings=indexSettingsBuilder.build();
//通過indiceservice創建索引
indicesService.createIndex(request.index(),actualIndexSettings,clusterService.localNode().id());
indexCreated=true;
//如果創建成功這里就可以獲取到對應的indexservice,否則會拋出異常
IndexServiceindexService=indicesService.indexServiceSafe(request.index());
//獲取mappingService試圖放置mapping
MapperServicemapperService=indexService.mapperService();
//為索引添加mapping,首先是默認mapping
if(mappings.containsKey(MapperService.DEFAULT_MAPPING)){
try{
mapperService.merge(MapperService.DEFAULT_MAPPING,newCompressedString(XContentFactory.jsonBuilder().map(mappings.get(MapperService.DEFAULT_MAPPING)).string()),false);
}catch(Exceptione){
removalReason="failedonparsingdefaultmappingonindexcreation";
thrownewMapperParsingException("mapping["+MapperService.DEFAULT_MAPPING+"]",e);
}
}
for(Map.Entry<String,Map<String,Object>>entry:mappings.entrySet()){
if(entry.getKey().equals(MapperService.DEFAULT_MAPPING)){
continue;
}
try{
//applythedefaulthere,itsthefirsttimeweparseit
mapperService.merge(entry.getKey(),newCompressedString(XContentFactory.jsonBuilder().map(entry.getValue()).string()),true);
}catch(Exceptione){
removalReason="failedonparsingmappingsonindexcreation";
thrownewMapperParsingException("mapping["+entry.getKey()+"]",e);
}
}
//添加request中的別稱
IndexQueryParserServiceindexQueryParserService=indexService.queryParserService();
for(Aliasalias:request.aliases()){
if(Strings.hasLength(alias.filter())){
aliasValidator.validateAliasFilter(alias.name(),alias.filter(),indexQueryParserService);
}
}
for(AliasMetaDataaliasMetaData:templatesAliases.values()){
if(aliasMetaData.filter()!=null){
aliasValidator.validateAliasFilter(aliasMetaData.alias(),aliasMetaData.filter().uncompressed(),indexQueryParserService);
}
}
//以下更新Index的matedata,
Map<String,MappingMetaData>mappingsMetaData=Maps.newHashMap();
for(DocumentMappermapper:mapperService.docMappers(true)){
MappingMetaDatamappingMd=newMappingMetaData(mapper);
mappingsMetaData.put(mapper.type(),mappingMd);
}
finalIndexMetaData.BuilderindexMetaDataBuilder=IndexMetaData.builder(request.index()).settings(actualIndexSettings);
for(MappingMetaDatamappingMd:mappingsMetaData.values()){
indexMetaDataBuilder.putMapping(mappingMd);
}
for(AliasMetaDataaliasMetaData:templatesAliases.values()){
indexMetaDataBuilder.putAlias(aliasMetaData);
}
for(Aliasalias:request.aliases()){
AliasMetaDataaliasMetaData=AliasMetaData.builder(alias.name()).filter(alias.filter())
.indexRouting(alias.indexRouting()).searchRouting(alias.searchRouting()).build();
indexMetaDataBuilder.putAlias(aliasMetaData);
}
for(Map.Entry<String,Custom>customEntry:customs.entrySet()){
indexMetaDataBuilder.putCustom(customEntry.getKey(),customEntry.getValue());
}
indexMetaDataBuilder.state(request.state());
//matedata更新完畢,build新的matedata
finalIndexMetaDataindexMetaData;
try{
indexMetaData=indexMetaDataBuilder.build();
}catch(Exceptione){
removalReason="failedtobuildindexmetadata";
throwe;
}
indexService.indicesLifecycle().beforeIndexAddedToCluster(newIndex(request.index()),
indexMetaData.settings());
//更新集群的matedata,將新build的indexmatadata加入到metadata中
MetaDatanewMetaData=MetaData.builder(currentState.metaData())
.put(indexMetaData,false)
.build();
logger.info("[{}]creatingindex,cause[{}],templates{},shards[{}]/[{}],mappings{}",request.index(),request.cause(),templateNames,indexMetaData.numberOfShards(),indexMetaData.numberOfReplicas(),mappings.keySet());
//阻塞集群,更新matadata
ClusterBlocks.Builderblocks=ClusterBlocks.builder().blocks(currentState.blocks());
if(!request.blocks().isEmpty()){
for(ClusterBlockblock:request.blocks()){
blocks.addIndexBlock(request.index(),block);
}
}
if(request.state()==State.CLOSE){
blocks.addIndexBlock(request.index(),MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
}
ClusterStateupdatedState=ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build();
if(request.state()==State.OPEN){
RoutingTable.BuilderroutingTableBuilder=RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metaData().index(request.index()));
RoutingAllocation.ResultroutingResult=allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build());
updatedState=ClusterState.builder(updatedState).routingResult(routingResult).build();
}
removalReason="cleaningupaftervalidatingindexonmaster";
returnupdatedState;
}finally{
if(indexCreated){
//Indexwasalreadypartiallycreated-needtocleanup
indicesService.removeIndex(request.index(),removalReason!=null?removalReason:"failedtocreateindex");
}
}
}
});
}
以上就是創建index的create方法,方法中主要進行了兩個動作:合并更新index的matadata和創建index。更新合并matadata的過程都在上面的代碼中體現了。
從indice中獲取對應的IndexService
創建索引是調用indiceSerivice構建一個guice的injector,這個injector包含了Index的所有功能(如分詞,相似度等)。同時會將其存儲到indiceService中,以一個map的格式存儲Map<String, Tuple<IndexService, Injector>> indices。運行中的集群每次對某個索引的操作都首先從indice中獲取對應的IndexService。
這一部分代碼如下所示:
publicsynchronizedIndexServicecreateIndex(StringsIndexName,@IndexSettingsSettingssettings,StringlocalNodeId)throwsElasticsearchException{
if(!lifecycle.started()){
thrownewElasticsearchIllegalStateException("Can'tcreateanindex["+sIndexName+"],nodeisclosed");
}
Indexindex=newIndex(sIndexName);
//檢測index是否已經存在
if(indices.containsKey(index.name())){
thrownewIndexAlreadyExistsException(index);
}
indicesLifecycle.beforeIndexCreated(index,settings);
logger.debug("creatingIndex[{}],shards[{}]/[{}]",sIndexName,settings.get(SETTING_NUMBER_OF_SHARDS),settings.get(SETTING_NUMBER_OF_REPLICAS));
SettingsindexSettings=settingsBuilder()
.put(this.settings)
.put(settings)
.classLoader(settings.getClassLoader())
.build();
//構建index對應的injector
ModulesBuildermodules=newModulesBuilder();
modules.add(newIndexNameModule(index));
modules.add(newLocalNodeIdModule(localNodeId));
modules.add(newIndexSettingsModule(index,indexSettings));
modules.add(newIndexPluginsModule(indexSettings,pluginsService));
modules.add(newIndexStoreModule(indexSettings));
modules.add(newIndexEngineModule(indexSettings));
modules.add(newAnalysisModule(indexSettings,indicesAnalysisService));
modules.add(newSimilarityModule(indexSettings));
modules.add(newIndexCacheModule(indexSettings));
modules.add(newIndexFieldDataModule(indexSettings));
modules.add(newCodecModule(indexSettings));
modules.add(newMapperServiceModule());
modules.add(newIndexQueryParserModule(indexSettings));
modules.add(newIndexAliasesServiceModule());
modules.add(newIndexGatewayModule(indexSettings,injector.getInstance(Gateway.class)));
modules.add(newIndexModule(indexSettings));
InjectorindexInjector;
try{
indexInjector=modules.createChildInjector(injector);
}catch(CreationExceptione){
thrownewIndexCreationException(index,Injectors.getFirstErrorFailure(e));
}catch(Throwablee){
thrownewIndexCreationException(index,e);
}
IndexServiceindexService=indexInjector.getInstance(IndexService.class);
indicesLifecycle.afterIndexCreated(indexService);
//將Indexservice和IndexInjector加入到indicemap中
indices=newMapBuilder(indices).put(index.name(),newTuple<>(indexService,indexInjector)).immutableMap();
returnindexService;
}
以上方法就是具體創建索引的過程,它是在master上操作的,同時它是同步方法。這樣才能保證集群的Index創建一致性,因此這也會導致之前所說的大量創建創建索引時候的速度瓶頸。但是創建大量索引的動作是不常見的,需要盡量避免。創建一個索引對于一個集群來說就是開啟對于該索引的各種操作,因此這里通過guice將索引的各個功能模塊注入,并獲得index操作的接口類Indexservice。如果這個方法執行成功,則可以合并template及request中的mapping,并且向剛創建的索引添加合并后的mapping,最后構建新的matadata,并將集群新的matadata發送給各個節點完成索引創建。
總結
以上是生活随笔為你收集整理的elasticsearch索引创建create index集群matedata更新的方法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: oracle 052考试,Oracle
- 下一篇: php基础不好,基础不好,问个php类调