From 02ba992f4818bebd838e1c7678bd2e1cc090bfab Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 29 May 2019 12:09:22 +0100 Subject: [PATCH] Add stopped event --- .../core/indexing/AsyncTwoPhaseIndexer.java | 44 ++++++++++++------- .../indexing/AsyncTwoPhaseIndexerTests.java | 36 ++++++++++++++- .../transforms/DataFrameTransformTask.java | 10 ++++- 3 files changed, 70 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 53ad88021b0da..3f7a56f52e4a9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -23,11 +23,11 @@ * An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)}, * it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position). * Only one background job can run simultaneously and {@link #onFinish} is called when the job - * finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call - * to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} + * finishes before state is persisted. The indexer can be stopped early by a call to {@link #stop()} which will + * trigger the {@link #onStopping()} and {@link #onStopped()} methods. + * {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} * is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()} * to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called. - * {@link #stop()} can be used to stop the background job without aborting the indexer. * * In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query, * indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input. @@ -87,10 +87,10 @@ public synchronized IndexerState start() { /** * Sets the internal state to {@link IndexerState#STOPPING} if an async job is - * running in the background, {@link #onStop()} will be called when the background job + * running in the background, {@link #onStopped()} will be called when the background job * detects that the indexer is stopped. * If there is no job running when this function is called the returned - * state is {@link IndexerState#STOPPED} and {@link #onStop()} will not be called. + * state is {@link IndexerState#STOPPED} and {@link #onStopped()} will not be called. * * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted). */ @@ -249,20 +249,30 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { /** * Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to - * {@link IndexerState#STARTED}. + * {@link IndexerState#STARTED} and before {@link #doSaveState(IndexerState, Object, Runnable)} is called * * @param listener listener to call after done */ protected abstract void onFinish(ActionListener listener); + /** - * Called when the indexer is stopped. This is only called when the indexer is stopped - * via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called - * when the indexer's work is done. + * Called when a background job stops after internal state has changed from {@link IndexerState#STOPPING} + * to {@link IndexerState#STOPPED} and before state is persisted via {@link #doSaveState(IndexerState, Object, Runnable)}. + * This is only called when the indexer is stopped due to a call to {@link #stop()} */ - protected void onStop() { + protected void onStopping() { } + /** + * Called when the indexer is stopped after {@link #onStopping()} and {@link #doSaveState(IndexerState, Object, Runnable)} + * have been called. + */ + protected void onStopped() { + } + + + /** * Called when a background job detects that the indexer is aborted causing the * async execution to stop. @@ -280,16 +290,18 @@ private void finishWithIndexingFailure(Exception exc) { } private IndexerState finishAndSetState() { - AtomicBoolean callOnStop = new AtomicBoolean(false); - AtomicBoolean callOnAbort = new AtomicBoolean(false); + AtomicBoolean callOnStopping = new AtomicBoolean(); + AtomicBoolean callOnAbort = new AtomicBoolean(); IndexerState updatedState = state.updateAndGet(prev -> { + callOnAbort.set(false); + callOnStopping.set(false); switch (prev) { case INDEXING: // ready for another job return IndexerState.STARTED; case STOPPING: - callOnStop.set(true); + callOnStopping.set(true); // must be started again return IndexerState.STOPPED; @@ -311,8 +323,8 @@ private IndexerState finishAndSetState() { } }); - if (callOnStop.get()) { - onStop(); + if (callOnStopping.get()) { + onStopping(); } else if (callOnAbort.get()) { onAbort(); } @@ -412,7 +424,7 @@ private boolean checkState(IndexerState currentState) { case STOPPING: logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer."); - doSaveState(finishAndSetState(), getPosition(), () -> {}); + doSaveState(finishAndSetState(), getPosition(), this::onStopped); return false; case STOPPED: diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 053e41d9b2a63..f083863814a55 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -36,11 +36,13 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { AtomicBoolean isFinished = new AtomicBoolean(false); AtomicBoolean isStopped = new AtomicBoolean(false); + AtomicBoolean isStopping = new AtomicBoolean(false); @Before public void reset() { isFinished.set(false); isStopped.set(false); + isStopping.set(false); } private class MockIndexer extends AsyncTwoPhaseIndexer { @@ -112,7 +114,7 @@ protected void doNextBulk(BulkRequest request, ActionListener next @Override protected void doSaveState(IndexerState state, Integer position, Runnable next) { - int expectedStep = stoppedBeforeFinished ? 3 : 5; + int expectedStep = stoppedBeforeFinished ? 4 : 5; assertThat(step, equalTo(expectedStep)); ++step; next.run(); @@ -132,7 +134,16 @@ protected void onFinish(ActionListener listener) { } @Override - protected void onStop() { + protected void onStopping() { + assertThat(step, equalTo(3)); + ++step; + assertTrue(isStopping.compareAndSet(false, true)); + } + + @Override + protected void onStopped() { + assertThat(step, equalTo(5)); + ++step; assertTrue(isStopped.compareAndSet(false, true)); } @@ -268,6 +279,26 @@ public void testStateMachineBrokenSearch() throws InterruptedException { } } + public void testStop_AfterIndexerIsFinished() throws InterruptedException { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + final ExecutorService executor = Executors.newFixedThreadPool(1); + try { + CountDownLatch countDownLatch = new CountDownLatch(1); + MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false); + indexer.start(); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + countDownLatch.countDown(); + assertTrue(awaitBusy(() -> isFinished.get())); + + indexer.stop(); + assertFalse(isStopping.get()); + assertFalse(isStopped.get()); + assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)); + } finally { + executor.shutdownNow(); + } + } + public void testStop_WhileIndexing() throws InterruptedException { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); @@ -282,6 +313,7 @@ public void testStop_WhileIndexing() throws InterruptedException { countDownLatch.countDown(); assertThat(indexer.getPosition(), equalTo(2)); + assertTrue(awaitBusy(() -> isStopping.get())); assertTrue(awaitBusy(() -> isStopped.get())); assertFalse(isFinished.get()); } finally { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index f3f67f4bfd2ab..b4f79d9302e30 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -238,7 +238,8 @@ public synchronized void stop() { IndexerState state = getIndexer().stop(); if (state == IndexerState.STOPPED) { - getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop()); + getIndexer().onStopping(); + getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStopped()); } } @@ -592,7 +593,12 @@ protected void onFinish(ActionListener listener) { } @Override - protected void onStop() { + protected void onStopping() { + transformTask.setTaskStateStopped(); + } + + @Override + protected void onStopped() { auditor.info(transformConfig.getId(), "Indexer has stopped"); logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId()); transformTask.setTaskStateStopped();