Skip to content

Commit

Permalink
Minor search controller changes (#36479)
Browse files Browse the repository at this point in the history
This commit contains a few minor changes to our search code:

- adjust the visibility of a couple of methods in our search code to package private from public or protected.
- make some of the `SearchPhaseController` methods static where possible
- rename one of the `SearchPhaseController#reducedQueryPhase` methods (used only for scroll requests) to `reducedScrollQueryPhase` without the `isScrollRequest` argument which was always set to `true`
- replace leniency in `SearchPhaseController#setShardIndex` with an assert to make sure that we never set the shard index twice
- remove two null checks where the checked field can never be null
- resolve an unchecked warning
- replace `List#toArray` invocation that creates an array providing the true size with array creation of length 0
- correct a couple of typos in comments
  • Loading branch information
javanna authored Dec 11, 2018
1 parent fb18b35 commit dafea3c
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final TransportSearchAction.SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* and collect the results. If a shard request returns a failure this class handles the advance to the next replica of the shard until
* the shards replica iterator is exhausted. Each shard is referenced by position in the {@link GroupShardsIterator} which is later
* referred to as the {@code shardIndex}.
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection of
* distributed frequencies
*/
abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends SearchPhase {
Expand Down Expand Up @@ -327,7 +327,6 @@ private void successfulShardExecution(SearchShardIterator shardsIt) {
}
}


/**
* Executed once all shard results have been received and processed
* @see #onShardFailure(int, SearchShardTarget, Exception)
Expand Down Expand Up @@ -367,7 +366,7 @@ protected abstract void executePhaseOnShard(SearchShardIterator shardIt, ShardRo
abstract static class SearchPhaseResults<Result extends SearchPhaseResult> {
private final int numShards;

protected SearchPhaseResults(int numShards) {
SearchPhaseResults(int numShards) {
this.numShards = numShards;
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.ObjectObjectHashMap;

import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.FieldDoc;
Expand Down Expand Up @@ -154,7 +153,7 @@ public AggregatedDfs aggregateDfs(Collection<DfsSearchResult> results) {
* @param from the offset into the search results top docs
* @param size the number of hits to return from the merged top docs
*/
public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results,
static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results,
final Collection<TopDocs> bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) {
if (results.isEmpty()) {
return SortedTopDocs.EMPTY;
Expand Down Expand Up @@ -214,7 +213,7 @@ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
}
final boolean isSortedByField;
final SortField[] sortFields;
if (mergedTopDocs != null && mergedTopDocs instanceof TopFieldDocs) {
if (mergedTopDocs instanceof TopFieldDocs) {
TopFieldDocs fieldDocs = (TopFieldDocs) mergedTopDocs;
isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs &&
fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
Expand All @@ -230,11 +229,10 @@ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
}
}

TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
static TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
if (results.isEmpty()) {
return null;
}
assert results.isEmpty() == false;
final boolean setShardIndex = false;
final TopDocs topDocs = results.stream().findFirst().get();
final TopDocs mergedTopDocs;
Expand All @@ -259,12 +257,8 @@ TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
}

private static void setShardIndex(TopDocs topDocs, int shardIndex) {
assert topDocs.scoreDocs.length == 0 || topDocs.scoreDocs[0].shardIndex == -1 : "shardIndex is already set";
for (ScoreDoc doc : topDocs.scoreDocs) {
if (doc.shardIndex != -1) {
// once there is a single shard index initialized all others will be initialized too
// there are many asserts down in lucene land that this is actually true. we can shortcut it here.
return;
}
doc.shardIndex = shardIndex;
}
}
Expand All @@ -283,7 +277,6 @@ public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase,
}
}
return lastEmittedDocPerShard;

}

