Skip to content

Commit

Permalink
Revert "Check thread pools directly when verifying that they're blocked"
Browse files Browse the repository at this point in the history
This reverts commit da6bca8.
  • Loading branch information
williamrandolph committed Mar 31, 2024
1 parent da6bca8 commit adf2c9f
Showing 1 changed file with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@

package org.elasticsearch.indices;

import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.Phaser;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.startsWith;

/**
Expand All @@ -37,6 +41,23 @@ protected Set<String> threadPoolsToBlock() {
return Set.of(ThreadPool.Names.GET, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH);
}

private void assertThreadPoolsBlocked() {
TimeValue timeout = TimeValue.timeValueMillis(25);
logger.info("cluster data nodes: " + cluster().numDataNodes() + ", data and master: " + cluster().numDataAndMasterNodes());
var e1 = expectThrows(
ElasticsearchTimeoutException.class,
() -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get(timeout)
);
assertThat(e1.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task."));
var e2 = expectThrows(ElasticsearchTimeoutException.class, () -> client().prepareGet(USER_INDEX, "id").get(timeout));
assertThat(e2.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task."));
var e3 = expectThrows(
ElasticsearchTimeoutException.class,
() -> client().prepareSearch(USER_INDEX).setQuery(QueryBuilders.matchAllQuery()).get(timeout)
);
assertThat(e3.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task."));
}

protected void runWithBlockedThreadPools(Runnable runnable) {
Phaser phaser = new Phaser();
Runnable waitAction = () -> {
Expand All @@ -63,26 +84,11 @@ protected void runWithBlockedThreadPools(Runnable runnable) {
}
}

public void testThreadPoolBlocking() {
runWithBlockedThreadPools(() -> {
for (String nodeName : internalCluster().getNodeNames()) {
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
for (String threadPoolName : threadPoolsToBlock()) {
ThreadPool.Info info = threadPool.info(threadPoolName);
public void testUserThreadPoolsAreBlocked() {
assertAcked(client().admin().indices().prepareCreate(USER_INDEX));

// fill up the queue
for (int i = 0; i < info.getQueueSize().singles(); i++) {
threadPool.executor(threadPoolName).submit(() -> {});
}
runWithBlockedThreadPools(this::assertThreadPoolsBlocked);

// next task throws exception
var exception = expectThrows(
EsRejectedExecutionException.class,
() -> threadPool.executor(threadPoolName).submit(() -> {})
);
assertThat(exception.getMessage(), startsWith("rejected execution"));
}
}
});
assertAcked(client().admin().indices().prepareDelete(USER_INDEX));
}
}

0 comments on commit adf2c9f

Please sign in to comment.