From 80653032cb945482ba5d1c9d4c6527ffaedaf449 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Fri, 25 Jan 2019 16:26:16 +0100 Subject: [PATCH] AsyncTwoPhaseIndexerTests race condition fixed (#37830) The unlucky timing can cause this test to fail when the indexing is triggered from `maybeTriggerAsyncJob`. As this is asynchronous, in can finish quicker then the test stepping over to next assertion The introduced barrier solves the problem closes #37695 --- .../indexing/AsyncTwoPhaseIndexerTests.java | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 46d09e30eae5a..0a4c4990880d8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -20,9 +20,11 @@ import java.io.IOException; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -34,11 +36,14 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { private class MockIndexer extends AsyncTwoPhaseIndexer { + private final CountDownLatch latch; // test the execution order private int step; - protected MockIndexer(Executor executor, AtomicReference initialState, Integer initialPosition) { + protected MockIndexer(Executor executor, AtomicReference initialState, Integer initialPosition, + CountDownLatch latch) { super(executor, initialState, initialPosition, new MockJobStats()); + this.latch = latch; } @Override @@ -48,11 +53,20 @@ protected String getJobId() { @Override protected IterationResult doProcess(SearchResponse searchResponse) { + awaitForLatch(); assertThat(step, equalTo(3)); ++step; return new IterationResult(Collections.emptyList(), 3, true); } + private void awaitForLatch() { + try { + latch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + @Override protected SearchRequest buildSearchRequest() { assertThat(step, equalTo(1)); @@ -195,12 +209,14 @@ public void testStateMachine() throws InterruptedException { final ExecutorService executor = Executors.newFixedThreadPool(1); isFinished.set(false); try { - - MockIndexer indexer = new MockIndexer(executor, state, 2); + CountDownLatch countDownLatch = new CountDownLatch(1); + MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + countDownLatch.countDown(); + assertThat(indexer.getPosition(), equalTo(2)); ESTestCase.awaitBusy(() -> isFinished.get()); assertThat(indexer.getStep(), equalTo(6));