From 2b24b9a4c1e85cc0b570d4958b8e76608d860e2c Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Thu, 24 Jan 2019 17:14:56 +0100 Subject: [PATCH 1/4] reproduced and fixed --- .../indexing/AsyncTwoPhaseIndexerTests.java | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index a3826bcf7cd7b..397cbfbbace14 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -35,11 +36,13 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { private class MockIndexer extends AsyncTwoPhaseIndexer { + private final CountDownLatch latch; // test the execution order private int step; - protected MockIndexer(Executor executor, AtomicReference initialState, Integer initialPosition) { + protected MockIndexer(Executor executor, AtomicReference initialState, Integer initialPosition, CountDownLatch latch) { super(executor, initialState, initialPosition, new MockJobStats()); + this.latch = latch; } @Override @@ -49,11 +52,20 @@ protected String getJobId() { @Override protected IterationResult doProcess(SearchResponse searchResponse) { + awaitForLatch(); assertThat(step, equalTo(3)); ++step; return new IterationResult(Collections.emptyList(), 3, true); } + private void awaitForLatch() { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + @Override protected SearchRequest buildSearchRequest() { assertThat(step, equalTo(1)); @@ -196,12 +208,15 @@ public void testStateMachine() throws InterruptedException { final ExecutorService executor = Executors.newFixedThreadPool(1); isFinished.set(false); try { - - MockIndexer indexer = new MockIndexer(executor, state, 2); + CountDownLatch countDownLatch = new CountDownLatch(1); + MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + Thread.sleep(1000); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + countDownLatch.countDown(); + assertThat(indexer.getPosition(), equalTo(2)); ESTestCase.awaitBusy(() -> isFinished.get()); assertThat(indexer.getStep(), equalTo(6)); From 905efa369bf03e8d756e326fdab9c03d06f3de50 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Thu, 24 Jan 2019 18:08:25 +0100 Subject: [PATCH 2/4] checkstype --- .../xpack/core/indexing/AsyncTwoPhaseIndexerTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 397cbfbbace14..dfc5d49c853e4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -40,7 +40,8 @@ private class MockIndexer extends AsyncTwoPhaseIndexer { // test the execution order private int step; - protected MockIndexer(Executor executor, AtomicReference initialState, Integer initialPosition, CountDownLatch latch) { + protected MockIndexer(Executor executor, AtomicReference initialState, Integer initialPosition, + CountDownLatch latch) { super(executor, initialState, initialPosition, new MockJobStats()); this.latch = latch; } @@ -213,7 +214,6 @@ public void testStateMachine() throws InterruptedException { indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - Thread.sleep(1000); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); countDownLatch.countDown(); From 38545e4427734c5a36656972eae3604dfd5f0181 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Fri, 25 Jan 2019 08:43:02 +0100 Subject: [PATCH 3/4] handle exception properly --- .../xpack/core/indexing/AsyncTwoPhaseIndexerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index dfc5d49c853e4..29a6e654a2089 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -63,7 +63,7 @@ private void awaitForLatch() { try { latch.await(); } catch (InterruptedException e) { - e.printStackTrace(); + throw new RuntimeException(e); } } From 00289e013f2b914a69e7c747365e657e3c797c58 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Fri, 25 Jan 2019 13:01:17 +0100 Subject: [PATCH 4/4] add timeout to latch --- .../xpack/core/indexing/AsyncTwoPhaseIndexerTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 29a6e654a2089..cfbac18dc9787 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -25,6 +25,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -61,7 +62,7 @@ protected IterationResult doProcess(SearchResponse searchResponse) { private void awaitForLatch() { try { - latch.await(); + latch.await(10, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); }