Skip to content

Commit

Permalink
Short-Circuit NOOP Mapping Updates Earlier (#77574)
Browse files Browse the repository at this point in the history
No need to actually run noop mapping updates or create a new document mapper
if nothing has changed.
  • Loading branch information
original-brownbear authored Sep 13, 2021
1 parent 553e8dc commit 8efbe22
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@
package org.elasticsearch.action.admin.indices.mapping.put;

import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
import org.elasticsearch.common.compress.CompressedXContent;

import java.io.IOException;

/**
* Cluster state update request that allows to put a mapping
*/
public class PutMappingClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<PutMappingClusterStateUpdateRequest> {

private final String source;
private final CompressedXContent source;

public PutMappingClusterStateUpdateRequest(String source) {
this.source = source;
public PutMappingClusterStateUpdateRequest(String source) throws IOException {
this.source = new CompressedXContent(source);
}

public String source() {
public CompressedXContent source() {
return source;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -125,15 +126,22 @@ static void performMappingUpdate(Index[] concreteIndices,
PutMappingRequest request,
ActionListener<AcknowledgedResponse> listener,
MetadataMappingService metadataMappingService) {
PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest(request.source())
.indices(concreteIndices)
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());

metadataMappingService.putMapping(updateRequest, listener.delegateResponse((l, e) -> {
final ActionListener<AcknowledgedResponse> wrappedListener = listener.delegateResponse((l, e) -> {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]",
Arrays.asList(concreteIndices)), e);
l.onFailure(e);
}));
});
final PutMappingClusterStateUpdateRequest updateRequest;
try {
updateRequest = new PutMappingClusterStateUpdateRequest(request.source())
.indices(concreteIndices)
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());
} catch (IOException e) {
wrappedListener.onFailure(e);
return;
}

metadataMappingService.putMapping(updateRequest, wrappedListener);
}

static String checkForSystemIndexViolations(SystemIndices systemIndices, Index[] concreteIndices, PutMappingRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.indices.IndicesService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -89,17 +88,20 @@ class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterSt
}

private ClusterState applyRequest(ClusterState currentState, PutMappingClusterStateUpdateRequest request,
Map<Index, MapperService> indexMapperServices) throws IOException {
Map<Index, MapperService> indexMapperServices) {

CompressedXContent mappingUpdateSource = new CompressedXContent(request.source());
final CompressedXContent mappingUpdateSource = request.source();
final Metadata metadata = currentState.metadata();
final List<IndexMetadata> updateList = new ArrayList<>();
for (Index index : request.indices()) {
MapperService mapperService = indexMapperServices.get(index);
// IMPORTANT: always get the metadata from the state since it get's batched
// and if we pull it from the indexService we might miss an update etc.
final IndexMetadata indexMetadata = currentState.getMetadata().getIndexSafe(index);

DocumentMapper existingMapper = mapperService.documentMapper();
if (existingMapper != null && existingMapper.mappingSource().equals(mappingUpdateSource)) {
continue;
}
// this is paranoia... just to be sure we use the exact same metadata tuple on the update that
// we used for the validation, it makes this mechanism little less scary (a little)
updateList.add(indexMetadata);
Expand Down Expand Up @@ -176,6 +178,32 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
}

public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
final Metadata metadata = clusterService.state().metadata();
boolean noop = true;
for (Index index : request.indices()) {
final IndexMetadata indexMetadata = metadata.index(index);
if (indexMetadata == null) {
// local store recovery sends a mapping update request during application of a cluster state on t he data node which
// might we receive here before the CS update that created the index has been applied on all nodes and thus the index
// isn't found in the state yet but will be visible to the CS update below
noop = false;
break;
}
final MappingMetadata mappingMetadata = indexMetadata.mapping();
if (mappingMetadata == null) {
noop = false;
break;
}
if (request.source().equals(mappingMetadata.source()) == false) {
noop = false;
break;
}
}
if (noop) {
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}

clusterService.submitStateUpdateTask("put-mapping " + Strings.arrayToCommaDelimitedString(request.indices()),
request,
ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,22 @@ public DocumentMapper merge(String type, CompressedXContent mappingSource, Merge
return mergeAndApplyMappings(type, mappingSource, reason);
}

private synchronized DocumentMapper mergeAndApplyMappings(String mappingType, CompressedXContent mappingSource, MergeReason reason) {
Mapping incomingMapping = parseMapping(mappingType, mappingSource);
Mapping mapping = mergeMappings(this.mapper, incomingMapping, reason);
DocumentMapper newMapper = newDocumentMapper(mapping, reason);
if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
private DocumentMapper mergeAndApplyMappings(String mappingType, CompressedXContent mappingSource, MergeReason reason) {
final DocumentMapper currentMapper = this.mapper;
if (currentMapper != null && currentMapper.mappingSource().equals(mappingSource)) {
return currentMapper;
}
synchronized (this) {
Mapping incomingMapping = parseMapping(mappingType, mappingSource);
Mapping mapping = mergeMappings(this.mapper, incomingMapping, reason);
DocumentMapper newMapper = newDocumentMapper(mapping, reason);
if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
return newMapper;
}
this.mapper = newMapper;
assert assertSerialization(newMapper);
return newMapper;
}
this.mapper = newMapper;
assert assertSerialization(newMapper);
return newMapper;
}

private DocumentMapper newDocumentMapper(Mapping mapping, MergeReason reason) {
Expand Down

0 comments on commit 8efbe22

Please sign in to comment.