diff --git a/CHANGELOG.md b/CHANGELOG.md index f44949bf38511..5142dc339aca0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711)) - Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054)) - [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897)) +- Add slice execution listeners to SearchOperationListener interface ([#--](https://github.com/opensearch-project/OpenSearch/pull/--)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java index 94079db468f9c..2172a6da4aa31 100644 --- a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java @@ -71,6 +71,12 @@ default void onFailedQueryPhase(SearchContext searchContext) {} */ default void onQueryPhase(SearchContext searchContext, long tookInNanos) {} + default void onPreSliceExecution(SearchContext searchContext) {} + + default void onFailedSliceExecution(SearchContext searchContext) {} + + default void onSliceExecution(SearchContext searchContext) {} + /** * Executed before the fetch phase is executed * @param searchContext the current search context @@ -195,6 +201,39 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) { } } + @Override + public void onPreSliceExecution(SearchContext searchContext) { + for (SearchOperationListener listener : listeners) { + try { + listener.onPreSliceExecution(searchContext); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPreSliceExecution listener [{}] failed", listener), e); + } + } + } + + @Override + public void onFailedSliceExecution(SearchContext searchContext) { + for (SearchOperationListener listener : listeners) { + try { + listener.onFailedSliceExecution(searchContext); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onFailedSliceExecution listener [{}] failed", listener), e); + } + } + } + + @Override + public void onSliceExecution(SearchContext searchContext) { + for (SearchOperationListener listener : listeners) { + try { + listener.onSliceExecution(searchContext); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onSliceExecution listener [{}] failed", listener), e); + } + } + } + @Override public void onPreFetchPhase(SearchContext searchContext) { for (SearchOperationListener listener : listeners) { diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index ec3ed2332d0b8..fa00ace378df1 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -270,20 +270,27 @@ public void search( @Override protected void search(List leaves, Weight weight, Collector collector) throws IOException { - // Time series based workload by default traverses segments in desc order i.e. latest to the oldest order. - // This is actually beneficial for search queries to start search on latest segments first for time series workload. - // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf - // reader order here. - if (searchContext.shouldUseTimeSeriesDescSortOptimization()) { - for (int i = leaves.size() - 1; i >= 0; i--) { - searchLeaf(leaves.get(i), weight, collector); - } - } else { - for (int i = 0; i < leaves.size(); i++) { - searchLeaf(leaves.get(i), weight, collector); + searchContext.indexShard().getSearchOperationListener().onPreSliceExecution(searchContext); + try { + // Time series based workload by default traverses segments in desc order i.e. latest to the oldest order. + // This is actually beneficial for search queries to start search on latest segments first for time series workload. + // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf + // reader order here. + if (searchContext.shouldUseTimeSeriesDescSortOptimization()) { + for (int i = leaves.size() - 1; i >= 0; i--) { + searchLeaf(leaves.get(i), weight, collector); + } + } else { + for (int i = 0; i < leaves.size(); i++) { + searchLeaf(leaves.get(i), weight, collector); + } } + searchContext.bucketCollectorProcessor().processPostCollection(collector); + } catch (Throwable t) { + searchContext.indexShard().getSearchOperationListener().onFailedSliceExecution(searchContext); + throw t; } - searchContext.bucketCollectorProcessor().processPostCollection(collector); + searchContext.indexShard().getSearchOperationListener().onSliceExecution(searchContext); } /** diff --git a/server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java b/server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java index c61c13eecf2c3..b00307920e875 100644 --- a/server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java @@ -56,6 +56,9 @@ public void testListenersAreExecuted() { AtomicInteger preQuery = new AtomicInteger(); AtomicInteger failedQuery = new AtomicInteger(); AtomicInteger onQuery = new AtomicInteger(); + AtomicInteger preSlice = new AtomicInteger(); + AtomicInteger failedSlice = new AtomicInteger(); + AtomicInteger onSlice = new AtomicInteger(); AtomicInteger onFetch = new AtomicInteger(); AtomicInteger preFetch = new AtomicInteger(); AtomicInteger failedFetch = new AtomicInteger(); @@ -86,6 +89,24 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) { onQuery.incrementAndGet(); } + @Override + public void onPreSliceExecution(SearchContext searchContext) { + assertNotNull(searchContext); + preSlice.incrementAndGet(); + } + + @Override + public void onFailedSliceExecution(SearchContext searchContext) { + assertNotNull(searchContext); + failedSlice.incrementAndGet(); + } + + @Override + public void onSliceExecution(SearchContext searchContext) { + assertNotNull(searchContext); + onSlice.incrementAndGet(); + } + @Override public void onPreFetchPhase(SearchContext searchContext) { assertNotNull(searchContext); @@ -167,10 +188,30 @@ public void onSearchIdleReactivation() { compositeListener.onQueryPhase(ctx, timeInNanos.get()); assertEquals(0, preFetch.get()); assertEquals(0, preQuery.get()); + assertEquals(0, preSlice.get()); + assertEquals(0, failedFetch.get()); + assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); + assertEquals(2, onQuery.get()); + assertEquals(0, onFetch.get()); + assertEquals(0, onSlice.get()); + assertEquals(0, newContext.get()); + assertEquals(0, newScrollContext.get()); + assertEquals(0, freeContext.get()); + assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleReactivateCount.get()); + assertEquals(0, validateSearchContext.get()); + + compositeListener.onSliceExecution(ctx); + assertEquals(0, preFetch.get()); + assertEquals(0, preQuery.get()); + assertEquals(0, preSlice.get()); assertEquals(0, failedFetch.get()); assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(0, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(0, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -181,10 +222,13 @@ public void onSearchIdleReactivation() { compositeListener.onFetchPhase(ctx, timeInNanos.get()); assertEquals(0, preFetch.get()); assertEquals(0, preQuery.get()); + assertEquals(0, preSlice.get()); assertEquals(0, failedFetch.get()); assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(0, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -195,10 +239,30 @@ public void onSearchIdleReactivation() { compositeListener.onPreQueryPhase(ctx); assertEquals(0, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(0, preSlice.get()); assertEquals(0, failedFetch.get()); assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); + assertEquals(0, newContext.get()); + assertEquals(0, newScrollContext.get()); + assertEquals(0, freeContext.get()); + assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleReactivateCount.get()); + assertEquals(0, validateSearchContext.get()); + + compositeListener.onPreSliceExecution(ctx); + assertEquals(0, preFetch.get()); + assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); + assertEquals(0, failedFetch.get()); + assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); + assertEquals(2, onQuery.get()); + assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(0, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -209,10 +273,13 @@ public void onSearchIdleReactivation() { compositeListener.onPreFetchPhase(ctx); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(0, failedFetch.get()); assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(0, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -223,10 +290,13 @@ public void onSearchIdleReactivation() { compositeListener.onFailedFetchPhase(ctx); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(0, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -237,10 +307,30 @@ public void onSearchIdleReactivation() { compositeListener.onFailedQueryPhase(ctx); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); + assertEquals(2, failedFetch.get()); + assertEquals(2, failedQuery.get()); + assertEquals(0, failedSlice.get()); + assertEquals(2, onQuery.get()); + assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); + assertEquals(0, newContext.get()); + assertEquals(0, newScrollContext.get()); + assertEquals(0, freeContext.get()); + assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleReactivateCount.get()); + assertEquals(0, validateSearchContext.get()); + + compositeListener.onFailedSliceExecution(ctx); + assertEquals(2, preFetch.get()); + assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(2, failedQuery.get()); + assertEquals(2, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(0, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -251,10 +341,13 @@ public void onSearchIdleReactivation() { compositeListener.onNewReaderContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(2, failedQuery.get()); + assertEquals(2, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(2, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -265,10 +358,13 @@ public void onSearchIdleReactivation() { compositeListener.onNewScrollContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(2, failedQuery.get()); + assertEquals(2, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(2, newContext.get()); assertEquals(2, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -279,10 +375,13 @@ public void onSearchIdleReactivation() { compositeListener.onFreeReaderContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(2, failedQuery.get()); + assertEquals(2, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(2, newContext.get()); assertEquals(2, newScrollContext.get()); assertEquals(2, freeContext.get()); @@ -293,10 +392,13 @@ public void onSearchIdleReactivation() { compositeListener.onFreeScrollContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(2, failedQuery.get()); + assertEquals(2, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(2, newContext.get()); assertEquals(2, newScrollContext.get()); assertEquals(2, freeContext.get()); @@ -307,10 +409,13 @@ public void onSearchIdleReactivation() { compositeListener.onSearchIdleReactivation(); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(2, failedQuery.get()); + assertEquals(2, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(2, newContext.get()); assertEquals(2, newScrollContext.get()); assertEquals(2, freeContext.get());