diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java index b5c15d9432b30..e59653f4d8d29 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java @@ -9,19 +9,22 @@ package org.elasticsearch.action.admin.indices.mapping.put; import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest; +import org.elasticsearch.common.compress.CompressedXContent; + +import java.io.IOException; /** * Cluster state update request that allows to put a mapping */ public class PutMappingClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest { - private final String source; + private final CompressedXContent source; - public PutMappingClusterStateUpdateRequest(String source) { - this.source = source; + public PutMappingClusterStateUpdateRequest(String source) throws IOException { + this.source = new CompressedXContent(source); } - public String source() { + public CompressedXContent source() { return source; } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index d43e78ffe7bf6..cc93cc7169ce2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -125,15 +126,22 @@ static void performMappingUpdate(Index[] concreteIndices, PutMappingRequest request, ActionListener listener, MetadataMappingService metadataMappingService) { - PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest(request.source()) - .indices(concreteIndices) - .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()); - - metadataMappingService.putMapping(updateRequest, listener.delegateResponse((l, e) -> { + final ActionListener wrappedListener = listener.delegateResponse((l, e) -> { logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]", Arrays.asList(concreteIndices)), e); l.onFailure(e); - })); + }); + final PutMappingClusterStateUpdateRequest updateRequest; + try { + updateRequest = new PutMappingClusterStateUpdateRequest(request.source()) + .indices(concreteIndices) + .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()); + } catch (IOException e) { + wrappedListener.onFailure(e); + return; + } + + metadataMappingService.putMapping(updateRequest, wrappedListener); } static String checkForSystemIndexViolations(SystemIndices systemIndices, Index[] concreteIndices, PutMappingRequest request) { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java index bed52d2cb0c90..e75391b242544 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java @@ -33,7 +33,6 @@ import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.indices.IndicesService; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -89,9 +88,9 @@ class PutMappingExecutor implements ClusterStateTaskExecutor indexMapperServices) throws IOException { + Map indexMapperServices) { - CompressedXContent mappingUpdateSource = new CompressedXContent(request.source()); + final CompressedXContent mappingUpdateSource = request.source(); final Metadata metadata = currentState.metadata(); final List updateList = new ArrayList<>(); for (Index index : request.indices()) { @@ -99,7 +98,10 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt // IMPORTANT: always get the metadata from the state since it get's batched // and if we pull it from the indexService we might miss an update etc. final IndexMetadata indexMetadata = currentState.getMetadata().getIndexSafe(index); - + DocumentMapper existingMapper = mapperService.documentMapper(); + if (existingMapper != null && existingMapper.mappingSource().equals(mappingUpdateSource)) { + continue; + } // this is paranoia... just to be sure we use the exact same metadata tuple on the update that // we used for the validation, it makes this mechanism little less scary (a little) updateList.add(indexMetadata); @@ -176,6 +178,32 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt } public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener listener) { + final Metadata metadata = clusterService.state().metadata(); + boolean noop = true; + for (Index index : request.indices()) { + final IndexMetadata indexMetadata = metadata.index(index); + if (indexMetadata == null) { + // local store recovery sends a mapping update request during application of a cluster state on t he data node which + // might we receive here before the CS update that created the index has been applied on all nodes and thus the index + // isn't found in the state yet but will be visible to the CS update below + noop = false; + break; + } + final MappingMetadata mappingMetadata = indexMetadata.mapping(); + if (mappingMetadata == null) { + noop = false; + break; + } + if (request.source().equals(mappingMetadata.source()) == false) { + noop = false; + break; + } + } + if (noop) { + listener.onResponse(AcknowledgedResponse.TRUE); + return; + } + clusterService.submitStateUpdateTask("put-mapping " + Strings.arrayToCommaDelimitedString(request.indices()), request, ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()), diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java index d18588c09a38e..b16e545674806 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -271,16 +271,22 @@ public DocumentMapper merge(String type, CompressedXContent mappingSource, Merge return mergeAndApplyMappings(type, mappingSource, reason); } - private synchronized DocumentMapper mergeAndApplyMappings(String mappingType, CompressedXContent mappingSource, MergeReason reason) { - Mapping incomingMapping = parseMapping(mappingType, mappingSource); - Mapping mapping = mergeMappings(this.mapper, incomingMapping, reason); - DocumentMapper newMapper = newDocumentMapper(mapping, reason); - if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) { + private DocumentMapper mergeAndApplyMappings(String mappingType, CompressedXContent mappingSource, MergeReason reason) { + final DocumentMapper currentMapper = this.mapper; + if (currentMapper != null && currentMapper.mappingSource().equals(mappingSource)) { + return currentMapper; + } + synchronized (this) { + Mapping incomingMapping = parseMapping(mappingType, mappingSource); + Mapping mapping = mergeMappings(this.mapper, incomingMapping, reason); + DocumentMapper newMapper = newDocumentMapper(mapping, reason); + if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) { + return newMapper; + } + this.mapper = newMapper; + assert assertSerialization(newMapper); return newMapper; } - this.mapper = newMapper; - assert assertSerialization(newMapper); - return newMapper; } private DocumentMapper newDocumentMapper(Mapping mapping, MergeReason reason) {