From 0c343ca2ce28875a2f25f4d46a0453b684f0d7a0 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 14 Nov 2018 17:39:57 +0000 Subject: [PATCH] [ML] Reimplement established model memory (#35500) This is the 7.0 implementation of a master node service to keep track of the native process memory requirement of each ML job with an associated native process. The new ML memory tracker service works when the whole cluster is upgraded to at least version 6.6. For mixed version clusters the old mechanism of established model memory stored on the job in cluster state was used. This means that the old (and complex) code to keep established model memory up to date on the job object has been removed in 7.0. Forward port of #35263 --- .../client/ml/job/config/Job.java | 31 +- .../client/ml/job/config/JobTests.java | 3 - docs/reference/ml/apis/jobresource.asciidoc | 5 - .../xpack/core/ml/MlMetadata.java | 62 +++- .../xpack/core/ml/job/config/Job.java | 79 +---- .../xpack/core/ml/job/config/JobUpdate.java | 48 +-- .../persistence/ElasticsearchMappings.java | 3 - .../ml/job/results/ReservedFieldNames.java | 1 - .../xpack/core/ml/job/config/JobTests.java | 39 +-- .../core/ml/job/config/JobUpdateTests.java | 3 - .../integration/BasicRenormalizationIT.java | 15 +- .../xpack/ml/integration/DatafeedJobsIT.java | 11 - ...erimResultsDeletedAfterReopeningJobIT.java | 2 +- .../ml/integration/OverallBucketsIT.java | 13 +- .../integration/RestoreModelSnapshotIT.java | 11 - .../xpack/ml/MachineLearning.java | 12 +- .../ml/action/TransportDeleteJobAction.java | 9 +- .../ml/action/TransportOpenJobAction.java | 189 +++++++--- .../xpack/ml/job/JobManager.java | 33 +- .../autodetect/AutodetectProcessManager.java | 3 +- .../output/AutoDetectResultProcessor.java | 138 +------- .../xpack/ml/process/MlMemoryTracker.java | 325 ++++++++++++++++++ .../xpack/ml/MlMetadataTests.java | 13 +- .../action/TransportOpenJobActionTests.java | 39 ++- .../AutodetectResultProcessorIT.java | 4 +- .../integration/MlDistributedFailureIT.java | 89 ++++- .../xpack/ml/integration/TooManyJobsIT.java | 2 - .../AutoDetectResultProcessorTests.java | 99 +----- .../ml/process/MlMemoryTrackerTests.java | 195 +++++++++++ 29 files changed, 919 insertions(+), 557 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java index 13b4dcb955a05..7210aefa98740 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java @@ -57,7 +57,6 @@ public class Job implements ToXContentObject { public static final ParseField DATA_DESCRIPTION = new ParseField("data_description"); public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField FINISHED_TIME = new ParseField("finished_time"); - public static final ParseField ESTABLISHED_MODEL_MEMORY = new ParseField("established_model_memory"); public static final ParseField MODEL_PLOT_CONFIG = new ParseField("model_plot_config"); public static final ParseField RENORMALIZATION_WINDOW_DAYS = new ParseField("renormalization_window_days"); public static final ParseField BACKGROUND_PERSIST_INTERVAL = new ParseField("background_persist_interval"); @@ -82,7 +81,6 @@ public class Job implements ToXContentObject { (p) -> TimeUtil.parseTimeField(p, FINISHED_TIME.getPreferredName()), FINISHED_TIME, ValueType.VALUE); - PARSER.declareLong(Builder::setEstablishedModelMemory, ESTABLISHED_MODEL_MEMORY); PARSER.declareObject(Builder::setAnalysisConfig, AnalysisConfig.PARSER, ANALYSIS_CONFIG); PARSER.declareObject(Builder::setAnalysisLimits, AnalysisLimits.PARSER, ANALYSIS_LIMITS); PARSER.declareObject(Builder::setDataDescription, DataDescription.PARSER, DATA_DESCRIPTION); @@ -105,7 +103,6 @@ public class Job implements ToXContentObject { private final String description; private final Date createTime; private final Date finishedTime; - private final Long establishedModelMemory; private final AnalysisConfig analysisConfig; private final AnalysisLimits analysisLimits; private final DataDescription dataDescription; @@ -120,7 +117,7 @@ public class Job implements ToXContentObject { private final Boolean deleting; private Job(String jobId, String jobType, List groups, String description, - Date createTime, Date finishedTime, Long establishedModelMemory, + Date createTime, Date finishedTime, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, @@ -132,7 +129,6 @@ private Job(String jobId, String jobType, List groups, String descriptio this.description = description; this.createTime = createTime; this.finishedTime = finishedTime; - this.establishedModelMemory = establishedModelMemory; this.analysisConfig = analysisConfig; this.analysisLimits = analysisLimits; this.dataDescription = dataDescription; @@ -202,16 +198,6 @@ public Date getFinishedTime() { return finishedTime; } - /** - * The established model memory of the job, or null if model - * memory has not reached equilibrium yet. - * - * @return The established model memory of the job - */ - public Long getEstablishedModelMemory() { - return establishedModelMemory; - } - /** * The analysis configuration object * @@ -304,9 +290,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.timeField(FINISHED_TIME.getPreferredName(), FINISHED_TIME.getPreferredName() + humanReadableSuffix, finishedTime.getTime()); } - if (establishedModelMemory != null) { - builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); - } builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params); if (analysisLimits != null) { builder.field(ANALYSIS_LIMITS.getPreferredName(), analysisLimits, params); @@ -362,7 +345,6 @@ public boolean equals(Object other) { && Objects.equals(this.description, that.description) && Objects.equals(this.createTime, that.createTime) && Objects.equals(this.finishedTime, that.finishedTime) - && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) && Objects.equals(this.analysisConfig, that.analysisConfig) && Objects.equals(this.analysisLimits, that.analysisLimits) && Objects.equals(this.dataDescription, that.dataDescription) @@ -379,7 +361,7 @@ public boolean equals(Object other) { @Override public int hashCode() { - return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, establishedModelMemory, + return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleting); @@ -405,7 +387,6 @@ public static class Builder { private DataDescription dataDescription; private Date createTime; private Date finishedTime; - private Long establishedModelMemory; private ModelPlotConfig modelPlotConfig; private Long renormalizationWindowDays; private TimeValue backgroundPersistInterval; @@ -433,7 +414,6 @@ public Builder(Job job) { this.dataDescription = job.getDataDescription(); this.createTime = job.getCreateTime(); this.finishedTime = job.getFinishedTime(); - this.establishedModelMemory = job.getEstablishedModelMemory(); this.modelPlotConfig = job.getModelPlotConfig(); this.renormalizationWindowDays = job.getRenormalizationWindowDays(); this.backgroundPersistInterval = job.getBackgroundPersistInterval(); @@ -494,11 +474,6 @@ Builder setFinishedTime(Date finishedTime) { return this; } - public Builder setEstablishedModelMemory(Long establishedModelMemory) { - this.establishedModelMemory = establishedModelMemory; - return this; - } - public Builder setDataDescription(DataDescription.Builder description) { dataDescription = Objects.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build(); return this; @@ -553,7 +528,7 @@ public Job build() { Objects.requireNonNull(id, "[" + ID.getPreferredName() + "] must not be null"); Objects.requireNonNull(jobType, "[" + JOB_TYPE.getPreferredName() + "] must not be null"); return new Job( - id, jobType, groups, description, createTime, finishedTime, establishedModelMemory, + id, jobType, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleting); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java index 667932d591231..70e8b4296b0df 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java @@ -125,9 +125,6 @@ public static Job.Builder createRandomizedJobBuilder() { if (randomBoolean()) { builder.setFinishedTime(new Date(randomNonNegativeLong())); } - if (randomBoolean()) { - builder.setEstablishedModelMemory(randomNonNegativeLong()); - } builder.setAnalysisConfig(AnalysisConfigTests.createRandomized()); builder.setAnalysisLimits(AnalysisLimitsTests.createRandomized()); diff --git a/docs/reference/ml/apis/jobresource.asciidoc b/docs/reference/ml/apis/jobresource.asciidoc index e0c314724e762..6ce6e1e39307e 100644 --- a/docs/reference/ml/apis/jobresource.asciidoc +++ b/docs/reference/ml/apis/jobresource.asciidoc @@ -42,11 +42,6 @@ so do not set the `background_persist_interval` value too low. `description`:: (string) An optional description of the job. -`established_model_memory`:: - (long) The approximate amount of memory resources that have been used for - analytical processing. This field is present only when the analytics have used - a stable amount of memory for several consecutive buckets. - `finished_time`:: (string) If the job closed or failed, this is the time the job finished, otherwise it is `null`. This property is informational; you cannot change its diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index afb56dc6cc8c9..e98773a2ce4de 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -57,8 +57,9 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { public static final String TYPE = "ml"; private static final ParseField JOBS_FIELD = new ParseField("jobs"); private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds"); + private static final ParseField LAST_MEMORY_REFRESH_VERSION_FIELD = new ParseField("last_memory_refresh_version"); - public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap()); + public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), null); // This parser follows the pattern that metadata is parsed leniently (to allow for enhancements) public static final ObjectParser LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new); @@ -66,15 +67,18 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { LENIENT_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOBS_FIELD); LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds, (p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD); + LENIENT_PARSER.declareLong(Builder::setLastMemoryRefreshVersion, LAST_MEMORY_REFRESH_VERSION_FIELD); } private final SortedMap jobs; private final SortedMap datafeeds; + private final Long lastMemoryRefreshVersion; private final GroupOrJobLookup groupOrJobLookup; - private MlMetadata(SortedMap jobs, SortedMap datafeeds) { + private MlMetadata(SortedMap jobs, SortedMap datafeeds, Long lastMemoryRefreshVersion) { this.jobs = Collections.unmodifiableSortedMap(jobs); this.datafeeds = Collections.unmodifiableSortedMap(datafeeds); + this.lastMemoryRefreshVersion = lastMemoryRefreshVersion; this.groupOrJobLookup = new GroupOrJobLookup(jobs.values()); } @@ -112,6 +116,10 @@ public Set expandDatafeedIds(String expression, boolean allowNoDatafeeds .expand(expression, allowNoDatafeeds); } + public Long getLastMemoryRefreshVersion() { + return lastMemoryRefreshVersion; + } + @Override public Version getMinimalSupportedVersion() { return Version.V_6_0_0_alpha1; @@ -145,7 +153,11 @@ public MlMetadata(StreamInput in) throws IOException { datafeeds.put(in.readString(), new DatafeedConfig(in)); } this.datafeeds = datafeeds; - + if (in.getVersion().onOrAfter(Version.V_6_6_0)) { + lastMemoryRefreshVersion = in.readOptionalLong(); + } else { + lastMemoryRefreshVersion = null; + } this.groupOrJobLookup = new GroupOrJobLookup(jobs.values()); } @@ -153,6 +165,9 @@ public MlMetadata(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { writeMap(jobs, out); writeMap(datafeeds, out); + if (out.getVersion().onOrAfter(Version.V_6_6_0)) { + out.writeOptionalLong(lastMemoryRefreshVersion); + } } private static void writeMap(Map map, StreamOutput out) throws IOException { @@ -169,6 +184,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params); mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams); mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams); + if (lastMemoryRefreshVersion != null) { + builder.field(LAST_MEMORY_REFRESH_VERSION_FIELD.getPreferredName(), lastMemoryRefreshVersion); + } return builder; } @@ -185,30 +203,46 @@ public static class MlMetadataDiff implements NamedDiff { final Diff> jobs; final Diff> datafeeds; + final Long lastMemoryRefreshVersion; MlMetadataDiff(MlMetadata before, MlMetadata after) { this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer()); this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer()); + this.lastMemoryRefreshVersion = after.lastMemoryRefreshVersion; } public MlMetadataDiff(StreamInput in) throws IOException { this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new, MlMetadataDiff::readJobDiffFrom); this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new, - MlMetadataDiff::readSchedulerDiffFrom); + MlMetadataDiff::readDatafeedDiffFrom); + if (in.getVersion().onOrAfter(Version.V_6_6_0)) { + lastMemoryRefreshVersion = in.readOptionalLong(); + } else { + lastMemoryRefreshVersion = null; + } } + /** + * Merge the diff with the ML metadata. + * @param part The current ML metadata. + * @return The new ML metadata. + */ @Override public MetaData.Custom apply(MetaData.Custom part) { TreeMap newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs)); TreeMap newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds)); - return new MlMetadata(newJobs, newDatafeeds); + // lastMemoryRefreshVersion always comes from the diff - no need to merge with the old value + return new MlMetadata(newJobs, newDatafeeds, lastMemoryRefreshVersion); } @Override public void writeTo(StreamOutput out) throws IOException { jobs.writeTo(out); datafeeds.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_6_6_0)) { + out.writeOptionalLong(lastMemoryRefreshVersion); + } } @Override @@ -220,7 +254,7 @@ static Diff readJobDiffFrom(StreamInput in) throws IOException { return AbstractDiffable.readDiffFrom(Job::new, in); } - static Diff readSchedulerDiffFrom(StreamInput in) throws IOException { + static Diff readDatafeedDiffFrom(StreamInput in) throws IOException { return AbstractDiffable.readDiffFrom(DatafeedConfig::new, in); } } @@ -233,7 +267,8 @@ public boolean equals(Object o) { return false; MlMetadata that = (MlMetadata) o; return Objects.equals(jobs, that.jobs) && - Objects.equals(datafeeds, that.datafeeds); + Objects.equals(datafeeds, that.datafeeds) && + Objects.equals(lastMemoryRefreshVersion, that.lastMemoryRefreshVersion); } @Override @@ -243,13 +278,14 @@ public final String toString() { @Override public int hashCode() { - return Objects.hash(jobs, datafeeds); + return Objects.hash(jobs, datafeeds, lastMemoryRefreshVersion); } public static class Builder { private TreeMap jobs; private TreeMap datafeeds; + private Long lastMemoryRefreshVersion; public Builder() { jobs = new TreeMap<>(); @@ -263,6 +299,7 @@ public Builder(@Nullable MlMetadata previous) { } else { jobs = new TreeMap<>(previous.jobs); datafeeds = new TreeMap<>(previous.datafeeds); + lastMemoryRefreshVersion = previous.lastMemoryRefreshVersion; } } @@ -382,8 +419,13 @@ private Builder putDatafeeds(Collection datafeeds) { return this; } + public Builder setLastMemoryRefreshVersion(Long lastMemoryRefreshVersion) { + this.lastMemoryRefreshVersion = lastMemoryRefreshVersion; + return this; + } + public MlMetadata build() { - return new MlMetadata(jobs, datafeeds); + return new MlMetadata(jobs, datafeeds, lastMemoryRefreshVersion); } public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) { @@ -420,8 +462,6 @@ void checkJobHasNoDatafeed(String jobId) { } } - - public static MlMetadata getMlMetadata(ClusterState state) { MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(TYPE); if (mlMetadata == null) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index 5b244ba44d5c3..032eef00649dd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -67,7 +67,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public static final ParseField DATA_DESCRIPTION = new ParseField("data_description"); public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField FINISHED_TIME = new ParseField("finished_time"); - public static final ParseField ESTABLISHED_MODEL_MEMORY = new ParseField("established_model_memory"); public static final ParseField MODEL_PLOT_CONFIG = new ParseField("model_plot_config"); public static final ParseField RENORMALIZATION_WINDOW_DAYS = new ParseField("renormalization_window_days"); public static final ParseField BACKGROUND_PERSIST_INTERVAL = new ParseField("background_persist_interval"); @@ -102,7 +101,6 @@ private static ObjectParser createParser(boolean ignoreUnknownFie p -> TimeUtils.parseTimeField(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ValueType.VALUE); parser.declareField(Builder::setFinishedTime, p -> TimeUtils.parseTimeField(p, FINISHED_TIME.getPreferredName()), FINISHED_TIME, ValueType.VALUE); - parser.declareLong(Builder::setEstablishedModelMemory, ESTABLISHED_MODEL_MEMORY); parser.declareObject(Builder::setAnalysisConfig, ignoreUnknownFields ? AnalysisConfig.LENIENT_PARSER : AnalysisConfig.STRICT_PARSER, ANALYSIS_CONFIG); parser.declareObject(Builder::setAnalysisLimits, ignoreUnknownFields ? AnalysisLimits.LENIENT_PARSER : AnalysisLimits.STRICT_PARSER, @@ -140,7 +138,6 @@ private static ObjectParser createParser(boolean ignoreUnknownFie // TODO: Use java.time for the Dates here: x-pack-elasticsearch#829 private final Date createTime; private final Date finishedTime; - private final Long establishedModelMemory; private final AnalysisConfig analysisConfig; private final AnalysisLimits analysisLimits; private final DataDescription dataDescription; @@ -156,7 +153,7 @@ private static ObjectParser createParser(boolean ignoreUnknownFie private final boolean deleting; private Job(String jobId, String jobType, Version jobVersion, List groups, String description, - Date createTime, Date finishedTime, Long establishedModelMemory, + Date createTime, Date finishedTime, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, @@ -169,7 +166,6 @@ private Job(String jobId, String jobType, Version jobVersion, List group this.description = description; this.createTime = createTime; this.finishedTime = finishedTime; - this.establishedModelMemory = establishedModelMemory; this.analysisConfig = analysisConfig; this.analysisLimits = analysisLimits; this.dataDescription = dataDescription; @@ -203,10 +199,9 @@ public Job(StreamInput in) throws IOException { in.readVLong(); } } - if (in.getVersion().onOrAfter(Version.V_6_1_0)) { - establishedModelMemory = in.readOptionalLong(); - } else { - establishedModelMemory = null; + // for removed establishedModelMemory field + if (in.getVersion().onOrAfter(Version.V_6_1_0) && in.getVersion().before(Version.V_7_0_0_alpha1)) { + in.readOptionalLong(); } analysisConfig = new AnalysisConfig(in); analysisLimits = in.readOptionalWriteable(AnalysisLimits::new); @@ -314,16 +309,6 @@ public Date getFinishedTime() { return finishedTime; } - /** - * The established model memory of the job, or null if model - * memory has not reached equilibrium yet. - * - * @return The established model memory of the job - */ - public Long getEstablishedModelMemory() { - return establishedModelMemory; - } - /** * The analysis configuration object * @@ -430,21 +415,6 @@ public Collection allInputFields() { return allFields; } - /** - * Make a best estimate of the job's memory footprint using the information available. - * If a job has an established model memory size, then this is the best estimate. - * Otherwise, assume the maximum model memory limit will eventually be required. - * In either case, a fixed overhead is added to account for the memory required by the - * program code and stack. - * @return an estimate of the memory requirement of this job, in bytes - */ - public long estimateMemoryFootprint() { - if (establishedModelMemory != null && establishedModelMemory > 0) { - return establishedModelMemory + PROCESS_MEMORY_OVERHEAD.getBytes(); - } - return ByteSizeUnit.MB.toBytes(analysisLimits.getModelMemoryLimit()) + PROCESS_MEMORY_OVERHEAD.getBytes(); - } - /** * Returns the timestamp before which data is not accepted by the job. * This is the latest record timestamp minus the job latency. @@ -487,8 +457,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().before(Version.V_7_0_0_alpha1)) { out.writeBoolean(false); } - if (out.getVersion().onOrAfter(Version.V_6_1_0)) { - out.writeOptionalLong(establishedModelMemory); + // for removed establishedModelMemory field + if (out.getVersion().onOrAfter(Version.V_6_1_0) && out.getVersion().before(Version.V_7_0_0_alpha1)) { + out.writeOptionalLong(null); } analysisConfig.writeTo(out); out.writeOptionalWriteable(analysisLimits); @@ -539,9 +510,6 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th builder.timeField(FINISHED_TIME.getPreferredName(), FINISHED_TIME.getPreferredName() + humanReadableSuffix, finishedTime.getTime()); } - if (establishedModelMemory != null) { - builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); - } builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params); if (analysisLimits != null) { builder.field(ANALYSIS_LIMITS.getPreferredName(), analysisLimits, params); @@ -598,7 +566,6 @@ public boolean equals(Object other) { && Objects.equals(this.description, that.description) && Objects.equals(this.createTime, that.createTime) && Objects.equals(this.finishedTime, that.finishedTime) - && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) && Objects.equals(this.analysisConfig, that.analysisConfig) && Objects.equals(this.analysisLimits, that.analysisLimits) && Objects.equals(this.dataDescription, that.dataDescription) @@ -616,7 +583,7 @@ public boolean equals(Object other) { @Override public int hashCode() { - return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, establishedModelMemory, + return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting); @@ -657,7 +624,6 @@ public static class Builder implements Writeable, ToXContentObject { private DataDescription dataDescription; private Date createTime; private Date finishedTime; - private Long establishedModelMemory; private ModelPlotConfig modelPlotConfig; private Long renormalizationWindowDays; private TimeValue backgroundPersistInterval; @@ -687,7 +653,6 @@ public Builder(Job job) { this.dataDescription = job.getDataDescription(); this.createTime = job.getCreateTime(); this.finishedTime = job.getFinishedTime(); - this.establishedModelMemory = job.getEstablishedModelMemory(); this.modelPlotConfig = job.getModelPlotConfig(); this.renormalizationWindowDays = job.getRenormalizationWindowDays(); this.backgroundPersistInterval = job.getBackgroundPersistInterval(); @@ -718,8 +683,9 @@ public Builder(StreamInput in) throws IOException { in.readVLong(); } } - if (in.getVersion().onOrAfter(Version.V_6_1_0)) { - establishedModelMemory = in.readOptionalLong(); + // for removed establishedModelMemory field + if (in.getVersion().onOrAfter(Version.V_6_1_0) && in.getVersion().before(Version.V_7_0_0_alpha1)) { + in.readOptionalLong(); } analysisConfig = in.readOptionalWriteable(AnalysisConfig::new); analysisLimits = in.readOptionalWriteable(AnalysisLimits::new); @@ -803,11 +769,6 @@ public Builder setFinishedTime(Date finishedTime) { return this; } - public Builder setEstablishedModelMemory(Long establishedModelMemory) { - this.establishedModelMemory = establishedModelMemory; - return this; - } - public Builder setDataDescription(DataDescription.Builder description) { dataDescription = ExceptionsHelper.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build(); return this; @@ -913,8 +874,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().before(Version.V_7_0_0_alpha1)) { out.writeBoolean(false); } - if (out.getVersion().onOrAfter(Version.V_6_1_0)) { - out.writeOptionalLong(establishedModelMemory); + // for removed establishedModelMemory field + if (out.getVersion().onOrAfter(Version.V_6_1_0) && out.getVersion().before(Version.V_7_0_0_alpha1)) { + out.writeOptionalLong(null); } out.writeOptionalWriteable(analysisConfig); out.writeOptionalWriteable(analysisLimits); @@ -957,9 +919,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (finishedTime != null) { builder.field(FINISHED_TIME.getPreferredName(), finishedTime.getTime()); } - if (establishedModelMemory != null) { - builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); - } if (analysisConfig != null) { builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params); } @@ -1020,7 +979,6 @@ public boolean equals(Object o) { && Objects.equals(this.dataDescription, that.dataDescription) && Objects.equals(this.createTime, that.createTime) && Objects.equals(this.finishedTime, that.finishedTime) - && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) && Objects.equals(this.modelPlotConfig, that.modelPlotConfig) && Objects.equals(this.renormalizationWindowDays, that.renormalizationWindowDays) && Objects.equals(this.backgroundPersistInterval, that.backgroundPersistInterval) @@ -1036,7 +994,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(id, jobType, jobVersion, groups, description, analysisConfig, analysisLimits, dataDescription, - createTime, finishedTime, establishedModelMemory, modelPlotConfig, renormalizationWindowDays, + createTime, finishedTime, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting); } @@ -1112,11 +1070,6 @@ private void validateGroups() { public Job build(Date createTime) { setCreateTime(createTime); setJobVersion(Version.CURRENT); - // TODO: Maybe we _could_ accept a value for this supplied at create time - it would - // mean cloned jobs that hadn't been edited much would start with an accurate expected size. - // But on the other hand it would mean jobs that were cloned and then completely changed - // would start with a size that was completely wrong. - setEstablishedModelMemory(null); return build(); } @@ -1152,7 +1105,7 @@ public Job build() { } return new Job( - id, jobType, jobVersion, groups, description, createTime, finishedTime, establishedModelMemory, + id, jobType, jobVersion, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index 44e20846f9aa3..5d0b56dd79575 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -57,7 +57,6 @@ public class JobUpdate implements Writeable, ToXContentObject { } // These fields should not be set by a REST request INTERNAL_PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID); - INTERNAL_PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY); INTERNAL_PARSER.declareString(Builder::setModelSnapshotMinVersion, Job.MODEL_SNAPSHOT_MIN_VERSION); INTERNAL_PARSER.declareString(Builder::setJobVersion, Job.JOB_VERSION); INTERNAL_PARSER.declareBoolean(Builder::setClearFinishTime, CLEAR_JOB_FINISH_TIME); @@ -77,7 +76,6 @@ public class JobUpdate implements Writeable, ToXContentObject { private final Map customSettings; private final String modelSnapshotId; private final Version modelSnapshotMinVersion; - private final Long establishedModelMemory; private final Version jobVersion; private final Boolean clearJobFinishTime; @@ -87,8 +85,7 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, @Nullable List categorisationFilters, @Nullable Map customSettings, @Nullable String modelSnapshotId, - @Nullable Version modelSnapshotMinVersion, @Nullable Long establishedModelMemory, - @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime) { + @Nullable Version modelSnapshotMinVersion, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime) { this.jobId = jobId; this.groups = groups; this.description = description; @@ -103,7 +100,6 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String this.customSettings = customSettings; this.modelSnapshotId = modelSnapshotId; this.modelSnapshotMinVersion = modelSnapshotMinVersion; - this.establishedModelMemory = establishedModelMemory; this.jobVersion = jobVersion; this.clearJobFinishTime = clearJobFinishTime; } @@ -135,10 +131,9 @@ public JobUpdate(StreamInput in) throws IOException { } customSettings = in.readMap(); modelSnapshotId = in.readOptionalString(); - if (in.getVersion().onOrAfter(Version.V_6_1_0)) { - establishedModelMemory = in.readOptionalLong(); - } else { - establishedModelMemory = null; + // was establishedModelMemory + if (in.getVersion().onOrAfter(Version.V_6_1_0) && in.getVersion().before(Version.V_7_0_0_alpha1)) { + in.readOptionalLong(); } if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.readBoolean()) { jobVersion = Version.readVersion(in); @@ -181,8 +176,9 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeMap(customSettings); out.writeOptionalString(modelSnapshotId); - if (out.getVersion().onOrAfter(Version.V_6_1_0)) { - out.writeOptionalLong(establishedModelMemory); + // was establishedModelMemory + if (out.getVersion().onOrAfter(Version.V_6_1_0) && out.getVersion().before(Version.V_7_0_0_alpha1)) { + out.writeOptionalLong(null); } if (out.getVersion().onOrAfter(Version.V_6_3_0)) { if (jobVersion != null) { @@ -261,10 +257,6 @@ public Version getModelSnapshotMinVersion() { return modelSnapshotMinVersion; } - public Long getEstablishedModelMemory() { - return establishedModelMemory; - } - public Version getJobVersion() { return jobVersion; } @@ -320,9 +312,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (modelSnapshotMinVersion != null) { builder.field(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshotMinVersion); } - if (establishedModelMemory != null) { - builder.field(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); - } if (jobVersion != null) { builder.field(Job.JOB_VERSION.getPreferredName(), jobVersion); } @@ -374,9 +363,6 @@ public Set getUpdateFields() { if (modelSnapshotMinVersion != null) { updateFields.add(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName()); } - if (establishedModelMemory != null) { - updateFields.add(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName()); - } if (jobVersion != null) { updateFields.add(Job.JOB_VERSION.getPreferredName()); } @@ -452,14 +438,6 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) { if (modelSnapshotMinVersion != null) { builder.setModelSnapshotMinVersion(modelSnapshotMinVersion); } - if (establishedModelMemory != null) { - // An established model memory of zero means we don't actually know the established model memory - if (establishedModelMemory > 0) { - builder.setEstablishedModelMemory(establishedModelMemory); - } else { - builder.setEstablishedModelMemory(null); - } - } if (jobVersion != null) { builder.setJobVersion(jobVersion); } @@ -487,7 +465,6 @@ && updatesDetectors(job) == false && (customSettings == null || Objects.equals(customSettings, job.getCustomSettings())) && (modelSnapshotId == null || Objects.equals(modelSnapshotId, job.getModelSnapshotId())) && (modelSnapshotMinVersion == null || Objects.equals(modelSnapshotMinVersion, job.getModelSnapshotMinVersion())) - && (establishedModelMemory == null || Objects.equals(establishedModelMemory, job.getEstablishedModelMemory())) && (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion())) && ((clearJobFinishTime == null || clearJobFinishTime == false) || job.getFinishedTime() == null); } @@ -536,7 +513,6 @@ public boolean equals(Object other) { && Objects.equals(this.customSettings, that.customSettings) && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) - && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) && Objects.equals(this.jobVersion, that.jobVersion) && Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime); } @@ -545,7 +521,7 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory, jobVersion, clearJobFinishTime); + modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime); } public static class DetectorUpdate implements Writeable, ToXContentObject { @@ -655,7 +631,6 @@ public static class Builder { private Map customSettings; private String modelSnapshotId; private Version modelSnapshotMinVersion; - private Long establishedModelMemory; private Version jobVersion; private Boolean clearJobFinishTime; @@ -738,11 +713,6 @@ public Builder setModelSnapshotMinVersion(String modelSnapshotMinVersion) { return this; } - public Builder setEstablishedModelMemory(Long establishedModelMemory) { - this.establishedModelMemory = establishedModelMemory; - return this; - } - public Builder setJobVersion(Version version) { this.jobVersion = version; return this; @@ -761,7 +731,7 @@ public Builder setClearFinishTime(boolean clearJobFinishTime) { public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory, jobVersion, clearJobFinishTime); + modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index 491be55049c10..70cbf1c088249 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -282,9 +282,6 @@ public static void addJobConfigFields(XContentBuilder builder) throws IOExceptio .startObject(Job.FINISHED_TIME.getPreferredName()) .field(TYPE, DATE) .endObject() - .startObject(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName()) - .field(TYPE, LONG) // TODO should be ByteSizeValue - .endObject() .startObject(Job.MODEL_PLOT_CONFIG.getPreferredName()) .startObject(PROPERTIES) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 400fcc07f4c53..6926ecb98e892 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -188,7 +188,6 @@ public final class ReservedFieldNames { Job.DATA_DESCRIPTION.getPreferredName(), Job.DESCRIPTION.getPreferredName(), Job.FINISHED_TIME.getPreferredName(), - Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), Job.MODEL_PLOT_CONFIG.getPreferredName(), Job.RENORMALIZATION_WINDOW_DAYS.getPreferredName(), Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName(), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java index 55b84995d58f7..13c433d758165 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java @@ -436,10 +436,9 @@ public void testBuilder_withInvalidIndexNameThrows() { public void testBuilder_buildWithCreateTime() { Job.Builder builder = buildJobBuilder("foo"); Date now = new Date(); - Job job = builder.setEstablishedModelMemory(randomNonNegativeLong()).build(now); + Job job = builder.build(now); assertEquals(now, job.getCreateTime()); assertEquals(Version.CURRENT, job.getJobVersion()); - assertNull(job.getEstablishedModelMemory()); } public void testJobWithoutVersion() throws IOException { @@ -514,39 +513,6 @@ public void testInvalidGroup_matchesJobId() { assertEquals(e.getMessage(), "job and group names must be unique but job [foo] and group [foo] have the same name"); } - public void testEstimateMemoryFootprint_GivenEstablished() { - Job.Builder builder = buildJobBuilder("established"); - long establishedModelMemory = randomIntBetween(10_000, 2_000_000_000); - builder.setEstablishedModelMemory(establishedModelMemory); - if (randomBoolean()) { - builder.setAnalysisLimits(new AnalysisLimits(randomNonNegativeLong(), null)); - } - assertEquals(establishedModelMemory + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint()); - } - - public void testEstimateMemoryFootprint_GivenLimitAndNotEstablished() { - Job.Builder builder = buildJobBuilder("limit"); - if (rarely()) { - // An "established" model memory of 0 means "not established". Generally this won't be set, so getEstablishedModelMemory() - // will return null, but if it returns 0 we shouldn't estimate the job's memory requirement to be 0. - builder.setEstablishedModelMemory(0L); - } - ByteSizeValue limit = new ByteSizeValue(randomIntBetween(100, 10000), ByteSizeUnit.MB); - builder.setAnalysisLimits(new AnalysisLimits(limit.getMb(), null)); - assertEquals(limit.getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint()); - } - - public void testEstimateMemoryFootprint_GivenNoLimitAndNotEstablished() { - Job.Builder builder = buildJobBuilder("nolimit"); - if (rarely()) { - // An "established" model memory of 0 means "not established". Generally this won't be set, so getEstablishedModelMemory() - // will return null, but if it returns 0 we shouldn't estimate the job's memory requirement to be 0. - builder.setEstablishedModelMemory(0L); - } - assertEquals(ByteSizeUnit.MB.toBytes(AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB) - + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint()); - } - public void testEarliestValidTimestamp_GivenEmptyDataCounts() { assertThat(createRandomizedJob().earliestValidTimestamp(new DataCounts("foo")), equalTo(0L)); } @@ -618,9 +584,6 @@ public static Job createRandomizedJob() { if (randomBoolean()) { builder.setFinishedTime(new Date(randomNonNegativeLong())); } - if (randomBoolean()) { - builder.setEstablishedModelMemory(randomNonNegativeLong()); - } builder.setAnalysisConfig(AnalysisConfigTests.createRandomized()); builder.setAnalysisLimits(AnalysisLimits.validateAndSetDefaults(AnalysisLimitsTests.createRandomized(), null, AnalysisLimits.DEFAULT_MODEL_MEMORY_LIMIT_MB)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index c6eb42038901b..e249b22a4a896 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -90,9 +90,6 @@ public JobUpdate createRandom(String jobId, @Nullable Job job) { if (useInternalParser && randomBoolean()) { update.setModelSnapshotMinVersion(Version.CURRENT); } - if (useInternalParser && randomBoolean()) { - update.setEstablishedModelMemory(randomNonNegativeLong()); - } if (useInternalParser && randomBoolean()) { update.setJobVersion(randomFrom(Version.CURRENT, Version.V_6_2_0, Version.V_6_1_0)); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java index cc5a9f4f1b469..d1decd4387f8f 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java @@ -6,13 +6,11 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.junit.After; @@ -29,7 +27,7 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase { @After - public void tearDownData() throws Exception { + public void tearDownData() { cleanUp(); } @@ -52,15 +50,6 @@ public void testDefaultRenormalization() throws Exception { // This is the key assertion: if renormalization never happened then the record_score would // be the same as the initial_record_score on the anomaly record that happened earlier assertThat(earlierRecord.getInitialRecordScore(), greaterThan(earlierRecord.getRecordScore())); - - // Since this job ran for 50 buckets, it's a good place to assert - // that established model memory matches model memory in the job stats - assertBusy(() -> { - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(jobId).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); - }); } public void testRenormalizationDisabled() throws Exception { @@ -94,7 +83,7 @@ private void createAndRunJob(String jobId, Long renormalizationWindow) throws Ex closeJob(job.getId()); } - private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan, Long renormalizationWindow) throws Exception { + private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan, Long renormalizationWindow) { Detector.Builder detector = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); analysisConfig.setBucketSpan(bucketSpan); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index ccef7c3f2e181..be760909e3ea9 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; -import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -25,7 +24,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.junit.After; import java.util.ArrayList; @@ -92,15 +90,6 @@ public void testLookbackOnly() throws Exception { }, 60, TimeUnit.SECONDS); waitUntilJobIsClosed(job.getId()); - - // Since this job ran for 168 buckets, it's a good place to assert - // that established model memory matches model memory in the job stats - assertBusy(() -> { - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(job.getId()).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); - }); } public void testRealtime() throws Exception { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java index add0b9e8a93a3..2e39658ab504a 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java @@ -33,7 +33,7 @@ public class InterimResultsDeletedAfterReopeningJobIT extends MlNativeAutodetectIntegTestCase { @After - public void cleanUpTest() throws Exception { + public void cleanUpTest() { cleanUp(); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java index fe344bf835991..ec670773f2f7b 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java @@ -7,14 +7,12 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; -import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetOverallBucketsAction; import org.elasticsearch.xpack.core.ml.action.util.PageParams; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.junit.After; import java.util.ArrayList; @@ -36,7 +34,7 @@ public class OverallBucketsIT extends MlNativeAutodetectIntegTestCase { private static final long BUCKET_SPAN_SECONDS = 3600; @After - public void cleanUpTest() throws Exception { + public void cleanUpTest() { cleanUp(); } @@ -99,15 +97,6 @@ public void test() throws Exception { GetOverallBucketsAction.INSTANCE, filteredOverallBucketsRequest).actionGet(); assertThat(filteredOverallBucketsResponse.getOverallBuckets().count(), equalTo(2L)); } - - // Since this job ran for 3000 buckets, it's a good place to assert - // that established model memory matches model memory in the job stats - assertBusy(() -> { - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(job.getId()).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); - }); } private static Map createRecord(long timestamp) { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java index d7a2b857bf359..42bfe4dcde301 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java @@ -7,12 +7,10 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.junit.After; @@ -82,15 +80,6 @@ public void test() throws Exception { }); closeJob(job.getId()); - - // Since these jobs ran for 72 buckets, it's a good place to assert - // that established model memory matches model memory in the job stats - assertBusy(() -> { - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(job.getId()).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); - }); } private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index efe932f4b5c6b..d9075c5a46fe5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -180,6 +180,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.NativeControllerHolder; import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction; @@ -275,6 +276,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu private final SetOnce autodetectProcessManager = new SetOnce<>(); private final SetOnce datafeedManager = new SetOnce<>(); + private final SetOnce memoryTracker = new SetOnce<>(); public MachineLearning(Settings settings, Path configPath) { this.settings = settings; @@ -415,6 +417,8 @@ public Collection createComponents(Client client, ClusterService cluster this.datafeedManager.set(datafeedManager); MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager, autodetectProcessManager); + MlMemoryTracker memoryTracker = new MlMemoryTracker(clusterService, threadPool, jobManager, jobResultsProvider); + this.memoryTracker.set(memoryTracker); // This object's constructor attaches to the license state, so there's no need to retain another reference to it new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedManager, autodetectProcessManager); @@ -433,7 +437,8 @@ public Collection createComponents(Client client, ClusterService cluster jobDataCountsPersister, datafeedManager, auditor, - new MlAssignmentNotifier(auditor, clusterService) + new MlAssignmentNotifier(auditor, clusterService), + memoryTracker ); } @@ -444,8 +449,9 @@ public List> getPersistentTasksExecutor(ClusterServic } return Arrays.asList( - new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get()), - new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get()) + new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(), + memoryTracker.get()), + new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor( datafeedManager.get()) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index edd60cd4756d0..385e33dfe369c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -68,6 +68,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import java.util.ArrayList; @@ -93,6 +94,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction(); } @@ -210,6 +214,9 @@ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJ ActionListener listener) { String jobId = request.getJobId(); + // We clean up the memory tracker on delete rather than close as close is not a master node action + memoryTracker.removeJob(jobId); + // Step 4. When the job has been removed from the cluster state, return a response // ------- CheckedConsumer apiResponseHandler = jobDeleted -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 98a4a9039b37a..f827d67c9f7c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -56,6 +56,7 @@ import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -68,6 +69,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import java.io.IOException; import java.util.ArrayList; @@ -94,21 +96,23 @@ To ensure that a subsequent close job call will see that same task status (and s */ public class TransportOpenJobAction extends TransportMasterNodeAction { + private static final PersistentTasksCustomMetaData.Assignment AWAITING_LAZY_ASSIGNMENT = + new PersistentTasksCustomMetaData.Assignment(null, "persistent task is awaiting node assignment."); + private final XPackLicenseState licenseState; private final PersistentTasksService persistentTasksService; private final Client client; private final JobResultsProvider jobResultsProvider; private final JobConfigProvider jobConfigProvider; - - private static final PersistentTasksCustomMetaData.Assignment AWAITING_LAZY_ASSIGNMENT = - new PersistentTasksCustomMetaData.Assignment(null, "persistent task is awaiting node assignment."); + private final MlMemoryTracker memoryTracker; @Inject public TransportOpenJobAction(TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState, ClusterService clusterService, PersistentTasksService persistentTasksService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client, - JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider) { + JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider, + MlMemoryTracker memoryTracker) { super(OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, OpenJobAction.Request::new); this.licenseState = licenseState; @@ -116,6 +120,7 @@ public TransportOpenJobAction(TransportService transportService, ThreadPool thre this.client = client; this.jobResultsProvider = jobResultsProvider; this.jobConfigProvider = jobConfigProvider; + this.memoryTracker = memoryTracker; } /** @@ -144,6 +149,7 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j int maxConcurrentJobAllocations, int fallbackMaxNumberOfOpenJobs, int maxMachineMemoryPercent, + MlMemoryTracker memoryTracker, Logger logger) { String resultsIndexName = job != null ? job.getResultsIndexName() : null; List unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState); @@ -154,10 +160,38 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j return new PersistentTasksCustomMetaData.Assignment(null, reason); } + // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe + // because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs + boolean allocateByMemory = true; + + if (memoryTracker.isRecentlyRefreshed() == false) { + + boolean scheduledRefresh = memoryTracker.asyncRefresh(ActionListener.wrap( + acknowledged -> { + if (acknowledged) { + logger.trace("Job memory requirement refresh request completed successfully"); + } else { + logger.warn("Job memory requirement refresh request completed but did not set time in cluster state"); + } + }, + e -> logger.error("Failed to refresh job memory requirements", e) + )); + if (scheduledRefresh) { + String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested"; + logger.debug(reason); + return new PersistentTasksCustomMetaData.Assignment(null, reason); + } else { + allocateByMemory = false; + logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled", + jobId); + } + } + List reasons = new LinkedList<>(); long maxAvailableCount = Long.MIN_VALUE; + long maxAvailableMemory = Long.MIN_VALUE; DiscoveryNode minLoadedNodeByCount = null; - + DiscoveryNode minLoadedNodeByMemory = null; PersistentTasksCustomMetaData persistentTasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { Map nodeAttributes = node.getAttributes(); @@ -198,10 +232,9 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j } } - long numberOfAssignedJobs = 0; int numberOfAllocatingJobs = 0; - + long assignedJobMemory = 0; if (persistentTasks != null) { // find all the job tasks assigned to this node Collection> assignedTasks = persistentTasks.findTasks( @@ -232,6 +265,15 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { // Don't count CLOSED or FAILED jobs, as they don't consume native memory ++numberOfAssignedJobs; + OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams(); + Long jobMemoryRequirement = memoryTracker.getJobMemoryRequirement(params.getJobId()); + if (jobMemoryRequirement == null) { + allocateByMemory = false; + logger.debug("Falling back to allocating job [{}] by job counts because " + + "the memory requirement for job [{}] was not available", jobId, params.getJobId()); + } else { + assignedJobMemory += jobMemoryRequirement; + } } } } @@ -272,10 +314,62 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j maxAvailableCount = availableCount; minLoadedNodeByCount = node; } + + String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR); + long machineMemory = -1; + // TODO: remove leniency and reject the node if the attribute is null in 7.0 + if (machineMemoryStr != null) { + try { + machineMemory = Long.parseLong(machineMemoryStr); + } catch (NumberFormatException e) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " + + MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long"; + logger.trace(reason); + reasons.add(reason); + continue; + } + } + + if (allocateByMemory) { + if (machineMemory > 0) { + long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100; + Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(jobId); + if (estimatedMemoryFootprint != null) { + long availableMemory = maxMlMemory - assignedJobMemory; + if (estimatedMemoryFootprint > availableMemory) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + + "], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory + + "], memory required by existing jobs [" + assignedJobMemory + + "], estimated memory required for this job [" + estimatedMemoryFootprint + "]"; + logger.trace(reason); + reasons.add(reason); + continue; + } + + if (maxAvailableMemory < availableMemory) { + maxAvailableMemory = availableMemory; + minLoadedNodeByMemory = node; + } + } else { + // If we cannot get the job memory requirement, + // fall back to simply allocating by job count + allocateByMemory = false; + logger.debug("Falling back to allocating job [{}] by job counts because its memory requirement was not available", + jobId); + } + } else { + // If we cannot get the available memory on any machine in + // the cluster, fall back to simply allocating by job count + allocateByMemory = false; + logger.debug("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]", + jobId, nodeNameAndMlAttributes(node)); + } + } } - if (minLoadedNodeByCount != null) { - logger.debug("selected node [{}] for job [{}]", minLoadedNodeByCount, jobId); - return new PersistentTasksCustomMetaData.Assignment(minLoadedNodeByCount.getId(), ""); + DiscoveryNode minLoadedNode = allocateByMemory ? minLoadedNodeByMemory : minLoadedNodeByCount; + if (minLoadedNode != null) { + logger.debug("selected node [{}] for job [{}]", minLoadedNode, jobId); + return new PersistentTasksCustomMetaData.Assignment(minLoadedNode.getId(), ""); } else { String explanation = String.join("|", reasons); logger.debug("no node selected for job [{}], reasons [{}]", jobId, explanation); @@ -451,41 +545,48 @@ public void onFailure(Exception e) { }; // Start job task - ActionListener establishedMemoryUpdateListener = ActionListener.wrap( - response -> { - persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), - MlTasks.JOB_TASK_NAME, jobParams, waitForJobToStart); - }, - listener::onFailure + ActionListener memoryRequirementRefreshListener = ActionListener.wrap( + mem -> persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), MlTasks.JOB_TASK_NAME, jobParams, + waitForJobToStart), + listener::onFailure + ); + + // Tell the job tracker to refresh the memory requirement for this job and all other jobs that have persistent tasks + ActionListener jobUpdateListener = ActionListener.wrap( + response -> memoryTracker.refreshJobMemoryAndAllOthers(jobParams.getJobId(), memoryRequirementRefreshListener), + listener::onFailure ); - // Update established model memory for pre-6.1 jobs that haven't had it set - // and increase the model memory limit for 6.1 - 6.3 jobs + // Increase the model memory limit for 6.1 - 6.3 jobs ActionListener missingMappingsListener = ActionListener.wrap( response -> { Job job = jobParams.getJob(); if (job != null) { Version jobVersion = job.getJobVersion(); - Long jobEstablishedModelMemory = job.getEstablishedModelMemory(); - if ((jobVersion == null || jobVersion.before(Version.V_6_1_0)) - && (jobEstablishedModelMemory == null || jobEstablishedModelMemory == 0)) { - jobResultsProvider.getEstablishedMemoryUsage(job.getId(), null, null, establishedModelMemory -> { - if (establishedModelMemory != null && establishedModelMemory > 0) { - JobUpdate update = new JobUpdate.Builder(job.getId()) - .setEstablishedModelMemory(establishedModelMemory).build(); - UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(job.getId(), update); - - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, - establishedMemoryUpdateListener); - } else { - establishedMemoryUpdateListener.onResponse(null); - } - }, listener::onFailure); - } else { - establishedMemoryUpdateListener.onResponse(null); + if (jobVersion != null && + (jobVersion.onOrAfter(Version.V_6_1_0) && jobVersion.before(Version.V_6_3_0))) { + // Increase model memory limit if < 512MB + if (job.getAnalysisLimits() != null && job.getAnalysisLimits().getModelMemoryLimit() != null && + job.getAnalysisLimits().getModelMemoryLimit() < 512L) { + + long updatedModelMemoryLimit = (long) (job.getAnalysisLimits().getModelMemoryLimit() * 1.3); + AnalysisLimits limits = new AnalysisLimits(updatedModelMemoryLimit, + job.getAnalysisLimits().getCategorizationExamplesLimit()); + + JobUpdate update = new JobUpdate.Builder(job.getId()).setJobVersion(Version.CURRENT) + .setAnalysisLimits(limits).build(); + UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(job.getId(), update); + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, + jobUpdateListener); + } else { + jobUpdateListener.onResponse(null); + } + } + else { + jobUpdateListener.onResponse(null); } } else { - establishedMemoryUpdateListener.onResponse(null); + jobUpdateListener.onResponse(null); } }, listener::onFailure ); @@ -629,6 +730,7 @@ private void addDocMappingIfMissing(String alias, CheckedSupplier { private final AutodetectProcessManager autodetectProcessManager; + private final MlMemoryTracker memoryTracker; /** * The maximum number of open jobs can be different on each node. However, nodes on older versions @@ -642,9 +744,10 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut private volatile int maxLazyMLNodes; public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService, - AutodetectProcessManager autodetectProcessManager) { + AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) { super(MlTasks.JOB_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME); this.autodetectProcessManager = autodetectProcessManager; + this.memoryTracker = memoryTracker; this.fallbackMaxNumberOfOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings); this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings); this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings); @@ -658,11 +761,17 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS @Override public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clusterState) { - PersistentTasksCustomMetaData.Assignment assignment =selectLeastLoadedMlNode(params.getJobId(), params.getJob(), clusterState, - maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs, maxMachineMemoryPercent, logger); + PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), + params.getJob(), + clusterState, + maxConcurrentJobAllocations, + fallbackMaxNumberOfOpenJobs, + maxMachineMemoryPercent, + memoryTracker, + logger); if (assignment.getExecutorNode() == null) { int numMlNodes = 0; - for(DiscoveryNode node : clusterState.getNodes()) { + for (DiscoveryNode node : clusterState.getNodes()) { if (Boolean.valueOf(node.getAttributes().get(MachineLearning.ML_ENABLED_NODE_ATTR))) { numMlNodes++; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 0e9d71b93b2da..7d05f2d02a126 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -66,7 +66,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -580,25 +579,17 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList // Step 1. update the job // ------- - Consumer updateJobHandler = response -> { - JobUpdate update = new JobUpdate.Builder(request.getJobId()) - .setModelSnapshotId(modelSnapshot.getSnapshotId()) - .setEstablishedModelMemory(response) - .build(); - - jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap( - job -> { - auditor.info(request.getJobId(), - Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); - updateHandler.accept(Boolean.TRUE); - }, - actionListener::onFailure - )); - }; - - // Step 0. Find the appropriate established model memory for the reverted job - // ------- - jobResultsProvider.getEstablishedMemoryUsage(request.getJobId(), modelSizeStats.getTimestamp(), modelSizeStats, updateJobHandler, - actionListener::onFailure); + JobUpdate update = new JobUpdate.Builder(request.getJobId()) + .setModelSnapshotId(modelSnapshot.getSnapshotId()) + .build(); + + jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap( + job -> { + auditor.info(request.getJobId(), + Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); + updateHandler.accept(Boolean.TRUE); + }, + actionListener::onFailure + )); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 24abeb9d45b47..8b0de68fb582b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -521,8 +521,7 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService, onProcessCrash(jobTask)); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( - client, auditor, jobId, renormalizer, jobResultsPersister, jobResultsProvider, autodetectParams.modelSizeStats(), - autodetectParams.modelSnapshot() != null); + client, auditor, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); ExecutorService autodetectWorkerExecutor; try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index a60c937209c21..f536b79547736 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -20,9 +20,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; @@ -41,7 +38,6 @@ import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; @@ -56,7 +52,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -85,20 +80,11 @@ public class AutoDetectResultProcessor { private static final Logger LOGGER = LogManager.getLogger(AutoDetectResultProcessor.class); - /** - * This is how far behind real-time we'll update the job with the latest established model memory. - * If more updates are received during the delay period then they'll take precedence. - * As a result there will be at most one update of established model memory per delay period. - */ - private static final TimeValue ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY = TimeValue.timeValueSeconds(5); - private final Client client; private final Auditor auditor; private final String jobId; private final Renormalizer renormalizer; private final JobResultsPersister persister; - private final JobResultsProvider jobResultsProvider; - private final boolean restoredSnapshot; final CountDownLatch completionLatch = new CountDownLatch(1); final Semaphore updateModelSnapshotSemaphore = new Semaphore(1); @@ -112,30 +98,21 @@ public class AutoDetectResultProcessor { * New model size stats are read as the process is running */ private volatile ModelSizeStats latestModelSizeStats; - private volatile Date latestDateForEstablishedModelMemoryCalc; - private volatile long latestEstablishedModelMemory; - private volatile boolean haveNewLatestModelSizeStats; - private Future scheduledEstablishedModelMemoryUpdate; // only accessed in synchronized methods public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, - JobResultsPersister persister, JobResultsProvider jobResultsProvider, - ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) { - this(client, auditor, jobId, renormalizer, persister, jobResultsProvider, latestModelSizeStats, - restoredSnapshot, new FlushListener()); + JobResultsPersister persister, ModelSizeStats latestModelSizeStats) { + this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, new FlushListener()); } AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, - JobResultsPersister persister, JobResultsProvider jobResultsProvider, ModelSizeStats latestModelSizeStats, - boolean restoredSnapshot, FlushListener flushListener) { + JobResultsPersister persister, ModelSizeStats latestModelSizeStats, FlushListener flushListener) { this.client = Objects.requireNonNull(client); this.auditor = Objects.requireNonNull(auditor); this.jobId = Objects.requireNonNull(jobId); this.renormalizer = Objects.requireNonNull(renormalizer); this.persister = Objects.requireNonNull(persister); - this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); this.flushListener = Objects.requireNonNull(flushListener); this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats); - this.restoredSnapshot = restoredSnapshot; } public void process(AutodetectProcess process) { @@ -230,17 +207,7 @@ void processResult(Context context, AutodetectResult result) { // persist after deleting interim results in case the new // results are also interim context.bulkResultsPersister.persistBucket(bucket).executeRequest(); - latestDateForEstablishedModelMemoryCalc = bucket.getTimestamp(); ++bucketCount; - - // if we haven't previously set established model memory, consider trying again after - // a reasonable number of buckets have elapsed since the last model size stats update - long minEstablishedTimespanMs = JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket.getBucketSpan() * 1000L; - if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 && latestDateForEstablishedModelMemoryCalc.getTime() - > latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) { - scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY); - haveNewLatestModelSizeStats = false; - } } List records = result.getRecords(); if (records != null && !records.isEmpty()) { @@ -331,15 +298,6 @@ private void processModelSizeStats(Context context, ModelSizeStats modelSizeStat persister.persistModelSizeStats(modelSizeStats); notifyModelMemoryStatusChange(context, modelSizeStats); latestModelSizeStats = modelSizeStats; - latestDateForEstablishedModelMemoryCalc = modelSizeStats.getTimestamp(); - haveNewLatestModelSizeStats = true; - - // This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets - // because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and - // we'll NEVER consider memory usage to be established during this period - if (restoredSnapshot || bucketCount >= JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { - scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY); - } } private void notifyModelMemoryStatusChange(Context context, ModelSizeStats modelSizeStats) { @@ -366,12 +324,11 @@ protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { return; } - Map update = new HashMap<>(); + Map update = new HashMap<>(); update.put(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshot.getSnapshotId()); update.put(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshot.getMinVersion().toString()); - updateJob(jobId, Collections.singletonMap(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshot.getSnapshotId()), - new ActionListener() { + updateJob(jobId, update, new ActionListener() { @Override public void onResponse(UpdateResponse updateResponse) { updateModelSnapshotSemaphore.release(); @@ -387,67 +344,11 @@ public void onFailure(Exception e) { }); } - /** - * The purpose of this method is to avoid saturating the cluster state update thread - * when a lookback job is churning through buckets very fast and the memory usage of - * the job is changing regularly. The idea is to only update the established model - * memory associated with the job a few seconds after the new value has been received. - * If more updates are received during the delay period then they simply replace the - * value that originally caused the update to be scheduled. This rate limits cluster - * state updates due to established model memory changing to one per job per delay period. - * (In reality updates will only occur this rapidly during lookback. During real-time - * operation the limit of one model size stats document per bucket will mean there is a - * maximum of one cluster state update per job per bucket, and usually the bucket span - * is 5 minutes or more.) - * @param delay The delay before updating established model memory. - */ - synchronized void scheduleEstablishedModelMemoryUpdate(TimeValue delay) { - - if (scheduledEstablishedModelMemoryUpdate == null) { - try { - scheduledEstablishedModelMemoryUpdate = client.threadPool().schedule(delay, MachineLearning.UTILITY_THREAD_POOL_NAME, - () -> runEstablishedModelMemoryUpdate(false)); - LOGGER.trace("[{}] Scheduled established model memory update to run in [{}]", jobId, delay); - } catch (EsRejectedExecutionException e) { - if (e.isExecutorShutdown()) { - LOGGER.debug("failed to schedule established model memory update; shutting down", e); - } else { - throw e; - } - } - } - } - - /** - * This method is called from two places: - * - From the {@link Future} used for delayed updates - * - When shutting down this result processor - * When shutting down the result processor it's only necessary to do anything - * if an update has been scheduled, but we want to do the update immediately. - * Despite cancelling the scheduled update in this case, it's possible that - * it's already started running, in which case this method will get called - * twice in quick succession. But the second call will do nothing, as - * scheduledEstablishedModelMemoryUpdate will have been reset - * to null by the first call. - */ - private synchronized void runEstablishedModelMemoryUpdate(boolean cancelExisting) { - - if (scheduledEstablishedModelMemoryUpdate != null) { - if (cancelExisting) { - LOGGER.debug("[{}] Bringing forward previously scheduled established model memory update", jobId); - FutureUtils.cancel(scheduledEstablishedModelMemoryUpdate); - } - scheduledEstablishedModelMemoryUpdate = null; - updateEstablishedModelMemoryOnJob(); - } - } - private void onAutodetectClose() { onCloseActionsLatch = new CountDownLatch(1); ActionListener updateListener = ActionListener.wrap( updateResponse -> { - runEstablishedModelMemoryUpdate(true); onCloseActionsLatch.countDown(); }, e -> { @@ -462,35 +363,6 @@ private void onAutodetectClose() { ); } - private void updateEstablishedModelMemoryOnJob() { - - // Copy these before committing writes, so the calculation is done based on committed documents - Date latestBucketTimestamp = latestDateForEstablishedModelMemoryCalc; - ModelSizeStats modelSizeStatsForCalc = latestModelSizeStats; - - // We need to make all results written up to and including these stats available for the established memory calculation - persister.commitResultWrites(jobId); - - jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> { - if (latestEstablishedModelMemory != establishedModelMemory) { - updateJob(jobId, Collections.singletonMap(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory), - new ActionListener() { - @Override - public void onResponse(UpdateResponse response) { - latestEstablishedModelMemory = establishedModelMemory; - LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); - } - - @Override - public void onFailure(Exception e) { - LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + - establishedModelMemory + "]", e); - } - }); - } - }, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e)); - } - private void updateJob(String jobId, Map update, ActionListener listener) { UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java new file mode 100644 index 0000000000000..63f0fac27d8e8 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -0,0 +1,325 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.process; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.ack.AckedRequest; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * This class keeps track of the memory requirement of ML jobs. + * It only functions on the master node - for this reason it should only be used by master node actions. + * The memory requirement for ML jobs can be updated in 3 ways: + * 1. For all open ML jobs (via {@link #asyncRefresh}) + * 2. For all open ML jobs, plus one named ML job that is not open (via {@link #refreshJobMemoryAndAllOthers}) + * 3. For one named ML job (via {@link #refreshJobMemory}) + * In all cases a listener informs the caller when the requested updates are complete. + */ +public class MlMemoryTracker implements LocalNodeMasterListener { + + private static final AckedRequest ACKED_REQUEST = new AckedRequest() { + @Override + public TimeValue ackTimeout() { + return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT; + } + + @Override + public TimeValue masterNodeTimeout() { + return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT; + } + }; + + private static final Duration RECENT_UPDATE_THRESHOLD = Duration.ofMinutes(1); + + private final Logger logger = LogManager.getLogger(MlMemoryTracker.class); + private final ConcurrentHashMap memoryRequirementByJob = new ConcurrentHashMap<>(); + private final List> fullRefreshCompletionListeners = new ArrayList<>(); + + private final ThreadPool threadPool; + private final ClusterService clusterService; + private final JobManager jobManager; + private final JobResultsProvider jobResultsProvider; + private volatile boolean isMaster; + private volatile Instant lastUpdateTime; + + public MlMemoryTracker(ClusterService clusterService, ThreadPool threadPool, JobManager jobManager, + JobResultsProvider jobResultsProvider) { + this.threadPool = threadPool; + this.clusterService = clusterService; + this.jobManager = jobManager; + this.jobResultsProvider = jobResultsProvider; + clusterService.addLocalNodeMasterListener(this); + } + + @Override + public void onMaster() { + isMaster = true; + logger.trace("ML memory tracker on master"); + } + + @Override + public void offMaster() { + isMaster = false; + logger.trace("ML memory tracker off master"); + memoryRequirementByJob.clear(); + lastUpdateTime = null; + } + + @Override + public String executorName() { + return MachineLearning.UTILITY_THREAD_POOL_NAME; + } + + /** + * Is the information in this object sufficiently up to date + * for valid allocation decisions to be made using it? + */ + public boolean isRecentlyRefreshed() { + Instant localLastUpdateTime = lastUpdateTime; + return localLastUpdateTime != null && localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).isAfter(Instant.now()); + } + + /** + * Get the memory requirement for a job. + * This method only works on the master node. + * @param jobId The job ID. + * @return The memory requirement of the job specified by {@code jobId}, + * or null if it cannot be calculated. + */ + public Long getJobMemoryRequirement(String jobId) { + + if (isMaster == false) { + return null; + } + + Long memoryRequirement = memoryRequirementByJob.get(jobId); + if (memoryRequirement != null) { + return memoryRequirement; + } + + return null; + } + + /** + * Remove any memory requirement that is stored for the specified job. + * It doesn't matter if this method is called for a job that doesn't have + * a stored memory requirement. + */ + public void removeJob(String jobId) { + memoryRequirementByJob.remove(jobId); + } + + /** + * Uses a separate thread to refresh the memory requirement for every ML job that has + * a corresponding persistent task. This method only works on the master node. + * @param listener Will be called when the async refresh completes or fails. The + * boolean value indicates whether the cluster state was updated + * with the refresh completion time. (If it was then this will in + * cause the persistent tasks framework to check if any persistent + * tasks are awaiting allocation.) + * @return true if the async refresh is scheduled, and false + * if this is not possible for some reason. + */ + public boolean asyncRefresh(ActionListener listener) { + + if (isMaster) { + try { + ActionListener mlMetaUpdateListener = ActionListener.wrap( + aVoid -> recordUpdateTimeInClusterState(listener), + listener::onFailure + ); + threadPool.executor(executorName()).execute( + () -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE), mlMetaUpdateListener)); + return true; + } catch (EsRejectedExecutionException e) { + logger.debug("Couldn't schedule ML memory update - node might be shutting down", e); + } + } + + return false; + } + + /** + * This refreshes the memory requirement for every ML job that has a corresponding + * persistent task and, in addition, one job that doesn't have a persistent task. + * This method only works on the master node. + * @param jobId The job ID of the job whose memory requirement is to be refreshed + * despite not having a corresponding persistent task. + * @param listener Receives the memory requirement of the job specified by {@code jobId}, + * or null if it cannot be calculated. + */ + public void refreshJobMemoryAndAllOthers(String jobId, ActionListener listener) { + + if (isMaster == false) { + listener.onResponse(null); + return; + } + + PersistentTasksCustomMetaData persistentTasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + refresh(persistentTasks, ActionListener.wrap(aVoid -> refreshJobMemory(jobId, listener), listener::onFailure)); + } + + /** + * This refreshes the memory requirement for every ML job that has a corresponding persistent task. + * It does NOT remove entries for jobs that no longer have a persistent task, because that would + * lead to a race where a job was opened part way through the refresh. (Instead, entries are removed + * when jobs are deleted.) + */ + void refresh(PersistentTasksCustomMetaData persistentTasks, ActionListener onCompletion) { + + synchronized (fullRefreshCompletionListeners) { + fullRefreshCompletionListeners.add(onCompletion); + if (fullRefreshCompletionListeners.size() > 1) { + // A refresh is already in progress, so don't do another + return; + } + } + + ActionListener refreshComplete = ActionListener.wrap(aVoid -> { + lastUpdateTime = Instant.now(); + synchronized (fullRefreshCompletionListeners) { + assert fullRefreshCompletionListeners.isEmpty() == false; + for (ActionListener listener : fullRefreshCompletionListeners) { + listener.onResponse(null); + } + fullRefreshCompletionListeners.clear(); + } + }, onCompletion::onFailure); + + // persistentTasks will be null if there's never been a persistent task created in this cluster + if (persistentTasks == null) { + refreshComplete.onResponse(null); + } else { + List> mlJobTasks = persistentTasks.tasks().stream() + .filter(task -> MlTasks.JOB_TASK_NAME.equals(task.getTaskName())).collect(Collectors.toList()); + iterateMlJobTasks(mlJobTasks.iterator(), refreshComplete); + } + } + + private void recordUpdateTimeInClusterState(ActionListener listener) { + + clusterService.submitStateUpdateTask("ml-memory-last-update-time", + new AckedClusterStateUpdateTask(ACKED_REQUEST, listener) { + @Override + protected Boolean newResponse(boolean acknowledged) { + return acknowledged; + } + + @Override + public ClusterState execute(ClusterState currentState) { + MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState); + MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); + builder.setLastMemoryRefreshVersion(currentState.getVersion() + 1); + MlMetadata newMlMetadata = builder.build(); + if (newMlMetadata.equals(currentMlMetadata)) { + // Return same reference if nothing has changed + return currentState; + } else { + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMlMetadata).build()); + return newState.build(); + } + } + }); + } + + private void iterateMlJobTasks(Iterator> iterator, + ActionListener refreshComplete) { + if (iterator.hasNext()) { + OpenJobAction.JobParams jobParams = (OpenJobAction.JobParams) iterator.next().getParams(); + refreshJobMemory(jobParams.getJobId(), + ActionListener.wrap(mem -> iterateMlJobTasks(iterator, refreshComplete), refreshComplete::onFailure)); + } else { + refreshComplete.onResponse(null); + } + } + + /** + * Refresh the memory requirement for a single job. + * This method only works on the master node. + * @param jobId The ID of the job to refresh the memory requirement for. + * @param listener Receives the job's memory requirement, or null + * if it cannot be calculated. + */ + public void refreshJobMemory(String jobId, ActionListener listener) { + if (isMaster == false) { + listener.onResponse(null); + return; + } + + try { + jobResultsProvider.getEstablishedMemoryUsage(jobId, null, null, + establishedModelMemoryBytes -> { + if (establishedModelMemoryBytes <= 0L) { + setJobMemoryToLimit(jobId, listener); + } else { + Long memoryRequirementBytes = establishedModelMemoryBytes + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); + memoryRequirementByJob.put(jobId, memoryRequirementBytes); + listener.onResponse(memoryRequirementBytes); + } + }, + e -> { + logger.error("[" + jobId + "] failed to calculate job established model memory requirement", e); + setJobMemoryToLimit(jobId, listener); + } + ); + } catch (Exception e) { + logger.error("[" + jobId + "] failed to calculate job established model memory requirement", e); + setJobMemoryToLimit(jobId, listener); + } + } + + private void setJobMemoryToLimit(String jobId, ActionListener listener) { + jobManager.getJob(jobId, ActionListener.wrap(job -> { + Long memoryLimitMb = job.getAnalysisLimits().getModelMemoryLimit(); + if (memoryLimitMb != null) { + Long memoryRequirementBytes = ByteSizeUnit.MB.toBytes(memoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); + memoryRequirementByJob.put(jobId, memoryRequirementBytes); + listener.onResponse(memoryRequirementBytes); + } else { + memoryRequirementByJob.remove(jobId); + listener.onResponse(null); + } + }, e -> { + if (e instanceof ResourceNotFoundException) { + // TODO: does this also happen if the .ml-config index exists but is unavailable? + logger.trace("[{}] job deleted during ML memory update", jobId); + } else { + logger.error("[" + jobId + "] failed to get job during ML memory update", e); + } + memoryRequirementByJob.remove(jobId); + listener.onResponse(null); + })); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index c7ca2ff805eba..eb58221bf5f35 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -69,6 +69,9 @@ protected MlMetadata createTestInstance() { builder.putJob(job, false); } } + if (randomBoolean()) { + builder.setLastMemoryRefreshVersion(randomNonNegativeLong()); + } return builder.build(); } @@ -438,8 +441,9 @@ protected MlMetadata mutateInstance(MlMetadata instance) { for (Map.Entry entry : datafeeds.entrySet()) { metadataBuilder.putDatafeed(entry.getValue(), Collections.emptyMap()); } + metadataBuilder.setLastMemoryRefreshVersion(instance.getLastMemoryRefreshVersion()); - switch (between(0, 1)) { + switch (between(0, 2)) { case 0: metadataBuilder.putJob(JobTests.createRandomizedJob(), true); break; @@ -459,6 +463,13 @@ protected MlMetadata mutateInstance(MlMetadata instance) { metadataBuilder.putJob(randomJob, false); metadataBuilder.putDatafeed(datafeedConfig, Collections.emptyMap()); break; + case 2: + if (instance.getLastMemoryRefreshVersion() == null) { + metadataBuilder.setLastMemoryRefreshVersion(randomNonNegativeLong()); + } else { + metadataBuilder.setLastMemoryRefreshVersion(null); + } + break; default: throw new AssertionError("Illegal randomisation branch"); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 32ae117f9fd45..84d2ecaf918f9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -50,7 +50,9 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; +import org.junit.Before; import java.io.IOException; import java.net.InetAddress; @@ -71,6 +73,14 @@ public class TransportOpenJobActionTests extends ESTestCase { + private MlMemoryTracker memoryTracker; + + @Before + public void setup() { + memoryTracker = mock(MlMemoryTracker.class); + when(memoryTracker.isRecentlyRefreshed()).thenReturn(true); + } + public void testValidate_jobMissing() { expectThrows(ResourceNotFoundException.class, () -> TransportOpenJobAction.validate("job_id2", null)); } @@ -125,7 +135,7 @@ public void testSelectLeastLoadedMlNode_byCount() { jobBuilder.setJobVersion(Version.CURRENT); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", jobBuilder.build(), - cs.build(), 2, 10, 30, logger); + cs.build(), 2, 10, 30, memoryTracker, logger); assertEquals("", result.getExplanation()); assertEquals("_node_id3", result.getExecutorNode()); } @@ -161,7 +171,7 @@ public void testSelectLeastLoadedMlNode_maxCapacity() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id0", new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date()); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), 2, - maxRunningJobsPerNode, 30, logger); + maxRunningJobsPerNode, 30, memoryTracker, logger); assertNull(result.getExecutorNode()); assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]")); @@ -187,7 +197,7 @@ public void testSelectLeastLoadedMlNode_noMlNodes() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 10, 30, memoryTracker, logger); assertTrue(result.getExplanation().contains("because this node isn't a ml node")); assertNull(result.getExecutorNode()); } @@ -221,7 +231,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id6", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); ClusterState cs = csBuilder.build(); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 10, 30, memoryTracker, logger); assertEquals("_node_id3", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); @@ -231,7 +241,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -242,7 +252,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger); assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -253,7 +263,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger); assertNull("no node selected, because null state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -291,7 +301,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); // Allocation won't be possible if the stale failed job is treated as opening - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger); assertEquals("_node_id1", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); @@ -301,7 +311,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 10, 30, memoryTracker, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -332,7 +342,8 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30, + memoryTracker, logger); assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]")); assertNull(result.getExecutorNode()); } @@ -362,7 +373,7 @@ public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion() metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", job, cs.build(), - 2, 10, 30, logger); + 2, 10, 30, memoryTracker, logger); assertThat(result.getExplanation(), containsString( "because the job's model snapshot requires a node of version [6.3.0] or higher")); assertNull(result.getExecutorNode()); @@ -389,7 +400,8 @@ public void testSelectLeastLoadedMlNode_jobWithRulesButNoNodeMeetsRequiredVersio cs.metaData(metaData); Job job = jobWithRules("job_with_rules"); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, memoryTracker, + logger); assertThat(result.getExplanation(), containsString( "because jobs using custom_rules require a node of version [6.4.0] or higher")); assertNull(result.getExecutorNode()); @@ -416,7 +428,8 @@ public void testSelectLeastLoadedMlNode_jobWithRulesAndNodeMeetsRequiredVersion( cs.metaData(metaData); Job job = jobWithRules("job_with_rules"); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, memoryTracker, + logger); assertNotNull(result.getExecutorNode()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index f1f3ff77c9840..505a2b871da0b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -89,7 +89,7 @@ public void createComponents() throws Exception { renormalizer = mock(Renormalizer.class); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer, - new JobResultsPersister(client()), jobResultsProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) { + new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build()) { @Override protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot); @@ -100,7 +100,7 @@ protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { } @After - public void deleteJob() throws Exception { + public void deleteJob() { DeleteJobAction.Request request = new DeleteJobAction.Request(JOB_ID); AcknowledgedResponse response = client().execute(DeleteJobAction.INSTANCE, request).actionGet(); assertTrue(response.isAcknowledged()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 2e14289da705e..5e4d8fd06030c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -8,10 +8,13 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentHelper; @@ -31,21 +34,32 @@ import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; public class MlDistributedFailureIT extends BaseMlIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(), 4) + .build(); + } + public void testFailOver() throws Exception { internalCluster().ensureAtLeastNumDataNodes(3); ensureStableClusterOnAllNodes(3); @@ -58,8 +72,6 @@ public void testFailOver() throws Exception { }); } - @TestLogging("org.elasticsearch.xpack.ml.action:DEBUG,org.elasticsearch.xpack.persistent:TRACE," + - "org.elasticsearch.xpack.ml.datafeed:TRACE") public void testLoseDedicatedMasterNode() throws Exception { internalCluster().ensureAtMostNumDataNodes(0); logger.info("Starting dedicated master node..."); @@ -136,12 +148,12 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { // Job state is opened but the job is not assigned to a node (because we just killed the only ML node) GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId); GetJobsStatsAction.Response jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); - assertEquals(jobStatsResponse.getResponse().results().get(0).getState(), JobState.OPENED); + assertEquals(JobState.OPENED, jobStatsResponse.getResponse().results().get(0).getState()); GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); GetDatafeedsStatsAction.Response datafeedStatsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); - assertEquals(datafeedStatsResponse.getResponse().results().get(0).getDatafeedState(), DatafeedState.STARTED); + assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); // Can't normal stop an unassigned datafeed StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); @@ -170,6 +182,73 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { assertTrue(closeJobResponse.isClosed()); } + @TestLogging("org.elasticsearch.xpack.ml.action:TRACE,org.elasticsearch.xpack.ml.process:TRACE") + public void testJobRelocationIsMemoryAware() throws Exception { + + internalCluster().ensureAtLeastNumDataNodes(1); + ensureStableClusterOnAllNodes(1); + + // Open 4 small jobs. Since there is only 1 node in the cluster they'll have to go on that node. + + setupJobWithoutDatafeed("small1", new ByteSizeValue(2, ByteSizeUnit.MB)); + setupJobWithoutDatafeed("small2", new ByteSizeValue(2, ByteSizeUnit.MB)); + setupJobWithoutDatafeed("small3", new ByteSizeValue(2, ByteSizeUnit.MB)); + setupJobWithoutDatafeed("small4", new ByteSizeValue(2, ByteSizeUnit.MB)); + + // Expand the cluster to 3 nodes. The 4 small jobs will stay on the + // same node because we don't rebalance jobs that are happily running. + + internalCluster().ensureAtLeastNumDataNodes(3); + ensureStableClusterOnAllNodes(3); + + // Open a big job. This should go on a different node to the 4 small ones. + + setupJobWithoutDatafeed("big1", new ByteSizeValue(500, ByteSizeUnit.MB)); + + // Stop the current master node - this should be the one with the 4 small jobs on. + + internalCluster().stopCurrentMasterNode(); + ensureStableClusterOnAllNodes(2); + + // If memory requirements are used to reallocate the 4 small jobs (as we expect) then they should + // all reallocate to the same node, that being the one that doesn't have the big job on. If job counts + // are used to reallocate the small jobs then this implies the fallback allocation mechanism has been + // used in a situation we don't want it to be used in, and at least one of the small jobs will be on + // the same node as the big job. (This all relies on xpack.ml.node_concurrent_job_allocations being set + // to at least 4, which we do in the nodeSettings() method.) + + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(MetaData.ALL)).actionGet(); + QueryPage jobStats = statsResponse.getResponse(); + assertNotNull(jobStats); + List smallJobNodes = jobStats.results().stream().filter(s -> s.getJobId().startsWith("small") && s.getNode() != null) + .map(s -> s.getNode().getName()).collect(Collectors.toList()); + List bigJobNodes = jobStats.results().stream().filter(s -> s.getJobId().startsWith("big") && s.getNode() != null) + .map(s -> s.getNode().getName()).collect(Collectors.toList()); + logger.info("small job nodes: " + smallJobNodes + ", big job nodes: " + bigJobNodes); + assertEquals(5, jobStats.count()); + assertEquals(4, smallJobNodes.size()); + assertEquals(1, bigJobNodes.size()); + assertEquals(1L, smallJobNodes.stream().distinct().count()); + assertEquals(1L, bigJobNodes.stream().distinct().count()); + assertNotEquals(smallJobNodes, bigJobNodes); + }); + } + + private void setupJobWithoutDatafeed(String jobId, ByteSizeValue modelMemoryLimit) throws Exception { + Job.Builder job = createFareQuoteJob(jobId, modelMemoryLimit); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job); + client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); + + client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).actionGet(); + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); + assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); + }); + } + private void setupJobAndDatafeed(String jobId, String datafeedId) throws Exception { Job.Builder job = createScheduledJob(jobId); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); @@ -183,7 +262,7 @@ private void setupJobAndDatafeed(String jobId, String datafeedId) throws Excepti assertBusy(() -> { GetJobsStatsAction.Response statsResponse = client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); - assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED); + assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); }); StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 87aa3c5b926e3..c4150d633a8f0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -123,12 +123,10 @@ public void testLazyNodeValidation() throws Exception { }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/34084") public void testSingleNode() throws Exception { verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 100)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/34084") public void testMultipleNodes() throws Exception { verifyMaxNumberOfJobsLimit(3, randomIntBetween(1, 100)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 05f83f4ae49cc..761cb56fa8804 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; @@ -56,9 +55,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Consumer; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; @@ -85,7 +82,6 @@ public class AutoDetectResultProcessorTests extends ESTestCase { private Auditor auditor; private Renormalizer renormalizer; private JobResultsPersister persister; - private JobResultsProvider jobResultsProvider; private FlushListener flushListener; private AutoDetectResultProcessor processorUnderTest; private ScheduledThreadPoolExecutor executor; @@ -95,9 +91,10 @@ public void setUpMocks() { executor = new ScheduledThreadPoolExecutor(1); client = mock(Client.class); doAnswer(invocation -> { - ActionListener listener = (ActionListener) invocation.getArguments()[2]; - listener.onResponse(new UpdateResponse()); - return null; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(new UpdateResponse()); + return null; }).when(client).execute(same(UpdateAction.INSTANCE), any(), any()); threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); @@ -113,10 +110,9 @@ public void setUpMocks() { persister = mock(JobResultsPersister.class); when(persister.persistModelSnapshot(any(), any())) .thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true)); - jobResultsProvider = mock(JobResultsProvider.class); flushListener = mock(FlushListener.class); - processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, jobResultsProvider, - new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), false, flushListener); + processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, + new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), flushListener); } @After @@ -300,8 +296,6 @@ public void testProcessResult_modelSizeStats() { verify(persister, times(1)).persistModelSizeStats(modelSizeStats); verifyNoMoreInteractions(persister); - // No interactions with the jobResultsProvider confirms that the established memory calculation did not run - verifyNoMoreInteractions(jobResultsProvider, auditor); assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); } @@ -343,85 +337,6 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { verifyNoMoreInteractions(auditor); } - public void testProcessResult_modelSizeStatsAfterManyBuckets() throws Exception { - JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); - - // To avoid slowing down the test this is using a delay of 1 nanosecond rather than the 5 seconds used in production - setupScheduleDelayTime(TimeValue.timeValueNanos(1)); - - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); - context.deleteInterimRequired = false; - for (int i = 0; i < JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE; ++i) { - AutodetectResult result = mock(AutodetectResult.class); - Bucket bucket = mock(Bucket.class); - when(result.getBucket()).thenReturn(bucket); - processorUnderTest.processResult(context, result); - } - - AutodetectResult result = mock(AutodetectResult.class); - ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); - Date timestamp = new Date(BUCKET_SPAN_MS); - when(modelSizeStats.getTimestamp()).thenReturn(timestamp); - when(result.getModelSizeStats()).thenReturn(modelSizeStats); - processorUnderTest.processResult(context, result); - - // Some calls will be made 1 nanosecond later in a different thread, hence the assertBusy() - assertBusy(() -> { - verify(persister, times(1)).persistModelSizeStats(modelSizeStats); - verify(persister, times(1)).commitResultWrites(JOB_ID); - verifyNoMoreInteractions(persister); - verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(timestamp), - eq(modelSizeStats), any(Consumer.class), any(Consumer.class)); - verifyNoMoreInteractions(jobResultsProvider); - assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); - }); - } - - public void testProcessResult_manyModelSizeStatsInQuickSuccession() throws Exception { - JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); - - setupScheduleDelayTime(TimeValue.timeValueSeconds(1)); - - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); - context.deleteInterimRequired = false; - ModelSizeStats modelSizeStats = null; - for (int i = 1; i <= JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE + 5; ++i) { - AutodetectResult result = mock(AutodetectResult.class); - Bucket bucket = mock(Bucket.class); - when(bucket.getTimestamp()).thenReturn(new Date(BUCKET_SPAN_MS * i)); - when(result.getBucket()).thenReturn(bucket); - processorUnderTest.processResult(context, result); - if (i > JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { - result = mock(AutodetectResult.class); - modelSizeStats = mock(ModelSizeStats.class); - when(modelSizeStats.getTimestamp()).thenReturn(new Date(BUCKET_SPAN_MS * i)); - when(result.getModelSizeStats()).thenReturn(modelSizeStats); - processorUnderTest.processResult(context, result); - } - } - - ModelSizeStats lastModelSizeStats = modelSizeStats; - assertNotNull(lastModelSizeStats); - Date lastTimestamp = lastModelSizeStats.getTimestamp(); - - // Some calls will be made 1 second later in a different thread, hence the assertBusy() - assertBusy(() -> { - // All the model size stats should be persisted to the index... - verify(persister, times(5)).persistModelSizeStats(any(ModelSizeStats.class)); - // ...but only the last should trigger an established model memory update - verify(persister, times(1)).commitResultWrites(JOB_ID); - verifyNoMoreInteractions(persister); - verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(lastTimestamp), eq(lastModelSizeStats), - any(Consumer.class), any(Consumer.class)); - verifyNoMoreInteractions(jobResultsProvider); - assertEquals(lastModelSizeStats, processorUnderTest.modelSizeStats()); - }); - } - public void testProcessResult_modelSnapshot() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); @@ -442,7 +357,7 @@ public void testProcessResult_modelSnapshot() { verifyNoMoreInteractions(persister); UpdateRequest capturedRequest = requestCaptor.getValue(); - assertThat(capturedRequest.doc().sourceAsMap().keySet(), contains(Job.MODEL_SNAPSHOT_ID.getPreferredName())); + assertNotNull(capturedRequest.doc().sourceAsMap().get(Job.MODEL_SNAPSHOT_ID.getPreferredName())); } public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java new file mode 100644 index 0000000000000..cbba7ffa04972 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -0,0 +1,195 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.process; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class MlMemoryTrackerTests extends ESTestCase { + + private ClusterService clusterService; + private ThreadPool threadPool; + private JobManager jobManager; + private JobResultsProvider jobResultsProvider; + private MlMemoryTracker memoryTracker; + + @Before + public void setup() { + + clusterService = mock(ClusterService.class); + threadPool = mock(ThreadPool.class); + ExecutorService executorService = mock(ExecutorService.class); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + Runnable r = (Runnable) invocation.getArguments()[0]; + r.run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + when(threadPool.executor(anyString())).thenReturn(executorService); + jobManager = mock(JobManager.class); + jobResultsProvider = mock(JobResultsProvider.class); + memoryTracker = new MlMemoryTracker(clusterService, threadPool, jobManager, jobResultsProvider); + } + + public void testRefreshAll() { + + boolean isMaster = randomBoolean(); + if (isMaster) { + memoryTracker.onMaster(); + } else { + memoryTracker.offMaster(); + } + + int numMlJobTasks = randomIntBetween(2, 5); + Map> tasks = new HashMap<>(); + for (int i = 1; i <= numMlJobTasks; ++i) { + String jobId = "job" + i; + PersistentTasksCustomMetaData.PersistentTask task = makeTestTask(jobId); + tasks.put(task.getId(), task); + } + PersistentTasksCustomMetaData persistentTasks = new PersistentTasksCustomMetaData(numMlJobTasks, tasks); + + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + Consumer listener = (Consumer) invocation.getArguments()[3]; + listener.accept(randomLongBetween(1000, 1000000)); + return null; + }).when(jobResultsProvider).getEstablishedMemoryUsage(anyString(), any(), any(), any(Consumer.class), any()); + + memoryTracker.refresh(persistentTasks, ActionListener.wrap(aVoid -> {}, ESTestCase::assertNull)); + + if (isMaster) { + for (int i = 1; i <= numMlJobTasks; ++i) { + String jobId = "job" + i; + verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(), any()); + } + } else { + verify(jobResultsProvider, never()).getEstablishedMemoryUsage(anyString(), any(), any(), any(), any()); + } + } + + public void testRefreshOne() { + + boolean isMaster = randomBoolean(); + if (isMaster) { + memoryTracker.onMaster(); + } else { + memoryTracker.offMaster(); + } + + String jobId = "job"; + boolean haveEstablishedModelMemory = randomBoolean(); + + long modelBytes = 1024 * 1024; + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + Consumer listener = (Consumer) invocation.getArguments()[3]; + listener.accept(haveEstablishedModelMemory ? modelBytes : 0L); + return null; + }).when(jobResultsProvider).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(Consumer.class), any()); + + long modelMemoryLimitMb = 2; + Job job = mock(Job.class); + when(job.getAnalysisLimits()).thenReturn(new AnalysisLimits(modelMemoryLimitMb, 4L)); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + listener.onResponse(job); + return null; + }).when(jobManager).getJob(eq(jobId), any(ActionListener.class)); + + AtomicReference refreshedMemoryRequirement = new AtomicReference<>(); + memoryTracker.refreshJobMemory(jobId, ActionListener.wrap(refreshedMemoryRequirement::set, ESTestCase::assertNull)); + + if (isMaster) { + if (haveEstablishedModelMemory) { + assertEquals(Long.valueOf(modelBytes + Job.PROCESS_MEMORY_OVERHEAD.getBytes()), + memoryTracker.getJobMemoryRequirement(jobId)); + } else { + assertEquals(Long.valueOf(ByteSizeUnit.MB.toBytes(modelMemoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes()), + memoryTracker.getJobMemoryRequirement(jobId)); + } + } else { + assertNull(memoryTracker.getJobMemoryRequirement(jobId)); + } + + assertEquals(memoryTracker.getJobMemoryRequirement(jobId), refreshedMemoryRequirement.get()); + + memoryTracker.removeJob(jobId); + assertNull(memoryTracker.getJobMemoryRequirement(jobId)); + } + + @SuppressWarnings("unchecked") + public void testRecordUpdateTimeInClusterState() { + + boolean isMaster = randomBoolean(); + if (isMaster) { + memoryTracker.onMaster(); + } else { + memoryTracker.offMaster(); + } + + when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); + + AtomicReference updateVersion = new AtomicReference<>(); + + doAnswer(invocation -> { + AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocation.getArguments()[1]; + ClusterState currentClusterState = ClusterState.EMPTY_STATE; + ClusterState newClusterState = task.execute(currentClusterState); + assertThat(currentClusterState, not(equalTo(newClusterState))); + MlMetadata newMlMetadata = MlMetadata.getMlMetadata(newClusterState); + updateVersion.set(newMlMetadata.getLastMemoryRefreshVersion()); + task.onAllNodesAcked(null); + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any(AckedClusterStateUpdateTask.class)); + + memoryTracker.asyncRefresh(ActionListener.wrap(ESTestCase::assertTrue, ESTestCase::assertNull)); + + if (isMaster) { + assertNotNull(updateVersion.get()); + } else { + assertNull(updateVersion.get()); + } + } + + private PersistentTasksCustomMetaData.PersistentTask makeTestTask(String jobId) { + return new PersistentTasksCustomMetaData.PersistentTask<>("job-" + jobId, MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(jobId), + 0, PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT); + } +}