Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add finalReduce flag to SearchRequest #38104

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/reference/modules/cross-cluster-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ GET /cluster_one:twitter/_search
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 2,
"_shards": {
"total": 1,
"successful": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
terms:
field: f1.keyword

- match: { num_reduce_phases: 3 }
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
Expand Down Expand Up @@ -63,6 +64,7 @@
terms:
field: f1.keyword

- match: { num_reduce_phases: 3 }
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
Expand All @@ -83,6 +85,7 @@
terms:
field: f1.keyword

- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
Expand All @@ -103,6 +106,7 @@
terms:
field: f1.keyword

- is_false: num_reduce_phases
- is_false: _clusters
- match: { _shards.total: 2 }
- match: { hits.total: 5}
Expand Down Expand Up @@ -133,6 +137,7 @@
rest_total_hits_as_int: true
index: test_remote_cluster:test_index

- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
Expand Down Expand Up @@ -162,6 +167,7 @@
rest_total_hits_as_int: true
index: "*:test_index"

- match: { num_reduce_phases: 3 }
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
Expand All @@ -176,6 +182,7 @@
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index

- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
Expand All @@ -192,6 +199,7 @@
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_index_1

- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
Expand All @@ -208,6 +216,7 @@
rest_total_hits_as_int: true
index: "my_remote_cluster:single_doc_index"

- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
query:
match_all: {}

- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
Expand All @@ -28,6 +29,7 @@
rest_total_hits_as_int: true
body: { "scroll_id": "$scroll_id", "scroll": "1m"}

- is_false: num_reduce_phases
- is_false: _clusters
- match: {hits.total: 6 }
- length: {hits.hits: 2 }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,20 +714,18 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
final boolean hasAggs = source != null && source.aggregations() != null;
final boolean hasTopDocs = source == null || source.size() != 0;
final int trackTotalHitsUpTo = resolveTrackTotalHits(request);
final boolean finalReduce = request.getLocalClusterAlias() == null;

if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
trackTotalHitsUpTo, finalReduce);
trackTotalHitsUpTo, request.isFinalReduce());
}
}
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@Override
ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, finalReduce);
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest

private final String localClusterAlias;
private final long absoluteStartMillis;
private final boolean finalReduce;

private SearchType searchType = SearchType.DEFAULT;

Expand Down Expand Up @@ -102,13 +103,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
this.finalReduce = true;
}

/**
* Constructs a new search request from the provided search request
*/
public SearchRequest(SearchRequest searchRequest) {
this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias, searchRequest.absoluteStartMillis);
this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias,
searchRequest.absoluteStartMillis, searchRequest.finalReduce);
}

/**
Expand All @@ -132,25 +135,30 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
}

/**
* Creates a new search request by providing the search request to copy all fields from, the indices to search against,
* the alias of the cluster where it will be executed, as well as the start time in milliseconds from the epoch time.
* Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request performing local reduction
* on each cluster. The coordinating CCS node provides the original search request, the indices to search against as well as the
* alias to prefix index names with in the returned search results, and the absolute start time to be used on the remote clusters
* to ensure that the same value is used.
* Creates a new search request by providing the search request to copy all fields from, the indices to search against, the alias of
* the cluster where it will be executed, as well as the start time in milliseconds from the epoch time and whether the reduction
* should be final or not. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request
* performing reduction on each cluster in order to minimize network round-trips between the coordinating node and the remote clusters.
*
* @param originalSearchRequest the original search request
* @param indices the indices to search against
* @param localClusterAlias the alias to prefix index names with in the returned search results
* @param absoluteStartMillis the absolute start time to be used on the remote clusters to ensure that the same value is used
* @param finalReduce whether the reduction should be final or not
*/
static SearchRequest withLocalReduction(SearchRequest originalSearchRequest, String[] indices,
String localClusterAlias, long absoluteStartMillis) {
String localClusterAlias, long absoluteStartMillis, boolean finalReduce) {
Objects.requireNonNull(originalSearchRequest, "search request must not be null");
validateIndices(indices);
Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
if (absoluteStartMillis < 0) {
throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
}
return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis);
return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis, finalReduce);
}

private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis) {
private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis,
boolean finalReduce) {
this.allowPartialSearchResults = searchRequest.allowPartialSearchResults;
this.batchedReduceSize = searchRequest.batchedReduceSize;
this.ccsMinimizeRoundtrips = searchRequest.ccsMinimizeRoundtrips;
Expand All @@ -167,6 +175,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca
this.types = searchRequest.types;
this.localClusterAlias = localClusterAlias;
this.absoluteStartMillis = absoluteStartMillis;
this.finalReduce = finalReduce;
}

/**
Expand Down Expand Up @@ -203,6 +212,12 @@ public SearchRequest(StreamInput in) throws IOException {
localClusterAlias = null;
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}
//TODO move to the 6_7_0 branch once backported to 6.x
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
finalReduce = in.readBoolean();
} else {
finalReduce = true;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
ccsMinimizeRoundtrips = in.readBoolean();
}
Expand Down Expand Up @@ -232,6 +247,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(absoluteStartMillis);
}
}
//TODO move to the 6_7_0 branch once backported to 6.x
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(finalReduce);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(ccsMinimizeRoundtrips);
}
Expand Down Expand Up @@ -277,11 +296,18 @@ String getLocalClusterAlias() {
return localClusterAlias;
}

/**
* Returns whether the reduction phase that will be performed needs to be final or not.
*/
boolean isFinalReduce() {
return finalReduce;
}

/**
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
* request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long)}, this method returns the provided
* current time, otherwise it will return {@link System#currentTimeMillis()}.
* request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long, boolean)}, this method returns
* the provided current time, otherwise it will return {@link System#currentTimeMillis()}.
*
*/
long getOrCreateAbsoluteStartMillis() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.search;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
Expand All @@ -35,8 +36,10 @@
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
Expand All @@ -47,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
Expand Down Expand Up @@ -497,4 +501,12 @@ public String toString() {
return "Clusters{total=" + total + ", successful=" + successful + ", skipped=" + skipped + '}';
}
}

static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters clusters) {
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits,
InternalAggregations.EMPTY, null, null, false, null, 0);
return new SearchResponse(internalSearchResponse, null, 0, 0, 0, tookInMillisSupplier.get(),
ShardSearchFailure.EMPTY_ARRAY, clusters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,7 @@ SearchResponse getMergedResponse(Clusters clusters) {
//if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true,
//we end up calling merge without anything to merge, we just return an empty search response
if (searchResponses.size() == 0) {
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits,
InternalAggregations.EMPTY, null, null, false, null, 0);
return new SearchResponse(internalSearchResponse, null, 0, 0, 0, searchTimeProvider.buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY, clusters);
return SearchResponse.empty(searchTimeProvider::buildTookInMillis, clusters);
}
int totalShards = 0;
int skippedShards = 0;
Expand Down
Loading