Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Short-Circuit NOOP Mapping Updates Earlier #77574

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@
package org.elasticsearch.action.admin.indices.mapping.put;

import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
import org.elasticsearch.common.compress.CompressedXContent;

import java.io.IOException;

/**
* Cluster state update request that allows to put a mapping
*/
public class PutMappingClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<PutMappingClusterStateUpdateRequest> {

private final String source;
private final CompressedXContent source;

public PutMappingClusterStateUpdateRequest(String source) {
this.source = source;
public PutMappingClusterStateUpdateRequest(String source) throws IOException {
this.source = new CompressedXContent(source);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should take the action off the transport thread, seems like an action that do not need to run on transport and now that we compress, we do a bit (though tiny) of real work here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this, but it seemed like the wrong approach. Either, and this is true IMO, the compression cost is so tiny here that it doesn't matter really. Or if it does indeed matter, the better fix seems to be to just serialize the PutMappingRequest with the mapping in compressed form right away (thus making it smaller and easier to read from the wire as well).

}

public String source() {
public CompressedXContent source() {
return source;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -125,15 +126,22 @@ static void performMappingUpdate(Index[] concreteIndices,
PutMappingRequest request,
ActionListener<AcknowledgedResponse> listener,
MetadataMappingService metadataMappingService) {
PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest(request.source())
.indices(concreteIndices)
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());

metadataMappingService.putMapping(updateRequest, listener.delegateResponse((l, e) -> {
final ActionListener<AcknowledgedResponse> wrappedListener = listener.delegateResponse((l, e) -> {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]",
Arrays.asList(concreteIndices)), e);
l.onFailure(e);
}));
});
final PutMappingClusterStateUpdateRequest updateRequest;
try {
updateRequest = new PutMappingClusterStateUpdateRequest(request.source())
.indices(concreteIndices)
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());
} catch (IOException e) {
wrappedListener.onFailure(e);
return;
}

metadataMappingService.putMapping(updateRequest, wrappedListener);
}

static String checkForSystemIndexViolations(SystemIndices systemIndices, Index[] concreteIndices, PutMappingRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.indices.IndicesService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -89,17 +88,20 @@ class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterSt
}

private ClusterState applyRequest(ClusterState currentState, PutMappingClusterStateUpdateRequest request,
Map<Index, MapperService> indexMapperServices) throws IOException {
Map<Index, MapperService> indexMapperServices) {

CompressedXContent mappingUpdateSource = new CompressedXContent(request.source());
final CompressedXContent mappingUpdateSource = request.source();
final Metadata metadata = currentState.metadata();
final List<IndexMetadata> updateList = new ArrayList<>();
for (Index index : request.indices()) {
MapperService mapperService = indexMapperServices.get(index);
// IMPORTANT: always get the metadata from the state since it get's batched
// and if we pull it from the indexService we might miss an update etc.
final IndexMetadata indexMetadata = currentState.getMetadata().getIndexSafe(index);

DocumentMapper existingMapper = mapperService.documentMapper();
if (existingMapper != null && existingMapper.mappingSource().equals(mappingUpdateSource)) {
continue;
}
// this is paranoia... just to be sure we use the exact same metadata tuple on the update that
// we used for the validation, it makes this mechanism little less scary (a little)
updateList.add(indexMetadata);
Expand Down Expand Up @@ -176,6 +178,32 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
}

public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
final Metadata metadata = clusterService.state().metadata();
boolean noop = true;
for (Index index : request.indices()) {
final IndexMetadata indexMetadata = metadata.index(index);
if (indexMetadata == null) {
// local store recovery sends a mapping update request during application of a cluster state on t he data node which
// might we receive here before the CS update that created the index has been applied on all nodes and thus the index
// isn't found in the state yet but will be visible to the CS update below
noop = false;
break;
}
final MappingMetadata mappingMetadata = indexMetadata.mapping();
if (mappingMetadata == null) {
noop = false;
break;
}
if (request.source().equals(mappingMetadata.source()) == false) {
noop = false;
break;
}
}
if (noop) {
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}

clusterService.submitStateUpdateTask("put-mapping " + Strings.arrayToCommaDelimitedString(request.indices()),
request,
ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,22 @@ public DocumentMapper merge(String type, CompressedXContent mappingSource, Merge
return mergeAndApplyMappings(type, mappingSource, reason);
}

private synchronized DocumentMapper mergeAndApplyMappings(String mappingType, CompressedXContent mappingSource, MergeReason reason) {
Mapping incomingMapping = parseMapping(mappingType, mappingSource);
Mapping mapping = mergeMappings(this.mapper, incomingMapping, reason);
DocumentMapper newMapper = newDocumentMapper(mapping, reason);
if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
private DocumentMapper mergeAndApplyMappings(String mappingType, CompressedXContent mappingSource, MergeReason reason) {
final DocumentMapper currentMapper = this.mapper;
if (currentMapper != null && currentMapper.mappingSource().equals(mappingSource)) {
return currentMapper;
}
synchronized (this) {
Mapping incomingMapping = parseMapping(mappingType, mappingSource);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be outside the synchronized block as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically I think so yes. But the parsing code does reference this.mapper in some spots so I decided to keep it this way for easy to understand correctness. Will try to look into a follow-up to this that cleans up the synchronization here a little :)

Mapping mapping = mergeMappings(this.mapper, incomingMapping, reason);
DocumentMapper newMapper = newDocumentMapper(mapping, reason);
if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
return newMapper;
}
this.mapper = newMapper;
assert assertSerialization(newMapper);
return newMapper;
}
this.mapper = newMapper;
assert assertSerialization(newMapper);
return newMapper;
}

private DocumentMapper newDocumentMapper(Mapping mapping, MergeReason reason) {
Expand Down