Skip to content

Commit

Permalink
AsyncTwoPhaseIndexerTests race condition fixed (#37830)
Browse files Browse the repository at this point in the history
The unlucky timing can cause this test to fail when the indexing is triggered from `maybeTriggerAsyncJob`. As this is asynchronous, in can finish quicker then the test stepping over to next assertion
The introduced barrier solves the problem
closes #37695
  • Loading branch information
pgomulka authored Jan 25, 2019
1 parent 27c3fb8 commit 85acc11
Showing 1 changed file with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -35,11 +37,14 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {

private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {

private final CountDownLatch latch;
// test the execution order
private int step;

protected MockIndexer(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition) {
protected MockIndexer(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition,
CountDownLatch latch) {
super(executor, initialState, initialPosition, new MockJobStats());
this.latch = latch;
}

@Override
Expand All @@ -49,11 +54,20 @@ protected String getJobId() {

@Override
protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
awaitForLatch();
assertThat(step, equalTo(3));
++step;
return new IterationResult<Integer>(Collections.emptyList(), 3, true);
}

private void awaitForLatch() {
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
protected SearchRequest buildSearchRequest() {
assertThat(step, equalTo(1));
Expand Down Expand Up @@ -196,12 +210,14 @@ public void testStateMachine() throws InterruptedException {
final ExecutorService executor = Executors.newFixedThreadPool(1);
isFinished.set(false);
try {

MockIndexer indexer = new MockIndexer(executor, state, 2);
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
countDownLatch.countDown();

assertThat(indexer.getPosition(), equalTo(2));
ESTestCase.awaitBusy(() -> isFinished.get());
assertThat(indexer.getStep(), equalTo(6));
Expand Down

0 comments on commit 85acc11

Please sign in to comment.