Skip to content

Commit

Permalink
Introduce utility for concurrent execution of arbitrary Runnable in t…
Browse files Browse the repository at this point in the history
…ests (elastic#110552)

We have the same pattern in a bunch of places, I dried up a few here. We
want to run N tasks so we create N threads in a loop, start them and
join them right away. The node starting logic refactored here is
essentially the same since the threads have idle lifetime 0. This can be
dried up a little and made more efficient. Might as well always use
`N-1` tasks and run one of them on the calling thread. This saves quite
a few threads when running tests and speeds things up a little,
especially when running many concurrent Gradle workers and CPU is at
100% already (mostly coming from the speedup on starting nodes this
brings and the reduction in test thread sleeps).

No functional changes to the tests otherwise, except for some replacing
of `CountDownLatch` with `CyclicalBarrier` to make things work with the
new API.
  • Loading branch information
original-brownbear authored Jul 8, 2024
1 parent f87c81d commit 9b8cd3d
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -832,30 +832,22 @@ public void testRolloverConcurrently() throws Exception {
assertAcked(client().execute(TransportPutComposableIndexTemplateAction.TYPE, putTemplateRequest).actionGet());

final CyclicBarrier barrier = new CyclicBarrier(numOfThreads);
final Thread[] threads = new Thread[numOfThreads];
for (int i = 0; i < numOfThreads; i++) {
runInParallel(numOfThreads, i -> {
var aliasName = "test-" + i;
threads[i] = new Thread(() -> {
assertAcked(prepareCreate(aliasName + "-000001").addAlias(new Alias(aliasName).writeIndex(true)).get());
for (int j = 1; j <= numberOfRolloversPerThread; j++) {
try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
var response = indicesAdmin().prepareRolloverIndex(aliasName).waitForActiveShards(ActiveShardCount.NONE).get();
assertThat(response.getOldIndex(), equalTo(aliasName + Strings.format("-%06d", j)));
assertThat(response.getNewIndex(), equalTo(aliasName + Strings.format("-%06d", j + 1)));
assertThat(response.isDryRun(), equalTo(false));
assertThat(response.isRolledOver(), equalTo(true));
assertAcked(prepareCreate(aliasName + "-000001").addAlias(new Alias(aliasName).writeIndex(true)).get());
for (int j = 1; j <= numberOfRolloversPerThread; j++) {
try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
threads[i].start();
}

for (Thread thread : threads) {
thread.join();
}
var response = indicesAdmin().prepareRolloverIndex(aliasName).waitForActiveShards(ActiveShardCount.NONE).get();
assertThat(response.getOldIndex(), equalTo(aliasName + Strings.format("-%06d", j)));
assertThat(response.getNewIndex(), equalTo(aliasName + Strings.format("-%06d", j + 1)));
assertThat(response.isDryRun(), equalTo(false));
assertThat(response.isRolledOver(), equalTo(true));
}
});

for (int i = 0; i < numOfThreads; i++) {
var aliasName = "test-" + i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,33 +519,22 @@ public void testFailingVersionedUpdatedOnBulk() throws Exception {
indexDoc("test", "1", "field", "1");
final BulkResponse[] responses = new BulkResponse[30];
final CyclicBarrier cyclicBarrier = new CyclicBarrier(responses.length);
Thread[] threads = new Thread[responses.length];

for (int i = 0; i < responses.length; i++) {
final int threadID = i;
threads[threadID] = new Thread(() -> {
try {
cyclicBarrier.await();
} catch (Exception e) {
return;
}
BulkRequestBuilder requestBuilder = client().prepareBulk();
requestBuilder.add(
client().prepareUpdate("test", "1")
.setIfSeqNo(0L)
.setIfPrimaryTerm(1)
.setDoc(Requests.INDEX_CONTENT_TYPE, "field", threadID)
);
responses[threadID] = requestBuilder.get();

});
threads[threadID].start();

}

for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
runInParallel(responses.length, threadID -> {
try {
cyclicBarrier.await();
} catch (Exception e) {
return;
}
BulkRequestBuilder requestBuilder = client().prepareBulk();
requestBuilder.add(
client().prepareUpdate("test", "1")
.setIfSeqNo(0L)
.setIfPrimaryTerm(1)
.setDoc(Requests.INDEX_CONTENT_TYPE, "field", threadID)
);
responses[threadID] = requestBuilder.get();
});

int successes = 0;
for (BulkResponse response : responses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -310,7 +312,7 @@ public void testAddBlockToUnassignedIndex() throws Exception {
}
}

public void testConcurrentAddBlock() throws InterruptedException {
public void testConcurrentAddBlock() throws InterruptedException, ExecutionException {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);

Expand All @@ -322,31 +324,21 @@ public void testConcurrentAddBlock() throws InterruptedException {
IntStream.range(0, nbDocs).mapToObj(i -> prepareIndex(indexName).setId(String.valueOf(i)).setSource("num", i)).collect(toList())
);
ensureYellowAndNoInitializingShards(indexName);

final CountDownLatch startClosing = new CountDownLatch(1);
final Thread[] threads = new Thread[randomIntBetween(2, 5)];

final APIBlock block = randomAddableBlock();

final int threadCount = randomIntBetween(2, 5);
final CyclicBarrier barrier = new CyclicBarrier(threadCount);
try {
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
safeAwait(startClosing);
try {
indicesAdmin().prepareAddBlock(block, indexName).get();
assertIndexHasBlock(block, indexName);
} catch (final ClusterBlockException e) {
assertThat(e.blocks(), hasSize(1));
assertTrue(e.blocks().stream().allMatch(b -> b.id() == block.getBlock().id()));
}
});
threads[i].start();
}

startClosing.countDown();
for (Thread thread : threads) {
thread.join();
}
runInParallel(threadCount, i -> {
safeAwait(barrier);
try {
indicesAdmin().prepareAddBlock(block, indexName).get();
assertIndexHasBlock(block, indexName);
} catch (final ClusterBlockException e) {
assertThat(e.blocks(), hasSize(1));
assertTrue(e.blocks().stream().allMatch(b -> b.id() == block.getBlock().id()));
}
});
assertIndexHasBlock(block, indexName);
} finally {
disableIndexBlock(indexName, block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,20 @@ static IndexingResult indexDocs(int numRequests, int numThreads) throws Exceptio
final AtomicInteger completedRequests = new AtomicInteger();
final AtomicInteger numSuccess = new AtomicInteger();
final AtomicInteger numFailure = new AtomicInteger();
Thread[] indexers = new Thread[numThreads];
Phaser phaser = new Phaser(indexers.length);
for (int i = 0; i < indexers.length; i++) {
indexers[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
while (completedRequests.incrementAndGet() <= numRequests) {
try {
final DocWriteResponse resp = prepareIndex("test").setSource("{}", XContentType.JSON).get();
numSuccess.incrementAndGet();
assertThat(resp.status(), equalTo(RestStatus.CREATED));
} catch (IllegalArgumentException e) {
numFailure.incrementAndGet();
assertThat(e.getMessage(), containsString("Number of documents in the index can't exceed [" + maxDocs.get() + "]"));
}
Phaser phaser = new Phaser(numThreads);
runInParallel(numThreads, i -> {
phaser.arriveAndAwaitAdvance();
while (completedRequests.incrementAndGet() <= numRequests) {
try {
final DocWriteResponse resp = prepareIndex("test").setSource("{}", XContentType.JSON).get();
numSuccess.incrementAndGet();
assertThat(resp.status(), equalTo(RestStatus.CREATED));
} catch (IllegalArgumentException e) {
numFailure.incrementAndGet();
assertThat(e.getMessage(), containsString("Number of documents in the index can't exceed [" + maxDocs.get() + "]"));
}
});
indexers[i].start();
}
for (Thread indexer : indexers) {
indexer.join();
}
}
});
internalCluster().assertNoInFlightDocsInEngine();
return new IndexingResult(numSuccess.get(), numFailure.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

Expand Down Expand Up @@ -161,31 +162,20 @@ public void testConcurrentDynamicIgnoreBeyondLimitUpdates() throws Throwable {
private Map<String, Object> indexConcurrently(int numberOfFieldsToCreate, Settings.Builder settings) throws Throwable {
indicesAdmin().prepareCreate("index").setSettings(settings).get();
ensureGreen("index");
final Thread[] indexThreads = new Thread[numberOfFieldsToCreate];
final CountDownLatch startLatch = new CountDownLatch(1);
final CyclicBarrier barrier = new CyclicBarrier(numberOfFieldsToCreate);
final AtomicReference<Throwable> error = new AtomicReference<>();
for (int i = 0; i < indexThreads.length; ++i) {
runInParallel(numberOfFieldsToCreate, i -> {
final String id = Integer.toString(i);
indexThreads[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
startLatch.await();
assertEquals(
DocWriteResponse.Result.CREATED,
prepareIndex("index").setId(id).setSource("field" + id, "bar").get().getResult()
);
} catch (Exception e) {
error.compareAndSet(null, e);
}
}
});
indexThreads[i].start();
}
startLatch.countDown();
for (Thread thread : indexThreads) {
thread.join();
}
try {
barrier.await();
assertEquals(
DocWriteResponse.Result.CREATED,
prepareIndex("index").setId(id).setSource("field" + id, "bar").get().getResult()
);
} catch (Exception e) {
error.compareAndSet(null, e);
}
});
if (error.get() != null) {
throw error.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -143,37 +141,20 @@ private void runGlobalCheckpointSyncTest(
final int numberOfDocuments = randomIntBetween(0, 256);

final int numberOfThreads = randomIntBetween(1, 4);
final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
final CyclicBarrier barrier = new CyclicBarrier(numberOfThreads);

// start concurrent indexing threads
final List<Thread> threads = new ArrayList<>(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
final int index = i;
final Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
for (int j = 0; j < numberOfDocuments; j++) {
final String id = Integer.toString(index * numberOfDocuments + j);
prepareIndex("test").setId(id).setSource("{\"foo\": " + id + "}", XContentType.JSON).get();
}
try {
barrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
});
threads.add(thread);
thread.start();
}

// synchronize the start of the threads
barrier.await();

// wait for the threads to finish
barrier.await();
runInParallel(numberOfThreads, index -> {
try {
barrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
for (int j = 0; j < numberOfDocuments; j++) {
final String id = Integer.toString(index * numberOfDocuments + j);
prepareIndex("test").setId(id).setSource("{\"foo\": " + id + "}", XContentType.JSON).get();
}
});

afterIndexing.accept(client());

Expand Down Expand Up @@ -203,9 +184,6 @@ private void runGlobalCheckpointSyncTest(
}
}, 60, TimeUnit.SECONDS);
ensureGreen("test");
for (final Thread thread : threads) {
thread.join();
}
}

public void testPersistGlobalCheckpoint() throws Exception {
Expand Down
Loading

0 comments on commit 9b8cd3d

Please sign in to comment.