From 7697e344a7db613c911947671bc9c104e5cd6bf5 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 17 Oct 2018 13:49:59 +0100 Subject: [PATCH] [ML] Job in index: Datafeed node selector (#34218) --- .../core/ml/action/StartDatafeedAction.java | 46 ++++- .../core/ml/action/DatafeedParamsTests.java | 8 + .../xpack/ml/MachineLearning.java | 3 +- .../xpack/ml/MlAssignmentNotifier.java | 24 +-- .../action/TransportStartDatafeedAction.java | 166 +++++++++++------- .../xpack/ml/datafeed/DatafeedJob.java | 4 + .../xpack/ml/datafeed/DatafeedJobBuilder.java | 115 +++++++++--- .../xpack/ml/datafeed/DatafeedManager.java | 68 ++++--- .../ml/datafeed/DatafeedNodeSelector.java | 38 ++-- .../xpack/ml/MlAssignmentNotifierTests.java | 4 +- .../TransportStartDatafeedActionTests.java | 36 +--- .../ml/datafeed/DatafeedJobBuilderTests.java | 66 +++++-- .../ml/datafeed/DatafeedManagerTests.java | 43 ++--- .../datafeed/DatafeedNodeSelectorTests.java | 127 ++++++-------- 14 files changed, 437 insertions(+), 311 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java index 16e5810b9a465..f30706c5cbfc1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java @@ -28,10 +28,13 @@ import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.function.LongSupplier; @@ -147,6 +150,8 @@ public boolean equals(Object obj) { public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams { + public static final ParseField INDICES = new ParseField("indices"); + public static ObjectParser PARSER = new ObjectParser<>(MlTasks.DATAFEED_TASK_NAME, true, DatafeedParams::new); static { PARSER.declareString((params, datafeedId) -> params.datafeedId = datafeedId, DatafeedConfig.ID); @@ -155,6 +160,8 @@ public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskPar PARSER.declareString(DatafeedParams::setEndTime, END_TIME); PARSER.declareString((params, val) -> params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + PARSER.declareString(DatafeedParams::setJobId, Job.ID); + PARSER.declareStringArray(DatafeedParams::setDatafeedIndices, INDICES); } static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) { @@ -194,6 +201,10 @@ public DatafeedParams(StreamInput in) throws IOException { startTime = in.readVLong(); endTime = in.readOptionalLong(); timeout = TimeValue.timeValueMillis(in.readVLong()); + if (in.getVersion().onOrAfter(Version.CURRENT)) { + jobId = in.readOptionalString(); + datafeedIndices = in.readList(StreamInput::readString); + } } DatafeedParams() { @@ -203,6 +214,9 @@ public DatafeedParams(StreamInput in) throws IOException { private long startTime; private Long endTime; private TimeValue timeout = TimeValue.timeValueSeconds(20); + private List datafeedIndices = Collections.emptyList(); + private String jobId; + public String getDatafeedId() { return datafeedId; @@ -232,6 +246,22 @@ public void setTimeout(TimeValue timeout) { this.timeout = timeout; } + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public List getDatafeedIndices() { + return datafeedIndices; + } + + public void setDatafeedIndices(List datafeedIndices) { + this.datafeedIndices = datafeedIndices; + } + @Override public String getWriteableName() { return MlTasks.DATAFEED_TASK_NAME; @@ -248,6 +278,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(startTime); out.writeOptionalLong(endTime); out.writeVLong(timeout.millis()); + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalString(jobId); + out.writeStringList(datafeedIndices); + } } @Override @@ -259,13 +293,19 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par builder.field(END_TIME.getPreferredName(), String.valueOf(endTime)); } builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep()); + if (jobId != null) { + builder.field(Job.ID.getPreferredName(), jobId); + } + if (datafeedIndices.isEmpty() == false) { + builder.field(INDICES.getPreferredName(), datafeedIndices); + } builder.endObject(); return builder; } @Override public int hashCode() { - return Objects.hash(datafeedId, startTime, endTime, timeout); + return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices); } @Override @@ -280,7 +320,9 @@ public boolean equals(Object obj) { return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(startTime, other.startTime) && Objects.equals(endTime, other.endTime) && - Objects.equals(timeout, other.timeout); + Objects.equals(timeout, other.timeout) && + Objects.equals(jobId, other.jobId) && + Objects.equals(datafeedIndices, other.datafeedIndices); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DatafeedParamsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DatafeedParamsTests.java index 24a6dbacfada5..79bfcde76e067 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DatafeedParamsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DatafeedParamsTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; +import java.util.Arrays; public class DatafeedParamsTests extends AbstractSerializingTestCase { @Override @@ -28,6 +29,13 @@ public static StartDatafeedAction.DatafeedParams createDatafeedParams() { if (randomBoolean()) { params.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); } + if (randomBoolean()) { + params.setJobId(randomAlphaOfLength(10)); + } + if (randomBoolean()) { + params.setDatafeedIndices(Arrays.asList(randomAlphaOfLength(10), randomAlphaOfLength(10))); + } + return params; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index f1a0745d58fed..b2b68dfeb9f75 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -411,7 +411,8 @@ public Collection createComponents(Client client, ClusterService cluster jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, xContentRegistry, auditor); this.autodetectProcessManager.set(autodetectProcessManager); - DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis); + DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry, + auditor, System::currentTimeMillis); DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, System::currentTimeMillis, auditor); this.datafeedManager.set(datafeedManager); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 5df11f02a3610..c7e867bb7de70 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -12,15 +12,13 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.Objects; @@ -89,16 +87,20 @@ public void clusterChanged(ClusterChangedEvent event) { auditor.info(jobId, "Opening job on node [" + node.toString() + "]"); } } else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) { - String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId(); - DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId); + StartDatafeedAction.DatafeedParams datafeedParams = (StartDatafeedAction.DatafeedParams) currentTask.getParams(); + String jobId = datafeedParams.getJobId(); if (currentAssignment.getExecutorNode() == null) { - String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" + + String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" + currentAssignment.getExplanation() + "]"; - logger.warn("[{}] {}", datafeedConfig.getJobId(), msg); - auditor.warning(datafeedConfig.getJobId(), msg); + logger.warn("[{}] {}", jobId, msg); + if (jobId != null) { + auditor.warning(jobId, msg); + } } else { DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); - auditor.info(datafeedConfig.getJobId(), "Starting datafeed [" + datafeedId + "] on node [" + node + "]"); + if (jobId != null) { + auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]"); + } } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 578f4ee5f983b..aa416b6fa1d54 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -35,7 +35,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -48,10 +47,14 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Predicate; /* This class extends from TransportMasterNodeAction for cluster state observing purposes. @@ -67,34 +70,30 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction listener) { StartDatafeedAction.DatafeedParams params = request.getParams(); - if (licenseState.isMachineLearningAllowed()) { - - ActionListener> waitForTaskListener = - new ActionListener>() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask - persistentTask) { - waitForDatafeedStarted(persistentTask.getId(), params, listener); - } - - @Override - public void onFailure(Exception e) { - if (e instanceof ResourceAlreadyExistsException) { - logger.debug("datafeed already started", e); - e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() + - "] because it has already been started", RestStatus.CONFLICT); - } - listener.onFailure(e); - } - }; - - // Verify data extractor factory can be created, then start persistent task - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - validate(params.getDatafeedId(), mlMetadata, tasks); - DatafeedConfig datafeed = mlMetadata.getDatafeed(params.getDatafeedId()); - Job job = mlMetadata.getJobs().get(datafeed.getJobId()); - - if (RemoteClusterLicenseChecker.containsRemoteIndex(datafeed.getIndices())) { - final RemoteClusterLicenseChecker remoteClusterLicenseChecker = - new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode); - remoteClusterLicenseChecker.checkRemoteClusterLicenses( - RemoteClusterLicenseChecker.remoteClusterAliases(datafeed.getIndices()), - ActionListener.wrap( - response -> { - if (response.isSuccess() == false) { - listener.onFailure(createUnlicensedError(datafeed.getId(), response)); - } else { - createDataExtractor(job, datafeed, params, waitForTaskListener); - } - }, - e -> listener.onFailure( - createUnknownLicenseError( - datafeed.getId(), RemoteClusterLicenseChecker.remoteIndices(datafeed.getIndices()), e)) - )); - } else { - createDataExtractor(job, datafeed, params, waitForTaskListener); - } - } else { + if (licenseState.isMachineLearningAllowed() == false) { listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING)); + return; } + + AtomicReference datafeedConfigHolder = new AtomicReference<>(); + PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + + ActionListener> waitForTaskListener = + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask + persistentTask) { + waitForDatafeedStarted(persistentTask.getId(), params, listener); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceAlreadyExistsException) { + logger.debug("datafeed already started", e); + e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() + + "] because it has already been started", RestStatus.CONFLICT); + } + listener.onFailure(e); + } + }; + + // Verify data extractor factory can be created, then start persistent task + Consumer createDataExtrator = job -> { + if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedIndices())) { + final RemoteClusterLicenseChecker remoteClusterLicenseChecker = + new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode); + remoteClusterLicenseChecker.checkRemoteClusterLicenses( + RemoteClusterLicenseChecker.remoteClusterAliases(params.getDatafeedIndices()), + ActionListener.wrap( + response -> { + if (response.isSuccess() == false) { + listener.onFailure(createUnlicensedError(params.getDatafeedId(), response)); + } else { + createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener); + } + }, + e -> listener.onFailure( + createUnknownLicenseError( + params.getDatafeedId(), + RemoteClusterLicenseChecker.remoteIndices(params.getDatafeedIndices()), e)) + ) + ); + } else { + createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener); + } + }; + + ActionListener jobListener = ActionListener.wrap( + jobBuilder -> { + try { + Job job = jobBuilder.build(); + validate(job, datafeedConfigHolder.get(), tasks); + createDataExtrator.accept(job); + } catch (Exception e) { + listener.onFailure(e); + } + }, + listener::onFailure + ); + + ActionListener datafeedListener = ActionListener.wrap( + datafeedBuilder -> { + try { + DatafeedConfig datafeedConfig = datafeedBuilder.build(); + params.setDatafeedIndices(datafeedConfig.getIndices()); + params.setJobId(datafeedConfig.getJobId()); + datafeedConfigHolder.set(datafeedConfig); + jobConfigProvider.getJob(datafeedConfig.getJobId(), jobListener); + } catch (Exception e) { + listener.onFailure(e); + } + }, + listener::onFailure + ); + + datafeedConfigProvider.getDatafeedConfig(params.getDatafeedId(), datafeedListener); } private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeedAction.DatafeedParams params, @@ -280,14 +310,14 @@ public StartDatafeedPersistentTasksExecutor(Settings settings, DatafeedManager d @Override public PersistentTasksCustomMetaData.Assignment getAssignment(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { - return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId()).selectNode(); + return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getJobId(), + params.getDatafeedIndices()).selectNode(); } @Override public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { - PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - TransportStartDatafeedAction.validate(params.getDatafeedId(), MlMetadata.getMlMetadata(clusterState), tasks); - new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getJobId(), params.getDatafeedIndices()) + .checkDatafeedTaskCanBeCreated(); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 1fa402f4e2485..20900c3f0d8e7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -82,6 +82,10 @@ boolean isIsolated() { return isIsolated; } + public String getJobId() { + return jobId; + } + Long runLookBack(long startTime, Long endTime) throws Exception { lookbackStartTimeMs = skipToStartTime(startTime); Optional endMs = Optional.ofNullable(endTime); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index efe332346efec..2f3c93be92b3f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -8,93 +8,154 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.Collections; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; public class DatafeedJobBuilder { private final Client client; - private final JobResultsProvider jobResultsProvider; + private final Settings settings; + private final NamedXContentRegistry xContentRegistry; private final Auditor auditor; private final Supplier currentTimeSupplier; - public DatafeedJobBuilder(Client client, JobResultsProvider jobResultsProvider, Auditor auditor, Supplier currentTimeSupplier) { + public DatafeedJobBuilder(Client client, Settings settings, NamedXContentRegistry xContentRegistry, + Auditor auditor, Supplier currentTimeSupplier) { this.client = client; - this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); + this.settings = Objects.requireNonNull(settings); + this.xContentRegistry = Objects.requireNonNull(xContentRegistry); this.auditor = Objects.requireNonNull(auditor); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); } - void build(Job job, DatafeedConfig datafeed, ActionListener listener) { + void build(String datafeedId, ActionListener listener) { + + JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings); + JobConfigProvider jobConfigProvider = new JobConfigProvider(client, settings); + DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); + + build(datafeedId, jobResultsProvider, jobConfigProvider, datafeedConfigProvider, listener); + } + + /** + * For testing only. + * Use {@link #build(String, ActionListener)} instead + */ + void build(String datafeedId, JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider, + DatafeedConfigProvider datafeedConfigProvider, ActionListener listener) { + + AtomicReference jobHolder = new AtomicReference<>(); + AtomicReference datafeedConfigHolder = new AtomicReference<>(); // Step 5. Build datafeed job object Consumer contextHanlder = context -> { - TimeValue frequency = getFrequencyOrDefault(datafeed, job); - TimeValue queryDelay = datafeed.getQueryDelay(); - DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.millis(), queryDelay.millis(), + TimeValue frequency = getFrequencyOrDefault(datafeedConfigHolder.get(), jobHolder.get()); + TimeValue queryDelay = datafeedConfigHolder.get().getQueryDelay(); + DatafeedJob datafeedJob = new DatafeedJob(jobHolder.get().getId(), buildDataDescription(jobHolder.get()), + frequency.millis(), queryDelay.millis(), context.dataExtractorFactory, client, auditor, currentTimeSupplier, context.latestFinalBucketEndMs, context.latestRecordTimeMs); + listener.onResponse(datafeedJob); }; final Context context = new Context(); - // Step 4. Context building complete - invoke final listener + // Context building complete - invoke final listener ActionListener dataExtractorFactoryHandler = ActionListener.wrap( dataExtractorFactory -> { context.dataExtractorFactory = dataExtractorFactory; contextHanlder.accept(context); }, e -> { - auditor.error(job.getId(), e.getMessage()); + auditor.error(jobHolder.get().getId(), e.getMessage()); listener.onFailure(e); } ); - // Step 3. Create data extractor factory + // Create data extractor factory Consumer dataCountsHandler = dataCounts -> { if (dataCounts.getLatestRecordTimeStamp() != null) { context.latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime(); } - DataExtractorFactory.create(client, datafeed, job, dataExtractorFactoryHandler); + DataExtractorFactory.create(client, datafeedConfigHolder.get(), jobHolder.get(), dataExtractorFactoryHandler); }; - // Step 2. Collect data counts + // Collect data counts Consumer> bucketsHandler = buckets -> { if (buckets.results().size() == 1) { - TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan(); + TimeValue bucketSpan = jobHolder.get().getAnalysisConfig().getBucketSpan(); context.latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.millis() - 1; } - jobResultsProvider.dataCounts(job.getId(), dataCountsHandler, listener::onFailure); + jobResultsProvider.dataCounts(jobHolder.get().getId(), dataCountsHandler, listener::onFailure); }; - // Step 1. Collect latest bucket - BucketsQueryBuilder latestBucketQuery = new BucketsQueryBuilder() - .sortField(Result.TIMESTAMP.getPreferredName()) - .sortDescending(true).size(1) - .includeInterim(false); - jobResultsProvider.bucketsViaInternalClient(job.getId(), latestBucketQuery, bucketsHandler, e -> { - if (e instanceof ResourceNotFoundException) { - QueryPage empty = new QueryPage<>(Collections.emptyList(), 0, Bucket.RESULT_TYPE_FIELD); - bucketsHandler.accept(empty); - } else { - listener.onFailure(e); - } - }); + // Collect latest bucket + Consumer jobIdConsumer = jobId -> { + BucketsQueryBuilder latestBucketQuery = new BucketsQueryBuilder() + .sortField(Result.TIMESTAMP.getPreferredName()) + .sortDescending(true).size(1) + .includeInterim(false); + jobResultsProvider.bucketsViaInternalClient(jobId, latestBucketQuery, bucketsHandler, e -> { + if (e instanceof ResourceNotFoundException) { + QueryPage empty = new QueryPage<>(Collections.emptyList(), 0, Bucket.RESULT_TYPE_FIELD); + bucketsHandler.accept(empty); + } else { + listener.onFailure(e); + } + }); + }; + + // Get the job config and re-validate + // Re-validation is required as the config has been re-read since + // the previous validation + ActionListener jobConfigListener = ActionListener.wrap( + jobBuilder -> { + try { + jobHolder.set(jobBuilder.build()); + DatafeedJobValidator.validate(datafeedConfigHolder.get(), jobHolder.get()); + jobIdConsumer.accept(jobHolder.get().getId()); + } catch (Exception e) { + listener.onFailure(e); + } + }, + listener::onFailure + ); + + // Get the datafeed config + ActionListener datafeedConfigListener = ActionListener.wrap( + configBuilder -> { + try { + datafeedConfigHolder.set(configBuilder.build()); + jobConfigProvider.getJob(datafeedConfigHolder.get().getJobId(), jobConfigListener); + } catch (Exception e) { + listener.onFailure(e); + } + }, + listener::onFailure + ); + + datafeedConfigProvider.getDatafeedConfig(datafeedId, datafeedConfigListener); } private static TimeValue getFrequencyOrDefault(DatafeedConfig datafeed, Job job) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 3d4d66eba92a3..c801118fe0fa7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -18,19 +18,16 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -48,9 +45,9 @@ import java.util.function.Consumer; import java.util.function.Supplier; +import static org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; -import static org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener; public class DatafeedManager extends AbstractComponent { @@ -77,17 +74,14 @@ public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clus clusterService.addListener(taskRunner); } - public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer taskHandler) { - String datafeedId = task.getDatafeedId(); - ClusterState state = clusterService.state(); - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); - Job job = mlMetadata.getJobs().get(datafeed.getJobId()); + public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer finishHandler) { + String datafeedId = task.getDatafeedId(); ActionListener datafeedJobHandler = ActionListener.wrap( datafeedJob -> { - Holder holder = new Holder(task, datafeed, datafeedJob, new ProblemTracker(auditor, job.getId()), taskHandler); + Holder holder = new Holder(task, datafeedId, datafeedJob, + new ProblemTracker(auditor, datafeedJob.getJobId()), finishHandler); runningDatafeedsOnThisNode.put(task.getAllocationId(), holder); task.updatePersistentTaskState(DatafeedState.STARTED, new ActionListener>() { @Override @@ -97,13 +91,13 @@ public void onResponse(PersistentTask persistentTask) { @Override public void onFailure(Exception e) { - taskHandler.accept(e); + finishHandler.accept(e); } }); - }, taskHandler::accept + }, finishHandler::accept ); - datafeedJobBuilder.build(job, datafeed, datafeedJobHandler); + datafeedJobBuilder.build(datafeedId, datafeedJobHandler); } public void stopDatafeed(TransportStartDatafeedAction.DatafeedTask task, String reason, TimeValue timeout) { @@ -158,7 +152,7 @@ private void innerRun(Holder holder, long startTime, Long endTime) { @Override public void onFailure(Exception e) { - logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e); + logger.error("Failed lookback import for job [" + holder.datafeedJob.getJobId() + "]", e); holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e); } @@ -188,17 +182,17 @@ protected void doRun() { } else { // Notify that a lookback-only run found no data String lookbackNoDataMsg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_LOOKBACK_NO_DATA); - logger.warn("[{}] {}", holder.datafeed.getJobId(), lookbackNoDataMsg); - auditor.warning(holder.datafeed.getJobId(), lookbackNoDataMsg); + logger.warn("[{}] {}", holder.datafeedJob.getJobId(), lookbackNoDataMsg); + auditor.warning(holder.datafeedJob.getJobId(), lookbackNoDataMsg); } } catch (Exception e) { - logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e); + logger.error("Failed lookback import for job [" + holder.datafeedJob.getJobId() + "]", e); holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e); return; } if (isolated == false) { if (next != null) { - doDatafeedRealtime(next, holder.datafeed.getJobId(), holder); + doDatafeedRealtime(next, holder.datafeedJob.getJobId(), holder); } else { holder.stop("no_realtime", TimeValue.timeValueSeconds(20), null); holder.problemTracker.finishReport(); @@ -276,29 +270,29 @@ public class Holder { private final TransportStartDatafeedAction.DatafeedTask task; private final long allocationId; - private final DatafeedConfig datafeed; + private final String datafeedId; // To ensure that we wait until loopback / realtime search has completed before we stop the datafeed private final ReentrantLock datafeedJobLock = new ReentrantLock(true); private final DatafeedJob datafeedJob; private final boolean autoCloseJob; private final ProblemTracker problemTracker; - private final Consumer handler; + private final Consumer finishHandler; volatile Future future; private volatile boolean isRelocating; - Holder(TransportStartDatafeedAction.DatafeedTask task, DatafeedConfig datafeed, DatafeedJob datafeedJob, - ProblemTracker problemTracker, Consumer handler) { + Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob, + ProblemTracker problemTracker, Consumer finishHandler) { this.task = task; this.allocationId = task.getAllocationId(); - this.datafeed = datafeed; + this.datafeedId = datafeedId; this.datafeedJob = datafeedJob; this.autoCloseJob = task.isLookbackOnly(); this.problemTracker = problemTracker; - this.handler = handler; + this.finishHandler = finishHandler; } String getJobId() { - return datafeed.getJobId(); + return datafeedJob.getJobId(); } boolean isRunning() { @@ -314,23 +308,23 @@ public void stop(String source, TimeValue timeout, Exception e) { return; } - logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId()); + logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeedId, datafeedJob.getJobId()); if (datafeedJob.stop()) { boolean acquired = false; try { - logger.info("[{}] try lock [{}] to stop datafeed [{}] for job [{}]...", source, timeout, datafeed.getId(), - datafeed.getJobId()); + logger.info("[{}] try lock [{}] to stop datafeed [{}] for job [{}]...", source, timeout, datafeedId, + datafeedJob.getJobId()); acquired = datafeedJobLock.tryLock(timeout.millis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } finally { - logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeed.getId(), - datafeed.getJobId(), acquired); + logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeedId, + datafeedJob.getJobId(), acquired); runningDatafeedsOnThisNode.remove(allocationId); FutureUtils.cancel(future); - auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); - handler.accept(e); - logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeed.getId(), datafeed.getJobId(), + auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); + finishHandler.accept(e); + logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(), acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"); if (autoCloseJob) { closeJob(); @@ -340,7 +334,7 @@ public void stop(String source, TimeValue timeout, Exception e) { } } } else { - logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId()); + logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeedId, datafeedJob.getJobId()); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index ce3f611b2227a..a45bfc7822071 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -14,9 +14,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -28,16 +26,20 @@ public class DatafeedNodeSelector { private static final Logger LOGGER = Loggers.getLogger(DatafeedNodeSelector.class); - private final DatafeedConfig datafeed; + private final String datafeedId; + private final String jobId; + private final List datafeedIndices; private final PersistentTasksCustomMetaData.PersistentTask jobTask; private final ClusterState clusterState; private final IndexNameExpressionResolver resolver; - public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId, + String jobId, List datafeedIndices) { PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - this.datafeed = mlMetadata.getDatafeed(datafeedId); - this.jobTask = MlTasks.getJobTask(datafeed.getJobId(), tasks); + this.datafeedId = datafeedId; + this.jobId = jobId; + this.datafeedIndices = datafeedIndices; + this.jobTask = MlTasks.getJobTask(jobId, tasks); this.clusterState = Objects.requireNonNull(clusterState); this.resolver = Objects.requireNonNull(resolver); } @@ -45,8 +47,8 @@ public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolv public void checkDatafeedTaskCanBeCreated() { AssignmentFailure assignmentFailure = checkAssignment(); if (assignmentFailure != null && assignmentFailure.isCriticalForTaskCreation) { - String msg = "No node found to start datafeed [" + datafeed.getId() + "], allocation explanation [" + assignmentFailure.reason - + "]"; + String msg = "No node found to start datafeed [" + datafeedId + "], " + + "allocation explanation [" + assignmentFailure.reason + "]"; LOGGER.debug(msg); throw ExceptionsHelper.conflictStatusException(msg); } @@ -64,7 +66,7 @@ public PersistentTasksCustomMetaData.Assignment selectNode() { @Nullable private AssignmentFailure checkAssignment() { PriorityFailureCollector priorityFailureCollector = new PriorityFailureCollector(); - priorityFailureCollector.add(verifyIndicesActive(datafeed)); + priorityFailureCollector.add(verifyIndicesActive()); JobTaskState jobTaskState = null; JobState jobState = JobState.CLOSED; @@ -75,13 +77,14 @@ private AssignmentFailure checkAssignment() { if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) { // lets try again later when the job has been opened: - String reason = "cannot start datafeed [" + datafeed.getId() + "], because job's [" + datafeed.getJobId() + - "] state is [" + jobState + "] while state [" + JobState.OPENED + "] is required"; + String reason = "cannot start datafeed [" + datafeedId + "], because the job's [" + jobId + + "] state is [" + jobState + "] while state [" + JobState.OPENED + "] is required"; priorityFailureCollector.add(new AssignmentFailure(reason, true)); } if (jobTaskState != null && jobTaskState.isStatusStale(jobTask)) { - String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] state is stale"; + String reason = "cannot start datafeed [" + datafeedId + "], because the job's [" + jobId + + "] state is stale"; priorityFailureCollector.add(new AssignmentFailure(reason, true)); } @@ -89,9 +92,8 @@ private AssignmentFailure checkAssignment() { } @Nullable - private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) { - List indices = datafeed.getIndices(); - for (String index : indices) { + private AssignmentFailure verifyIndicesActive() { + for (String index : datafeedIndices) { if (RemoteClusterLicenseChecker.isRemoteIndex(index)) { // We cannot verify remote indices @@ -99,7 +101,7 @@ private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) { } String[] concreteIndices; - String reason = "cannot start datafeed [" + datafeed.getId() + "] because index [" + String reason = "cannot start datafeed [" + datafeedId + "] because index [" + index + "] does not exist, is closed, or is still initializing."; try { @@ -115,7 +117,7 @@ private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) { for (String concreteIndex : concreteIndices) { IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex); if (routingTable == null || !routingTable.allPrimaryShardsActive()) { - reason = "cannot start datafeed [" + datafeed.getId() + "] because index [" + reason = "cannot start datafeed [" + datafeedId + "] because index [" + concreteIndex + "] does not have all primary shards active yet."; return new AssignmentFailure(reason, false); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index 3055dc2bb37f9..2defead100774 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -32,7 +32,7 @@ public class MlAssignmentNotifierTests extends ESTestCase { - public void testClusterChanged_info() throws Exception { + public void testClusterChanged_info() { Auditor auditor = mock(Auditor.class); ClusterService clusterService = mock(ClusterService.class); MlAssignmentNotifier notifier = new MlAssignmentNotifier(Settings.EMPTY, auditor, clusterService); @@ -60,7 +60,7 @@ public void testClusterChanged_info() throws Exception { verifyNoMoreInteractions(auditor); } - public void testClusterChanged_warning() throws Exception { + public void testClusterChanged_warning() { Auditor auditor = mock(Auditor.class); ClusterService clusterService = mock(ClusterService.class); MlAssignmentNotifier notifier = new MlAssignmentNotifier(Settings.EMPTY, auditor, clusterService); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java index 610a5c1b92fb6..7afb048f7aedb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java @@ -7,11 +7,9 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -28,61 +26,33 @@ public class TransportStartDatafeedActionTests extends ESTestCase { - public void testValidate_GivenDatafeedIsMissing() { - Job job = DatafeedManagerTests.createDatafeedJob().build(new Date()); - MlMetadata mlMetadata = new MlMetadata.Builder() - .putJob(job, false) - .build(); - Exception e = expectThrows(ResourceNotFoundException.class, - () -> TransportStartDatafeedAction.validate("some-datafeed", mlMetadata, null)); - assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists")); - } - public void testValidate_jobClosed() { Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date()); - MlMetadata mlMetadata1 = new MlMetadata.Builder() - .putJob(job1, false) - .build(); PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder().build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); - MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1, Collections.emptyMap()) - .build(); Exception e = expectThrows(ElasticsearchStatusException.class, - () -> TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks)); + () -> TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks)); assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] is closed")); } public void testValidate_jobOpening() { Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date()); - MlMetadata mlMetadata1 = new MlMetadata.Builder() - .putJob(job1, false) - .build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), null, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); - MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1, Collections.emptyMap()) - .build(); - TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); + TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks); } public void testValidate_jobOpened() { Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date()); - MlMetadata mlMetadata1 = new MlMetadata.Builder() - .putJob(job1, false) - .build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), JobState.OPENED, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); - MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1, Collections.emptyMap()) - .build(); - TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); + TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks); } public static TransportStartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java index 3d9ee17bac0c9..d13e311531da2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java @@ -19,6 +19,8 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; @@ -32,6 +34,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -41,8 +44,10 @@ public class DatafeedJobBuilderTests extends ESTestCase { private Client client; private Auditor auditor; - private JobResultsProvider jobResultsProvider; private Consumer taskHandler; + private JobResultsProvider jobResultsProvider; + private JobConfigProvider jobConfigProvider; + private DatafeedConfigProvider datafeedConfigProvider; private DatafeedJobBuilder datafeedJobBuilder; @@ -54,10 +59,10 @@ public void init() { when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); when(client.settings()).thenReturn(Settings.EMPTY); auditor = mock(Auditor.class); - jobResultsProvider = mock(JobResultsProvider.class); taskHandler = mock(Consumer.class); - datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis); + datafeedJobBuilder = new DatafeedJobBuilder(client, Settings.EMPTY, xContentRegistry(), auditor, System::currentTimeMillis); + jobResultsProvider = mock(JobResultsProvider.class); Mockito.doAnswer(invocationOnMock -> { String jobId = (String) invocationOnMock.getArguments()[0]; @SuppressWarnings("unchecked") @@ -72,6 +77,9 @@ public void init() { consumer.accept(new ResourceNotFoundException("dummy")); return null; }).when(jobResultsProvider).bucketsViaInternalClient(any(), any(), any(), any()); + + jobConfigProvider = mock(JobConfigProvider.class); + datafeedConfigProvider = mock(DatafeedConfigProvider.class); } public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception { @@ -79,7 +87,8 @@ public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception { dataDescription.setTimeField("time"); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build(); + jobBuilder.setCreateTime(new Date()); + DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId()); AtomicBoolean wasHandlerCalled = new AtomicBoolean(false); ActionListener datafeedJobHandler = ActionListener.wrap( @@ -91,7 +100,10 @@ public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception { }, e -> fail() ); - datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, datafeedJobHandler); + givenJob(jobBuilder); + givenDatafeed(datafeed); + + datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler); assertBusy(() -> wasHandlerCalled.get()); } @@ -101,7 +113,8 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestRecordTimestampAfter dataDescription.setTimeField("time"); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build(); + jobBuilder.setCreateTime(new Date()); + DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId()); givenLatestTimes(7_200_000L, 3_600_000L); @@ -115,7 +128,10 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestRecordTimestampAfter }, e -> fail() ); - datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, datafeedJobHandler); + givenJob(jobBuilder); + givenDatafeed(datafeed); + + datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler); assertBusy(() -> wasHandlerCalled.get()); } @@ -125,7 +141,8 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestBucketAfterLatestRec dataDescription.setTimeField("time"); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build(); + jobBuilder.setCreateTime(new Date()); + DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId()); givenLatestTimes(3_800_000L, 3_600_000L); @@ -139,7 +156,10 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestBucketAfterLatestRec }, e -> fail() ); - datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, datafeedJobHandler); + givenJob(jobBuilder); + givenDatafeed(datafeed); + + datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler); assertBusy(() -> wasHandlerCalled.get()); } @@ -149,7 +169,8 @@ public void testBuild_GivenBucketsRequestFails() { dataDescription.setTimeField("time"); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build(); + jobBuilder.setCreateTime(new Date()); + DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId()); Exception error = new RuntimeException("error"); doAnswer(invocationOnMock -> { @@ -159,11 +180,34 @@ public void testBuild_GivenBucketsRequestFails() { return null; }).when(jobResultsProvider).bucketsViaInternalClient(any(), any(), any(), any()); - datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, ActionListener.wrap(datafeedJob -> fail(), taskHandler)); + + givenJob(jobBuilder); + givenDatafeed(datafeed); + + datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, + ActionListener.wrap(datafeedJob -> fail(), taskHandler)); verify(taskHandler).accept(error); } + private void givenJob(Job.Builder job) { + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener handler = (ActionListener) invocationOnMock.getArguments()[1]; + handler.onResponse(job); + return null; + }).when(jobConfigProvider).getJob(eq(job.getId()), any()); + } + + private void givenDatafeed(DatafeedConfig.Builder datafeed) { + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener handler = (ActionListener) invocationOnMock.getArguments()[1]; + handler.onResponse(datafeed); + return null; + }).when(datafeedConfigProvider).getDatafeedConfig(eq(datafeed.getId()), any()); + } + private void givenLatestTimes(long latestRecordTimestamp, long latestBucketTimestamp) { Mockito.doAnswer(invocationOnMock -> { String jobId = (String) invocationOnMock.getArguments()[0]; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index a9dec7c66d4b6..54aa3ade8e1b9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -21,9 +21,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -34,11 +35,9 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.notifications.AuditMessage; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.action.TransportStartDatafeedActionTests; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction.DatafeedTask; +import org.elasticsearch.xpack.ml.action.TransportStartDatafeedActionTests; import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; @@ -79,11 +78,8 @@ public class DatafeedManagerTests extends ESTestCase { @Before @SuppressWarnings("unchecked") public void setUpTests() { - MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - Job job = createDatafeedJob().build(new Date()); - mlMetadata.putJob(job, false); - DatafeedConfig datafeed = createDatafeedConfig("datafeed_id", job.getId()).build(); - mlMetadata.putDatafeed(datafeed, Collections.emptyMap()); + Job.Builder job = createDatafeedJob().setCreateTime(new Date()); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); @@ -92,8 +88,7 @@ public void setUpTests() { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks)) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasks)) .nodes(nodes); clusterService = mock(ClusterService.class); @@ -124,13 +119,14 @@ public void setUpTests() { datafeedJob = mock(DatafeedJob.class); when(datafeedJob.isRunning()).thenReturn(true); when(datafeedJob.stop()).thenReturn(true); + when(datafeedJob.getJobId()).thenReturn(job.getId()); DatafeedJobBuilder datafeedJobBuilder = mock(DatafeedJobBuilder.class); doAnswer(invocationOnMock -> { @SuppressWarnings("rawtypes") - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; listener.onResponse(datafeedJob); return null; - }).when(datafeedJobBuilder).build(any(), any(), any()); + }).when(datafeedJobBuilder).build(any(), any()); datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor); @@ -253,8 +249,7 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); when(clusterService.state()).thenReturn(cs.build()); Consumer handler = mockConsumer(); @@ -268,8 +263,7 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build())); @@ -279,8 +273,7 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged( new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build())); @@ -293,8 +286,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); when(clusterService.state()).thenReturn(cs.build()); Consumer handler = mockConsumer(); @@ -307,8 +299,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.FAILED, tasksBuilder); ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", updatedCs.build(), cs.build())); @@ -321,8 +312,7 @@ public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); when(clusterService.state()).thenReturn(cs.build()); Consumer handler = mockConsumer(); @@ -339,8 +329,7 @@ public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() { tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", cs.build(), updatedCs.build())); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 4b8ad1d08aed3..dfaf9f03c0dec 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -26,13 +26,13 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.junit.Before; import java.net.InetAddress; @@ -52,7 +52,6 @@ public class DatafeedNodeSelectorTests extends ESTestCase { private IndexNameExpressionResolver resolver; private DiscoveryNodes nodes; private ClusterState clusterState; - private MlMetadata mlMetadata; private PersistentTasksCustomMetaData tasks; @Before @@ -65,11 +64,8 @@ public void init() { } public void testSelectNode_GivenJobIsOpened() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); @@ -77,17 +73,15 @@ public void testSelectNode_GivenJobIsOpened() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobIsOpening() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", null, tasksBuilder); @@ -95,41 +89,38 @@ public void testSelectNode_GivenJobIsOpening() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); } public void testNoJobTask() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")); tasks = PersistentTasksCustomMetaData.builder().build(); givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNull(result.getExecutorNode()); - assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because job's [job_id] state is " + + assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because the job's [job_id] state is " + "[closed] while state [opened] is required")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) + .checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id], because job's [job_id] state is [closed] while state [opened] is required]")); + + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [closed] while state [opened] is required]")); } public void testSelectNode_GivenJobFailedOrClosed() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED); @@ -138,26 +129,25 @@ public void testSelectNode_GivenJobFailedOrClosed() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNull(result.getExecutorNode()); - assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + + assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) + .checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required]")); } public void testShardUnassigned() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); @@ -168,22 +158,20 @@ public void testShardUnassigned() { givenClusterState("foo", 1, 0, states); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); } public void testShardNotAllActive() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); @@ -195,21 +183,18 @@ public void testShardNotAllActive() { givenClusterState("foo", 2, 0, states); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); } public void testIndexDoesntExist() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), - Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); @@ -217,24 +202,22 @@ public void testIndexDoesntExist() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " + "does not exist, is closed, or is still initializing.")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) + .checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); } public void testRemoteIndex() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("remote:foo")), - Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("remote:foo")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); @@ -242,16 +225,14 @@ public void testRemoteIndex() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNotNull(result.getExecutorNode()); } public void testSelectNode_jobTaskStale() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); String nodeId = randomBoolean() ? "node_id2" : null; PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -262,44 +243,43 @@ public void testSelectNode_jobTaskStale() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNull(result.getExecutorNode()); - assertEquals("cannot start datafeed [datafeed_id], job [job_id] state is stale", + assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is stale", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) + .checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id], job [job_id] state is stale]")); + + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is stale]")); tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder); tasks = tasksBuilder.build(); givenClusterState("foo", 1, 0); - result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + result = new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertEquals("node_id1", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { // Here we test that when there are 2 problems, the most critical gets reported first. // In this case job is Opening (non-critical) and the index does not exist (critical) - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), - Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENING, tasksBuilder); tasks = tasksBuilder.build(); givenClusterState("foo", 1, 0); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) + .checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); } @@ -319,7 +299,6 @@ private void givenClusterState(String index, int numberOfShards, int numberOfRep clusterState = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder() - .putCustom(MlMetadata.TYPE, mlMetadata) .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) .put(indexMetaData, false)) .nodes(nodes)