diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index de52093a9cb76..d4277769bf19d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -540,7 +540,7 @@ public Collection createComponents(Client client, ClusterService cluster } else { mlController = new DummyController(); autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) -> - new BlackHoleAutodetectProcess(job.getId()); + new BlackHoleAutodetectProcess(job.getId(), onProcessCrash); // factor of 1.0 makes renormalization a no-op normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0); analyticsProcessFactory = (jobId, analyticsProcessConfig, state, executorService, onProcessCrash) -> null; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 2d8c62223f24d..ed3c69f7a4d5d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.Task; @@ -85,6 +86,7 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId, List startedDatafeedIds, List stoppingDatafeedIds) { switch (datafeedState) { + case STARTING: case STARTED: startedDatafeedIds.add(datafeedId); break; @@ -94,6 +96,7 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId, stoppingDatafeedIds.add(datafeedId); break; default: + assert false : "Unexpected datafeed state " + datafeedState; break; } } @@ -126,9 +129,9 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()])); if (request.isForce()) { - forceStopDatafeed(request, listener, tasks, startedDatafeeds); + forceStopDatafeed(request, listener, tasks, startedDatafeeds, stoppingDatafeeds); } else { - normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds); + normalStopDatafeed(task, request, listener, tasks, nodes, startedDatafeeds, stoppingDatafeeds); } }, listener::onFailure @@ -137,9 +140,9 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi } private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, ActionListener listener, - PersistentTasksCustomMetaData tasks, + PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes, List startedDatafeeds, List stoppingDatafeeds) { - Set executorNodes = new HashSet<>(); + final Set executorNodes = new HashSet<>(); for (String datafeedId : startedDatafeeds) { PersistentTasksCustomMetaData.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); if (datafeedTask == null) { @@ -147,10 +150,10 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found."; assert datafeedTask != null : msg; logger.error(msg); - } else if (datafeedTask.isAssigned()) { + } else if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes) == false) { executorNodes.add(datafeedTask.getExecutorNode()); } else { - // This is the easy case - the datafeed is not currently assigned to a node, + // This is the easy case - the datafeed is not currently assigned to a valid node, // so can be gracefully stopped simply by removing its persistent task. (Usually // a graceful stop cannot be achieved by simply removing the persistent task, but // if the datafeed has no running code then graceful/forceful are the same.) @@ -171,24 +174,37 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A ActionListener finalListener = ActionListener.wrap( r -> waitForDatafeedStopped(allDataFeedsToWaitFor, request, r, listener), - listener::onFailure); + e -> { + if (ExceptionsHelper.unwrapCause(e) instanceof FailedNodeException) { + // A node has dropped out of the cluster since we started executing the requests. + // Since stopping an already stopped datafeed is not an error we can try again. + // The datafeeds that were running on the node that dropped out of the cluster + // will just have their persistent tasks cancelled. Datafeeds that were stopped + // by the previous attempt will be noops in the subsequent attempt. + doExecute(task, request, listener); + } else { + listener.onFailure(e); + } + }); super.doExecute(task, request, finalListener); } private void forceStopDatafeed(final StopDatafeedAction.Request request, final ActionListener listener, - PersistentTasksCustomMetaData tasks, final List startedDatafeeds) { + PersistentTasksCustomMetaData tasks, final List startedDatafeeds, + List stoppingDatafeeds) { + final List allDatafeeds = Stream.concat(startedDatafeeds.stream(), stoppingDatafeeds.stream()).collect(Collectors.toList()); final AtomicInteger counter = new AtomicInteger(); - final AtomicArray failures = new AtomicArray<>(startedDatafeeds.size()); + final AtomicArray failures = new AtomicArray<>(allDatafeeds.size()); - for (String datafeedId : startedDatafeeds) { + for (String datafeedId : allDatafeeds) { PersistentTasksCustomMetaData.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); if (datafeedTask != null) { persistentTasksService.sendRemoveRequest(datafeedTask.getId(), new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - if (counter.incrementAndGet() == startedDatafeeds.size()) { + if (counter.incrementAndGet() == allDatafeeds.size()) { sendResponseOrFailure(request.getDatafeedId(), listener, failures); } } @@ -196,23 +212,26 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask persisten @Override public void onFailure(Exception e) { final int slot = counter.incrementAndGet(); - if ((ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException && - Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) { + // We validated that the datafeed names supplied in the request existed when we started processing the action. + // If the related tasks don't exist at this point then they must have been stopped by a simultaneous stop request. + // This is not an error. + if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) { failures.set(slot - 1, e); } - if (slot == startedDatafeeds.size()) { + if (slot == allDatafeeds.size()) { sendResponseOrFailure(request.getDatafeedId(), listener, failures); } } }); } else { - // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method + // This should not happen, because startedDatafeeds and stoppingDatafeeds + // were derived from the same tasks that were passed to this method String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found."; assert datafeedTask != null : msg; logger.error(msg); final int slot = counter.incrementAndGet(); failures.set(slot - 1, new RuntimeException(msg)); - if (slot == startedDatafeeds.size()) { + if (slot == allDatafeeds.size()) { sendResponseOrFailure(request.getDatafeedId(), listener, failures); } } @@ -313,7 +332,7 @@ protected StopDatafeedAction.Response newResponse(StopDatafeedAction.Request req .convertToElastic(failedNodeExceptions.get(0)); } else { // This can happen we the actual task in the node no longer exists, - // which means the datafeed(s) have already been closed. + // which means the datafeed(s) have already been stopped. return new StopDatafeedAction.Response(true); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index 9421637831ee0..ff717bb0bb2ca 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -21,12 +21,15 @@ import java.io.IOException; import java.time.ZonedDateTime; +import java.util.Arrays; import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * A placeholder class simulating the actions of the native Autodetect process. @@ -37,16 +40,21 @@ */ public class BlackHoleAutodetectProcess implements AutodetectProcess { + public static final String MAGIC_FAILURE_VALUE = "253402300799"; + public static final String MAGIC_FAILURE_VALUE_AS_DATE = "9999-12-31 23:59:59"; + private static final String FLUSH_ID = "flush-1"; private final String jobId; private final ZonedDateTime startTime; private final BlockingQueue results = new LinkedBlockingDeque<>(); + private final Consumer onProcessCrash; private volatile boolean open = true; - public BlackHoleAutodetectProcess(String jobId) { + public BlackHoleAutodetectProcess(String jobId, Consumer onProcessCrash) { this.jobId = jobId; startTime = ZonedDateTime.now(); + this.onProcessCrash = Objects.requireNonNull(onProcessCrash); } @Override @@ -59,7 +67,13 @@ public boolean isReady() { } @Override - public void writeRecord(String[] record) throws IOException { + public void writeRecord(String[] record) { + if (Arrays.asList(record).contains(MAGIC_FAILURE_VALUE)) { + open = false; + onProcessCrash.accept("simulated failure"); + AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null); + results.add(result); + } } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 3509d54999db9..4cb66b44b5fd5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -5,11 +5,13 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -21,16 +23,20 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.persistent.PersistentTaskResponse; import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.core.action.util.QueryPage; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction.Response.DatafeedStats; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction.Response.JobStats; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; @@ -41,6 +47,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import java.io.IOException; @@ -116,7 +123,6 @@ public void testFullClusterRestart() throws Exception { }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/43670") public void testCloseUnassignedJobAndDatafeed() throws Exception { internalCluster().ensureAtMostNumDataNodes(0); logger.info("Starting dedicated master node..."); @@ -143,7 +149,7 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { String jobId = "test-lose-ml-node"; String datafeedId = jobId + "-datafeed"; - setupJobAndDatafeed(jobId, datafeedId); + setupJobAndDatafeed(jobId, datafeedId, TimeValue.timeValueHours(1)); waitForDatafeed(jobId, numDocs1); // stop the only ML node @@ -174,6 +180,158 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { assertTrue(closeJobResponse.isClosed()); } + public void testCloseUnassignedFailedJobAndStopUnassignedStoppingDatafeed() throws Exception { + internalCluster().ensureAtMostNumDataNodes(0); + logger.info("Starting master/data nodes..."); + for (int count = 0; count < 3; ++count) { + internalCluster().startNode(Settings.builder() + .put("node.master", true) + .put("node.data", true) + .put("node.ml", false) + .build()); + } + logger.info("Starting dedicated ml node..."); + internalCluster().startNode(Settings.builder() + .put("node.master", false) + .put("node.data", false) + .put("node.ml", true) + .build()); + ensureStableClusterOnAllNodes(4); + + // index some datafeed data + client().admin().indices().prepareCreate("data") + .addMapping("type", "time", "type=date") + .get(); + long numDocs1 = randomIntBetween(32, 2048); + long now = System.currentTimeMillis(); + long weekAgo = now - 604800000; + long twoWeeksAgo = weekAgo - 604800000; + indexDocs(logger, "data", numDocs1, twoWeeksAgo, weekAgo); + + String jobId = "test-stop-unassigned-datafeed-for-failed-job"; + String datafeedId = jobId + "-datafeed"; + setupJobAndDatafeed(jobId, datafeedId, TimeValue.timeValueHours(1)); + waitForDatafeed(jobId, numDocs1); + + // Job state should be opened here + GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId); + GetJobsStatsAction.Response jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); + assertEquals(JobState.OPENED, jobStatsResponse.getResponse().results().get(0).getState()); + DiscoveryNode jobNode = jobStatsResponse.getResponse().results().get(0).getNode(); + + // Post the job a record that will result in the job receiving a timestamp in epoch + // seconds equal to the maximum integer - this makes the blackhole autodetect fail. + // It's better to do this than the approach of directly updating the job state using + // the approach used below for datafeeds, because when the job fails at the "process" + // level it sets off a more realistic chain reaction in the layers that wrap the "process" + // (remember it's not a real native process in these internal cluster tests). + PostDataAction.Request postDataRequest = new PostDataAction.Request(jobId); + postDataRequest.setContent( + new BytesArray("{ \"time\" : \"" + BlackHoleAutodetectProcess.MAGIC_FAILURE_VALUE_AS_DATE + "\" }"), XContentType.JSON); + PostDataAction.Response postDataResponse = client().execute(PostDataAction.INSTANCE, postDataRequest).actionGet(); + assertEquals(1L, postDataResponse.getDataCounts().getInputRecordCount()); + + // Confirm the job state is now failed + jobStatsRequest = new GetJobsStatsAction.Request(jobId); + jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); + assertEquals(JobState.FAILED, jobStatsResponse.getResponse().results().get(0).getState()); + + // It's impossible to reliably get the datafeed into a stopping state at the point when the ML node is removed from the cluster + // using externally accessible actions. The only way this situation could occur in reality is through extremely unfortunate + // timing. Therefore, to simulate this unfortunate timing we cheat and access internal classes to set the datafeed state to + // stopping. + PersistentTasksCustomMetaData tasks = clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData.PersistentTask task = MlTasks.getDatafeedTask(datafeedId, tasks); + UpdatePersistentTaskStatusAction.Request updatePersistentTaskStatusRequest = + new UpdatePersistentTaskStatusAction.Request(task.getId(), task.getAllocationId(), DatafeedState.STOPPING); + PersistentTaskResponse updatePersistentTaskStatusResponse = + client().execute(UpdatePersistentTaskStatusAction.INSTANCE, updatePersistentTaskStatusRequest).actionGet(); + assertNotNull(updatePersistentTaskStatusResponse.getTask()); + + // Confirm the datafeed state is now stopping + GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response datafeedStatsResponse = + client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STOPPING, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // Stop the node running the failed job/stopping datafeed + ensureGreen(); // replicas must be assigned, otherwise we could lose a whole index + internalCluster().stopRandomNode(settings -> jobNode.getName().equals(settings.get("node.name"))); + ensureStableCluster(3); + + // We should be allowed to force stop the unassigned datafeed even though it is stopping and its job has failed + StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); + stopDatafeedRequest.setForce(true); + StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet(); + assertTrue(stopDatafeedResponse.isStopped()); + + // Confirm the datafeed state is now stopped + datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); + datafeedStatsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STOPPED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // We should be allowed to force stop the unassigned failed job + CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId); + closeJobRequest.setForce(true); + CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet(); + assertTrue(closeJobResponse.isClosed()); + } + + public void testStopAndForceStopDatafeed() throws Exception { + internalCluster().ensureAtMostNumDataNodes(0); + logger.info("Starting dedicated master node..."); + internalCluster().startNode(Settings.builder() + .put("node.master", true) + .put("node.data", true) + .put("node.ml", false) + .build()); + logger.info("Starting ml and data node..."); + internalCluster().startNode(Settings.builder() + .put("node.master", false) + .build()); + ensureStableClusterOnAllNodes(2); + + // index some datafeed data + client().admin().indices().prepareCreate("data") + .addMapping("type", "time", "type=date") + .get(); + long numDocs1 = randomIntBetween(32, 2048); + long now = System.currentTimeMillis(); + long weekAgo = now - 604800000; + long twoWeeksAgo = weekAgo - 604800000; + indexDocs(logger, "data", numDocs1, twoWeeksAgo, weekAgo); + + String jobId = "test-stop-and-force-stop"; + String datafeedId = jobId + "-datafeed"; + setupJobAndDatafeed(jobId, datafeedId, TimeValue.timeValueHours(1)); + waitForDatafeed(jobId, numDocs1); + + GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response datafeedStatsResponse = + client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // Stop the datafeed normally + StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); + ActionFuture normalStopActionFuture + = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest); + + // Force stop the datafeed without waiting for the normal stop to return first + stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); + stopDatafeedRequest.setForce(true); + StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet(); + assertTrue(stopDatafeedResponse.isStopped()); + + // Confirm that the normal stop also reports success - whichever way the datafeed + // ends up getting stopped it's not an error to stop a stopped datafeed + stopDatafeedResponse = normalStopActionFuture.actionGet(); + assertTrue(stopDatafeedResponse.isStopped()); + + CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId); + CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet(); + assertTrue(closeJobResponse.isClosed()); + } + public void testJobRelocationIsMemoryAware() throws Exception { internalCluster().ensureAtLeastNumDataNodes(1); @@ -244,12 +402,12 @@ private void setupJobWithoutDatafeed(String jobId, ByteSizeValue modelMemoryLimi }); } - private void setupJobAndDatafeed(String jobId, String datafeedId) throws Exception { + private void setupJobAndDatafeed(String jobId, String datafeedId, TimeValue datafeedFrequency) throws Exception { Job.Builder job = createScheduledJob(jobId); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); - DatafeedConfig config = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data")); + DatafeedConfig config = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data"), datafeedFrequency); PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config); client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet(); @@ -274,7 +432,7 @@ private void run(String jobId, CheckedRunnable disrupt) throws Except long twoWeeksAgo = weekAgo - 604800000; indexDocs(logger, "data", numDocs1, twoWeeksAgo, weekAgo); - setupJobAndDatafeed(jobId, "data_feed_id"); + setupJobAndDatafeed(jobId, "data_feed_id", TimeValue.timeValueSeconds(1)); waitForDatafeed(jobId, numDocs1); client().admin().indices().prepareSyncedFlush().get(); @@ -353,5 +511,4 @@ private void ensureStableClusterOnAllNodes(int nodeCount) { ensureStableCluster(nodeCount, nodeName); } } - } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java index 4b11fd813ce56..48439d6f68517 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java @@ -11,17 +11,45 @@ import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; public class BlackHoleAutodetectProcessTests extends ESTestCase { public void testFlushJob_writesAck() throws Exception { - try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo")) { + try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo", failureReason -> {})) { String flushId = process.flushJob(FlushJobParams.builder().build()); Iterator iterator = process.readAutodetectResults(); - iterator.hasNext(); + assertTrue(iterator.hasNext()); AutodetectResult result = iterator.next(); FlushAcknowledgement ack = result.getFlushAcknowledgement(); assertEquals(flushId, ack.getId()); } } + + public void testSimulatedFailure() throws Exception { + AtomicReference failureReason = new AtomicReference<>(); + try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo", failureReason::set)) { + Iterator iterator = process.readAutodetectResults(); + process.writeRecord(new String[] { BlackHoleAutodetectProcess.MAGIC_FAILURE_VALUE}); + assertFalse(process.isProcessAlive()); + assertTrue(iterator.hasNext()); + AutodetectResult result = iterator.next(); + assertThat(result.getModelSizeStats(), nullValue()); + assertThat(result.getBucket(), nullValue()); + assertThat(result.getFlushAcknowledgement(), nullValue()); + assertThat(result.getCategoryDefinition(), nullValue()); + assertThat(result.getModelSnapshot(), nullValue()); + assertThat(result.getQuantiles(), nullValue()); + assertThat(result.getInfluencers(), nullValue()); + assertThat(result.getModelPlot(), nullValue()); + assertThat(result.getRecords(), nullValue()); + assertThat(result.getForecast(), nullValue()); + assertThat(result.getForecastRequestStats(), nullValue()); + assertFalse(iterator.hasNext()); + } + assertThat(failureReason.get(), equalTo("simulated failure")); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index e934f7aa1b696..60b524ae0a025 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -178,13 +178,21 @@ public static Job.Builder createScheduledJob(String jobId) { } public static DatafeedConfig createDatafeed(String datafeedId, String jobId, List indices) { - return createDatafeedBuilder(datafeedId, jobId, indices).build(); + return createDatafeed(datafeedId, jobId, indices, TimeValue.timeValueSeconds(1)); + } + + public static DatafeedConfig createDatafeed(String datafeedId, String jobId, List indices, TimeValue frequency) { + return createDatafeedBuilder(datafeedId, jobId, indices, frequency).build(); } public static DatafeedConfig.Builder createDatafeedBuilder(String datafeedId, String jobId, List indices) { + return createDatafeedBuilder(datafeedId, jobId, indices, TimeValue.timeValueSeconds(1)); + } + + public static DatafeedConfig.Builder createDatafeedBuilder(String datafeedId, String jobId, List indices, TimeValue frequency) { DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId); builder.setQueryDelay(TimeValue.timeValueSeconds(1)); - builder.setFrequency(TimeValue.timeValueSeconds(1)); + builder.setFrequency(frequency); builder.setIndices(indices); return builder; }