Skip to content

Commit

Permalink
Add slice level operation listener methods
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 authored and Jay Deng committed Aug 7, 2024
1 parent 97c1bf0 commit 1770e61
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Concurrent Segment Search] Support composite aggregations with scripting ([#15072](https://github.com/opensearch-project/OpenSearch/pull/15072))
- Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711))
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
- Add slice execution listeners to SearchOperationListener interface ([#--](https://github.com/opensearch-project/OpenSearch/pull/--))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ default void onFailedQueryPhase(SearchContext searchContext) {}
*/
default void onQueryPhase(SearchContext searchContext, long tookInNanos) {}

default void onPreSliceExecution(SearchContext searchContext) {}

default void onFailedSliceExecution(SearchContext searchContext) {}

default void onSliceExecution(SearchContext searchContext) {}

/**
* Executed before the fetch phase is executed
* @param searchContext the current search context
Expand Down Expand Up @@ -195,6 +201,39 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
}
}

@Override
public void onPreSliceExecution(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onPreSliceExecution(searchContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPreSliceExecution listener [{}] failed", listener), e);
}
}
}

@Override
public void onFailedSliceExecution(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFailedSliceExecution(searchContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFailedSliceExecution listener [{}] failed", listener), e);
}
}
}

@Override
public void onSliceExecution(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onSliceExecution(searchContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onSliceExecution listener [{}] failed", listener), e);
}
}
}

@Override
public void onPreFetchPhase(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,20 +270,27 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
}
} else {
for (int i = 0; i < leaves.size(); i++) {
searchLeaf(leaves.get(i), weight, collector);
searchContext.indexShard().getSearchOperationListener().onPreSliceExecution(searchContext);
try {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
}
} else {
for (int i = 0; i < leaves.size(); i++) {
searchLeaf(leaves.get(i), weight, collector);
}
}
searchContext.bucketCollectorProcessor().processPostCollection(collector);
} catch (Throwable t) {
searchContext.indexShard().getSearchOperationListener().onFailedSliceExecution(searchContext);
throw t;
}
searchContext.bucketCollectorProcessor().processPostCollection(collector);
searchContext.indexShard().getSearchOperationListener().onSliceExecution(searchContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public void testListenersAreExecuted() {
AtomicInteger preQuery = new AtomicInteger();
AtomicInteger failedQuery = new AtomicInteger();
AtomicInteger onQuery = new AtomicInteger();
AtomicInteger preSlice = new AtomicInteger();
AtomicInteger failedSlice = new AtomicInteger();
AtomicInteger onSlice = new AtomicInteger();
AtomicInteger onFetch = new AtomicInteger();
AtomicInteger preFetch = new AtomicInteger();
AtomicInteger failedFetch = new AtomicInteger();
Expand Down Expand Up @@ -86,6 +89,24 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
onQuery.incrementAndGet();
}

@Override
public void onPreSliceExecution(SearchContext searchContext) {
assertNotNull(searchContext);
preSlice.incrementAndGet();
}

@Override
public void onFailedSliceExecution(SearchContext searchContext) {
assertNotNull(searchContext);
failedSlice.incrementAndGet();
}

@Override
public void onSliceExecution(SearchContext searchContext) {
assertNotNull(searchContext);
onSlice.incrementAndGet();
}

@Override
public void onPreFetchPhase(SearchContext searchContext) {
assertNotNull(searchContext);
Expand Down Expand Up @@ -167,10 +188,30 @@ public void onSearchIdleReactivation() {
compositeListener.onQueryPhase(ctx, timeInNanos.get());
assertEquals(0, preFetch.get());
assertEquals(0, preQuery.get());
assertEquals(0, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(0, onFetch.get());
assertEquals(0, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleReactivateCount.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onSliceExecution(ctx);
assertEquals(0, preFetch.get());
assertEquals(0, preQuery.get());
assertEquals(0, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(0, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -181,10 +222,13 @@ public void onSearchIdleReactivation() {
compositeListener.onFetchPhase(ctx, timeInNanos.get());
assertEquals(0, preFetch.get());
assertEquals(0, preQuery.get());
assertEquals(0, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -195,10 +239,30 @@ public void onSearchIdleReactivation() {
compositeListener.onPreQueryPhase(ctx);
assertEquals(0, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(0, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleReactivateCount.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onPreSliceExecution(ctx);
assertEquals(0, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -209,10 +273,13 @@ public void onSearchIdleReactivation() {
compositeListener.onPreFetchPhase(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -223,10 +290,13 @@ public void onSearchIdleReactivation() {
compositeListener.onFailedFetchPhase(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -237,10 +307,30 @@ public void onSearchIdleReactivation() {
compositeListener.onFailedQueryPhase(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleReactivateCount.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFailedSliceExecution(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -251,10 +341,13 @@ public void onSearchIdleReactivation() {
compositeListener.onNewReaderContext(mock(ReaderContext.class));
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -265,10 +358,13 @@ public void onSearchIdleReactivation() {
compositeListener.onNewScrollContext(mock(ReaderContext.class));
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -279,10 +375,13 @@ public void onSearchIdleReactivation() {
compositeListener.onFreeReaderContext(mock(ReaderContext.class));
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
Expand All @@ -293,10 +392,13 @@ public void onSearchIdleReactivation() {
compositeListener.onFreeScrollContext(mock(ReaderContext.class));
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
Expand All @@ -307,10 +409,13 @@ public void onSearchIdleReactivation() {
compositeListener.onSearchIdleReactivation();
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
Expand Down

0 comments on commit 1770e61

Please sign in to comment.