Skip to content

Commit

Permalink
Added more revisions
Browse files Browse the repository at this point in the history
Signed-off-by: sahil buddharaju <sahilbud@amazon.com>
  • Loading branch information
sahil buddharaju committed Jul 18, 2023
1 parent 7d6f655 commit 3533ad3
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .idea/runConfigurations/Debug_OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
public final class SearchCoordinatorStats implements SearchRequestOperationsListener {
public StatsHolder totalStats = new StatsHolder();

// private final CounterMetric openContexts = new CounterMetric();

// private volatile Map<String, StatsHolder> groupStats = emptyMap();

@Inject
public SearchCoordinatorStats() {}

Expand Down Expand Up @@ -89,18 +85,18 @@ public long getExpandSearchTotal() {
return totalStats.expandSearchTotal.count();
}

private void computeStats(SearchPhaseContext searchPhaseContext, Consumer<StatsHolder> consumer) {
private void computeStats(Consumer<StatsHolder> consumer) {
consumer.accept(totalStats);
}

@Override
public void onDFSPreQueryPhaseStart(SearchPhaseContext context) {
computeStats(context, statsHolder -> { statsHolder.dfsPreQueryCurrent.inc(); });
computeStats(statsHolder -> { statsHolder.dfsPreQueryCurrent.inc(); });
}

@Override
public void onDFSPreQueryPhaseEnd(SearchPhaseContext context, long tookTime) {
computeStats(context, statsHolder -> {
computeStats(statsHolder -> {
totalStats.dfsPreQueryCurrent.dec();
totalStats.dfsPreQueryTotal.inc();
totalStats.dfsPreQueryMetric.inc(tookTime);
Expand All @@ -109,17 +105,17 @@ public void onDFSPreQueryPhaseEnd(SearchPhaseContext context, long tookTime) {

@Override
public void onDFSPreQueryPhaseFailure(SearchPhaseContext context) {
return;
computeStats(statsHolder -> { statsHolder.dfsPreQueryCurrent.dec(); });
}

@Override
public void onCanMatchPhaseStart(SearchPhaseContext context) {
computeStats(context, statsHolder -> { statsHolder.canMatchCurrent.inc(); });
computeStats(statsHolder -> { statsHolder.canMatchCurrent.inc(); });
}

@Override
public void onCanMatchPhaseEnd(SearchPhaseContext context, long tookTime) {
computeStats(context, statsHolder -> {
computeStats(statsHolder -> {
totalStats.canMatchCurrent.dec();
totalStats.canMatchTotal.inc();
totalStats.canMatchMetric.inc(tookTime);
Expand All @@ -128,17 +124,17 @@ public void onCanMatchPhaseEnd(SearchPhaseContext context, long tookTime) {

@Override
public void onCanMatchPhaseFailure(SearchPhaseContext context) {
return;
computeStats(statsHolder -> { statsHolder.canMatchCurrent.dec(); });
}

@Override
public void onQueryPhaseStart(SearchPhaseContext context) {
computeStats(context, statsHolder -> { statsHolder.queryCurrent.inc(); });
computeStats(statsHolder -> { statsHolder.queryCurrent.inc(); });
}

@Override
public void onQueryPhaseEnd(SearchPhaseContext context, long tookTime) {
computeStats(context, statsHolder -> {
computeStats(statsHolder -> {
totalStats.queryCurrent.dec();
totalStats.queryTotal.inc();
totalStats.queryMetric.inc(tookTime);
Expand All @@ -147,17 +143,17 @@ public void onQueryPhaseEnd(SearchPhaseContext context, long tookTime) {

@Override
public void onQueryPhaseFailure(SearchPhaseContext context) {
return;
computeStats(statsHolder -> { statsHolder.queryCurrent.dec(); });
}

@Override
public void onFetchPhaseStart(SearchPhaseContext context) {
computeStats(context, statsHolder -> { totalStats.fetchCurrent.inc(); });
computeStats(statsHolder -> { totalStats.fetchCurrent.inc(); });
}

@Override
public void onFetchPhaseEnd(SearchPhaseContext context, long tookTime) {
computeStats(context, statsHolder -> {
computeStats(statsHolder -> {
totalStats.fetchCurrent.dec();
totalStats.fetchTotal.inc();
totalStats.fetchMetric.inc(tookTime);
Expand All @@ -166,17 +162,17 @@ public void onFetchPhaseEnd(SearchPhaseContext context, long tookTime) {

@Override
public void onFetchPhaseFailure(SearchPhaseContext context) {
return;
computeStats(statsHolder -> { totalStats.fetchCurrent.dec(); });
}

@Override
public void onExpandSearchPhaseStart(SearchPhaseContext context) {
computeStats(context, statsHolder -> { totalStats.expandSearchCurrent.inc(); });
computeStats(statsHolder -> { totalStats.expandSearchCurrent.inc(); });
}

@Override
public void onExpandSearchPhaseEnd(SearchPhaseContext context, long tookTime) {
computeStats(context, statsHolder -> {
computeStats(statsHolder -> {
totalStats.expandSearchCurrent.dec();
totalStats.expandSearchTotal.inc();
totalStats.expandSearchMetric.inc(tookTime);
Expand All @@ -185,7 +181,7 @@ public void onExpandSearchPhaseEnd(SearchPhaseContext context, long tookTime) {

@Override
public void onExpandSearchPhaseFailure(SearchPhaseContext context) {
return;
computeStats(statsHolder -> { totalStats.expandSearchCurrent.dec(); });
}

public static final class StatsHolder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,6 @@
*/
public interface SearchRequestOperationsListener {

/**
* Executed when the request is started
* @param context the current searchPhase context
*/
// void onRequestStart(SearchPhaseContext context);

/**
* Executed when the request is ended
* @param context the current searchPhase context
*/
// void onRequestEnd(SearchPhaseContext context);

/**
* Executed when the query phase is started
*/
void onDFSPreQueryPhaseStart(SearchPhaseContext context);

void onDFSPreQueryPhaseFailure(SearchPhaseContext context);
Expand Down Expand Up @@ -68,7 +53,6 @@ public interface SearchRequestOperationsListener {
final class CompositeListener implements SearchRequestOperationsListener {
private final List<SearchRequestOperationsListener> listeners;
private final Logger logger;
private long canMatchPhaseStart;

public CompositeListener(List<SearchRequestOperationsListener> listeners, Logger logger) {
this.listeners = listeners;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,9 @@ public void run() {
}, "message", null);
assertEquals(1, searchCoordinatorMockStats.queryPhaseFailure.get());

SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(requestOperationListeners);
SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(
requestOperationListeners
);
searchDfsQueryThenFetchAsyncAction.start();
searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase("test") {
@Override
Expand All @@ -358,7 +360,7 @@ public void run() {
}
}, "message", null);
assertEquals(1, searchCoordinatorMockStats.dfsPreQueryPhaseFailure.get());

FetchSearchPhase fetchPhase = createFetchSearchPhase();
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE);
Expand Down Expand Up @@ -596,11 +598,13 @@ public void testSearchRequestListeners() throws InterruptedException {

SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);

SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(requestOperationListeners);
SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(
requestOperationListeners
);

CanMatchPreFilterSearchPhase canMatchPreFilterSearchPhaseAction = createCanMatchPreFilterSearchPhase(requestOperationListeners);

long delay = (int)Math.floor(Math.random() * (5 - 1 + 1) + 1);
long delay = (int) Math.floor(Math.random() * (5 - 1 + 1) + 1);
action.start();
assertEquals(1, testListener.totalStats.queryCurrent.count());
TimeUnit.SECONDS.sleep(delay);
Expand Down Expand Up @@ -641,7 +645,9 @@ public void testSearchRequestListeners() throws InterruptedException {
assertEquals(testListener.totalStats.dfsPreQueryTotal.count(), 1);
}

private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction(List<SearchRequestOperationsListener> searchRequestOperationsListeners) {
private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction(
List<SearchRequestOperationsListener> searchRequestOperationsListeners
) {
SearchPhaseController controller = new SearchPhaseController(
writableRegistry(),
r -> InternalAggregationTestCase.emptyReduceContextBuilder()
Expand Down Expand Up @@ -685,7 +691,9 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct
);
}

private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction(List<SearchRequestOperationsListener> searchRequestOperationsListeners) {
private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction(
List<SearchRequestOperationsListener> searchRequestOperationsListeners
) {
SearchPhaseController controller = new SearchPhaseController(
writableRegistry(),
r -> InternalAggregationTestCase.emptyReduceContextBuilder()
Expand Down Expand Up @@ -738,7 +746,9 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
};
}

private CanMatchPreFilterSearchPhase createCanMatchPreFilterSearchPhase(List<SearchRequestOperationsListener> searchRequestOperationsListeners) {
private CanMatchPreFilterSearchPhase createCanMatchPreFilterSearchPhase(
List<SearchRequestOperationsListener> searchRequestOperationsListeners
) {
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
0,
System.nanoTime(),
Expand Down

0 comments on commit 3533ad3

Please sign in to comment.