Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added search backpressure stats API #4932

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Update previous release bwc version to 2.5.0 ([#5003](https://github.com/opensearch-project/OpenSearch/pull/5003))
- Use getParameterCount instead of getParameterTypes ([#4821](https://github.com/opensearch-project/OpenSearch/pull/4821))
- Remote shard balancer support for searchable snapshots ([#4870](https://github.com/opensearch-project/OpenSearch/pull/4870))
- Added search backpressure stats API ([#4932](https://github.com/opensearch-project/OpenSearch/pull/4932))
- [Test] Add IAE test for deprecated edgeNGram analyzer name ([#5040](https://github.com/opensearch-project/OpenSearch/pull/5040))
### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.node.stats;

import org.opensearch.Version;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
Expand All @@ -54,6 +55,7 @@
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;

Expand Down Expand Up @@ -117,6 +119,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private ShardIndexingPressureStats shardIndexingPressureStats;

@Nullable
private SearchBackpressureStats searchBackpressureStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand All @@ -141,6 +146,12 @@ public NodeStats(StreamInput in) throws IOException {
}
indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new);
shardIndexingPressureStats = in.readOptionalWriteable(ShardIndexingPressureStats::new);

if (in.getVersion().onOrAfter(Version.V_2_4_0)) {
ketanv3 marked this conversation as resolved.
Show resolved Hide resolved
searchBackpressureStats = in.readOptionalWriteable(SearchBackpressureStats::new);
ketanv3 marked this conversation as resolved.
Show resolved Hide resolved
} else {
searchBackpressureStats = null;
}
}

public NodeStats(
Expand All @@ -161,7 +172,8 @@ public NodeStats(
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
@Nullable SearchBackpressureStats searchBackpressureStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -181,6 +193,7 @@ public NodeStats(
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
this.searchBackpressureStats = searchBackpressureStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -290,6 +303,11 @@ public ShardIndexingPressureStats getShardIndexingPressureStats() {
return shardIndexingPressureStats;
}

@Nullable
public SearchBackpressureStats getSearchBackpressureStats() {
return searchBackpressureStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -314,6 +332,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(adaptiveSelectionStats);
out.writeOptionalWriteable(indexingPressureStats);
out.writeOptionalWriteable(shardIndexingPressureStats);

if (out.getVersion().onOrAfter(Version.V_2_4_0)) {
out.writeOptionalWriteable(searchBackpressureStats);
}
}

@Override
Expand Down Expand Up @@ -386,6 +408,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getShardIndexingPressureStats() != null) {
getShardIndexingPressureStats().toXContent(builder, params);
}
if (getSearchBackpressureStats() != null) {
getSearchBackpressureStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ public enum Metric {
ADAPTIVE_SELECTION("adaptive_selection"),
SCRIPT_CACHE("script_cache"),
INDEXING_PRESSURE("indexing_pressure"),
SHARD_INDEXING_PRESSURE("shard_indexing_pressure");
SHARD_INDEXING_PRESSURE("shard_indexing_pressure"),
SEARCH_BACKPRESSURE("search_backpressure");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics)
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,7 @@ public void apply(Settings value, Settings current, Settings previous) {
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,

// Settings related to search backpressure
SearchBackpressureSettings.SETTING_ENABLED,
SearchBackpressureSettings.SETTING_ENFORCED,
ketanv3 marked this conversation as resolved.
Show resolved Hide resolved
SearchBackpressureSettings.SETTING_MODE,
SearchBackpressureSettings.SETTING_CANCELLATION_RATIO,
SearchBackpressureSettings.SETTING_CANCELLATION_RATE,
SearchBackpressureSettings.SETTING_CANCELLATION_BURST,
Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ public NodeStats stats(
boolean adaptiveSelection,
boolean scriptCache,
boolean indexingPressure,
boolean shardIndexingPressure
boolean shardIndexingPressure,
boolean searchBackpressure
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -195,7 +196,8 @@ public NodeStats stats(
adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
scriptCache ? scriptService.cacheStats() : null,
indexingPressure ? this.indexingPressureService.nodeStats() : null,
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null,
searchBackpressure ? this.searchBackpressureService.nodeStats() : null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
import org.opensearch.common.util.TokenBucket;
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.search.backpressure.settings.SearchBackpressureMode;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.search.backpressure.stats.SearchShardTaskStats;
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.search.backpressure.trackers.NodeDuressTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;
Expand Down Expand Up @@ -117,7 +121,8 @@ public SearchBackpressureService(
}

void doRun() {
if (getSettings().isEnabled() == false) {
SearchBackpressureMode mode = getSettings().getMode();
if (mode == SearchBackpressureMode.DISABLED) {
return;
}

Expand All @@ -126,7 +131,7 @@ void doRun() {
}

// We are only targeting in-flight cancellation of SearchShardTask for now.
List<CancellableTask> searchShardTasks = getSearchShardTasks();
List<SearchShardTask> searchShardTasks = getSearchShardTasks();

// Force-refresh usage stats of these tasks before making a cancellation decision.
taskResourceTrackingService.refreshResourceStats(searchShardTasks.toArray(new Task[0]));
Expand All @@ -138,12 +143,13 @@ void doRun() {

for (TaskCancellation taskCancellation : getTaskCancellations(searchShardTasks)) {
logger.debug(
"cancelling task [{}] due to high resource consumption [{}]",
"[{} mode] cancelling task [{}] due to high resource consumption [{}]",
mode.getName(),
taskCancellation.getTask().getId(),
taskCancellation.getReasonString()
);

if (getSettings().isEnforced() == false) {
if (mode != SearchBackpressureMode.ENFORCED) {
ketanv3 marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

Expand All @@ -159,7 +165,6 @@ void doRun() {
}

taskCancellation.cancel();
state.incrementCancellationCount();
}
}

Expand All @@ -182,7 +187,7 @@ boolean isNodeInDuress() {
/**
* Returns true if the increase in heap usage is due to search requests.
*/
boolean isHeapUsageDominatedBySearch(List<CancellableTask> searchShardTasks) {
boolean isHeapUsageDominatedBySearch(List<SearchShardTask> searchShardTasks) {
long usage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum();
long threshold = getSettings().getSearchShardTaskSettings().getTotalHeapBytesThreshold();
if (usage < threshold) {
Expand All @@ -196,12 +201,12 @@ boolean isHeapUsageDominatedBySearch(List<CancellableTask> searchShardTasks) {
/**
* Filters and returns the list of currently running SearchShardTasks.
*/
List<CancellableTask> getSearchShardTasks() {
List<SearchShardTask> getSearchShardTasks() {
return taskResourceTrackingService.getResourceAwareTasks()
.values()
.stream()
.filter(task -> task instanceof SearchShardTask)
.map(task -> (CancellableTask) task)
.map(task -> (SearchShardTask) task)
.collect(Collectors.toUnmodifiableList());
}

Expand All @@ -222,13 +227,17 @@ TaskCancellation getTaskCancellation(CancellableTask task) {
}
}

if (task instanceof SearchShardTask) {
callbacks.add(state::incrementCancellationCount);
}

return new TaskCancellation(task, reasons, callbacks);
}

/**
* Returns a list of TaskCancellations sorted by descending order of their cancellation scores.
*/
List<TaskCancellation> getTaskCancellations(List<CancellableTask> tasks) {
List<TaskCancellation> getTaskCancellations(List<? extends CancellableTask> tasks) {
return tasks.stream()
.map(this::getTaskCancellation)
.filter(TaskCancellation::isEligibleForCancellation)
Expand All @@ -246,7 +255,7 @@ SearchBackpressureState getState() {

@Override
public void onTaskCompleted(Task task) {
if (getSettings().isEnabled() == false) {
if (getSettings().getMode() == SearchBackpressureMode.DISABLED) {
ketanv3 marked this conversation as resolved.
Show resolved Hide resolved
return;
}

Expand Down Expand Up @@ -310,4 +319,17 @@ protected void doStop() {

@Override
protected void doClose() throws IOException {}

public SearchBackpressureStats nodeStats() {
List<SearchShardTask> searchShardTasks = getSearchShardTasks();

SearchShardTaskStats searchShardTaskStats = new SearchShardTaskStats(
state.getCancellationCount(),
state.getLimitReachedCount(),
taskResourceUsageTrackers.stream()
.collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks)))
);

return new SearchBackpressureStats(searchShardTaskStats, getSettings().getMode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.backpressure.settings;

/**
* Defines the search backpressure mode.
*/
public enum SearchBackpressureMode {
/**
* SearchBackpressureService is completely disabled.
*/
DISABLED("disabled"),

/**
* SearchBackpressureService only monitors the resource usage of running tasks.
*/
MONITOR_ONLY("monitor_only"),

/**
* SearchBackpressureService monitors and rejects tasks that exceed resource usage thresholds.
*/
ENFORCED("enforced");

private final String name;

SearchBackpressureMode(String name) {
this.name = name;
}

public String getName() {
return name;
}

public static SearchBackpressureMode fromName(String name) {
switch (name) {
case "disabled":
return DISABLED;
case "monitor_only":
return MONITOR_ONLY;
case "enforced":
return ENFORCED;
}

throw new IllegalArgumentException("Invalid SearchBackpressureMode: " + name);
}
}
Loading