Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional shards routing info in ShardSearchRequest #29533

Merged
merged 11 commits into from
Apr 26, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.transport.Transport;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -128,17 +129,17 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
onPhaseFailure(currentPhase, "all shards failed", cause);
} else {
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && shardFailures.get() != null ){
if (logger.isDebugEnabled()) {
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
Throwable cause = shardSearchFailures.length == 0 ? null :
ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
shardSearchFailures.length, getName()), cause);
}
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
Expand Down Expand Up @@ -271,14 +272,14 @@ public final SearchRequest getRequest() {

@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {

ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && failures.length > 0){
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}

return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
skippedOps.get(), buildTookInMillis(), failures, clusters);
}
Expand Down Expand Up @@ -318,8 +319,11 @@ public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIter
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias);
int[] indexShards = getIndexShards(shardIt.shardId().getIndex());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is quite a complex operation since we call if for every shard and then do consume the entire iterator again. I wonder if we can pre-sort the ShardRoutings in SearchShardIterator and then calculate this on the fly and simply call SearchShardIterator#getIndexShardOrdinal() to get it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the logic to compute the needed informations only once in InitialSearchPhase constructor. I need the complete GroupShardsIterator to do so which is why it's not in SearchShardIterator but it's the same idea:
b90716c

int remapShardId = Arrays.binarySearch(indexShards, shardIt.shardId().getId());
assert remapShardId >= 0;
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), remapShardId,
indexShards.length, getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -131,7 +134,7 @@ public final void run() throws IOException {
if (shardsIts.size() > 0) {
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
assert success;
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
Expand All @@ -140,7 +143,7 @@ public final void run() throws IOException {
final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) {
if(missingShards.length() >0 ){
missingShards.append(", ");
missingShards.append(", ");
}
missingShards.append(shardRoutings.shardId());
}
Expand Down Expand Up @@ -377,4 +380,18 @@ protected void skipShard(SearchShardIterator iterator) {
successfulShardExecution(iterator);
}

/**
* Returns the list of shard ids in the request that match the provided {@link Index}.
*/
protected int[] getIndexShards(Index index) {
List<Integer> shards = new ArrayList<>();
for (ShardIterator it : shardsIts) {
if (index.equals(it.shardId().getIndex())) {
shards.add(it.shardId().getId());
}
}
Collections.sort(shards);
return shards.stream().mapToInt((i) -> i).toArray();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Counter;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -120,6 +121,7 @@ final class DefaultSearchContext extends SearchContext {
// filter for sliced scroll
private SliceBuilder sliceBuilder;
private SearchTask task;
private final Version minNodeVersion;


/**
Expand Down Expand Up @@ -154,7 +156,7 @@ final class DefaultSearchContext extends SearchContext {

DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher,
IndexService indexService, IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter,
TimeValue timeout, FetchPhase fetchPhase, String clusterAlias) {
TimeValue timeout, FetchPhase fetchPhase, String clusterAlias, Version minNodeVersion) {
this.id = id;
this.request = request;
this.fetchPhase = fetchPhase;
Expand All @@ -171,6 +173,7 @@ final class DefaultSearchContext extends SearchContext {
this.searcher = new ContextIndexSearcher(engineSearcher, indexService.cache().query(), indexShard.getQueryCachingPolicy());
this.timeEstimateCounter = timeEstimateCounter;
this.timeout = timeout;
this.minNodeVersion = minNodeVersion;
queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher.getIndexReader(), request::nowInMillis,
clusterAlias);
queryShardContext.setTypes(request.types());
Expand Down Expand Up @@ -278,8 +281,7 @@ && new NestedHelper(mapperService()).mightMatchNestedDocs(query)
}

if (sliceBuilder != null) {
filters.add(sliceBuilder.toFilter(queryShardContext, shardTarget().getShardId().getId(),
queryShardContext.getIndexSettings().getNumberOfShards()));
filters.add(sliceBuilder.toFilter(queryShardContext, remapShardId(), numberOfIndexShards(), minNodeVersion));
}

if (filters.isEmpty()) {
Expand Down Expand Up @@ -335,6 +337,14 @@ public int numberOfShards() {
return request.numberOfShards();
}

public int numberOfIndexShards() {
return request.numberOfIndexShards();
}

public int remapShardId() {
return request.remapShardId();
}

@Override
public float queryBoost() {
return queryBoost;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ private DefaultSearchContext createSearchContext(ShardSearchRequest request, Tim

final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase,
request.getClusterAlias());
request.getClusterAlias(), clusterService.state().nodes().getMinNodeVersion());
boolean success = false;
try {
// we clone the query shard context here just for rewriting otherwise we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {

private String clusterAlias;
private ShardId shardId;
private int remapShardId;
private int numberOfIndexShards;
private int numberOfShards;
private SearchType searchType;
private Scroll scroll;
Expand All @@ -80,10 +82,10 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
ShardSearchLocalRequest() {
}

ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards,
ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int remapShardId, int numberOfIndexShards, int numberOfShards,
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) {
this(shardId, numberOfShards, searchRequest.searchType(),
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost,
this(shardId, remapShardId, numberOfIndexShards, numberOfShards, searchRequest.searchType(),
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost,
searchRequest.allowPartialSearchResults());
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
// at this stage. Any NPEs in the above are therefore an error in request preparation logic.
Expand All @@ -101,9 +103,12 @@ public ShardSearchLocalRequest(ShardId shardId, String[] types, long nowInMillis
indexBoost = 1.0f;
}

public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType searchType, SearchSourceBuilder source, String[] types,
Boolean requestCache, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults) {
public ShardSearchLocalRequest(ShardId shardId, int remapShardId, int numberOfIndexShards, int numberOfShards,
SearchType searchType, SearchSourceBuilder source, String[] types,
Boolean requestCache, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults) {
this.shardId = shardId;
this.remapShardId = remapShardId;
this.numberOfIndexShards = numberOfIndexShards;
this.numberOfShards = numberOfShards;
this.searchType = searchType;
this.source = source;
Expand All @@ -114,7 +119,6 @@ public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType s
this.allowPartialSearchResults = allowPartialSearchResults;
}


@Override
public ShardId shardId() {
return shardId;
Expand Down Expand Up @@ -150,6 +154,16 @@ public int numberOfShards() {
return numberOfShards;
}

@Override
public int numberOfIndexShards() {
return numberOfIndexShards;
}

@Override
public int remapShardId() {
return remapShardId;
}

@Override
public SearchType searchType() {
return searchType;
Expand All @@ -169,12 +183,12 @@ public long nowInMillis() {
public Boolean requestCache() {
return requestCache;
}

@Override
public Boolean allowPartialSearchResults() {
return allowPartialSearchResults;
}


@Override
public Scroll scroll() {
Expand All @@ -199,6 +213,14 @@ protected void innerReadFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
searchType = SearchType.fromId(in.readByte());
numberOfShards = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
remapShardId = in.readVInt();
numberOfIndexShards = in.readVInt();
assert remapShardId != -1 && numberOfIndexShards != -1;
} else {
remapShardId = -1;
numberOfIndexShards = -1;
}
scroll = in.readOptionalWriteable(Scroll::new);
source = in.readOptionalWriteable(SearchSourceBuilder::new);
types = in.readStringArray();
Expand Down Expand Up @@ -232,6 +254,10 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException
out.writeByte(searchType.id());
if (!asKey) {
out.writeVInt(numberOfShards);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVInt(remapShardId);
out.writeVInt(numberOfIndexShards);
}
}
out.writeOptionalWriteable(scroll);
out.writeOptionalWriteable(source);
Expand All @@ -250,7 +276,7 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeOptionalBoolean(allowPartialSearchResults);
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public interface ShardSearchRequest {

ShardId shardId();

/**
* Returns the remapped shard id of the requested shard for this request
* or -1 if this information is not available.
* The remapped shard id is the id of the requested shard among all shards
* of this index that are part of the request. Note that the remapped shard id
* is equal to the original shard id if all shards of this index are part of the request.
*/
int remapShardId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should call it shardRequestOrdinal shard ID is trappy at least for me since I alwasy think of ShardID.java when I read it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I pushed b90716c


String[] types();

SearchSourceBuilder source();
Expand All @@ -59,6 +68,15 @@ public interface ShardSearchRequest {

void source(SearchSourceBuilder source);

/**
* Returns the number of shards of this index ({@link ShardId#getIndex()}) that participates in the request
* or -1 if this information is not available.
*/
int numberOfIndexShards();

/**
* Returns the number of shards that participates in the request.
*/
int numberOfShards();

SearchType searchType();
Expand All @@ -68,7 +86,7 @@ public interface ShardSearchRequest {
long nowInMillis();

Boolean requestCache();

Boolean allowPartialSearchResults();

Scroll scroll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
public ShardSearchTransportRequest(){
}

public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int numberOfShards,
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) {
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, numberOfShards, aliasFilter, indexBoost,
public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int remapShardId,
int numberOfIndexShards, int numberOfShards, AliasFilter aliasFilter, float indexBoost,
long nowInMillis, String clusterAlias) {
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, remapShardId,
numberOfIndexShards, numberOfShards, aliasFilter, indexBoost,
nowInMillis, clusterAlias);
this.originalIndices = originalIndices;
}
Expand Down Expand Up @@ -102,6 +104,11 @@ public ShardId shardId() {
return shardSearchLocalRequest.shardId();
}

@Override
public int remapShardId() {
return shardSearchLocalRequest.remapShardId();
}

@Override
public String[] types() {
return shardSearchLocalRequest.types();
Expand Down Expand Up @@ -132,6 +139,11 @@ public int numberOfShards() {
return shardSearchLocalRequest.numberOfShards();
}

@Override
public int numberOfIndexShards() {
return shardSearchLocalRequest.numberOfIndexShards();
}

@Override
public SearchType searchType() {
return shardSearchLocalRequest.searchType();
Expand All @@ -151,11 +163,11 @@ public long nowInMillis() {
public Boolean requestCache() {
return shardSearchLocalRequest.requestCache();
}

@Override
public Boolean allowPartialSearchResults() {
return shardSearchLocalRequest.allowPartialSearchResults();
}
}

@Override
public Scroll scroll() {
Expand Down
Loading