From 8f79a40a080a27dc1be4f178821f8c284a24d38e Mon Sep 17 00:00:00 2001 From: William Brafford Date: Sat, 30 Mar 2024 23:51:48 -0400 Subject: [PATCH] Fill threadpool queues before testing requests that should be blocked --- .../indices/SystemIndexThreadPoolTests.java | 52 ++++++++++++------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/indices/SystemIndexThreadPoolTests.java b/test/framework/src/main/java/org/elasticsearch/indices/SystemIndexThreadPoolTests.java index ed83ac6393266..b97c39ce70792 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/SystemIndexThreadPoolTests.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/SystemIndexThreadPoolTests.java @@ -8,8 +8,8 @@ package org.elasticsearch.indices; -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.core.TimeValue; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -41,23 +41,6 @@ protected Set threadPoolsToBlock() { return Set.of(ThreadPool.Names.GET, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH); } - private void assertThreadPoolsBlocked() { - TimeValue timeout = TimeValue.timeValueMillis(25); - logger.info("cluster data nodes: " + cluster().numDataNodes() + ", data and master: " + cluster().numDataAndMasterNodes()); - var e1 = expectThrows( - ElasticsearchTimeoutException.class, - () -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get(timeout) - ); - assertThat(e1.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task.")); - var e2 = expectThrows(ElasticsearchTimeoutException.class, () -> client().prepareGet(USER_INDEX, "id").get(timeout)); - assertThat(e2.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task.")); - var e3 = expectThrows( - ElasticsearchTimeoutException.class, - () -> client().prepareSearch(USER_INDEX).setQuery(QueryBuilders.matchAllQuery()).get(timeout) - ); - assertThat(e3.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task.")); - } - protected void runWithBlockedThreadPools(Runnable runnable) { Phaser phaser = new Phaser(); Runnable waitAction = () -> { @@ -91,4 +74,35 @@ public void testUserThreadPoolsAreBlocked() { assertAcked(client().admin().indices().prepareDelete(USER_INDEX)); } + + private void assertThreadPoolsBlocked() { + fillThreadPoolQueues(); // rejections are easier to check than timeouts + + var e1 = expectThrows( + EsRejectedExecutionException.class, + () -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get() + ); + assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable")); + var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get()); + assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable")); + var e3 = expectThrows( + SearchPhaseExecutionException.class, + () -> client().prepareSearch(USER_INDEX).setQuery(QueryBuilders.matchAllQuery()).get() + ); + assertThat(e3.getMessage(), startsWith("all shards failed")); + } + + private void fillThreadPoolQueues() { + for (String nodeName : internalCluster().getNodeNames()) { + ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName); + for (String threadPoolName : threadPoolsToBlock()) { + ThreadPool.Info info = threadPool.info(threadPoolName); + + // fill up the queue + for (int i = 0; i < info.getQueueSize().singles(); i++) { + threadPool.executor(threadPoolName).submit(() -> {}); + } + } + } + } }