Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.17] Provide factory for pluggable deciders #15734

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.query.QueryPhase;
Expand Down Expand Up @@ -1337,7 +1337,7 @@ protected Node(
circuitBreakerService,
searchModule.getIndexSearcherExecutor(threadPool),
taskResourceTrackingService,
searchModule.getConcurrentSearchDeciders()
searchModule.getConcurrentSearchRequestDeciderFactories()
);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
Expand Down Expand Up @@ -1987,7 +1987,7 @@ protected SearchService newSearchService(
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor,
TaskResourceTrackingService taskResourceTrackingService,
Collection<ConcurrentSearchDecider> concurrentSearchDecidersList
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
) {
return new SearchService(
clusterService,
Expand All @@ -2001,7 +2001,7 @@ protected SearchService newSearchService(
circuitBreakerService,
indexSearcherExecutor,
taskResourceTrackingService,
concurrentSearchDecidersList
concurrentSearchDeciderFactories
);
}

Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.query.QueryPhaseSearcher;
Expand Down Expand Up @@ -141,12 +141,12 @@ default Map<String, Highlighter> getHighlighters() {
}

/**
* Allows plugins to register custom decider for concurrent search
* @return A {@link ConcurrentSearchDecider}
* Allows plugins to register a factory to create custom decider for concurrent search
* @return A {@link ConcurrentSearchRequestDecider.Factory}
*/
@ExperimentalApi
default ConcurrentSearchDecider getConcurrentSearchDecider() {
return null;
default Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
return Optional.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.collapse.CollapseContext;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchDecision;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.deciders.ConcurrentSearchVisitor;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.fetch.FetchPhase;
Expand Down Expand Up @@ -106,13 +106,14 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
Expand All @@ -136,7 +137,7 @@ final class DefaultSearchContext extends SearchContext {
private final ShardSearchRequest request;
private final SearchShardTarget shardTarget;
private final LongSupplier relativeTimeSupplier;
private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;
private SearchType searchType;
private final BigArrays bigArrays;
private final IndexShard indexShard;
Expand Down Expand Up @@ -221,7 +222,7 @@ final class DefaultSearchContext extends SearchContext {
boolean validate,
Executor executor,
Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder,
Collection<ConcurrentSearchDecider> concurrentSearchDeciders
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand Down Expand Up @@ -264,7 +265,7 @@ final class DefaultSearchContext extends SearchContext {

this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
this.concurrentSearchDeciders = concurrentSearchDeciders;
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
}

@Override
Expand Down Expand Up @@ -928,14 +929,21 @@ public boolean shouldUseConcurrentSearch() {

private boolean evaluateAutoMode() {

// filter out deciders that want to opt-out of decision-making
final Set<ConcurrentSearchDecider> filteredDeciders = concurrentSearchDeciders.stream()
.filter(concurrentSearchDecider -> concurrentSearchDecider.canEvaluateForIndex(indexService.getIndexSettings()))
.collect(Collectors.toSet());
final Set<ConcurrentSearchRequestDecider> concurrentSearchRequestDeciders = new HashSet<>();

// create the ConcurrentSearchRequestDeciders using registered factories
for (ConcurrentSearchRequestDecider.Factory deciderFactory : concurrentSearchDeciderFactories) {
final Optional<ConcurrentSearchRequestDecider> concurrentSearchRequestDecider = deciderFactory.create(
indexService.getIndexSettings()
);
concurrentSearchRequestDecider.ifPresent(concurrentSearchRequestDeciders::add);

}

// evaluate based on concurrent search query visitor
if (filteredDeciders.size() > 0) {
if (concurrentSearchRequestDeciders.size() > 0) {
ConcurrentSearchVisitor concurrentSearchVisitor = new ConcurrentSearchVisitor(
filteredDeciders,
concurrentSearchRequestDeciders,
indexService.getIndexSettings()
);
if (request().source() != null && request().source().query() != null) {
Expand All @@ -945,7 +953,7 @@ private boolean evaluateAutoMode() {
}

final List<ConcurrentSearchDecision> decisions = new ArrayList<>();
for (ConcurrentSearchDecider decider : filteredDeciders) {
for (ConcurrentSearchRequestDecider decider : concurrentSearchRequestDeciders) {
ConcurrentSearchDecision decision = decider.getConcurrentSearchDecision();
if (decision != null) {
if (logger.isDebugEnabled()) {
Expand Down
24 changes: 11 additions & 13 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregator;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.ExplainPhase;
Expand Down Expand Up @@ -334,7 +334,7 @@ public class SearchModule {
private final QueryPhaseSearcher queryPhaseSearcher;
private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider;

private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;

/**
* Constructs a new SearchModule object
Expand Down Expand Up @@ -364,25 +364,23 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
queryPhaseSearcher = registerQueryPhaseSearcher(plugins);
indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins);
namedWriteables.addAll(SortValue.namedWriteables());
concurrentSearchDeciders = registerConcurrentSearchDeciders(plugins);
concurrentSearchDeciderFactories = registerConcurrentSearchDeciderFactories(plugins);
}

private Collection<ConcurrentSearchDecider> registerConcurrentSearchDeciders(List<SearchPlugin> plugins) {
List<ConcurrentSearchDecider> concurrentSearchDeciders = new ArrayList<>();
private Collection<ConcurrentSearchRequestDecider.Factory> registerConcurrentSearchDeciderFactories(List<SearchPlugin> plugins) {
List<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories = new ArrayList<>();
for (SearchPlugin plugin : plugins) {
ConcurrentSearchDecider decider = plugin.getConcurrentSearchDecider();
if (decider != null) {
concurrentSearchDeciders.add(decider);
}
final Optional<ConcurrentSearchRequestDecider.Factory> deciderFactory = plugin.getConcurrentSearchRequestDeciderFactory();
deciderFactory.ifPresent(concurrentSearchDeciderFactories::add);
}
return concurrentSearchDeciders;
return concurrentSearchDeciderFactories;
}

/**
* Returns the concurrent search deciders that the plugins have registered
* Returns the concurrent search decider factories that the plugins have registered
*/
public Collection<ConcurrentSearchDecider> getConcurrentSearchDeciders() {
return concurrentSearchDeciders;
public Collection<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactories() {
return concurrentSearchDeciderFactories;
}

public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.collapse.CollapseContext;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.dfs.DfsPhase;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.fetch.FetchPhase;
Expand Down Expand Up @@ -358,7 +358,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final QueryPhase queryPhase;

private final FetchPhase fetchPhase;
private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;

private volatile long defaultKeepAlive;

Expand Down Expand Up @@ -404,7 +404,7 @@ public SearchService(
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor,
TaskResourceTrackingService taskResourceTrackingService,
Collection<ConcurrentSearchDecider> concurrentSearchDeciders
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
Expand Down Expand Up @@ -460,7 +460,7 @@ public SearchService(
allowDerivedField = CLUSTER_ALLOW_DERIVED_FIELD_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ALLOW_DERIVED_FIELD_SETTING, this::setAllowDerivedField);

this.concurrentSearchDeciders = concurrentSearchDeciders;
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
}

private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -1161,7 +1161,7 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
validate,
indexSearcherExecutor,
this::aggReduceContextBuilder,
concurrentSearchDeciders
concurrentSearchDeciderFactories
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.util.Collection;

/**
* This Class defines the decisions that a {@link ConcurrentSearchDecider#getConcurrentSearchDecision} can return.
* This Class defines the decisions that a {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision} can return.
*
*/
@ExperimentalApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.QueryBuilder;

import java.util.Optional;

/**
* {@link ConcurrentSearchDecider} allows pluggable way to evaluate if a query in the search request
* {@link ConcurrentSearchRequestDecider} allows pluggable way to evaluate if a query in the search request
* can use concurrent segment search using the passed in queryBuilders from query tree and index settings
* on a per shard request basis.
* Implementations can also opt out of the evaluation process for certain indices based on the index settings.
* For all the deciders which can evaluate query tree for an index, its evaluateForQuery method
* will be called for each node in the query tree. After traversing of the query tree is completed, the final
* decision from the deciders will be obtained using {@link ConcurrentSearchDecider#getConcurrentSearchDecision}
* Implementations will need to implement the Factory interface that can be used to create the ConcurrentSearchRequestDecider
* This factory will be called on each shard search request to create the ConcurrentSearchRequestDecider and get the
* concurrent search decision from the created decider on a per-request basis.
* For all the deciders the evaluateForQuery method will be called for each node in the query tree.
* After traversing of the query tree is completed, the final decision from the deciders will be
* obtained using {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision}
*/
@ExperimentalApi
public abstract class ConcurrentSearchDecider {
public abstract class ConcurrentSearchRequestDecider {

/**
* Evaluate for the passed in queryBuilder node in the query tree of the search request
Expand All @@ -31,14 +35,6 @@ public abstract class ConcurrentSearchDecider {
*/
public abstract void evaluateForQuery(QueryBuilder queryBuilder, IndexSettings indexSettings);

/**
* Provides a way for deciders to opt out of decision-making process for certain requests based on
* index settings.
* Return true if interested in decision making for index,
* false, otherwise
*/
public abstract boolean canEvaluateForIndex(IndexSettings indexSettings);

/**
* Provide the final decision for concurrent search based on all evaluations
* Plugins may need to maintain internal state of evaluations to provide a final decision
Expand All @@ -47,4 +43,16 @@ public abstract class ConcurrentSearchDecider {
*/
public abstract ConcurrentSearchDecision getConcurrentSearchDecision();

/**
* Factory interface that can be implemented to create the ConcurrentSearchRequestDecider object.
* Implementations can use the passed in indexSettings to decide whether to create the decider object or
* return {@link Optional#empty()}.
*/
@ExperimentalApi
public interface Factory {
default Optional<ConcurrentSearchRequestDecider> create(IndexSettings indexSettings) {
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@

/**
* Class to traverse the QueryBuilder tree and invoke the
* {@link ConcurrentSearchDecider#evaluateForQuery} at each node of the query tree
* {@link ConcurrentSearchRequestDecider#evaluateForQuery} at each node of the query tree
*/
@ExperimentalApi
public class ConcurrentSearchVisitor implements QueryBuilderVisitor {

private final Set<ConcurrentSearchDecider> deciders;
private final Set<ConcurrentSearchRequestDecider> deciders;
private final IndexSettings indexSettings;

public ConcurrentSearchVisitor(Set<ConcurrentSearchDecider> concurrentSearchVisitorDeciders, IndexSettings idxSettings) {
public ConcurrentSearchVisitor(Set<ConcurrentSearchRequestDecider> concurrentSearchVisitorDeciders, IndexSettings idxSettings) {
Objects.requireNonNull(concurrentSearchVisitorDeciders, "Concurrent search deciders cannot be null");
deciders = concurrentSearchVisitorDeciders;
indexSettings = idxSettings;
Expand Down
Loading
Loading