From 5114f0b1f2f29fc8cabc0e7f963ce407420f2e2a Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 15 Nov 2019 17:17:03 +0000 Subject: [PATCH] [ML] Fixes for stop datafeed edge cases The following edge cases were fixed: 1. A request to force-stop a stopping datafeed is no longer ignored. Force-stop is an important recovery mechanism if normal stop doesn't work for some reason, and needs to operate on a datafeed in any state other than stopped. 2. If the node that a datafeed is running on is removed from the cluster during a normal stop then the stop request is retried (and will likely succeed on this retry by simply cancelling the persistent task for the affected datafeed). 3. If there are multiple simultaneous force-stop requests for the same datafeed we no longer fail the one that is processed second. The previous behaviour was wrong as stopping a stopped datafeed is not an error, so stopping a datafeed twice simultaneously should not be either. Fixes #43670 Fixes #48931 --- .../xpack/ml/MachineLearning.java | 2 +- .../action/TransportStopDatafeedAction.java | 53 ++++-- .../BlackHoleAutodetectProcess.java | 18 +- .../integration/MlDistributedFailureIT.java | 169 +++++++++++++++++- .../BlackHoleAutodetectProcessTests.java | 32 +++- .../xpack/ml/support/BaseMlIntegTestCase.java | 12 +- 6 files changed, 256 insertions(+), 30 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index de52093a9cb7..d4277769bf19 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -540,7 +540,7 @@ public Collection createComponents(Client client, ClusterService cluster } else { mlController = new DummyController(); autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) -> - new BlackHoleAutodetectProcess(job.getId()); + new BlackHoleAutodetectProcess(job.getId(), onProcessCrash); // factor of 1.0 makes renormalization a no-op normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0); analyticsProcessFactory = (jobId, analyticsProcessConfig, state, executorService, onProcessCrash) -> null; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 2d8c62223f24..ed3c69f7a4d5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.Task; @@ -85,6 +86,7 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId, List startedDatafeedIds, List stoppingDatafeedIds) { switch (datafeedState) { + case STARTING: case STARTED: startedDatafeedIds.add(datafeedId); break; @@ -94,6 +96,7 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId, stoppingDatafeedIds.add(datafeedId); break; default: + assert false : "Unexpected datafeed state " + datafeedState; break; } } @@ -126,9 +129,9 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()])); if (request.isForce()) { - forceStopDatafeed(request, listener, tasks, startedDatafeeds); + forceStopDatafeed(request, listener, tasks, startedDatafeeds, stoppingDatafeeds); } else { - normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds); + normalStopDatafeed(task, request, listener, tasks, nodes, startedDatafeeds, stoppingDatafeeds); } }, listener::onFailure @@ -137,9 +140,9 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi } private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, ActionListener listener, - PersistentTasksCustomMetaData tasks, + PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes, List startedDatafeeds, List stoppingDatafeeds) { - Set executorNodes = new HashSet<>(); + final Set executorNodes = new HashSet<>(); for (String datafeedId : startedDatafeeds) { PersistentTasksCustomMetaData.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); if (datafeedTask == null) { @@ -147,10 +150,10 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found."; assert datafeedTask != null : msg; logger.error(msg); - } else if (datafeedTask.isAssigned()) { + } else if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes) == false) { executorNodes.add(datafeedTask.getExecutorNode()); } else { - // This is the easy case - the datafeed is not currently assigned to a node, + // This is the easy case - the datafeed is not currently assigned to a valid node, // so can be gracefully stopped simply by removing its persistent task. (Usually // a graceful stop cannot be achieved by simply removing the persistent task, but // if the datafeed has no running code then graceful/forceful are the same.) @@ -171,24 +174,37 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A ActionListener finalListener = ActionListener.wrap( r -> waitForDatafeedStopped(allDataFeedsToWaitFor, request, r, listener), - listener::onFailure); + e -> { + if (ExceptionsHelper.unwrapCause(e) instanceof FailedNodeException) { + // A node has dropped out of the cluster since we started executing the requests. + // Since stopping an already stopped datafeed is not an error we can try again. + // The datafeeds that were running on the node that dropped out of the cluster + // will just have their persistent tasks cancelled. Datafeeds that were stopped + // by the previous attempt will be noops in the subsequent attempt. + doExecute(task, request, listener); + } else { + listener.onFailure(e); + } + }); super.doExecute(task, request, finalListener); } private void forceStopDatafeed(final StopDatafeedAction.Request request, final ActionListener listener, - PersistentTasksCustomMetaData tasks, final List startedDatafeeds) { + PersistentTasksCustomMetaData tasks, final List startedDatafeeds, + List stoppingDatafeeds) { + final List allDatafeeds = Stream.concat(startedDatafeeds.stream(), stoppingDatafeeds.stream()).collect(Collectors.toList()); final AtomicInteger counter = new AtomicInteger(); - final AtomicArray failures = new AtomicArray<>(startedDatafeeds.size()); + final AtomicArray failures = new AtomicArray<>(allDatafeeds.size()); - for (String datafeedId : startedDatafeeds) { + for (String datafeedId : allDatafeeds) { PersistentTasksCustomMetaData.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); if (datafeedTask != null) { persistentTasksService.sendRemoveRequest(datafeedTask.getId(), new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - if (counter.incrementAndGet() == startedDatafeeds.size()) { + if (counter.incrementAndGet() == allDatafeeds.size()) { sendResponseOrFailure(request.getDatafeedId(), listener, failures); } } @@ -196,23 +212,26 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask persisten @Override public void onFailure(Exception e) { final int slot = counter.incrementAndGet(); - if ((ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException && - Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) { + // We validated that the datafeed names supplied in the request existed when we started processing the action. + // If the related tasks don't exist at this point then they must have been stopped by a simultaneous stop request. + // This is not an error. + if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) { failures.set(slot - 1, e); } - if (slot == startedDatafeeds.size()) { + if (slot == allDatafeeds.size()) { sendResponseOrFailure(request.getDatafeedId(), listener, failures); } } }); } else { - // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method + // This should not happen, because startedDatafeeds and stoppingDatafeeds + // were derived from the same tasks that were passed to this method String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found."; assert datafeedTask != null : msg; logger.error(msg); final int slot = counter.incrementAndGet(); failures.set(slot - 1, new RuntimeException(msg)); - if (slot == startedDatafeeds.size()) { + if (slot == allDatafeeds.size()) { sendResponseOrFailure(request.getDatafeedId(), listener, failures); } } @@ -313,7 +332,7 @@ protected StopDatafeedAction.Response newResponse(StopDatafeedAction.Request req .convertToElastic(failedNodeExceptions.get(0)); } else { // This can happen we the actual task in the node no longer exists, - // which means the datafeed(s) have already been closed. + // which means the datafeed(s) have already been stopped. return new StopDatafeedAction.Response(true); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index 9421637831ee..ff717bb0bb2c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -21,12 +21,15 @@ import java.io.IOException; import java.time.ZonedDateTime; +import java.util.Arrays; import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * A placeholder class simulating the actions of the native Autodetect process. @@ -37,16 +40,21 @@ */ public class BlackHoleAutodetectProcess implements AutodetectProcess { + public static final String MAGIC_FAILURE_VALUE = "253402300799"; + public static final String MAGIC_FAILURE_VALUE_AS_DATE = "9999-12-31 23:59:59"; + private static final String FLUSH_ID = "flush-1"; private final String jobId; private final ZonedDateTime startTime; private final BlockingQueue results = new LinkedBlockingDeque<>(); + private final Consumer onProcessCrash; private volatile boolean open = true; - public BlackHoleAutodetectProcess(String jobId) { + public BlackHoleAutodetectProcess(String jobId, Consumer onProcessCrash) { this.jobId = jobId; startTime = ZonedDateTime.now(); + this.onProcessCrash = Objects.requireNonNull(onProcessCrash); } @Override @@ -59,7 +67,13 @@ public boolean isReady() { } @Override - public void writeRecord(String[] record) throws IOException { + public void writeRecord(String[] record) { + if (Arrays.asList(record).contains(MAGIC_FAILURE_VALUE)) { + open = false; + onProcessCrash.accept("simulated failure"); + AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null); + results.add(result); + } } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 3509d54999db..4cb66b44b5fd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -5,11 +5,13 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -21,16 +23,20 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.persistent.PersistentTaskResponse; import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.core.action.util.QueryPage; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction.Response.DatafeedStats; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction.Response.JobStats; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; @@ -41,6 +47,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import java.io.IOException; @@ -116,7 +123,6 @@ public void testFullClusterRestart() throws Exception { }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/43670") public void testCloseUnassignedJobAndDatafeed() throws Exception { internalCluster().ensureAtMostNumDataNodes(0); logger.info("Starting dedicated master node..."); @@ -143,7 +149,7 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { String jobId = "test-lose-ml-node"; String datafeedId = jobId + "-datafeed"; - setupJobAndDatafeed(jobId, datafeedId); + setupJobAndDatafeed(jobId, datafeedId, TimeValue.timeValueHours(1)); waitForDatafeed(jobId, numDocs1); // stop the only ML node @@ -174,6 +180,158 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { assertTrue(closeJobResponse.isClosed()); } + public void testCloseUnassignedFailedJobAndStopUnassignedStoppingDatafeed() throws Exception { + internalCluster().ensureAtMostNumDataNodes(0); + logger.info("Starting master/data nodes..."); + for (int count = 0; count < 3; ++count) { + internalCluster().startNode(Settings.builder() + .put("node.master", true) + .put("node.data", true) + .put("node.ml", false) + .build()); + } + logger.info("Starting dedicated ml node..."); + internalCluster().startNode(Settings.builder() + .put("node.master", false) + .put("node.data", false) + .put("node.ml", true) + .build()); + ensureStableClusterOnAllNodes(4); + + // index some datafeed data + client().admin().indices().prepareCreate("data") + .addMapping("type", "time", "type=date") + .get(); + long numDocs1 = randomIntBetween(32, 2048); + long now = System.currentTimeMillis(); + long weekAgo = now - 604800000; + long twoWeeksAgo = weekAgo - 604800000; + indexDocs(logger, "data", numDocs1, twoWeeksAgo, weekAgo); + + String jobId = "test-stop-unassigned-datafeed-for-failed-job"; + String datafeedId = jobId + "-datafeed"; + setupJobAndDatafeed(jobId, datafeedId, TimeValue.timeValueHours(1)); + waitForDatafeed(jobId, numDocs1); + + // Job state should be opened here + GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId); + GetJobsStatsAction.Response jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); + assertEquals(JobState.OPENED, jobStatsResponse.getResponse().results().get(0).getState()); + DiscoveryNode jobNode = jobStatsResponse.getResponse().results().get(0).getNode(); + + // Post the job a record that will result in the job receiving a timestamp in epoch + // seconds equal to the maximum integer - this makes the blackhole autodetect fail. + // It's better to do this than the approach of directly updating the job state using + // the approach used below for datafeeds, because when the job fails at the "process" + // level it sets off a more realistic chain reaction in the layers that wrap the "process" + // (remember it's not a real native process in these internal cluster tests). + PostDataAction.Request postDataRequest = new PostDataAction.Request(jobId); + postDataRequest.setContent( + new BytesArray("{ \"time\" : \"" + BlackHoleAutodetectProcess.MAGIC_FAILURE_VALUE_AS_DATE + "\" }"), XContentType.JSON); + PostDataAction.Response postDataResponse = client().execute(PostDataAction.INSTANCE, postDataRequest).actionGet(); + assertEquals(1L, postDataResponse.getDataCounts().getInputRecordCount()); + + // Confirm the job state is now failed + jobStatsRequest = new GetJobsStatsAction.Request(jobId); + jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); + assertEquals(JobState.FAILED, jobStatsResponse.getResponse().results().get(0).getState()); + + // It's impossible to reliably get the datafeed into a stopping state at the point when the ML node is removed from the cluster + // using externally accessible actions. The only way this situation could occur in reality is through extremely unfortunate + // timing. Therefore, to simulate this unfortunate timing we cheat and access internal classes to set the datafeed state to + // stopping. + PersistentTasksCustomMetaData tasks = clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData.PersistentTask task = MlTasks.getDatafeedTask(datafeedId, tasks); + UpdatePersistentTaskStatusAction.Request updatePersistentTaskStatusRequest = + new UpdatePersistentTaskStatusAction.Request(task.getId(), task.getAllocationId(), DatafeedState.STOPPING); + PersistentTaskResponse updatePersistentTaskStatusResponse = + client().execute(UpdatePersistentTaskStatusAction.INSTANCE, updatePersistentTaskStatusRequest).actionGet(); + assertNotNull(updatePersistentTaskStatusResponse.getTask()); + + // Confirm the datafeed state is now stopping + GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response datafeedStatsResponse = + client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STOPPING, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // Stop the node running the failed job/stopping datafeed + ensureGreen(); // replicas must be assigned, otherwise we could lose a whole index + internalCluster().stopRandomNode(settings -> jobNode.getName().equals(settings.get("node.name"))); + ensureStableCluster(3); + + // We should be allowed to force stop the unassigned datafeed even though it is stopping and its job has failed + StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); + stopDatafeedRequest.setForce(true); + StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet(); + assertTrue(stopDatafeedResponse.isStopped()); + + // Confirm the datafeed state is now stopped + datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); + datafeedStatsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STOPPED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // We should be allowed to force stop the unassigned failed job + CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId); + closeJobRequest.setForce(true); + CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet(); + assertTrue(closeJobResponse.isClosed()); + } + + public void testStopAndForceStopDatafeed() throws Exception { + internalCluster().ensureAtMostNumDataNodes(0); + logger.info("Starting dedicated master node..."); + internalCluster().startNode(Settings.builder() + .put("node.master", true) + .put("node.data", true) + .put("node.ml", false) + .build()); + logger.info("Starting ml and data node..."); + internalCluster().startNode(Settings.builder() + .put("node.master", false) + .build()); + ensureStableClusterOnAllNodes(2); + + // index some datafeed data + client().admin().indices().prepareCreate("data") + .addMapping("type", "time", "type=date") + .get(); + long numDocs1 = randomIntBetween(32, 2048); + long now = System.currentTimeMillis(); + long weekAgo = now - 604800000; + long twoWeeksAgo = weekAgo - 604800000; + indexDocs(logger, "data", numDocs1, twoWeeksAgo, weekAgo); + + String jobId = "test-stop-and-force-stop"; + String datafeedId = jobId + "-datafeed"; + setupJobAndDatafeed(jobId, datafeedId, TimeValue.timeValueHours(1)); + waitForDatafeed(jobId, numDocs1); + + GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response datafeedStatsResponse = + client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // Stop the datafeed normally + StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); + ActionFuture normalStopActionFuture + = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest); + + // Force stop the datafeed without waiting for the normal stop to return first + stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); + stopDatafeedRequest.setForce(true); + StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet(); + assertTrue(stopDatafeedResponse.isStopped()); + + // Confirm that the normal stop also reports success - whichever way the datafeed + // ends up getting stopped it's not an error to stop a stopped datafeed + stopDatafeedResponse = normalStopActionFuture.actionGet(); + assertTrue(stopDatafeedResponse.isStopped()); + + CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId); + CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet(); + assertTrue(closeJobResponse.isClosed()); + } + public void testJobRelocationIsMemoryAware() throws Exception { internalCluster().ensureAtLeastNumDataNodes(1); @@ -244,12 +402,12 @@ private void setupJobWithoutDatafeed(String jobId, ByteSizeValue modelMemoryLimi }); } - private void setupJobAndDatafeed(String jobId, String datafeedId) throws Exception { + private void setupJobAndDatafeed(String jobId, String datafeedId, TimeValue datafeedFrequency) throws Exception { Job.Builder job = createScheduledJob(jobId); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); - DatafeedConfig config = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data")); + DatafeedConfig config = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data"), datafeedFrequency); PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config); client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet(); @@ -274,7 +432,7 @@ private void run(String jobId, CheckedRunnable disrupt) throws Except long twoWeeksAgo = weekAgo - 604800000; indexDocs(logger, "data", numDocs1, twoWeeksAgo, weekAgo); - setupJobAndDatafeed(jobId, "data_feed_id"); + setupJobAndDatafeed(jobId, "data_feed_id", TimeValue.timeValueSeconds(1)); waitForDatafeed(jobId, numDocs1); client().admin().indices().prepareSyncedFlush().get(); @@ -353,5 +511,4 @@ private void ensureStableClusterOnAllNodes(int nodeCount) { ensureStableCluster(nodeCount, nodeName); } } - } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java index 4b11fd813ce5..48439d6f6851 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java @@ -11,17 +11,45 @@ import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; public class BlackHoleAutodetectProcessTests extends ESTestCase { public void testFlushJob_writesAck() throws Exception { - try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo")) { + try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo", failureReason -> {})) { String flushId = process.flushJob(FlushJobParams.builder().build()); Iterator iterator = process.readAutodetectResults(); - iterator.hasNext(); + assertTrue(iterator.hasNext()); AutodetectResult result = iterator.next(); FlushAcknowledgement ack = result.getFlushAcknowledgement(); assertEquals(flushId, ack.getId()); } } + + public void testSimulatedFailure() throws Exception { + AtomicReference failureReason = new AtomicReference<>(); + try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo", failureReason::set)) { + Iterator iterator = process.readAutodetectResults(); + process.writeRecord(new String[] { BlackHoleAutodetectProcess.MAGIC_FAILURE_VALUE}); + assertFalse(process.isProcessAlive()); + assertTrue(iterator.hasNext()); + AutodetectResult result = iterator.next(); + assertThat(result.getModelSizeStats(), nullValue()); + assertThat(result.getBucket(), nullValue()); + assertThat(result.getFlushAcknowledgement(), nullValue()); + assertThat(result.getCategoryDefinition(), nullValue()); + assertThat(result.getModelSnapshot(), nullValue()); + assertThat(result.getQuantiles(), nullValue()); + assertThat(result.getInfluencers(), nullValue()); + assertThat(result.getModelPlot(), nullValue()); + assertThat(result.getRecords(), nullValue()); + assertThat(result.getForecast(), nullValue()); + assertThat(result.getForecastRequestStats(), nullValue()); + assertFalse(iterator.hasNext()); + } + assertThat(failureReason.get(), equalTo("simulated failure")); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index e934f7aa1b69..60b524ae0a02 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -178,13 +178,21 @@ public static Job.Builder createScheduledJob(String jobId) { } public static DatafeedConfig createDatafeed(String datafeedId, String jobId, List indices) { - return createDatafeedBuilder(datafeedId, jobId, indices).build(); + return createDatafeed(datafeedId, jobId, indices, TimeValue.timeValueSeconds(1)); + } + + public static DatafeedConfig createDatafeed(String datafeedId, String jobId, List indices, TimeValue frequency) { + return createDatafeedBuilder(datafeedId, jobId, indices, frequency).build(); } public static DatafeedConfig.Builder createDatafeedBuilder(String datafeedId, String jobId, List indices) { + return createDatafeedBuilder(datafeedId, jobId, indices, TimeValue.timeValueSeconds(1)); + } + + public static DatafeedConfig.Builder createDatafeedBuilder(String datafeedId, String jobId, List indices, TimeValue frequency) { DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId); builder.setQueryDelay(TimeValue.timeValueSeconds(1)); - builder.setFrequency(TimeValue.timeValueSeconds(1)); + builder.setFrequency(frequency); builder.setIndices(indices); return builder; }