Skip to content

Commit

Permalink
Make search pipelines asynchronous
Browse files Browse the repository at this point in the history
If a search processor needs to make a call out to another
service, we should not risk blocking on the transport thread. We should
support async execution.

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Oct 12, 2023
1 parent 6c02261 commit f44e521
Show file tree
Hide file tree
Showing 8 changed files with 635 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,132 +423,140 @@ private void executeRequest(
ActionListener<SearchResponse> listener;
try {
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
listener = ActionListener.wrap(
r -> originalListener.onResponse(searchRequest.transformResponse(r)),
originalListener::onFailure
);
listener = searchRequest.transformResponseListener(originalListener);
} catch (Exception e) {
originalListener.onFailure(e);
return;
}

ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
searchRequest.source(source);
}
final ClusterState clusterState = clusterService.state();
final SearchContextId searchContext;
final Map<String, OriginalIndices> remoteClusterIndices;
if (searchRequest.pointInTimeBuilder() != null) {
searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId());
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
ActionListener<SearchRequest> requestTransformListener = ActionListener.wrap(sr -> {
ActionListener<SearchSourceBuilder> rewriteListener = buildRewriteListener(sr, task,timeProvider, searchAsyncActionProvider, listener);
if (sr.source() == null) {
rewriteListener.onResponse(sr.source());
} else {
searchContext = null;
remoteClusterIndices = remoteClusterService.groupIndices(
searchRequest.indicesOptions(),
searchRequest.indices(),
idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState)
Rewriteable.rewriteAndFetch(
sr.source(),
searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener
);
}
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
executeLocalSearch(
task,
timeProvider,
searchRequest,
localIndices,
clusterState,
listener,
searchContext,
searchAsyncActionProvider
);
} else {
if (shouldMinimizeRoundtrips(searchRequest)) {
ccsRemoteReduce(
searchRequest,
localIndices,
remoteClusterIndices,
timeProvider,
searchService.aggReduceContextBuilder(searchRequest.source()),
remoteClusterService,
threadPool,
listener,
(r, l) -> executeLocalSearch(
task,
timeProvider,
r,
localIndices,
clusterState,
l,
searchContext,
searchAsyncActionProvider
)
);
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
collectSearchShards(
searchRequest.indicesOptions(),
searchRequest.preference(),
searchRequest.routing(),
skippedClusters,
remoteClusterIndices,
remoteClusterService,
threadPool,
ActionListener.wrap(searchShardsResponses -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
searchShardsResponses
);
final Map<String, AliasFilter> remoteAliasFilters;
final List<SearchShardIterator> remoteShardIterators;
if (searchContext != null) {
remoteAliasFilters = searchContext.aliasFilter();
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(
searchShardsResponses,
searchContext,
searchRequest.pointInTimeBuilder().getKeepAlive(),
remoteClusterIndices
);
} else {
remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
remoteShardIterators = getRemoteShardsIterator(
searchShardsResponses,
remoteClusterIndices,
remoteAliasFilters
);
}
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
executeSearch(
(SearchTask) task,
timeProvider,
searchRequest,
localIndices,
remoteShardIterators,
clusterNodeLookup,
clusterState,
remoteAliasFilters,
listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
searchContext,
searchAsyncActionProvider
);
}, listener::onFailure)
);
}
}
}, listener::onFailure);
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
Rewriteable.rewriteAndFetch(
searchRequest.source(),
searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener
);
}
}, listener::onFailure);
searchRequest.transformRequest(requestTransformListener);
}

private ActionListener<SearchSourceBuilder> buildRewriteListener(SearchRequest searchRequest,
Task task,
SearchTimeProvider timeProvider,
SearchAsyncActionProvider searchAsyncActionProvider,
ActionListener<SearchResponse> listener) {
return ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
searchRequest.source(source);
}
final ClusterState clusterState = clusterService.state();
final SearchContextId searchContext;
final Map<String, OriginalIndices> remoteClusterIndices;
if (searchRequest.pointInTimeBuilder() != null) {
searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId());
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
} else {
searchContext = null;
remoteClusterIndices = remoteClusterService.groupIndices(
searchRequest.indicesOptions(),
searchRequest.indices(),
idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState)
);
}
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
executeLocalSearch(
task,
timeProvider,
searchRequest,
localIndices,
clusterState,
listener,
searchContext,
searchAsyncActionProvider
);
} else {
if (shouldMinimizeRoundtrips(searchRequest)) {
ccsRemoteReduce(
searchRequest,
localIndices,
remoteClusterIndices,
timeProvider,
searchService.aggReduceContextBuilder(searchRequest.source()),
remoteClusterService,
threadPool,
listener,
(r, l) -> executeLocalSearch(
task,
timeProvider,
r,
localIndices,
clusterState,
l,
searchContext,
searchAsyncActionProvider
)
);
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
collectSearchShards(
searchRequest.indicesOptions(),
searchRequest.preference(),
searchRequest.routing(),
skippedClusters,
remoteClusterIndices,
remoteClusterService,
threadPool,
ActionListener.wrap(searchShardsResponses -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
searchShardsResponses
);
final Map<String, AliasFilter> remoteAliasFilters;
final List<SearchShardIterator> remoteShardIterators;
if (searchContext != null) {
remoteAliasFilters = searchContext.aliasFilter();
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(
searchShardsResponses,
searchContext,
searchRequest.pointInTimeBuilder().getKeepAlive(),
remoteClusterIndices
);
} else {
remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
remoteShardIterators = getRemoteShardsIterator(
searchShardsResponses,
remoteClusterIndices,
remoteAliasFilters
);
}
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
executeSearch(
(SearchTask) task,
timeProvider,
searchRequest,
localIndices,
remoteShardIterators,
clusterNodeLookup,
clusterState,
remoteAliasFilters,
listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
searchContext,
searchAsyncActionProvider
);
}, listener::onFailure)
);
}
}
}, listener::onFailure);
}

static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
Expand Down
Loading

0 comments on commit f44e521

Please sign in to comment.