diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 4004d43562043..fc4fd3434b794 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -333,6 +333,60 @@ public void testSendSearchResponseDisallowPartialFailures() { assertEquals(requestIds, releasedContexts); } + public void testOnPhaseListenersFailure() { + SearchCoordinatorStatsTesting searchCoordinatorStatsTesting = new SearchCoordinatorStatsTesting(); + SearchRequestOperationsListener testListener = createSearchRequestOperationsListener(searchCoordinatorStatsTesting); + final List requestOperationListeners = new ArrayList<>(Arrays.asList(testListener, testListener)); + + SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(); + action.setSearchListenerList(requestOperationListeners); + action.setCurrentPhase(action); + action.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(2, searchCoordinatorStatsTesting.queryPhaseFailure.get()); + + SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(); + searchDfsQueryThenFetchAsyncAction.setSearchListenerList(requestOperationListeners); + searchDfsQueryThenFetchAsyncAction.setCurrentPhase(searchDfsQueryThenFetchAsyncAction); + searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(2, searchCoordinatorStatsTesting.dfsPreQueryPhaseFailure.get()); + + CanMatchPreFilterSearchPhase canMatchPreFilterSearchPhaseAction = createCanMatchPreFilterSearchPhase(); + canMatchPreFilterSearchPhaseAction.setSearchListenerList(requestOperationListeners); + canMatchPreFilterSearchPhaseAction.setCurrentPhase(canMatchPreFilterSearchPhaseAction); + + canMatchPreFilterSearchPhaseAction.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(2, searchCoordinatorStatsTesting.canMatchPhaseFailure.get()); + + FetchSearchPhase fetchPhase = createFetchSearchPhase(); + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); + searchShardIterator.resetAndSkip(); + action.skipShard(searchShardIterator); + action.executeNextPhase(action, fetchPhase); + action.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(2, searchCoordinatorStatsTesting.fetchPhaseFailure.get()); + } + public void testOnPhaseFailure() { SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false); AtomicReference exception = new AtomicReference<>(); @@ -341,6 +395,7 @@ public void testOnPhaseFailure() { List> nodeLookups = new ArrayList<>(); ArraySearchPhaseResults phaseResults = phaseResults(requestIds, nodeLookups, 0); AbstractSearchAsyncAction action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong()); + action.onPhaseFailure(new SearchPhase("test") { @Override public void run() { @@ -612,6 +667,9 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct shardsIter.size(), exc -> {} ); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); + return new SearchDfsQueryThenFetchAsyncAction( logger, null, @@ -623,7 +681,7 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct executor, resultConsumer, searchRequest, - null, + listener, shardsIter, null, null, @@ -652,6 +710,8 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction() shardsIter.size(), exc -> {} ); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); return new SearchQueryThenFetchAsyncAction( logger, null, @@ -663,7 +723,7 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction() executor, resultConsumer, searchRequest, - null, + listener, shardsIter, null, null, @@ -709,6 +769,8 @@ private CanMatchPreFilterSearchPhase createCanMatchPreFilterSearchPhase() { primaryNode, replicaNode ); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); return new CanMatchPreFilterSearchPhase( logger, searchTransportService, @@ -718,7 +780,7 @@ private CanMatchPreFilterSearchPhase createCanMatchPreFilterSearchPhase() { Collections.emptyMap(), OpenSearchExecutors.newDirectExecutorService(), searchRequest, - null, + listener, shardsIter, timeProvider, ClusterState.EMPTY_STATE,