Skip to content

Commit

Permalink
Fill threadpool queues before testing requests that should be blocked
Browse files Browse the repository at this point in the history
  • Loading branch information
williamrandolph committed Mar 31, 2024
1 parent adf2c9f commit 8f79a40
Showing 1 changed file with 33 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package org.elasticsearch.indices;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -41,23 +41,6 @@ protected Set<String> threadPoolsToBlock() {
return Set.of(ThreadPool.Names.GET, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH);
}

private void assertThreadPoolsBlocked() {
TimeValue timeout = TimeValue.timeValueMillis(25);
logger.info("cluster data nodes: " + cluster().numDataNodes() + ", data and master: " + cluster().numDataAndMasterNodes());
var e1 = expectThrows(
ElasticsearchTimeoutException.class,
() -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get(timeout)
);
assertThat(e1.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task."));
var e2 = expectThrows(ElasticsearchTimeoutException.class, () -> client().prepareGet(USER_INDEX, "id").get(timeout));
assertThat(e2.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task."));
var e3 = expectThrows(
ElasticsearchTimeoutException.class,
() -> client().prepareSearch(USER_INDEX).setQuery(QueryBuilders.matchAllQuery()).get(timeout)
);
assertThat(e3.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task."));
}

protected void runWithBlockedThreadPools(Runnable runnable) {
Phaser phaser = new Phaser();
Runnable waitAction = () -> {
Expand Down Expand Up @@ -91,4 +74,35 @@ public void testUserThreadPoolsAreBlocked() {

assertAcked(client().admin().indices().prepareDelete(USER_INDEX));
}

private void assertThreadPoolsBlocked() {
fillThreadPoolQueues(); // rejections are easier to check than timeouts

var e1 = expectThrows(
EsRejectedExecutionException.class,
() -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get()
);
assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable"));
var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get());
assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable"));
var e3 = expectThrows(
SearchPhaseExecutionException.class,
() -> client().prepareSearch(USER_INDEX).setQuery(QueryBuilders.matchAllQuery()).get()
);
assertThat(e3.getMessage(), startsWith("all shards failed"));
}

private void fillThreadPoolQueues() {
for (String nodeName : internalCluster().getNodeNames()) {
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
for (String threadPoolName : threadPoolsToBlock()) {
ThreadPool.Info info = threadPool.info(threadPoolName);

// fill up the queue
for (int i = 0; i < info.getQueueSize().singles(); i++) {
threadPool.executor(threadPoolName).submit(() -> {});
}
}
}
}
}

0 comments on commit 8f79a40

Please sign in to comment.