Skip to content

Commit

Permalink
Simplified enabled/enforced flags with search backpressure mode
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed Nov 2, 2022
1 parent 0183d8d commit b17b52b
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,7 @@ public void apply(Settings value, Settings current, Settings previous) {
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,

// Settings related to search backpressure
SearchBackpressureSettings.SETTING_ENABLED,
SearchBackpressureSettings.SETTING_ENFORCED,
SearchBackpressureSettings.SETTING_MODE,
SearchBackpressureSettings.SETTING_CANCELLATION_RATIO,
SearchBackpressureSettings.SETTING_CANCELLATION_RATE,
SearchBackpressureSettings.SETTING_CANCELLATION_BURST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.common.util.TokenBucket;
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.search.backpressure.settings.SearchBackpressureMode;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.stats.CancelledTaskStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
Expand Down Expand Up @@ -121,7 +122,8 @@ public SearchBackpressureService(
}

void doRun() {
if (getSettings().isEnabled() == false) {
SearchBackpressureMode mode = getSettings().getMode();
if (mode == SearchBackpressureMode.DISABLED) {
return;
}

Expand All @@ -147,7 +149,7 @@ void doRun() {
taskCancellation.getReasonString()
);

if (getSettings().isEnforced() == false) {
if (mode != SearchBackpressureMode.ENFORCED) {
continue;
}

Expand Down Expand Up @@ -254,7 +256,7 @@ SearchBackpressureState getState() {

@Override
public void onTaskCompleted(Task task) {
if (getSettings().isEnabled() == false) {
if (getSettings().getMode() == SearchBackpressureMode.DISABLED) {
return;
}

Expand Down Expand Up @@ -330,6 +332,6 @@ public SearchBackpressureStats nodeStats() {
.collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks)))
);

return new SearchBackpressureStats(searchShardTaskStats, getSettings().isEnabled(), getSettings().isEnforced());
return new SearchBackpressureStats(searchShardTaskStats, getSettings().getMode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.backpressure.settings;

/**
* Defines the search backpressure mode.
*/
public enum SearchBackpressureMode {
/**
* SearchBackpressureService is completely disabled.
*/
DISABLED("disabled"),

/**
* SearchBackpressureService only monitors the resource usage of running tasks.
*/
MONITOR_ONLY("monitor_only"),

/**
* SearchBackpressureService monitors and rejects tasks that exceed resource usage thresholds.
*/
ENFORCED("enforced");

private final String name;

SearchBackpressureMode(String name) {
this.name = name;
}

public String getName() {
return name;
}

public static SearchBackpressureMode fromName(String name) {
switch (name) {
case "disabled":
return DISABLED;
case "monitor_only":
return MONITOR_ONLY;
case "enforced":
return ENFORCED;
}

throw new IllegalArgumentException("invalid TaskResourceUsageTrackerType: " + name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
public class SearchBackpressureSettings {
private static class Defaults {
private static final long INTERVAL_MILLIS = 1000;

private static final boolean ENABLED = true;
private static final boolean ENFORCED = false;
private static final String MODE = "monitor_only";

private static final double CANCELLATION_RATIO = 0.1;
private static final double CANCELLATION_RATE = 0.003;
Expand All @@ -48,23 +46,12 @@ private static class Defaults {
);

/**
* Defines whether search backpressure is enabled or not.
*/
private volatile boolean enabled;
public static final Setting<Boolean> SETTING_ENABLED = Setting.boolSetting(
"search_backpressure.enabled",
Defaults.ENABLED,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Defines whether in-flight cancellation of tasks is enabled or not.
* Defines the search backpressure mode. It can be either "disabled", "monitor_only" or "enforced".
*/
private volatile boolean enforced;
public static final Setting<Boolean> SETTING_ENFORCED = Setting.boolSetting(
"search_backpressure.enforced",
Defaults.ENFORCED,
private volatile SearchBackpressureMode mode;
public static final Setting<String> SETTING_MODE = Setting.simpleString(
"search_backpressure.mode",
Defaults.MODE,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
Expand Down Expand Up @@ -133,11 +120,8 @@ public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSett

interval = new TimeValue(SETTING_INTERVAL_MILLIS.get(settings));

enabled = SETTING_ENABLED.get(settings);
clusterSettings.addSettingsUpdateConsumer(SETTING_ENABLED, this::setEnabled);

enforced = SETTING_ENFORCED.get(settings);
clusterSettings.addSettingsUpdateConsumer(SETTING_ENFORCED, this::setEnforced);
mode = SearchBackpressureMode.fromName(SETTING_MODE.get(settings));
clusterSettings.addSettingsUpdateConsumer(SETTING_MODE, s -> this.setMode(SearchBackpressureMode.fromName(s)));

cancellationRatio = SETTING_CANCELLATION_RATIO.get(settings);
clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATIO, this::setCancellationRatio);
Expand Down Expand Up @@ -173,20 +157,12 @@ public TimeValue getInterval() {
return interval;
}

public boolean isEnabled() {
return enabled;
}

private void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public boolean isEnforced() {
return enforced;
public SearchBackpressureMode getMode() {
return mode;
}

private void setEnforced(boolean enforced) {
this.enforced = enforced;
public void setMode(SearchBackpressureMode mode) {
this.mode = mode;
}

public double getCancellationRatio() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.search.backpressure.settings.SearchBackpressureMode;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -22,45 +23,41 @@
*/
public class SearchBackpressureStats implements ToXContentFragment, Writeable {
private final SearchShardTaskStats searchShardTaskStats;
private final boolean enabled;
private final boolean enforced;
private final SearchBackpressureMode mode;

public SearchBackpressureStats(SearchShardTaskStats searchShardTaskStats, boolean enabled, boolean enforced) {
public SearchBackpressureStats(SearchShardTaskStats searchShardTaskStats, SearchBackpressureMode mode) {
this.searchShardTaskStats = searchShardTaskStats;
this.enabled = enabled;
this.enforced = enforced;
this.mode = mode;
}

public SearchBackpressureStats(StreamInput in) throws IOException {
this(new SearchShardTaskStats(in), in.readBoolean(), in.readBoolean());
this(new SearchShardTaskStats(in), SearchBackpressureMode.fromName(in.readString()));
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject("search_backpressure")
.field("search_shard_task", searchShardTaskStats)
.field("enabled", enabled)
.field("enforced", enforced)
.field("mode", mode.getName())
.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
searchShardTaskStats.writeTo(out);
out.writeBoolean(enabled);
out.writeBoolean(enforced);
out.writeString(mode.getName());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchBackpressureStats that = (SearchBackpressureStats) o;
return enabled == that.enabled && enforced == that.enforced && searchShardTaskStats.equals(that.searchShardTaskStats);
return searchShardTaskStats.equals(that.searchShardTaskStats) && mode == that.mode;
}

@Override
public int hashCode() {
return Objects.hash(searchShardTaskStats, enabled, enforced);
return Objects.hash(searchShardTaskStats, mode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.search.backpressure.settings.SearchBackpressureMode;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.search.backpressure.trackers.NodeDuressTracker;
Expand Down Expand Up @@ -159,8 +160,7 @@ public Stats stats(List<? extends Task> activeTasks) {
SearchBackpressureSettings settings = spy(
new SearchBackpressureSettings(
Settings.builder()
.put(SearchBackpressureSettings.SETTING_ENABLED.getKey(), true)
.put(SearchBackpressureSettings.SETTING_ENFORCED.getKey(), true)
.put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced")
.put(SearchBackpressureSettings.SETTING_CANCELLATION_RATIO.getKey(), 0.1)
.put(SearchBackpressureSettings.SETTING_CANCELLATION_RATE.getKey(), 0.003)
.put(SearchBackpressureSettings.SETTING_CANCELLATION_BURST.getKey(), 10.0)
Expand Down Expand Up @@ -233,8 +233,7 @@ public Stats stats(List<? extends Task> activeTasks) {
new CancelledTaskStats(500, 500, 1000000000),
Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(15))
),
true,
true
SearchBackpressureMode.ENFORCED
);
SearchBackpressureStats actualStats = service.nodeStats();
assertEquals(expectedStats, actualStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.search.backpressure.stats;

import org.opensearch.common.io.stream.Writeable;
import org.opensearch.search.backpressure.settings.SearchBackpressureMode;
import org.opensearch.test.AbstractWireSerializingTestCase;

public class SearchBackpressureStatsTests extends AbstractWireSerializingTestCase<SearchBackpressureStats> {
Expand All @@ -23,6 +24,9 @@ protected SearchBackpressureStats createTestInstance() {
}

public static SearchBackpressureStats randomInstance() {
return new SearchBackpressureStats(SearchShardTaskStatsTests.randomInstance(), randomBoolean(), randomBoolean());
return new SearchBackpressureStats(
SearchShardTaskStatsTests.randomInstance(),
randomFrom(SearchBackpressureMode.DISABLED, SearchBackpressureMode.MONITOR_ONLY, SearchBackpressureMode.ENFORCED)
);
}
}

0 comments on commit b17b52b

Please sign in to comment.