From 7c3465764434ddee5f1f85262ab49c576bc45bf2 Mon Sep 17 00:00:00 2001 From: PritLadani Date: Sat, 19 Nov 2022 01:04:37 +0530 Subject: [PATCH 1/6] Adding integration tests for search backpressure Signed-off-by: PritLadani --- .../backpressure/SearchBackpressureIT.java | 284 ++++++++++++++++++ 1 file changed, 284 insertions(+) create mode 100644 server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java new file mode 100644 index 0000000000000..5527f266066c5 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java @@ -0,0 +1,284 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.ActionResponse; +import org.opensearch.action.ActionType; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.search.backpressure.settings.NodeDuressSettings; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.search.backpressure.trackers.CpuUsageTracker; +import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; +import org.opensearch.tasks.ResourceUsageMetric; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancelledException; +import org.opensearch.tasks.TaskId; +import org.opensearch.tasks.ResourceStats; +import org.opensearch.tasks.ResourceStatsType; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE) +public class SearchBackpressureIT extends OpenSearchIntegTestCase { + + private static final TimeValue TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") + .put(SearchBackpressureSettings.SETTING_CANCELLATION_RATIO.getKey(), 1) + .put(SearchBackpressureSettings.SETTING_CANCELLATION_RATE.getKey(), 10) + .put(SearchBackpressureSettings.SETTING_CANCELLATION_BURST.getKey(), 10) + .put(NodeDuressSettings.SETTING_CPU_THRESHOLD.getKey(), 0.3) + .put(NodeDuressSettings.SETTING_HEAP_THRESHOLD.getKey(), 0.0) + .put(NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES.getKey(), 1) + .put(SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD.getKey(), 0.0) + .build(); + } + + @Override + protected Collection> nodePlugins() { + final List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(TestPlugin.class); + return plugins; + } + + public void testSearchShardTaskCancellationWithHighElapsedTime() throws InterruptedException { + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder().put(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 5000) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + CountDownLatch latch = new CountDownLatch(1); + + client().execute(TestTransportAction.ACTION, generateRequestWithHighElapsedTime(8000), new ActionListener<>() { + @Override + public void onResponse(TestResponse testResponse) { + fail("SearchShardTask should have been cancelled"); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + logger.info("testSearchShardTaskCancellationWithHighElapsedTime onFailure e = " + e.getMessage()); + assertEquals(TaskCancelledException.class, e.getClass()); + assertTrue(e.getMessage().contains("elapsed time exceeded")); + latch.countDown(); + } + }); + + latch.await(); + resetSettingValue(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey()); + } + + public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedException { + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 1000)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + CountDownLatch latch = new CountDownLatch(1); + + client().execute(TestTransportAction.ACTION, generateRequestWithHigCpuUsage(0.8, 10000), new ActionListener<>() { + @Override + public void onResponse(TestResponse testResponse) { + fail("SearchShardTask should have been cancelled"); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + logger.info("testSearchShardTaskCancellationWithHighCpu onFailure e = " + e.getMessage()); + assertEquals(TaskCancelledException.class, e.getClass()); + assertTrue(e.getMessage().contains("cpu usage exceeded")); + latch.countDown(); + } + }); + + latch.await(); + resetSettingValue(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey()); + } + + public void testSearchCancellationWithBackpressureDisabled() throws InterruptedException { + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(SearchBackpressureSettings.SETTING_MODE.getKey(), "monitor_only")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + CountDownLatch latch = new CountDownLatch(1); + + client().execute(TestTransportAction.ACTION, generateRequestWithHigCpuUsage(0.8, 10000), new ActionListener<>() { + @Override + public void onResponse(TestResponse testResponse) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("SearchShardTask shouldn't have cancelled for monitor_only mode"); + latch.countDown(); + } + }); + + latch.await(); + resetSettingValue(SearchBackpressureSettings.SETTING_MODE.getKey()); + } + + /** + * Returns a {@link TestRequest} which will continue running for given milliseconds + * @param duration time in milliseconds + */ + private TestRequest generateRequestWithHighElapsedTime(long duration) { + return new TestRequest(() -> { + try { + Thread.sleep(duration); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + + /** + * Returns a {@link TestRequest} which is CPU heavy + * @param load percentage load to the CPU + * @param duration time in milliseconds + */ + private TestRequest generateRequestWithHigCpuUsage(double load, long duration) { + return new TestRequest(() -> { + long startTime = System.currentTimeMillis(); + try { + while (System.currentTimeMillis() - startTime < duration) { + // sleeping for the percentage of unladen time every 100ms + if (System.currentTimeMillis() % 100 == 0) { + Thread.sleep((long) Math.floor((1 - load) * 100)); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + + private void resetSettingValue(String key) { + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder().putNull(key)).get(); + } + + public static class TestRequest extends ActionRequest { + Runnable runnable; + + public TestRequest(StreamInput in) {} + + public TestRequest(Runnable runnable) { + this.runnable = runnable; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchShardTask(id, type, action, "", parentTaskId, headers); + } + + public void execute() { + runnable.run(); + } + } + + public static class TestResponse extends ActionResponse { + public TestResponse() {} + + public TestResponse(StreamInput in) {} + + @Override + public void writeTo(StreamOutput out) throws IOException {} + } + + public static class TestTransportAction extends HandledTransportAction { + public static final ActionType ACTION = new ActionType<>("internal::test_action", TestResponse::new); + + @Inject + public TestTransportAction(TransportService transportService, NodeClient client, ActionFilters actionFilters) { + super(ACTION.name(), transportService, actionFilters, TestRequest::new, ThreadPool.Names.GENERIC); + } + + @Override + protected void doExecute(Task task, TestRequest request, ActionListener listener) { + try { + SearchShardTask searchShardTask = (SearchShardTask) task; + // starting resource tracking for the current thread + ResourceUsageMetric[] initialTaskResourceMetrics = new ResourceUsageMetric[] { + new ResourceUsageMetric(ResourceStats.MEMORY, 0), + new ResourceUsageMetric(ResourceStats.CPU, 0) }; + task.startThreadResourceTracking( + Thread.currentThread().getId(), + ResourceStatsType.WORKER_STATS, + initialTaskResourceMetrics + ); + long startTime = System.nanoTime(); + + // Doing a busy-wait until task cancellation or timeout. + do { + request.execute(); + Thread.sleep(500); + } while (searchShardTask.isCancelled() == false && (System.nanoTime() - startTime) < TIMEOUT.getNanos()); + + if (searchShardTask.isCancelled()) { + throw new TaskCancelledException(searchShardTask.getReasonCancelled()); + } else { + listener.onResponse(new TestResponse()); + } + } catch (Exception e) { + listener.onFailure(e); + } + } + } + + public static class TestPlugin extends Plugin implements ActionPlugin { + @Override + public List> getActions() { + return Collections.singletonList(new ActionHandler<>(TestTransportAction.ACTION, TestTransportAction.class)); + } + + @Override + public List> getClientActions() { + return Collections.singletonList(TestTransportAction.ACTION); + } + } +} From ec08b5834bd42bc0d5d62d4046b9f04423c814e3 Mon Sep 17 00:00:00 2001 From: PritLadani Date: Mon, 21 Nov 2022 17:37:59 +0530 Subject: [PATCH 2/6] Addressing PR comments Signed-off-by: PritLadani --- .../backpressure/SearchBackpressureIT.java | 200 ++++++++---------- 1 file changed, 92 insertions(+), 108 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java index 5527f266066c5..de6865f1b3e14 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java @@ -8,16 +8,16 @@ package org.opensearch.search.backpressure; +import org.junit.After; +import org.junit.Before; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.ActionResponse; import org.opensearch.action.ActionType; -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.search.SearchShardTask; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.client.node.NodeClient; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -30,12 +30,9 @@ import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; -import org.opensearch.tasks.ResourceUsageMetric; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancelledException; import org.opensearch.tasks.TaskId; -import org.opensearch.tasks.ResourceStats; -import org.opensearch.tasks.ResourceStatsType; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -58,17 +55,7 @@ public class SearchBackpressureIT extends OpenSearchIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") - .put(SearchBackpressureSettings.SETTING_CANCELLATION_RATIO.getKey(), 1) - .put(SearchBackpressureSettings.SETTING_CANCELLATION_RATE.getKey(), 10) - .put(SearchBackpressureSettings.SETTING_CANCELLATION_BURST.getKey(), 10) - .put(NodeDuressSettings.SETTING_CPU_THRESHOLD.getKey(), 0.3) - .put(NodeDuressSettings.SETTING_HEAP_THRESHOLD.getKey(), 0.0) - .put(NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES.getKey(), 1) - .put(SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD.getKey(), 0.0) - .build(); + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); } @Override @@ -78,16 +65,38 @@ protected Collection> nodePlugins() { return plugins; } - public void testSearchShardTaskCancellationWithHighElapsedTime() throws InterruptedException { - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings( - Settings.builder().put(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 5000) + @Before + public final void setupNodeSettings() { + Settings request = Settings.builder() + .put(NodeDuressSettings.SETTING_CPU_THRESHOLD.getKey(), 0.0) + .put(NodeDuressSettings.SETTING_HEAP_THRESHOLD.getKey(), 0.0) + .put(NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES.getKey(), 1) + .put(SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD.getKey(), 0.0) + .build(); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); + } + + @After + public final void cleanupNodeSettings() { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("*")) + .setTransientSettings(Settings.builder().putNull("*")) ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } + + public void testSearchShardTaskCancellationWithHighElapsedTime() throws InterruptedException { + Settings request = Settings.builder() + .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") + .put(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 5000) + .build(); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); CountDownLatch latch = new CountDownLatch(1); - client().execute(TestTransportAction.ACTION, generateRequestWithHighElapsedTime(8000), new ActionListener<>() { + client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), new ActionListener<>() { @Override public void onResponse(TestResponse testResponse) { fail("SearchShardTask should have been cancelled"); @@ -96,25 +105,25 @@ public void onResponse(TestResponse testResponse) { @Override public void onFailure(Exception e) { - logger.info("testSearchShardTaskCancellationWithHighElapsedTime onFailure e = " + e.getMessage()); assertEquals(TaskCancelledException.class, e.getClass()); assertTrue(e.getMessage().contains("elapsed time exceeded")); latch.countDown(); } }); - latch.await(); - resetSettingValue(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey()); + latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); } public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedException { - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings(Settings.builder().put(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 1000)); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + Settings request = Settings.builder() + .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") + .put(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 15000) + .build(); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); CountDownLatch latch = new CountDownLatch(1); - client().execute(TestTransportAction.ACTION, generateRequestWithHigCpuUsage(0.8, 10000), new ActionListener<>() { + client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_CPU), new ActionListener<>() { @Override public void onResponse(TestResponse testResponse) { fail("SearchShardTask should have been cancelled"); @@ -123,25 +132,22 @@ public void onResponse(TestResponse testResponse) { @Override public void onFailure(Exception e) { - logger.info("testSearchShardTaskCancellationWithHighCpu onFailure e = " + e.getMessage()); assertEquals(TaskCancelledException.class, e.getClass()); assertTrue(e.getMessage().contains("cpu usage exceeded")); latch.countDown(); } }); - latch.await(); - resetSettingValue(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey()); + latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); } public void testSearchCancellationWithBackpressureDisabled() throws InterruptedException { - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings(Settings.builder().put(SearchBackpressureSettings.SETTING_MODE.getKey(), "monitor_only")); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + Settings request = Settings.builder().put(SearchBackpressureSettings.SETTING_MODE.getKey(), "monitor_only").build(); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); CountDownLatch latch = new CountDownLatch(1); - client().execute(TestTransportAction.ACTION, generateRequestWithHigCpuUsage(0.8, 10000), new ActionListener<>() { + client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), new ActionListener<>() { @Override public void onResponse(TestResponse testResponse) { latch.countDown(); @@ -154,56 +160,24 @@ public void onFailure(Exception e) { } }); - latch.await(); - resetSettingValue(SearchBackpressureSettings.SETTING_MODE.getKey()); - } - - /** - * Returns a {@link TestRequest} which will continue running for given milliseconds - * @param duration time in milliseconds - */ - private TestRequest generateRequestWithHighElapsedTime(long duration) { - return new TestRequest(() -> { - try { - Thread.sleep(duration); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); - } - - /** - * Returns a {@link TestRequest} which is CPU heavy - * @param load percentage load to the CPU - * @param duration time in milliseconds - */ - private TestRequest generateRequestWithHigCpuUsage(double load, long duration) { - return new TestRequest(() -> { - long startTime = System.currentTimeMillis(); - try { - while (System.currentTimeMillis() - startTime < duration) { - // sleeping for the percentage of unladen time every 100ms - if (System.currentTimeMillis() % 100 == 0) { - Thread.sleep((long) Math.floor((1 - load) * 100)); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); + latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); } - private void resetSettingValue(String key) { - client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder().putNull(key)).get(); + enum RequestType { + HIGH_CPU, + HIGH_ELAPSED_TIME; } public static class TestRequest extends ActionRequest { - Runnable runnable; + private final RequestType type; - public TestRequest(StreamInput in) {} + public TestRequest(RequestType type) { + this.type = type; + } - public TestRequest(Runnable runnable) { - this.runnable = runnable; + public TestRequest(StreamInput in) throws IOException { + super(in); + this.type = in.readEnum(RequestType.class); } @Override @@ -216,8 +190,8 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, return new SearchShardTask(id, type, action, "", parentTaskId, headers); } - public void execute() { - runnable.run(); + public RequestType getType() { + return this.type; } } @@ -232,40 +206,50 @@ public void writeTo(StreamOutput out) throws IOException {} public static class TestTransportAction extends HandledTransportAction { public static final ActionType ACTION = new ActionType<>("internal::test_action", TestResponse::new); + private final ThreadPool threadPool; @Inject - public TestTransportAction(TransportService transportService, NodeClient client, ActionFilters actionFilters) { - super(ACTION.name(), transportService, actionFilters, TestRequest::new, ThreadPool.Names.GENERIC); + public TestTransportAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters) { + super(ACTION.name(), transportService, actionFilters, TestRequest::new); + this.threadPool = threadPool; } @Override protected void doExecute(Task task, TestRequest request, ActionListener listener) { - try { - SearchShardTask searchShardTask = (SearchShardTask) task; - // starting resource tracking for the current thread - ResourceUsageMetric[] initialTaskResourceMetrics = new ResourceUsageMetric[] { - new ResourceUsageMetric(ResourceStats.MEMORY, 0), - new ResourceUsageMetric(ResourceStats.CPU, 0) }; - task.startThreadResourceTracking( - Thread.currentThread().getId(), - ResourceStatsType.WORKER_STATS, - initialTaskResourceMetrics - ); - long startTime = System.nanoTime(); - - // Doing a busy-wait until task cancellation or timeout. - do { - request.execute(); - Thread.sleep(500); - } while (searchShardTask.isCancelled() == false && (System.nanoTime() - startTime) < TIMEOUT.getNanos()); - - if (searchShardTask.isCancelled()) { - throw new TaskCancelledException(searchShardTask.getReasonCancelled()); - } else { - listener.onResponse(new TestResponse()); + threadPool.executor(ThreadPool.Names.SEARCH).execute(() -> { + try { + SearchShardTask searchShardTask = (SearchShardTask) task; + long startTime = System.nanoTime(); + + // Doing a busy-wait until task cancellation or timeout. + do { + doWork(request); + } while (searchShardTask.isCancelled() == false && (System.nanoTime() - startTime) < TIMEOUT.getNanos()); + + if (searchShardTask.isCancelled()) { + throw new TaskCancelledException(searchShardTask.getReasonCancelled()); + } else { + listener.onResponse(new TestResponse()); + } + } catch (Exception e) { + listener.onFailure(e); } - } catch (Exception e) { - listener.onFailure(e); + }); + } + + private void doWork(TestRequest request) throws InterruptedException { + switch (request.getType()) { + case HIGH_CPU: + long i = 0, j = 1, k = 1, iterations = 1000; + do { + j += i; + k *= j; + i++; + } while (i < iterations); + break; + case HIGH_ELAPSED_TIME: + Thread.sleep(100); + break; } } } From 09496ef12c88ed1a92bb5c0da34ffb1d3163ad87 Mon Sep 17 00:00:00 2001 From: PritLadani Date: Mon, 21 Nov 2022 20:17:22 +0530 Subject: [PATCH 3/6] Addressing PR comments Signed-off-by: PritLadani --- .../search/backpressure/SearchBackpressureIT.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java index de6865f1b3e14..31c6ef9d0ab54 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java @@ -53,11 +53,6 @@ public class SearchBackpressureIT extends OpenSearchIntegTestCase { private static final TimeValue TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); - } - @Override protected Collection> nodePlugins() { final List> plugins = new ArrayList<>(super.nodePlugins()); @@ -190,6 +185,12 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, return new SearchShardTask(id, type, action, "", parentTaskId, headers); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeEnum(type); + } + public RequestType getType() { return this.type; } From cbeb1e6c403357a1925aa9086ea6fda0abd8d27f Mon Sep 17 00:00:00 2001 From: PritLadani Date: Tue, 22 Nov 2022 16:19:21 +0530 Subject: [PATCH 4/6] Adding tests for high heap usage Signed-off-by: PritLadani --- .../backpressure/SearchBackpressureIT.java | 131 ++++++++++++------ 1 file changed, 87 insertions(+), 44 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java index 31c6ef9d0ab54..490c4cc948db6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java @@ -30,6 +30,7 @@ import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; +import org.opensearch.search.backpressure.trackers.HeapUsageTracker; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancelledException; import org.opensearch.tasks.TaskId; @@ -85,55 +86,68 @@ public final void cleanupNodeSettings() { public void testSearchShardTaskCancellationWithHighElapsedTime() throws InterruptedException { Settings request = Settings.builder() .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") - .put(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 5000) + .put(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 1000) .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); CountDownLatch latch = new CountDownLatch(1); - - client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), new ActionListener<>() { - @Override - public void onResponse(TestResponse testResponse) { - fail("SearchShardTask should have been cancelled"); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - assertEquals(TaskCancelledException.class, e.getClass()); - assertTrue(e.getMessage().contains("elapsed time exceeded")); - latch.countDown(); - } - }); - + ExceptionCatchingListener listener = new ExceptionCatchingListener(latch); + client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), listener); latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); + + Exception caughtException = listener.getException(); + assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException); + assertEquals(TaskCancelledException.class, caughtException.getClass()); + assertTrue(caughtException.getMessage().contains("elapsed time exceeded")); } public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedException { Settings request = Settings.builder() .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") - .put(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 15000) + .put(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 1000) .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); CountDownLatch latch = new CountDownLatch(1); + ExceptionCatchingListener listener = new ExceptionCatchingListener(latch); + client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_CPU), listener); + latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); - client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_CPU), new ActionListener<>() { - @Override - public void onResponse(TestResponse testResponse) { - fail("SearchShardTask should have been cancelled"); - latch.countDown(); - } + Exception caughtException = listener.getException(); + assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException); + assertEquals(TaskCancelledException.class, caughtException.getClass()); + assertTrue(caughtException.getMessage().contains("cpu usage exceeded")); + } - @Override - public void onFailure(Exception e) { - assertEquals(TaskCancelledException.class, e.getClass()); - assertTrue(e.getMessage().contains("cpu usage exceeded")); - latch.countDown(); - } - }); + public void testSearchShardTaskCancellationWithHighHeapUsage() throws InterruptedException { + // Before SearchBackpressureService cancels a task based on its heap usage, we need to build up the heap moving average + // To build up the heap moving average, we need to hit the same node with multiple requests and then hit the same node with a + // request having higher heap usage + String node = internalCluster().startDataOnlyNode(); + final int MOVING_AVERAGE_WINDOW_SIZE = 10; + Settings request = Settings.builder() + .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") + .put(HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD.getKey(), 0.00001) + .put(HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD.getKey(), 1.0) + .put(HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.getKey(), MOVING_AVERAGE_WINDOW_SIZE) + .build(); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); + CountDownLatch latch = new CountDownLatch(1); + ExceptionCatchingListener listener = new ExceptionCatchingListener(latch); + for (int i = 0; i < MOVING_AVERAGE_WINDOW_SIZE; i++) { + client(node).execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_HEAP), listener); + } + + latch = new CountDownLatch(1); + listener = new ExceptionCatchingListener(latch); + client(node).execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGHER_HEAP), listener); latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); + + Exception caughtException = listener.getException(); + assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException); + assertEquals(TaskCancelledException.class, caughtException.getClass()); + assertTrue(caughtException.getMessage().contains("heap usage exceeded")); } public void testSearchCancellationWithBackpressureDisabled() throws InterruptedException { @@ -141,25 +155,43 @@ public void testSearchCancellationWithBackpressureDisabled() throws InterruptedE assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); CountDownLatch latch = new CountDownLatch(1); + ExceptionCatchingListener listener = new ExceptionCatchingListener(latch); + client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), listener); + latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); - client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), new ActionListener<>() { - @Override - public void onResponse(TestResponse testResponse) { - latch.countDown(); - } + Exception caughtException = listener.getException(); + assertNull("SearchShardTask shouldn't have cancelled for monitor_only mode", caughtException); - @Override - public void onFailure(Exception e) { - fail("SearchShardTask shouldn't have cancelled for monitor_only mode"); - latch.countDown(); - } - }); + } - latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); + private static class ExceptionCatchingListener implements ActionListener { + private final CountDownLatch latch; + private Exception exception = null; + + public ExceptionCatchingListener(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void onResponse(TestResponse r) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + this.exception = e; + latch.countDown(); + } + + private Exception getException() { + return exception; + } } enum RequestType { HIGH_CPU, + HIGH_HEAP, + HIGHER_HEAP, HIGH_ELAPSED_TIME; } @@ -223,9 +255,12 @@ protected void doExecute(Task task, TestRequest request, ActionListener Date: Tue, 22 Nov 2022 22:09:43 +0530 Subject: [PATCH 5/6] Addressing PR comments Signed-off-by: PritLadani --- .../opensearch/search/backpressure/SearchBackpressureIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java index 490c4cc948db6..1e4702409df02 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java @@ -123,11 +123,11 @@ public void testSearchShardTaskCancellationWithHighHeapUsage() throws Interrupte // Before SearchBackpressureService cancels a task based on its heap usage, we need to build up the heap moving average // To build up the heap moving average, we need to hit the same node with multiple requests and then hit the same node with a // request having higher heap usage - String node = internalCluster().startDataOnlyNode(); + String node = randomFrom(internalCluster().getNodeNames()); final int MOVING_AVERAGE_WINDOW_SIZE = 10; Settings request = Settings.builder() .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") - .put(HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD.getKey(), 0.00001) + .put(HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD.getKey(), 0.0) .put(HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD.getKey(), 1.0) .put(HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.getKey(), MOVING_AVERAGE_WINDOW_SIZE) .build(); From e336a048ac9248f0ba45173370a52beda5fe9c17 Mon Sep 17 00:00:00 2001 From: PritLadani Date: Wed, 23 Nov 2022 00:32:21 +0530 Subject: [PATCH 6/6] Adding assertion on latch.await() Signed-off-by: PritLadani --- .../backpressure/SearchBackpressureIT.java | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java index 1e4702409df02..f8629e2c88b07 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java @@ -8,6 +8,7 @@ package org.opensearch.search.backpressure; +import org.hamcrest.MatcherAssert; import org.junit.After; import org.junit.Before; import org.opensearch.action.ActionListener; @@ -47,12 +48,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE) public class SearchBackpressureIT extends OpenSearchIntegTestCase { - private static final TimeValue TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); + private static final TimeValue TIMEOUT = new TimeValue(10, TimeUnit.SECONDS); @Override protected Collection> nodePlugins() { @@ -90,15 +93,14 @@ public void testSearchShardTaskCancellationWithHighElapsedTime() throws Interrup .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); - CountDownLatch latch = new CountDownLatch(1); - ExceptionCatchingListener listener = new ExceptionCatchingListener(latch); + ExceptionCatchingListener listener = new ExceptionCatchingListener(); client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), listener); - latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); + assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); Exception caughtException = listener.getException(); assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException); - assertEquals(TaskCancelledException.class, caughtException.getClass()); - assertTrue(caughtException.getMessage().contains("elapsed time exceeded")); + MatcherAssert.assertThat(caughtException, instanceOf(TaskCancelledException.class)); + MatcherAssert.assertThat(caughtException.getMessage(), containsString("elapsed time exceeded")); } public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedException { @@ -108,15 +110,14 @@ public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedExcep .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); - CountDownLatch latch = new CountDownLatch(1); - ExceptionCatchingListener listener = new ExceptionCatchingListener(latch); + ExceptionCatchingListener listener = new ExceptionCatchingListener(); client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_CPU), listener); - latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); + assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); Exception caughtException = listener.getException(); assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException); - assertEquals(TaskCancelledException.class, caughtException.getClass()); - assertTrue(caughtException.getMessage().contains("cpu usage exceeded")); + MatcherAssert.assertThat(caughtException, instanceOf(TaskCancelledException.class)); + MatcherAssert.assertThat(caughtException.getMessage(), containsString("cpu usage exceeded")); } public void testSearchShardTaskCancellationWithHighHeapUsage() throws InterruptedException { @@ -133,43 +134,43 @@ public void testSearchShardTaskCancellationWithHighHeapUsage() throws Interrupte .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); - CountDownLatch latch = new CountDownLatch(1); - ExceptionCatchingListener listener = new ExceptionCatchingListener(latch); + ExceptionCatchingListener listener = new ExceptionCatchingListener(); for (int i = 0; i < MOVING_AVERAGE_WINDOW_SIZE; i++) { client(node).execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_HEAP), listener); } - latch = new CountDownLatch(1); - listener = new ExceptionCatchingListener(latch); + listener = new ExceptionCatchingListener(); client(node).execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGHER_HEAP), listener); - latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); + assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); Exception caughtException = listener.getException(); assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException); - assertEquals(TaskCancelledException.class, caughtException.getClass()); - assertTrue(caughtException.getMessage().contains("heap usage exceeded")); + MatcherAssert.assertThat(caughtException, instanceOf(TaskCancelledException.class)); + MatcherAssert.assertThat(caughtException.getMessage(), containsString("heap usage exceeded")); } public void testSearchCancellationWithBackpressureDisabled() throws InterruptedException { Settings request = Settings.builder().put(SearchBackpressureSettings.SETTING_MODE.getKey(), "monitor_only").build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); - CountDownLatch latch = new CountDownLatch(1); - ExceptionCatchingListener listener = new ExceptionCatchingListener(latch); + ExceptionCatchingListener listener = new ExceptionCatchingListener(); client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), listener); - latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); + // waiting for the TIMEOUT * 3 time for the request to complete and the latch to countdown. + assertTrue( + "SearchShardTask should have been completed by now and countdown the latch", + listener.latch.await(TIMEOUT.getSeconds() * 3, TimeUnit.SECONDS) + ); Exception caughtException = listener.getException(); assertNull("SearchShardTask shouldn't have cancelled for monitor_only mode", caughtException); - } private static class ExceptionCatchingListener implements ActionListener { private final CountDownLatch latch; private Exception exception = null; - public ExceptionCatchingListener(CountDownLatch latch) { - this.latch = latch; + public ExceptionCatchingListener() { + this.latch = new CountDownLatch(1); } @Override