Skip to content

Commit

Permalink
Changed coordinator to request, refactored logic for onPhase end, sta…
Browse files Browse the repository at this point in the history
…rt, and failure

Signed-off-by: sahil buddharaju <sahilbud@amazon.com>
  • Loading branch information
sahil buddharaju committed Jul 24, 2023
1 parent 3533ad3 commit aaafde1
Show file tree
Hide file tree
Showing 18 changed files with 402 additions and 411 deletions.
32 changes: 16 additions & 16 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ public void testSearchWithWRRShardRouting() throws IOException {

for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (searchStats.getCoordinatorStatsLongHolder().queryMetric > 0) {
assertThat(searchStats.getCoordinatorStatsLongHolder().queryTotal, greaterThan(0L));
assertThat(searchStats.getCoordinatorStatsLongHolder().fetchMetric, greaterThan(0L));
assertThat(searchStats.getCoordinatorStatsLongHolder().fetchTotal, greaterThan(0L));
assertThat(searchStats.getCoordinatorStatsLongHolder().expandSearchTotal, greaterThan(0L));
if (searchStats.getRequestStatsLongHolder().queryMetric > 0) {
assertThat(searchStats.getRequestStatsLongHolder().queryTotal, greaterThan(0L));
assertThat(searchStats.getRequestStatsLongHolder().fetchMetric, greaterThan(0L));
assertThat(searchStats.getRequestStatsLongHolder().fetchTotal, greaterThan(0L));
assertThat(searchStats.getRequestStatsLongHolder().expandSearchTotal, greaterThan(0L));
coordNumber += 1;
}
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ public void testSimpleStats() throws Exception {

for (NodeStats stat : nodeStats.getNodes()) {
Stats total = stat.getIndices().getSearch().getTotal();
if (total.getCoordinatorStatsLongHolder().queryMetric > 0) {
assertThat(total.getCoordinatorStatsLongHolder().queryTotal, greaterThan(0L));
assertThat(total.getCoordinatorStatsLongHolder().fetchMetric, greaterThan(0L));
assertThat(total.getCoordinatorStatsLongHolder().fetchTotal, greaterThan(0L));
assertThat(total.getCoordinatorStatsLongHolder().expandSearchTotal, greaterThan(0L));
if (total.getRequestStatsLongHolder().queryMetric > 0) {
assertThat(total.getRequestStatsLongHolder().queryTotal, greaterThan(0L));
assertThat(total.getRequestStatsLongHolder().fetchMetric, greaterThan(0L));
assertThat(total.getRequestStatsLongHolder().fetchTotal, greaterThan(0L));
assertThat(total.getRequestStatsLongHolder().expandSearchTotal, greaterThan(0L));
coordNumber += 1;
}
if (nodeIdsWithIndex.contains(stat.getNode().getId())) {
Expand Down
30 changes: 0 additions & 30 deletions server/src/main/java/org/opensearch/action/CoordinatorStats.java

This file was deleted.

30 changes: 30 additions & 0 deletions server/src/main/java/org/opensearch/action/RequestStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import org.opensearch.action.search.SearchRequestStats;

/**
* Request level stats
*
* @opensearch.internal
*/
public final class RequestStats {

public SearchRequestStats searchRequestStats;

public RequestStats() {
searchRequestStats = new SearchRequestStats();
}

public SearchRequestStats getSearchRequestStats() {
return searchRequestStats;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
package org.opensearch.action.admin.indices.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.action.CoordinatorStats;
import org.opensearch.action.RequestStats;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -286,9 +286,9 @@ public void writeTo(StreamOutput out) throws IOException {
}

// Add all Coordinator Stats to the Search Stats from here
public void addCoordinatorStats(CoordinatorStats coordinatorStats) {
if (coordinatorStats.getSearchCoordinatorStats() != null && this.search != null) {
search.setSearchCoordinatorStats(coordinatorStats.getSearchCoordinatorStats());
public void addRequestStats(RequestStats requestStats) {
if (requestStats.getSearchRequestStats() != null && this.search != null) {
search.setSearchRequestStats(requestStats.getSearchRequestStats());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -122,6 +123,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten

private SearchRequestOperationsListener searchRequestOperationsListener;
private List<SearchRequestOperationsListener> searchListenersList;
Map<String, Runnable> instanceStartMap = new HashMap<String, Runnable>();
Map<String, Runnable> instanceEndMap = new HashMap<String, Runnable>();
Map<String, Runnable> instanceFailMap = new HashMap<String, Runnable>();

AbstractSearchAsyncAction(
String name,
Expand Down Expand Up @@ -180,6 +184,28 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
if (searchListenersList != null) {
this.searchListenersList = searchListenersList;
this.searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(this.searchListenersList, logger);

instanceStartMap.put("dfs", () -> searchRequestOperationsListener.onDFSPreQueryPhaseStart(this));
instanceStartMap.put("can_match", () -> searchRequestOperationsListener.onCanMatchPhaseStart(this));
instanceStartMap.put("dfs_query", () -> searchRequestOperationsListener.onQueryPhaseStart(this));
instanceStartMap.put("query", () -> searchRequestOperationsListener.onQueryPhaseStart(this));
instanceStartMap.put("fetch", () -> searchRequestOperationsListener.onFetchPhaseStart(this));
instanceStartMap.put("expand", () -> searchRequestOperationsListener.onExpandSearchPhaseStart(this));

instanceEndMap.put("dfs", () -> searchRequestOperationsListener.onDFSPreQueryPhaseEnd(this, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime())));
instanceEndMap.put("can_match", () -> searchRequestOperationsListener.onCanMatchPhaseEnd(this, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime())));
instanceEndMap.put("dfs_query", () -> searchRequestOperationsListener.onQueryPhaseEnd(this, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime())));
instanceEndMap.put("query", () -> searchRequestOperationsListener.onQueryPhaseEnd(this, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime())));
instanceEndMap.put("fetch", () -> searchRequestOperationsListener.onFetchPhaseEnd(this, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime())));
instanceEndMap.put("expand", () -> searchRequestOperationsListener.onExpandSearchPhaseEnd(this, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime())));

instanceFailMap.put("dfs", () -> searchRequestOperationsListener.onDFSPreQueryPhaseFailure(this));
instanceFailMap.put("can_match", () -> searchRequestOperationsListener.onCanMatchPhaseFailure(this));
instanceFailMap.put("dfs_query", () -> searchRequestOperationsListener.onQueryPhaseFailure(this));
instanceFailMap.put("query", () -> searchRequestOperationsListener.onQueryPhaseFailure(this));
instanceFailMap.put("fetch", () -> searchRequestOperationsListener.onFetchPhaseFailure(this));
instanceFailMap.put("expand", () -> searchRequestOperationsListener.onExpandSearchPhaseFailure(this));

}
}

Expand Down Expand Up @@ -439,42 +465,15 @@ private void onPhaseEnd(SearchPhaseContext searchPhaseContext) {
if (searchRequestOperationsListener == null) {
return;
}
long tookTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime());

if (searchPhaseContext.getCurrentPhase() instanceof SearchDfsQueryThenFetchAsyncAction) {
searchRequestOperationsListener.onDFSPreQueryPhaseEnd(searchPhaseContext, tookTimeInMillis);
} else if (searchPhaseContext.getCurrentPhase() instanceof CanMatchPreFilterSearchPhase) {
searchRequestOperationsListener.onCanMatchPhaseEnd(searchPhaseContext, tookTimeInMillis);
} else if (searchPhaseContext.getCurrentPhase() instanceof DfsQueryPhase) {
searchRequestOperationsListener.onQueryPhaseEnd(searchPhaseContext, tookTimeInMillis);
} else if (searchPhaseContext.getCurrentPhase() instanceof SearchQueryThenFetchAsyncAction) {
searchRequestOperationsListener.onQueryPhaseEnd(searchPhaseContext, tookTimeInMillis);
} else if (searchPhaseContext.getCurrentPhase() instanceof FetchSearchPhase) {
searchRequestOperationsListener.onFetchPhaseEnd(searchPhaseContext, tookTimeInMillis);
} else if (searchPhaseContext.getCurrentPhase() instanceof ExpandSearchPhase) {
searchRequestOperationsListener.onExpandSearchPhaseEnd(searchPhaseContext, tookTimeInMillis);
}
instanceEndMap.get(searchPhaseContext.getCurrentPhase().getName()).run();
}

private void onPhaseStart(SearchPhase phase, SearchPhaseContext searchPhaseContext) {
setCurrentPhase(phase);
phase.setStartTimeInNanos(System.nanoTime());
if (searchRequestOperationsListener == null) {
return;
}
if (searchPhaseContext.getCurrentPhase() instanceof SearchDfsQueryThenFetchAsyncAction) {
searchRequestOperationsListener.onDFSPreQueryPhaseStart(searchPhaseContext);
} else if (searchPhaseContext.getCurrentPhase() instanceof CanMatchPreFilterSearchPhase) {
searchRequestOperationsListener.onCanMatchPhaseStart(searchPhaseContext);
} else if (searchPhaseContext.getCurrentPhase() instanceof DfsQueryPhase) {
searchRequestOperationsListener.onQueryPhaseStart(searchPhaseContext);
} else if (searchPhaseContext.getCurrentPhase() instanceof SearchQueryThenFetchAsyncAction) {
searchRequestOperationsListener.onQueryPhaseStart(searchPhaseContext);
} else if (searchPhaseContext.getCurrentPhase() instanceof FetchSearchPhase) {
searchRequestOperationsListener.onFetchPhaseStart(searchPhaseContext);
} else if (searchPhaseContext.getCurrentPhase() instanceof ExpandSearchPhase) {
searchRequestOperationsListener.onExpandSearchPhaseStart(searchPhaseContext);
}
instanceStartMap.get(searchPhaseContext.getCurrentPhase().getName()).run();
}

private void executePhase(SearchPhase phase) {
Expand Down Expand Up @@ -728,19 +727,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
if (searchRequestOperationsListener != null) {
if (this.currentPhase instanceof SearchDfsQueryThenFetchAsyncAction) {
searchRequestOperationsListener.onDFSPreQueryPhaseFailure(this);
} else if (this.currentPhase instanceof CanMatchPreFilterSearchPhase) {
searchRequestOperationsListener.onCanMatchPhaseFailure(this);
} else if (this.currentPhase instanceof DfsQueryPhase) {
searchRequestOperationsListener.onQueryPhaseFailure(this);
} else if (this.currentPhase instanceof SearchQueryThenFetchAsyncAction) {
searchRequestOperationsListener.onQueryPhaseFailure(this);
} else if (this.currentPhase instanceof FetchSearchPhase) {
searchRequestOperationsListener.onFetchPhaseFailure(this);
} else if (this.currentPhase instanceof ExpandSearchPhase) {
searchRequestOperationsListener.onExpandSearchPhaseFailure(this);
}
instanceFailMap.get(this.getCurrentPhase().getName()).run();
}
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
import java.util.function.Consumer;

/**
* Coordinator level search stats
* Request level search stats
*
* @opensearch.internal
*/
public final class SearchCoordinatorStats implements SearchRequestOperationsListener {
public final class SearchRequestStats implements SearchRequestOperationsListener {
public StatsHolder totalStats = new StatsHolder();

@Inject
public SearchCoordinatorStats() {}
public SearchRequestStats() {}

public long getDFSPreQueryMetric() {
return totalStats.dfsPreQueryMetric.sum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public TransportSearchAction(
IndexNameExpressionResolver indexNameExpressionResolver,
NamedWriteableRegistry namedWriteableRegistry,
SearchPipelineService searchPipelineService,
SearchCoordinatorStats searchCoordinatorStats
SearchRequestStats searchCoordinatorStats
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
this.client = client;
Expand Down Expand Up @@ -922,9 +922,7 @@ private void executeSearch(
@Nullable SearchContextId searchContext,
SearchAsyncActionProvider searchAsyncActionProvider
) {

clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);

// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
Expand Down Expand Up @@ -978,7 +976,6 @@ private void executeSearch(
failIfOverShardCountLimit(clusterService, shardIterators.size());

Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);

// optimize search type for cases where there is only one shard group to search on
if (shardIterators.size() == 1) {
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
Expand Down Expand Up @@ -1012,7 +1009,8 @@ private void executeSearch(
concreteLocalIndices,
localShardIterators.size() + remoteShardIterators.size()
);
searchAsyncActionProvider.asyncSearchAction(
long test1 = timeProvider.buildTookInMillis();
AbstractSearchAsyncAction<? extends SearchPhaseResult> action = searchAsyncActionProvider.asyncSearchAction(
task,
searchRequest,
asyncSearchExecutor,
Expand All @@ -1027,7 +1025,9 @@ private void executeSearch(
preFilterSearchShards,
threadPool,
clusters
).start();
);
long test10 = timeProvider.buildTookInMillis();
action.start();
}

Executor asyncSearchExecutor(final String[] indices, final ClusterState clusterState) {
Expand Down Expand Up @@ -1172,6 +1172,7 @@ public void run() {
searchListenersList
);
} else {
long test4 = timeProvider.buildTookInMillis();
final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults(
executor,
circuitBreaker,
Expand All @@ -1180,6 +1181,7 @@ public void run() {
shardIterators.size(),
exc -> cancelTask(task, exc)
);
long test5 = timeProvider.buildTookInMillis();
AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction;
switch (searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
Expand All @@ -1204,6 +1206,7 @@ public void run() {
);
break;
case QUERY_THEN_FETCH:
long test6 = timeProvider.buildTookInMillis();
searchAsyncAction = new SearchQueryThenFetchAsyncAction(
logger,
searchTransportService,
Expand All @@ -1223,6 +1226,7 @@ public void run() {
clusters,
searchListenersList
);
long test7 = timeProvider.buildTookInMillis();
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
Expand Down
Loading

0 comments on commit aaafde1

Please sign in to comment.