Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dynamically adding SearchRequestOperationsListener #11526

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add deleted doc count in _cat/shards ([#11678](https://github.com/opensearch-project/OpenSearch/pull/11678))
- Capture information for additional query types and aggregation types ([#11582](https://github.com/opensearch-project/OpenSearch/pull/11582))
- Use slice_size == shard_size heuristic in terms aggs for concurrent segment search and properly calculate the doc_count_error ([#11732](https://github.com/opensearch-project/OpenSearch/pull/11732))
- Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.action.search.SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import java.util.Set;
import java.util.function.Function;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.action.search.SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public final void start() {
0,
0,
buildTookInMillis(),
timeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down Expand Up @@ -670,7 +670,7 @@ protected final SearchResponse buildSearchResponse(
successfulOps.get(),
skippedOps.get(),
buildTookInMillis(),
timeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
failures,
clusters,
searchContextId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;

import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

Expand All @@ -31,18 +29,14 @@ class SearchRequestContext {
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;

/**
* This constructor is for testing only
*/
SearchRequestContext() {
this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()));
}
private final SearchRequest searchRequest;

SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) {
SearchRequestContext(final SearchRequestOperationsListener searchRequestOperationsListener, final SearchRequest searchRequest) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
this.searchRequest = searchRequest;
}

SearchRequestOperationsListener getSearchRequestOperationsListener() {
Expand All @@ -57,6 +51,14 @@ Map<String, Long> phaseTookMap() {
return phaseTookMap;
}

SearchResponse.PhaseTook getPhaseTook() {
if (searchRequest != null && searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) {
return new SearchResponse.PhaseTook(phaseTookMap);
} else {
return null;
}
}

/**
* Override absoluteStartNanos set in constructor.
* For testing only
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* SearchRequestOperationsCompositeListenerFactory contains listeners registered to search requests,
* and is responsible for creating the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
*
* @opensearch.internal
*/
public final class SearchRequestOperationsCompositeListenerFactory {
private final List<SearchRequestOperationsListener> searchRequestListenersList;

/**
* Create the SearchRequestOperationsCompositeListenerFactory and add multiple {@link SearchRequestOperationsListener}
* to the searchRequestListenersList.
* Those enabled listeners will be executed during each search request.
*
* @param listeners Multiple SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if any input listener is null.
*/
public SearchRequestOperationsCompositeListenerFactory(final SearchRequestOperationsListener... listeners) {
searchRequestListenersList = new ArrayList<>();
for (SearchRequestOperationsListener listener : listeners) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
searchRequestListenersList.add(listener);
}
}

/**
* Get searchRequestListenersList,
*
* @return List of SearchRequestOperationsListener
*/
public List<SearchRequestOperationsListener> getListeners() {
return searchRequestListenersList;
}

/**
* Create the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
* @param searchRequest The SearchRequest object used to decide which request-level listeners to add based on states/flags
* @param logger Logger to be attached to the {@link SearchRequestOperationsListener.CompositeListener}
* @param perRequestListeners the per-request listeners that can be optionally added to the returned CompositeListener list.
* @return SearchRequestOperationsListener.CompositeListener
*/
public SearchRequestOperationsListener.CompositeListener buildCompositeListener(
final SearchRequest searchRequest,
final Logger logger,
final SearchRequestOperationsListener... perRequestListeners
) {
final List<SearchRequestOperationsListener> searchListenersList = Stream.concat(
searchRequestListenersList.stream(),
Arrays.stream(perRequestListeners)
)
.filter((searchRequestOperationsListener -> searchRequestOperationsListener.isEnabled(searchRequest)))
.collect(Collectors.toList());

return new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@
* @opensearch.internal
*/
@InternalApi
abstract class SearchRequestOperationsListener {
public abstract class SearchRequestOperationsListener {
private volatile boolean enabled;

protected SearchRequestOperationsListener() {
this.enabled = true;
}

protected SearchRequestOperationsListener(final boolean enabled) {
this.enabled = enabled;
}

abstract void onPhaseStart(SearchPhaseContext context);

Expand All @@ -32,6 +41,18 @@ void onRequestStart(SearchRequestContext searchRequestContext) {}

void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}
ansjcy marked this conversation as resolved.
Show resolved Hide resolved

boolean isEnabled(SearchRequest searchRequest) {
return isEnabled();
}

boolean isEnabled() {
return enabled;
}

protected void setEnabled(final boolean enabled) {
this.enabled = enabled;
}

/**
* Holder of Composite Listeners
*
Expand Down Expand Up @@ -101,5 +122,9 @@ public void onRequestEnd(SearchPhaseContext context, SearchRequestContext search
}
}
}

public List<SearchRequestOperationsListener> getListeners() {
return listeners;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ public SearchRequestSlowLog(ClusterService clusterService) {
this.logger = logger;
Loggers.setLevel(this.logger, SlowLogLevel.TRACE.name());

this.warnThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING).nanos();
this.infoThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING).nanos();
this.debugThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING).nanos();
this.traceThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING).nanos();
this.level = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL);
this.setWarnThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING));
this.setInfoThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING));
this.setDebugThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING));
this.setTraceThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING));
this.setLevel(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL));

clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING, this::setWarnThreshold);
Expand Down Expand Up @@ -233,18 +233,22 @@ private static String escapeJson(String text) {

void setWarnThreshold(TimeValue warnThreshold) {
this.warnThreshold = warnThreshold.nanos();
setEnabledIfThresholdExceed();
}

void setInfoThreshold(TimeValue infoThreshold) {
this.infoThreshold = infoThreshold.nanos();
setEnabledIfThresholdExceed();
}

void setDebugThreshold(TimeValue debugThreshold) {
this.debugThreshold = debugThreshold.nanos();
setEnabledIfThresholdExceed();
}

void setTraceThreshold(TimeValue traceThreshold) {
this.traceThreshold = traceThreshold.nanos();
setEnabledIfThresholdExceed();
}

void setLevel(SlowLogLevel level) {
Expand All @@ -270,4 +274,8 @@ protected long getTraceThreshold() {
SlowLogLevel getLevel() {
return level;
}

private void setEnabledIfThresholdExceed() {
super.setEnabled(this.warnThreshold >= 0 || this.debugThreshold >= 0 || this.infoThreshold >= 0 || this.traceThreshold >= 0);
ansjcy marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;

import java.util.EnumMap;
import java.util.Map;
Expand All @@ -26,8 +28,18 @@
public final class SearchRequestStats extends SearchRequestOperationsListener {
Map<SearchPhaseName, StatsHolder> phaseStatsMap = new EnumMap<>(SearchPhaseName.class);

public static final String SEARCH_REQUEST_STATS_ENABLED_KEY = "search.request_stats_enabled";
public static final Setting<Boolean> SEARCH_REQUEST_STATS_ENABLED = Setting.boolSetting(
SEARCH_REQUEST_STATS_ENABLED_KEY,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

@Inject
public SearchRequestStats() {
public SearchRequestStats(ClusterSettings clusterSettings) {
ansjcy marked this conversation as resolved.
Show resolved Hide resolved
this.setEnabled(clusterSettings.get(SEARCH_REQUEST_STATS_ENABLED));
clusterSettings.addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
phaseStatsMap.put(searchPhaseName, new StatsHolder());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ final class SearchResponseMerger {
/**
* Add a search response to the list of responses to be merged together into one.
* Merges currently happen at once when all responses are available and
* {@link #getMergedResponse(SearchResponse.Clusters)} )} is called.
* {@link #getMergedResponse(SearchResponse.Clusters, SearchRequestContext)} )} is called.
* That may change in the future as it's possible to introduce incremental merges as responses come in if necessary.
*/
void add(SearchResponse searchResponse) {
Expand All @@ -126,7 +126,7 @@ int numResponses() {
* Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)}
* so that all responses are merged into a single one.
*/
SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
SearchResponse getMergedResponse(SearchResponse.Clusters clusters, SearchRequestContext searchRequestContext) {
// if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true,
// we end up calling merge without anything to merge, we just return an empty search response
if (searchResponses.size() == 0) {
Expand Down Expand Up @@ -236,7 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
successfulShards,
skippedShards,
tookInMillis,
searchTimeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
shardFailures,
clusters,
null
Expand Down
Loading
Loading