Skip to content

Commit

Permalink
Simplify result handling in FetchSearchPhase (elastic#115723)
Browse files Browse the repository at this point in the history
The results instance does not need to be a field.
It's state handling is fairly straight forward, it needs to be released
once all fetches have been procesed. No need to even create it on any other
path so I split up the method slightly to clearly isolate when and how
we need the results instance.

Part of elastic#115722
  • Loading branch information
original-brownbear committed Nov 22, 2024
1 parent f03d347 commit 177187f
Showing 1 changed file with 48 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
* Then it reaches out to all relevant shards to fetch the topN hits.
*/
final class FetchSearchPhase extends SearchPhase {
private final ArraySearchPhaseResults<FetchSearchResult> fetchResults;
private final AtomicArray<SearchPhaseResult> searchPhaseShardResults;
private final BiFunction<SearchResponseSections, AtomicArray<SearchPhaseResult>, SearchPhase> nextPhaseFactory;
private final SearchPhaseContext context;
Expand Down Expand Up @@ -79,8 +78,6 @@ final class FetchSearchPhase extends SearchPhase {
+ resultConsumer.getNumShards()
);
}
this.fetchResults = new ArraySearchPhaseResults<>(resultConsumer.getNumShards());
context.addReleasable(fetchResults);
this.searchPhaseShardResults = resultConsumer.getAtomicArray();
this.aggregatedDfs = aggregatedDfs;
this.nextPhaseFactory = nextPhaseFactory;
Expand Down Expand Up @@ -129,48 +126,56 @@ private void innerRun() throws Exception {
// we have to release contexts here to free up resources
searchPhaseShardResults.asList()
.forEach(searchPhaseShardResult -> releaseIrrelevantSearchContext(searchPhaseShardResult, context));
moveToNextPhase(fetchResults.getAtomicArray(), reducedQueryPhase);
moveToNextPhase(new AtomicArray<>(numShards), reducedQueryPhase);
} else {
final boolean shouldExplainRank = shouldExplainRankScores(context.getRequest());
final List<Map<Integer, RankDoc>> rankDocsPerShard = false == shouldExplainRank
? null
: splitRankDocsPerShard(scoreDocs, numShards);
final ScoreDoc[] lastEmittedDocPerShard = context.getRequest().scroll() != null
? SearchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
: null;
final List<Integer>[] docIdsToLoad = SearchPhaseController.fillDocIdsToLoad(numShards, scoreDocs);
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(
fetchResults,
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
() -> moveToNextPhase(fetchResults.getAtomicArray(), reducedQueryPhase),
context
);
for (int i = 0; i < docIdsToLoad.length; i++) {
List<Integer> entry = docIdsToLoad[i];
RankDocShardInfo rankDocs = rankDocsPerShard == null || rankDocsPerShard.get(i).isEmpty()
? null
: new RankDocShardInfo(rankDocsPerShard.get(i));
SearchPhaseResult shardPhaseResult = searchPhaseShardResults.get(i);
if (entry == null) { // no results for this shard ID
if (shardPhaseResult != null) {
// if we got some hits from this shard we have to release the context there
// we do this as we go since it will free up resources and passing on the request on the
// transport layer is cheap.
releaseIrrelevantSearchContext(shardPhaseResult, context);
progressListener.notifyFetchResult(i);
}
// in any case we count down this result since we don't talk to this shard anymore
counter.countDown();
} else {
executeFetch(
shardPhaseResult,
counter,
entry,
rankDocs,
(lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[i] : null
);
}
innerRunFetch(scoreDocs, numShards, reducedQueryPhase);
}
}
}

private void innerRunFetch(ScoreDoc[] scoreDocs, int numShards, SearchPhaseController.ReducedQueryPhase reducedQueryPhase) {
ArraySearchPhaseResults<FetchSearchResult> fetchResults = new ArraySearchPhaseResults<>(numShards);
final List<Map<Integer, RankDoc>> rankDocsPerShard = false == shouldExplainRankScores(context.getRequest())
? null
: splitRankDocsPerShard(scoreDocs, numShards);
final ScoreDoc[] lastEmittedDocPerShard = context.getRequest().scroll() != null
? SearchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
: null;
final List<Integer>[] docIdsToLoad = SearchPhaseController.fillDocIdsToLoad(numShards, scoreDocs);
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(
fetchResults,
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
() -> {
try (fetchResults) {
moveToNextPhase(fetchResults.getAtomicArray(), reducedQueryPhase);
}
},
context
);
for (int i = 0; i < docIdsToLoad.length; i++) {
List<Integer> entry = docIdsToLoad[i];
RankDocShardInfo rankDocs = rankDocsPerShard == null || rankDocsPerShard.get(i).isEmpty()
? null
: new RankDocShardInfo(rankDocsPerShard.get(i));
SearchPhaseResult shardPhaseResult = searchPhaseShardResults.get(i);
if (entry == null) { // no results for this shard ID
if (shardPhaseResult != null) {
// if we got some hits from this shard we have to release the context there
// we do this as we go since it will free up resources and passing on the request on the
// transport layer is cheap.
releaseIrrelevantSearchContext(shardPhaseResult, context);
progressListener.notifyFetchResult(i);
}
// in any case we count down this result since we don't talk to this shard anymore
counter.countDown();
} else {
executeFetch(
shardPhaseResult,
counter,
entry,
rankDocs,
(lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[i] : null
);
}
}
}
Expand Down Expand Up @@ -257,7 +262,6 @@ private void moveToNextPhase(
) {
var resp = SearchPhaseController.merge(context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr);
context.addReleasable(resp::decRef);
fetchResults.close();
context.executeNextPhase(this, nextPhaseFactory.apply(resp, searchPhaseShardResults));
}

Expand Down

0 comments on commit 177187f

Please sign in to comment.