/**
Expand Down Expand Up @@ -402,15 +395,15 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
hits.add(searchHit);
}
}
return new SearchHits(hits.toArray(new SearchHit[hits.size()]), reducedQueryPhase.totalHits, reducedQueryPhase.maxScore);
return new SearchHits(hits.toArray(new SearchHit[0]), reducedQueryPhase.totalHits, reducedQueryPhase.maxScore);
}

/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
*/
public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults, boolean isScrollRequest) {
return reducedQueryPhase(queryResults, isScrollRequest, true);
public ReducedQueryPhase reducedScrollQueryPhase(Collection<? extends SearchPhaseResult> queryResults) {
return reducedQueryPhase(queryResults, true, true);
}

/**
Expand All @@ -422,7 +415,6 @@ public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResul
return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(trackTotalHits), 0, isScrollRequest);
}


/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
Expand Down Expand Up @@ -507,15 +499,13 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs scoreDocs = this.sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final SortedTopDocs scoreDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, suggest, aggregations, shardResults, scoreDocs.scoreDocs, scoreDocs.sortFields,
firstResult != null ? firstResult.sortValueFormats() : null,
numReducePhases, scoreDocs.isSortedByField, size, from, firstResult == null);
firstResult.sortValueFormats(), numReducePhases, scoreDocs.isSortedByField, size, from, false);
}


/**
* Performs an intermediate reduce phase on the aggregations. For instance with this reduce phase never prune information
* that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
Expand All @@ -526,7 +516,7 @@ private InternalAggregations reduceAggsIncrementally(List<InternalAggregations>
null, reduceContext);
}

private InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
private static InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> pipelineAggregators, ReduceContext reduceContext) {
InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
if (pipelineAggregators != null) {
Expand Down Expand Up @@ -657,7 +647,6 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR
this.hasTopDocs = hasTopDocs;
this.hasAggs = hasAggs;
this.bufferSize = bufferSize;

}

@Override
Expand All @@ -675,10 +664,9 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
aggsBuffer[0] = reducedAggs;
}
if (hasTopDocs) {
TopDocs reducedTopDocs = controller.mergeTopDocs(Arrays.asList(topDocsBuffer),
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
// we have to merge here in the same way we collect on a shard
querySearchResult.from() + querySearchResult.size()
, 0);
querySearchResult.from() + querySearchResult.size(), 0);
Arrays.fill(topDocsBuffer, null);
topDocsBuffer[0] = reducedTopDocs;
}
Expand All @@ -692,7 +680,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (hasTopDocs) {
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs);
SearchPhaseController.setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
}
Expand All @@ -705,7 +693,6 @@ private synchronized List<TopDocs> getRemainingTopDocs() {
return hasTopDocs ? Arrays.asList(topDocsBuffer).subList(0, index) : null;
}


@Override
public ReducedQueryPhase reduce() {
return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats,
Expand Down Expand Up @@ -739,7 +726,7 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs);
}
}
return new InitialSearchPhase.ArraySearchPhaseResults(numShards) {
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@Override
public ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected void executeInitialPhase(Transport.Connection connection, InternalScro

@Override
protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode> clusterNodeLookup) {
return sendResponsePhase(searchPhaseController.reducedQueryPhase(queryFetchResults.asList(), true), queryFetchResults);
return sendResponsePhase(searchPhaseController.reducedScrollQueryPhase(queryFetchResults.asList()), queryFetchResults);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode>
return new SearchPhase("fetch") {
@Override
public void run() throws IOException {
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(
queryResults.asList(), true);
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedScrollQueryPhase(
queryResults.asList());
if (reducedQueryPhase.scoreDocs.length == 0) {
sendResponse(reducedQueryPhase, fetchResults);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.elasticsearch.transport.TransportResponse;

/**
* This class is a base class for all search releated results. It contains the shard target it
* This class is a base class for all search related results. It contains the shard target it
* was executed against, a shard index used to reference the result on the coordinating node
* and a request ID that is used to reference the request context on the executing node. The
* request ID is particularly important since it is used to reference and maintain a context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public InternalTopHits(StreamInput in) throws IOException {
from = in.readVInt();
size = in.readVInt();
topDocs = Lucene.readTopDocs(in);
assert topDocs != null;
searchHits = SearchHits.readSearchHits(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilders;
Expand All @@ -38,8 +40,6 @@
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.suggest.Suggest;
Expand Down Expand Up @@ -73,7 +73,7 @@ public void setup() {
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
}

public void testSort() throws Exception {
public void testSort() {
List<CompletionSuggestion> suggestions = new ArrayList<>();
for (int i = 0; i < randomIntBetween(1, 5); i++) {
suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20), false));
Expand All @@ -88,7 +88,7 @@ public void testSort() throws Exception {
size = first.get().queryResult().size();
}
int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(),
ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(),
from, size)
.scoreDocs;
for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
Expand All @@ -113,12 +113,12 @@ public void testSortIsIdempotent() throws Exception {
size = first.get().queryResult().size();
}
SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats();
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs;
ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs;

results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize,
useConstantScore);
SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats();
ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs;
ScoreDoc[] sortedDocs2 = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs;
assertEquals(sortedDocs.length, sortedDocs2.length);
for (int i = 0; i < sortedDocs.length; i++) {
assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc);
Expand Down

0 comments on commit dafea3c

Please sign in to comment.