diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index a3826bcf7cd7b..cfbac18dc9787 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -21,9 +21,11 @@ import java.io.IOException; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -35,11 +37,14 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { private class MockIndexer extends AsyncTwoPhaseIndexer { + private final CountDownLatch latch; // test the execution order private int step; - protected MockIndexer(Executor executor, AtomicReference initialState, Integer initialPosition) { + protected MockIndexer(Executor executor, AtomicReference initialState, Integer initialPosition, + CountDownLatch latch) { super(executor, initialState, initialPosition, new MockJobStats()); + this.latch = latch; } @Override @@ -49,11 +54,20 @@ protected String getJobId() { @Override protected IterationResult doProcess(SearchResponse searchResponse) { + awaitForLatch(); assertThat(step, equalTo(3)); ++step; return new IterationResult(Collections.emptyList(), 3, true); } + private void awaitForLatch() { + try { + latch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + @Override protected SearchRequest buildSearchRequest() { assertThat(step, equalTo(1)); @@ -196,12 +210,14 @@ public void testStateMachine() throws InterruptedException { final ExecutorService executor = Executors.newFixedThreadPool(1); isFinished.set(false); try { - - MockIndexer indexer = new MockIndexer(executor, state, 2); + CountDownLatch countDownLatch = new CountDownLatch(1); + MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + countDownLatch.countDown(); + assertThat(indexer.getPosition(), equalTo(2)); ESTestCase.awaitBusy(() -> isFinished.get()); assertThat(indexer.getStep(), equalTo(6));