Skip to content

Commit

Permalink
Use safeAwait() in more places (elastic#100292)
Browse files Browse the repository at this point in the history
Various tests that use `CyclicBarrier` for synchronization pre-date the
introduction of `safeAwait()` and do all the necessary
exception-handling themselves instead. This commit update these usages
to use `safeAwait()`.
  • Loading branch information
DaveCTurner authored Oct 4, 2023
1 parent 265c167 commit 061c3e7
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,10 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -626,18 +624,7 @@ public void testNotifyIfCancelled() throws Exception {

TaskCancelHelper.cancel(task, "simulated");

final Runnable await = new Runnable() {
final CyclicBarrier barrier = new CyclicBarrier(2);

@Override
public void run() {
try {
barrier.await(5, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError("unexpected", e);
}
}
};
final CyclicBarrier barrier = new CyclicBarrier(2);

final Thread concurrentNotify = new Thread(() -> task.notifyIfCancelled(new ActionListener<Void>() {
@Override
Expand All @@ -647,18 +634,18 @@ public void onResponse(Void unused) {

@Override
public void onFailure(Exception e) {
await.run();
safeAwait(barrier);
// main thread calls notifyIfCancelled again between these two blocks
await.run();
safeAwait(barrier);
}
}), "concurrent notify");
concurrentNotify.start();

await.run();
safeAwait(barrier);
task.notifyIfCancelled(future);
assertTrue(future.isDone());
assertThat(expectThrows(TaskCancelledException.class, future::actionGet).getMessage(), equalTo("task cancelled [simulated]"));
await.run();
safeAwait(barrier);
concurrentNotify.join();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testListenersNotifiedOnCorrectThreads() throws InterruptedException
if (i < adderThreads) {
final String threadName = ADDER_THREAD_NAME_PREFIX + i;
threads[i] = new Thread(() -> {
awaitSafe(barrier);
safeAwait(barrier);

final AtomicBoolean isComplete = new AtomicBoolean();
if (completerThreads == 1 && postComplete.get()) {
Expand Down Expand Up @@ -143,7 +143,7 @@ public void onFailure(Exception e) {
} else {
final String threadName = COMPLETER_THREAD_NAME_PREFIX + i;
threads[i] = new Thread(() -> {
awaitSafe(barrier);
safeAwait(barrier);

preComplete.set(true);
future.onResponse(null);
Expand All @@ -155,21 +155,13 @@ public void onFailure(Exception e) {
threads[i].start();
}

awaitSafe(barrier);
safeAwait(barrier);
for (final Thread thread : threads) {
thread.join();
}

}

private static void awaitSafe(CyclicBarrier barrier) {
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new AssertionError("unexpected", e);
}
}

public void testAddedListenersReleasedOnCompletion() {
final ListenableActionFuture<Void> future = new ListenableActionFuture<>();
final ReachabilityChecker reachabilityChecker = new ReachabilityChecker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,11 @@
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -1680,33 +1678,21 @@ public void testStarvationLogging() throws Exception {
MockLogAppender mockAppender = new MockLogAppender();
try (MasterService masterService = createMasterService(true); var ignored = mockAppender.capturing(MasterService.class)) {
final AtomicBoolean keepRunning = new AtomicBoolean(true);

final Runnable await = new Runnable() {
private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

@Override
public void run() {
try {
cyclicBarrier.await(10, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError("unexpected", e);
}
}
};
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
final Runnable awaitNextTask = () -> {
await.run();
await.run();
safeAwait(cyclicBarrier);
safeAwait(cyclicBarrier);
};

final ClusterStateUpdateTask starvationCausingTask = new ClusterStateUpdateTask(Priority.HIGH) {
@Override
public ClusterState execute(ClusterState currentState) {
await.run();
safeAwait(cyclicBarrier);
relativeTimeInMillis += taskDurationMillis;
if (keepRunning.get()) {
masterService.submitUnbatchedStateUpdateTask("starvation-causing task", this);
}
await.run();
safeAwait(cyclicBarrier);
return currentState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
Expand Down Expand Up @@ -375,19 +373,7 @@ protected boolean isFresh(String currentKey, String newKey) {
}

public void testForegroundRefreshCanBeCancelled() throws InterruptedException {

final Runnable awaitBarrier = new Runnable() {
final CyclicBarrier barrier = new CyclicBarrier(2);

@Override
public void run() {
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError("unexpected", e);
}
}
};
final CyclicBarrier barrier = new CyclicBarrier(2);

final CancellableSingleObjectCache<String, String, Integer> testCache = new CancellableSingleObjectCache<>(testThreadContext) {
@Override
Expand All @@ -398,8 +384,8 @@ protected void refresh(
ActionListener<Integer> listener
) {
ActionListener.completeWith(listener, () -> {
awaitBarrier.run(); // main-thread barrier 2; cancelled-thread barrier 1
awaitBarrier.run(); // main-thread barrier 3; cancelled-thread barrier 2
safeAwait(barrier); // main-thread barrier 2; cancelled-thread barrier 1
safeAwait(barrier); // main-thread barrier 3; cancelled-thread barrier 2
ensureNotCancelled.run();
if (s.equals("cancelled")) {
throw new AssertionError("should have been cancelled");
Expand All @@ -421,11 +407,11 @@ protected String getKey(String s) {
final AtomicBoolean isCancelled = new AtomicBoolean();
final Thread cancelledThread = new Thread(() -> {
testCache.get("cancelled", isCancelled::get, cancelledFuture);
awaitBarrier.run(); // cancelled-thread barrier 3
safeAwait(barrier); // cancelled-thread barrier 3
}, "cancelled-thread");

cancelledThread.start();
awaitBarrier.run(); // main-thread barrier 1
safeAwait(barrier); // main-thread barrier 1
isCancelled.set(true);
testCache.get("successful", () -> false, successfulFuture);
cancelledThread.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -276,12 +273,7 @@ public void testLimitsNumberOfClosedClients() throws InterruptedException {

for (int i = 0; i < clientThreads.length; i++) {
clientThreads[i] = new Thread(() -> {
try {
startBarrier.await(10, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError("unexpected", e);
}

safeAwait(startBarrier);
HttpChannel httpChannel = randomHttpChannel();
httpClientStatsTracker.addClientStats(httpChannel);
while (operationPermits.tryAcquire()) {
Expand All @@ -301,12 +293,7 @@ public void testLimitsNumberOfClosedClients() throws InterruptedException {

final AtomicBoolean keepGoing = new AtomicBoolean(true);
final Thread statsThread = new Thread(() -> {
try {
startBarrier.await(10, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError("unexpected", e);
}

safeAwait(startBarrier);
while (keepGoing.get()) {
closeLock.writeLock().lock();
final List<HttpStats.ClientStats> clientStats = httpClientStatsTracker.getClientStats();
Expand Down Expand Up @@ -345,12 +332,7 @@ public void testClearsStatsIfDisabledConcurrently() throws InterruptedException
);
for (int i = 0; i < clientThreads.length; i++) {
clientThreads[i] = new Thread(() -> {
try {
startBarrier.await(10, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError("unexpected", e);
}

safeAwait(startBarrier);
HttpChannel httpChannel = randomHttpChannel();
httpClientStatsTracker.addClientStats(httpChannel);
while (operationPermits.tryAcquire()) {
Expand All @@ -367,12 +349,7 @@ public void testClearsStatsIfDisabledConcurrently() throws InterruptedException
}, "client-thread-" + i);
clientThreads[i].start();
}

try {
startBarrier.await(10, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError("unexpected", e);
}
safeAwait(startBarrier);
clusterSettings.applySettings(Settings.builder().put(SETTING_HTTP_CLIENT_STATS_ENABLED.getKey(), false).build());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand All @@ -37,11 +34,10 @@ private static class TestTask {
private final boolean concurrentRemove = randomBoolean();
private final long requestId = randomIntBetween(-1, 10);

TestTask(Task task, String item, CancellableTasksTracker<String> tracker, Runnable awaitStart) {
TestTask(Task task, String item, CancellableTasksTracker<String> tracker, CyclicBarrier startBarrier) {
if (concurrentRemove) {
concurrentRemoveThread = new Thread(() -> {
awaitStart.run();

safeAwait(startBarrier);
for (int i = 0; i < 10; i++) {
if (3 <= state.get()) {
final String removed = tracker.remove(task);
Expand All @@ -52,11 +48,11 @@ private static class TestTask {
}
});
} else {
concurrentRemoveThread = new Thread(awaitStart);
concurrentRemoveThread = new Thread(() -> safeAwait(startBarrier));
}

actionThread = new Thread(() -> {
awaitStart.run();
safeAwait(startBarrier);

state.incrementAndGet();
tracker.put(task, requestId, item);
Expand All @@ -75,7 +71,7 @@ private static class TestTask {
}, "action-thread-" + item);

watchThread = new Thread(() -> {
awaitStart.run();
safeAwait(startBarrier);

for (int i = 0; i < 10; i++) {
final int stateBefore = state.get();
Expand Down Expand Up @@ -148,19 +144,7 @@ public void testCancellableTasksTracker() throws InterruptedException {

final CancellableTasksTracker<String> tracker = new CancellableTasksTracker<>();
final TestTask[] tasks = new TestTask[between(1, 100)];

final Runnable awaitStart = new Runnable() {
private final CyclicBarrier startBarrier = new CyclicBarrier(tasks.length * 3);

@Override
public void run() {
try {
startBarrier.await(10, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError("unexpected", e);
}
}
};
final CyclicBarrier startBarrier = new CyclicBarrier(tasks.length * 3);

for (int i = 0; i < tasks.length; i++) {
tasks[i] = new TestTask(
Expand All @@ -174,7 +158,7 @@ public void run() {
),
"item-" + i,
tracker,
awaitStart
startBarrier
);
}

Expand Down

0 comments on commit 061c3e7

Please sign in to comment.