Skip to content

Commit

Permalink
Ran Spotless
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Oct 12, 2023
1 parent f5f70aa commit 07aa11e
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 472 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,13 @@ private void executeRequest(
}

ActionListener<SearchRequest> requestTransformListener = ActionListener.wrap(sr -> {
ActionListener<SearchSourceBuilder> rewriteListener = buildRewriteListener(sr, task,timeProvider, searchAsyncActionProvider, listener);
ActionListener<SearchSourceBuilder> rewriteListener = buildRewriteListener(
sr,
task,
timeProvider,
searchAsyncActionProvider,
listener
);
if (sr.source() == null) {
rewriteListener.onResponse(sr.source());
} else {
Expand All @@ -440,123 +446,125 @@ private void executeRequest(
rewriteListener
);
}
}, listener::onFailure);
}, listener::onFailure);
searchRequest.transformRequest(requestTransformListener);
}

private ActionListener<SearchSourceBuilder> buildRewriteListener(SearchRequest searchRequest,
Task task,
SearchTimeProvider timeProvider,
SearchAsyncActionProvider searchAsyncActionProvider,
ActionListener<SearchResponse> listener) {
return ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
searchRequest.source(source);
}
final ClusterState clusterState = clusterService.state();
final SearchContextId searchContext;
final Map<String, OriginalIndices> remoteClusterIndices;
if (searchRequest.pointInTimeBuilder() != null) {
searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId());
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
} else {
searchContext = null;
remoteClusterIndices = remoteClusterService.groupIndices(
searchRequest.indicesOptions(),
searchRequest.indices(),
idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState)
);
}
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
executeLocalSearch(
task,
timeProvider,
searchRequest,
localIndices,
clusterState,
listener,
searchContext,
searchAsyncActionProvider
);
} else {
if (shouldMinimizeRoundtrips(searchRequest)) {
ccsRemoteReduce(
searchRequest,
localIndices,
remoteClusterIndices,
timeProvider,
searchService.aggReduceContextBuilder(searchRequest.source()),
remoteClusterService,
threadPool,
listener,
(r, l) -> executeLocalSearch(
task,
timeProvider,
r,
localIndices,
clusterState,
l,
searchContext,
searchAsyncActionProvider
)
);
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
collectSearchShards(
searchRequest.indicesOptions(),
searchRequest.preference(),
searchRequest.routing(),
skippedClusters,
remoteClusterIndices,
remoteClusterService,
threadPool,
ActionListener.wrap(searchShardsResponses -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
searchShardsResponses
);
final Map<String, AliasFilter> remoteAliasFilters;
final List<SearchShardIterator> remoteShardIterators;
if (searchContext != null) {
remoteAliasFilters = searchContext.aliasFilter();
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(
searchShardsResponses,
searchContext,
searchRequest.pointInTimeBuilder().getKeepAlive(),
remoteClusterIndices
);
} else {
remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
remoteShardIterators = getRemoteShardsIterator(
searchShardsResponses,
remoteClusterIndices,
remoteAliasFilters
);
}
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
executeSearch(
(SearchTask) task,
timeProvider,
searchRequest,
localIndices,
remoteShardIterators,
clusterNodeLookup,
clusterState,
remoteAliasFilters,
listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
searchContext,
searchAsyncActionProvider
);
}, listener::onFailure)
);
}
}
}, listener::onFailure);
private ActionListener<SearchSourceBuilder> buildRewriteListener(
SearchRequest searchRequest,
Task task,
SearchTimeProvider timeProvider,
SearchAsyncActionProvider searchAsyncActionProvider,
ActionListener<SearchResponse> listener
) {
return ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
searchRequest.source(source);
}
final ClusterState clusterState = clusterService.state();
final SearchContextId searchContext;
final Map<String, OriginalIndices> remoteClusterIndices;
if (searchRequest.pointInTimeBuilder() != null) {
searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId());
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
} else {
searchContext = null;
remoteClusterIndices = remoteClusterService.groupIndices(
searchRequest.indicesOptions(),
searchRequest.indices(),
idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState)
);
}
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
executeLocalSearch(
task,
timeProvider,
searchRequest,
localIndices,
clusterState,
listener,
searchContext,
searchAsyncActionProvider
);
} else {
if (shouldMinimizeRoundtrips(searchRequest)) {
ccsRemoteReduce(
searchRequest,
localIndices,
remoteClusterIndices,
timeProvider,
searchService.aggReduceContextBuilder(searchRequest.source()),
remoteClusterService,
threadPool,
listener,
(r, l) -> executeLocalSearch(
task,
timeProvider,
r,
localIndices,
clusterState,
l,
searchContext,
searchAsyncActionProvider
)
);
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
collectSearchShards(
searchRequest.indicesOptions(),
searchRequest.preference(),
searchRequest.routing(),
skippedClusters,
remoteClusterIndices,
remoteClusterService,
threadPool,
ActionListener.wrap(searchShardsResponses -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
searchShardsResponses
);
final Map<String, AliasFilter> remoteAliasFilters;
final List<SearchShardIterator> remoteShardIterators;
if (searchContext != null) {
remoteAliasFilters = searchContext.aliasFilter();
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(
searchShardsResponses,
searchContext,
searchRequest.pointInTimeBuilder().getKeepAlive(),
remoteClusterIndices
);
} else {
remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
remoteShardIterators = getRemoteShardsIterator(
searchShardsResponses,
remoteClusterIndices,
remoteAliasFilters
);
}
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
executeSearch(
(SearchTask) task,
timeProvider,
searchRequest,
localIndices,
remoteShardIterators,
clusterNodeLookup,
clusterState,
remoteAliasFilters,
listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
searchContext,
searchAsyncActionProvider
);
}, listener::onFailure)
);
}
}
}, listener::onFailure);
}

static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
Expand Down
Loading

0 comments on commit 07aa11e

Please sign in to comment.