diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java index 13b4dcb955a05..7210aefa98740 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java @@ -57,7 +57,6 @@ public class Job implements ToXContentObject { public static final ParseField DATA_DESCRIPTION = new ParseField("data_description"); public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField FINISHED_TIME = new ParseField("finished_time"); - public static final ParseField ESTABLISHED_MODEL_MEMORY = new ParseField("established_model_memory"); public static final ParseField MODEL_PLOT_CONFIG = new ParseField("model_plot_config"); public static final ParseField RENORMALIZATION_WINDOW_DAYS = new ParseField("renormalization_window_days"); public static final ParseField BACKGROUND_PERSIST_INTERVAL = new ParseField("background_persist_interval"); @@ -82,7 +81,6 @@ public class Job implements ToXContentObject { (p) -> TimeUtil.parseTimeField(p, FINISHED_TIME.getPreferredName()), FINISHED_TIME, ValueType.VALUE); - PARSER.declareLong(Builder::setEstablishedModelMemory, ESTABLISHED_MODEL_MEMORY); PARSER.declareObject(Builder::setAnalysisConfig, AnalysisConfig.PARSER, ANALYSIS_CONFIG); PARSER.declareObject(Builder::setAnalysisLimits, AnalysisLimits.PARSER, ANALYSIS_LIMITS); PARSER.declareObject(Builder::setDataDescription, DataDescription.PARSER, DATA_DESCRIPTION); @@ -105,7 +103,6 @@ public class Job implements ToXContentObject { private final String description; private final Date createTime; private final Date finishedTime; - private final Long establishedModelMemory; private final AnalysisConfig analysisConfig; private final AnalysisLimits analysisLimits; private final DataDescription dataDescription; @@ -120,7 +117,7 @@ public class Job implements ToXContentObject { private final Boolean deleting; private Job(String jobId, String jobType, List groups, String description, - Date createTime, Date finishedTime, Long establishedModelMemory, + Date createTime, Date finishedTime, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, @@ -132,7 +129,6 @@ private Job(String jobId, String jobType, List groups, String descriptio this.description = description; this.createTime = createTime; this.finishedTime = finishedTime; - this.establishedModelMemory = establishedModelMemory; this.analysisConfig = analysisConfig; this.analysisLimits = analysisLimits; this.dataDescription = dataDescription; @@ -202,16 +198,6 @@ public Date getFinishedTime() { return finishedTime; } - /** - * The established model memory of the job, or null if model - * memory has not reached equilibrium yet. - * - * @return The established model memory of the job - */ - public Long getEstablishedModelMemory() { - return establishedModelMemory; - } - /** * The analysis configuration object * @@ -304,9 +290,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.timeField(FINISHED_TIME.getPreferredName(), FINISHED_TIME.getPreferredName() + humanReadableSuffix, finishedTime.getTime()); } - if (establishedModelMemory != null) { - builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); - } builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params); if (analysisLimits != null) { builder.field(ANALYSIS_LIMITS.getPreferredName(), analysisLimits, params); @@ -362,7 +345,6 @@ public boolean equals(Object other) { && Objects.equals(this.description, that.description) && Objects.equals(this.createTime, that.createTime) && Objects.equals(this.finishedTime, that.finishedTime) - && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) && Objects.equals(this.analysisConfig, that.analysisConfig) && Objects.equals(this.analysisLimits, that.analysisLimits) && Objects.equals(this.dataDescription, that.dataDescription) @@ -379,7 +361,7 @@ public boolean equals(Object other) { @Override public int hashCode() { - return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, establishedModelMemory, + return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleting); @@ -405,7 +387,6 @@ public static class Builder { private DataDescription dataDescription; private Date createTime; private Date finishedTime; - private Long establishedModelMemory; private ModelPlotConfig modelPlotConfig; private Long renormalizationWindowDays; private TimeValue backgroundPersistInterval; @@ -433,7 +414,6 @@ public Builder(Job job) { this.dataDescription = job.getDataDescription(); this.createTime = job.getCreateTime(); this.finishedTime = job.getFinishedTime(); - this.establishedModelMemory = job.getEstablishedModelMemory(); this.modelPlotConfig = job.getModelPlotConfig(); this.renormalizationWindowDays = job.getRenormalizationWindowDays(); this.backgroundPersistInterval = job.getBackgroundPersistInterval(); @@ -494,11 +474,6 @@ Builder setFinishedTime(Date finishedTime) { return this; } - public Builder setEstablishedModelMemory(Long establishedModelMemory) { - this.establishedModelMemory = establishedModelMemory; - return this; - } - public Builder setDataDescription(DataDescription.Builder description) { dataDescription = Objects.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build(); return this; @@ -553,7 +528,7 @@ public Job build() { Objects.requireNonNull(id, "[" + ID.getPreferredName() + "] must not be null"); Objects.requireNonNull(jobType, "[" + JOB_TYPE.getPreferredName() + "] must not be null"); return new Job( - id, jobType, groups, description, createTime, finishedTime, establishedModelMemory, + id, jobType, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleting); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java index 667932d591231..70e8b4296b0df 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java @@ -125,9 +125,6 @@ public static Job.Builder createRandomizedJobBuilder() { if (randomBoolean()) { builder.setFinishedTime(new Date(randomNonNegativeLong())); } - if (randomBoolean()) { - builder.setEstablishedModelMemory(randomNonNegativeLong()); - } builder.setAnalysisConfig(AnalysisConfigTests.createRandomized()); builder.setAnalysisLimits(AnalysisLimitsTests.createRandomized()); diff --git a/docs/reference/ml/apis/jobresource.asciidoc b/docs/reference/ml/apis/jobresource.asciidoc index e0c314724e762..6ce6e1e39307e 100644 --- a/docs/reference/ml/apis/jobresource.asciidoc +++ b/docs/reference/ml/apis/jobresource.asciidoc @@ -42,11 +42,6 @@ so do not set the `background_persist_interval` value too low. `description`:: (string) An optional description of the job. -`established_model_memory`:: - (long) The approximate amount of memory resources that have been used for - analytical processing. This field is present only when the analytics have used - a stable amount of memory for several consecutive buckets. - `finished_time`:: (string) If the job closed or failed, this is the time the job finished, otherwise it is `null`. This property is informational; you cannot change its diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index afb56dc6cc8c9..e98773a2ce4de 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -57,8 +57,9 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { public static final String TYPE = "ml"; private static final ParseField JOBS_FIELD = new ParseField("jobs"); private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds"); + private static final ParseField LAST_MEMORY_REFRESH_VERSION_FIELD = new ParseField("last_memory_refresh_version"); - public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap()); + public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), null); // This parser follows the pattern that metadata is parsed leniently (to allow for enhancements) public static final ObjectParser LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new); @@ -66,15 +67,18 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { LENIENT_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOBS_FIELD); LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds, (p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD); + LENIENT_PARSER.declareLong(Builder::setLastMemoryRefreshVersion, LAST_MEMORY_REFRESH_VERSION_FIELD); } private final SortedMap jobs; private final SortedMap datafeeds; + private final Long lastMemoryRefreshVersion; private final GroupOrJobLookup groupOrJobLookup; - private MlMetadata(SortedMap jobs, SortedMap datafeeds) { + private MlMetadata(SortedMap jobs, SortedMap datafeeds, Long lastMemoryRefreshVersion) { this.jobs = Collections.unmodifiableSortedMap(jobs); this.datafeeds = Collections.unmodifiableSortedMap(datafeeds); + this.lastMemoryRefreshVersion = lastMemoryRefreshVersion; this.groupOrJobLookup = new GroupOrJobLookup(jobs.values()); } @@ -112,6 +116,10 @@ public Set expandDatafeedIds(String expression, boolean allowNoDatafeeds .expand(expression, allowNoDatafeeds); } + public Long getLastMemoryRefreshVersion() { + return lastMemoryRefreshVersion; + } + @Override public Version getMinimalSupportedVersion() { return Version.V_6_0_0_alpha1; @@ -145,7 +153,11 @@ public MlMetadata(StreamInput in) throws IOException { datafeeds.put(in.readString(), new DatafeedConfig(in)); } this.datafeeds = datafeeds; - + if (in.getVersion().onOrAfter(Version.V_6_6_0)) { + lastMemoryRefreshVersion = in.readOptionalLong(); + } else { + lastMemoryRefreshVersion = null; + } this.groupOrJobLookup = new GroupOrJobLookup(jobs.values()); } @@ -153,6 +165,9 @@ public MlMetadata(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { writeMap(jobs, out); writeMap(datafeeds, out); + if (out.getVersion().onOrAfter(Version.V_6_6_0)) { + out.writeOptionalLong(lastMemoryRefreshVersion); + } } private static void writeMap(Map map, StreamOutput out) throws IOException { @@ -169,6 +184,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params); mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams); mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams); + if (lastMemoryRefreshVersion != null) { + builder.field(LAST_MEMORY_REFRESH_VERSION_FIELD.getPreferredName(), lastMemoryRefreshVersion); + } return builder; } @@ -185,30 +203,46 @@ public static class MlMetadataDiff implements NamedDiff { final Diff> jobs; final Diff> datafeeds; + final Long lastMemoryRefreshVersion; MlMetadataDiff(MlMetadata before, MlMetadata after) { this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer()); this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer()); + this.lastMemoryRefreshVersion = after.lastMemoryRefreshVersion; } public MlMetadataDiff(StreamInput in) throws IOException { this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new, MlMetadataDiff::readJobDiffFrom); this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new, - MlMetadataDiff::readSchedulerDiffFrom); + MlMetadataDiff::readDatafeedDiffFrom); + if (in.getVersion().onOrAfter(Version.V_6_6_0)) { + lastMemoryRefreshVersion = in.readOptionalLong(); + } else { + lastMemoryRefreshVersion = null; + } } + /** + * Merge the diff with the ML metadata. + * @param part The current ML metadata. + * @return The new ML metadata. + */ @Override public MetaData.Custom apply(MetaData.Custom part) { TreeMap newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs)); TreeMap newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds)); - return new MlMetadata(newJobs, newDatafeeds); + // lastMemoryRefreshVersion always comes from the diff - no need to merge with the old value + return new MlMetadata(newJobs, newDatafeeds, lastMemoryRefreshVersion); } @Override public void writeTo(StreamOutput out) throws IOException { jobs.writeTo(out); datafeeds.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_6_6_0)) { + out.writeOptionalLong(lastMemoryRefreshVersion); + } } @Override @@ -220,7 +254,7 @@ static Diff readJobDiffFrom(StreamInput in) throws IOException { return AbstractDiffable.readDiffFrom(Job::new, in); } - static Diff readSchedulerDiffFrom(StreamInput in) throws IOException { + static Diff readDatafeedDiffFrom(StreamInput in) throws IOException { return AbstractDiffable.readDiffFrom(DatafeedConfig::new, in); } } @@ -233,7 +267,8 @@ public boolean equals(Object o) { return false; MlMetadata that = (MlMetadata) o; return Objects.equals(jobs, that.jobs) && - Objects.equals(datafeeds, that.datafeeds); + Objects.equals(datafeeds, that.datafeeds) && + Objects.equals(lastMemoryRefreshVersion, that.lastMemoryRefreshVersion); } @Override @@ -243,13 +278,14 @@ public final String toString() { @Override public int hashCode() { - return Objects.hash(jobs, datafeeds); + return Objects.hash(jobs, datafeeds, lastMemoryRefreshVersion); } public static class Builder { private TreeMap jobs; private TreeMap datafeeds; + private Long lastMemoryRefreshVersion; public Builder() { jobs = new TreeMap<>(); @@ -263,6 +299,7 @@ public Builder(@Nullable MlMetadata previous) { } else { jobs = new TreeMap<>(previous.jobs); datafeeds = new TreeMap<>(previous.datafeeds); + lastMemoryRefreshVersion = previous.lastMemoryRefreshVersion; } } @@ -382,8 +419,13 @@ private Builder putDatafeeds(Collection datafeeds) { return this; } + public Builder setLastMemoryRefreshVersion(Long lastMemoryRefreshVersion) { + this.lastMemoryRefreshVersion = lastMemoryRefreshVersion; + return this; + } + public MlMetadata build() { - return new MlMetadata(jobs, datafeeds); + return new MlMetadata(jobs, datafeeds, lastMemoryRefreshVersion); } public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) { @@ -420,8 +462,6 @@ void checkJobHasNoDatafeed(String jobId) { } } - - public static MlMetadata getMlMetadata(ClusterState state) { MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(TYPE); if (mlMetadata == null) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index 5b244ba44d5c3..032eef00649dd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -67,7 +67,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public static final ParseField DATA_DESCRIPTION = new ParseField("data_description"); public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField FINISHED_TIME = new ParseField("finished_time"); - public static final ParseField ESTABLISHED_MODEL_MEMORY = new ParseField("established_model_memory"); public static final ParseField MODEL_PLOT_CONFIG = new ParseField("model_plot_config"); public static final ParseField RENORMALIZATION_WINDOW_DAYS = new ParseField("renormalization_window_days"); public static final ParseField BACKGROUND_PERSIST_INTERVAL = new ParseField("background_persist_interval"); @@ -102,7 +101,6 @@ private static ObjectParser createParser(boolean ignoreUnknownFie p -> TimeUtils.parseTimeField(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ValueType.VALUE); parser.declareField(Builder::setFinishedTime, p -> TimeUtils.parseTimeField(p, FINISHED_TIME.getPreferredName()), FINISHED_TIME, ValueType.VALUE); - parser.declareLong(Builder::setEstablishedModelMemory, ESTABLISHED_MODEL_MEMORY); parser.declareObject(Builder::setAnalysisConfig, ignoreUnknownFields ? AnalysisConfig.LENIENT_PARSER : AnalysisConfig.STRICT_PARSER, ANALYSIS_CONFIG); parser.declareObject(Builder::setAnalysisLimits, ignoreUnknownFields ? AnalysisLimits.LENIENT_PARSER : AnalysisLimits.STRICT_PARSER, @@ -140,7 +138,6 @@ private static ObjectParser createParser(boolean ignoreUnknownFie // TODO: Use java.time for the Dates here: x-pack-elasticsearch#829 private final Date createTime; private final Date finishedTime; - private final Long establishedModelMemory; private final AnalysisConfig analysisConfig; private final AnalysisLimits analysisLimits; private final DataDescription dataDescription; @@ -156,7 +153,7 @@ private static ObjectParser createParser(boolean ignoreUnknownFie private final boolean deleting; private Job(String jobId, String jobType, Version jobVersion, List groups, String description, - Date createTime, Date finishedTime, Long establishedModelMemory, + Date createTime, Date finishedTime, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, @@ -169,7 +166,6 @@ private Job(String jobId, String jobType, Version jobVersion, List group this.description = description; this.createTime = createTime; this.finishedTime = finishedTime; - this.establishedModelMemory = establishedModelMemory; this.analysisConfig = analysisConfig; this.analysisLimits = analysisLimits; this.dataDescription = dataDescription; @@ -203,10 +199,9 @@ public Job(StreamInput in) throws IOException { in.readVLong(); } } - if (in.getVersion().onOrAfter(Version.V_6_1_0)) { - establishedModelMemory = in.readOptionalLong(); - } else { - establishedModelMemory = null; + // for removed establishedModelMemory field + if (in.getVersion().onOrAfter(Version.V_6_1_0) && in.getVersion().before(Version.V_7_0_0_alpha1)) { + in.readOptionalLong(); } analysisConfig = new AnalysisConfig(in); analysisLimits = in.readOptionalWriteable(AnalysisLimits::new); @@ -314,16 +309,6 @@ public Date getFinishedTime() { return finishedTime; } - /** - * The established model memory of the job, or null if model - * memory has not reached equilibrium yet. - * - * @return The established model memory of the job - */ - public Long getEstablishedModelMemory() { - return establishedModelMemory; - } - /** * The analysis configuration object * @@ -430,21 +415,6 @@ public Collection allInputFields() { return allFields; } - /** - * Make a best estimate of the job's memory footprint using the information available. - * If a job has an established model memory size, then this is the best estimate. - * Otherwise, assume the maximum model memory limit will eventually be required. - * In either case, a fixed overhead is added to account for the memory required by the - * program code and stack. - * @return an estimate of the memory requirement of this job, in bytes - */ - public long estimateMemoryFootprint() { - if (establishedModelMemory != null && establishedModelMemory > 0) { - return establishedModelMemory + PROCESS_MEMORY_OVERHEAD.getBytes(); - } - return ByteSizeUnit.MB.toBytes(analysisLimits.getModelMemoryLimit()) + PROCESS_MEMORY_OVERHEAD.getBytes(); - } - /** * Returns the timestamp before which data is not accepted by the job. * This is the latest record timestamp minus the job latency. @@ -487,8 +457,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().before(Version.V_7_0_0_alpha1)) { out.writeBoolean(false); } - if (out.getVersion().onOrAfter(Version.V_6_1_0)) { - out.writeOptionalLong(establishedModelMemory); + // for removed establishedModelMemory field + if (out.getVersion().onOrAfter(Version.V_6_1_0) && out.getVersion().before(Version.V_7_0_0_alpha1)) { + out.writeOptionalLong(null); } analysisConfig.writeTo(out); out.writeOptionalWriteable(analysisLimits); @@ -539,9 +510,6 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th builder.timeField(FINISHED_TIME.getPreferredName(), FINISHED_TIME.getPreferredName() + humanReadableSuffix, finishedTime.getTime()); } - if (establishedModelMemory != null) { - builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); - } builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params); if (analysisLimits != null) { builder.field(ANALYSIS_LIMITS.getPreferredName(), analysisLimits, params); @@ -598,7 +566,6 @@ public boolean equals(Object other) { && Objects.equals(this.description, that.description) && Objects.equals(this.createTime, that.createTime) && Objects.equals(this.finishedTime, that.finishedTime) - && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) && Objects.equals(this.analysisConfig, that.analysisConfig) && Objects.equals(this.analysisLimits, that.analysisLimits) && Objects.equals(this.dataDescription, that.dataDescription) @@ -616,7 +583,7 @@ public boolean equals(Object other) { @Override public int hashCode() { - return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, establishedModelMemory, + return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting); @@ -657,7 +624,6 @@ public static class Builder implements Writeable, ToXContentObject { private DataDescription dataDescription; private Date createTime; private Date finishedTime; - private Long establishedModelMemory; private ModelPlotConfig modelPlotConfig; private Long renormalizationWindowDays; private TimeValue backgroundPersistInterval; @@ -687,7 +653,6 @@ public Builder(Job job) { this.dataDescription = job.getDataDescription(); this.createTime = job.getCreateTime(); this.finishedTime = job.getFinishedTime(); - this.establishedModelMemory = job.getEstablishedModelMemory(); this.modelPlotConfig = job.getModelPlotConfig(); this.renormalizationWindowDays = job.getRenormalizationWindowDays(); this.backgroundPersistInterval = job.getBackgroundPersistInterval(); @@ -718,8 +683,9 @@ public Builder(StreamInput in) throws IOException { in.readVLong(); } } - if (in.getVersion().onOrAfter(Version.V_6_1_0)) { - establishedModelMemory = in.readOptionalLong(); + // for removed establishedModelMemory field + if (in.getVersion().onOrAfter(Version.V_6_1_0) && in.getVersion().before(Version.V_7_0_0_alpha1)) { + in.readOptionalLong(); } analysisConfig = in.readOptionalWriteable(AnalysisConfig::new); analysisLimits = in.readOptionalWriteable(AnalysisLimits::new); @@ -803,11 +769,6 @@ public Builder setFinishedTime(Date finishedTime) { return this; } - public Builder setEstablishedModelMemory(Long establishedModelMemory) { - this.establishedModelMemory = establishedModelMemory; - return this; - } - public Builder setDataDescription(DataDescription.Builder description) { dataDescription = ExceptionsHelper.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build(); return this; @@ -913,8 +874,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().before(Version.V_7_0_0_alpha1)) { out.writeBoolean(false); } - if (out.getVersion().onOrAfter(Version.V_6_1_0)) { - out.writeOptionalLong(establishedModelMemory); + // for removed establishedModelMemory field + if (out.getVersion().onOrAfter(Version.V_6_1_0) && out.getVersion().before(Version.V_7_0_0_alpha1)) { + out.writeOptionalLong(null); } out.writeOptionalWriteable(analysisConfig); out.writeOptionalWriteable(analysisLimits); @@ -957,9 +919,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (finishedTime != null) { builder.field(FINISHED_TIME.getPreferredName(), finishedTime.getTime()); } - if (establishedModelMemory != null) { - builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); - } if (analysisConfig != null) { builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params); } @@ -1020,7 +979,6 @@ public boolean equals(Object o) { && Objects.equals(this.dataDescription, that.dataDescription) && Objects.equals(this.createTime, that.createTime) && Objects.equals(this.finishedTime, that.finishedTime) - && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) && Objects.equals(this.modelPlotConfig, that.modelPlotConfig) && Objects.equals(this.renormalizationWindowDays, that.renormalizationWindowDays) && Objects.equals(this.backgroundPersistInterval, that.backgroundPersistInterval) @@ -1036,7 +994,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(id, jobType, jobVersion, groups, description, analysisConfig, analysisLimits, dataDescription, - createTime, finishedTime, establishedModelMemory, modelPlotConfig, renormalizationWindowDays, + createTime, finishedTime, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting); } @@ -1112,11 +1070,6 @@ private void validateGroups() { public Job build(Date createTime) { setCreateTime(createTime); setJobVersion(Version.CURRENT); - // TODO: Maybe we _could_ accept a value for this supplied at create time - it would - // mean cloned jobs that hadn't been edited much would start with an accurate expected size. - // But on the other hand it would mean jobs that were cloned and then completely changed - // would start with a size that was completely wrong. - setEstablishedModelMemory(null); return build(); } @@ -1152,7 +1105,7 @@ public Job build() { } return new Job( - id, jobType, jobVersion, groups, description, createTime, finishedTime, establishedModelMemory, + id, jobType, jobVersion, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index 44e20846f9aa3..5d0b56dd79575 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -57,7 +57,6 @@ public class JobUpdate implements Writeable, ToXContentObject { } // These fields should not be set by a REST request INTERNAL_PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID); - INTERNAL_PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY); INTERNAL_PARSER.declareString(Builder::setModelSnapshotMinVersion, Job.MODEL_SNAPSHOT_MIN_VERSION); INTERNAL_PARSER.declareString(Builder::setJobVersion, Job.JOB_VERSION); INTERNAL_PARSER.declareBoolean(Builder::setClearFinishTime, CLEAR_JOB_FINISH_TIME); @@ -77,7 +76,6 @@ public class JobUpdate implements Writeable, ToXContentObject { private final Map customSettings; private final String modelSnapshotId; private final Version modelSnapshotMinVersion; - private final Long establishedModelMemory; private final Version jobVersion; private final Boolean clearJobFinishTime; @@ -87,8 +85,7 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, @Nullable List categorisationFilters, @Nullable Map customSettings, @Nullable String modelSnapshotId, - @Nullable Version modelSnapshotMinVersion, @Nullable Long establishedModelMemory, - @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime) { + @Nullable Version modelSnapshotMinVersion, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime) { this.jobId = jobId; this.groups = groups; this.description = description; @@ -103,7 +100,6 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String this.customSettings = customSettings; this.modelSnapshotId = modelSnapshotId; this.modelSnapshotMinVersion = modelSnapshotMinVersion; - this.establishedModelMemory = establishedModelMemory; this.jobVersion = jobVersion; this.clearJobFinishTime = clearJobFinishTime; } @@ -135,10 +131,9 @@ public JobUpdate(StreamInput in) throws IOException { } customSettings = in.readMap(); modelSnapshotId = in.readOptionalString(); - if (in.getVersion().onOrAfter(Version.V_6_1_0)) { - establishedModelMemory = in.readOptionalLong(); - } else { - establishedModelMemory = null; + // was establishedModelMemory + if (in.getVersion().onOrAfter(Version.V_6_1_0) && in.getVersion().before(Version.V_7_0_0_alpha1)) { + in.readOptionalLong(); } if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.readBoolean()) { jobVersion = Version.readVersion(in); @@ -181,8 +176,9 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeMap(customSettings); out.writeOptionalString(modelSnapshotId); - if (out.getVersion().onOrAfter(Version.V_6_1_0)) { - out.writeOptionalLong(establishedModelMemory); + // was establishedModelMemory + if (out.getVersion().onOrAfter(Version.V_6_1_0) && out.getVersion().before(Version.V_7_0_0_alpha1)) { + out.writeOptionalLong(null); } if (out.getVersion().onOrAfter(Version.V_6_3_0)) { if (jobVersion != null) { @@ -261,10 +257,6 @@ public Version getModelSnapshotMinVersion() { return modelSnapshotMinVersion; } - public Long getEstablishedModelMemory() { - return establishedModelMemory; - } - public Version getJobVersion() { return jobVersion; } @@ -320,9 +312,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (modelSnapshotMinVersion != null) { builder.field(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshotMinVersion); } - if (establishedModelMemory != null) { - builder.field(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); - } if (jobVersion != null) { builder.field(Job.JOB_VERSION.getPreferredName(), jobVersion); } @@ -374,9 +363,6 @@ public Set getUpdateFields() { if (modelSnapshotMinVersion != null) { updateFields.add(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName()); } - if (establishedModelMemory != null) { - updateFields.add(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName()); - } if (jobVersion != null) { updateFields.add(Job.JOB_VERSION.getPreferredName()); } @@ -452,14 +438,6 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) { if (modelSnapshotMinVersion != null) { builder.setModelSnapshotMinVersion(modelSnapshotMinVersion); } - if (establishedModelMemory != null) { - // An established model memory of zero means we don't actually know the established model memory - if (establishedModelMemory > 0) { - builder.setEstablishedModelMemory(establishedModelMemory); - } else { - builder.setEstablishedModelMemory(null); - } - } if (jobVersion != null) { builder.setJobVersion(jobVersion); } @@ -487,7 +465,6 @@ && updatesDetectors(job) == false && (customSettings == null || Objects.equals(customSettings, job.getCustomSettings())) && (modelSnapshotId == null || Objects.equals(modelSnapshotId, job.getModelSnapshotId())) && (modelSnapshotMinVersion == null || Objects.equals(modelSnapshotMinVersion, job.getModelSnapshotMinVersion())) - && (establishedModelMemory == null || Objects.equals(establishedModelMemory, job.getEstablishedModelMemory())) && (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion())) && ((clearJobFinishTime == null || clearJobFinishTime == false) || job.getFinishedTime() == null); } @@ -536,7 +513,6 @@ public boolean equals(Object other) { && Objects.equals(this.customSettings, that.customSettings) && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) - && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) && Objects.equals(this.jobVersion, that.jobVersion) && Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime); } @@ -545,7 +521,7 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory, jobVersion, clearJobFinishTime); + modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime); } public static class DetectorUpdate implements Writeable, ToXContentObject { @@ -655,7 +631,6 @@ public static class Builder { private Map customSettings; private String modelSnapshotId; private Version modelSnapshotMinVersion; - private Long establishedModelMemory; private Version jobVersion; private Boolean clearJobFinishTime; @@ -738,11 +713,6 @@ public Builder setModelSnapshotMinVersion(String modelSnapshotMinVersion) { return this; } - public Builder setEstablishedModelMemory(Long establishedModelMemory) { - this.establishedModelMemory = establishedModelMemory; - return this; - } - public Builder setJobVersion(Version version) { this.jobVersion = version; return this; @@ -761,7 +731,7 @@ public Builder setClearFinishTime(boolean clearJobFinishTime) { public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory, jobVersion, clearJobFinishTime); + modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index 491be55049c10..70cbf1c088249 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -282,9 +282,6 @@ public static void addJobConfigFields(XContentBuilder builder) throws IOExceptio .startObject(Job.FINISHED_TIME.getPreferredName()) .field(TYPE, DATE) .endObject() - .startObject(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName()) - .field(TYPE, LONG) // TODO should be ByteSizeValue - .endObject() .startObject(Job.MODEL_PLOT_CONFIG.getPreferredName()) .startObject(PROPERTIES) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 400fcc07f4c53..6926ecb98e892 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -188,7 +188,6 @@ public final class ReservedFieldNames { Job.DATA_DESCRIPTION.getPreferredName(), Job.DESCRIPTION.getPreferredName(), Job.FINISHED_TIME.getPreferredName(), - Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), Job.MODEL_PLOT_CONFIG.getPreferredName(), Job.RENORMALIZATION_WINDOW_DAYS.getPreferredName(), Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName(), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java index 55b84995d58f7..13c433d758165 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java @@ -436,10 +436,9 @@ public void testBuilder_withInvalidIndexNameThrows() { public void testBuilder_buildWithCreateTime() { Job.Builder builder = buildJobBuilder("foo"); Date now = new Date(); - Job job = builder.setEstablishedModelMemory(randomNonNegativeLong()).build(now); + Job job = builder.build(now); assertEquals(now, job.getCreateTime()); assertEquals(Version.CURRENT, job.getJobVersion()); - assertNull(job.getEstablishedModelMemory()); } public void testJobWithoutVersion() throws IOException { @@ -514,39 +513,6 @@ public void testInvalidGroup_matchesJobId() { assertEquals(e.getMessage(), "job and group names must be unique but job [foo] and group [foo] have the same name"); } - public void testEstimateMemoryFootprint_GivenEstablished() { - Job.Builder builder = buildJobBuilder("established"); - long establishedModelMemory = randomIntBetween(10_000, 2_000_000_000); - builder.setEstablishedModelMemory(establishedModelMemory); - if (randomBoolean()) { - builder.setAnalysisLimits(new AnalysisLimits(randomNonNegativeLong(), null)); - } - assertEquals(establishedModelMemory + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint()); - } - - public void testEstimateMemoryFootprint_GivenLimitAndNotEstablished() { - Job.Builder builder = buildJobBuilder("limit"); - if (rarely()) { - // An "established" model memory of 0 means "not established". Generally this won't be set, so getEstablishedModelMemory() - // will return null, but if it returns 0 we shouldn't estimate the job's memory requirement to be 0. - builder.setEstablishedModelMemory(0L); - } - ByteSizeValue limit = new ByteSizeValue(randomIntBetween(100, 10000), ByteSizeUnit.MB); - builder.setAnalysisLimits(new AnalysisLimits(limit.getMb(), null)); - assertEquals(limit.getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint()); - } - - public void testEstimateMemoryFootprint_GivenNoLimitAndNotEstablished() { - Job.Builder builder = buildJobBuilder("nolimit"); - if (rarely()) { - // An "established" model memory of 0 means "not established". Generally this won't be set, so getEstablishedModelMemory() - // will return null, but if it returns 0 we shouldn't estimate the job's memory requirement to be 0. - builder.setEstablishedModelMemory(0L); - } - assertEquals(ByteSizeUnit.MB.toBytes(AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB) - + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint()); - } - public void testEarliestValidTimestamp_GivenEmptyDataCounts() { assertThat(createRandomizedJob().earliestValidTimestamp(new DataCounts("foo")), equalTo(0L)); } @@ -618,9 +584,6 @@ public static Job createRandomizedJob() { if (randomBoolean()) { builder.setFinishedTime(new Date(randomNonNegativeLong())); } - if (randomBoolean()) { - builder.setEstablishedModelMemory(randomNonNegativeLong()); - } builder.setAnalysisConfig(AnalysisConfigTests.createRandomized()); builder.setAnalysisLimits(AnalysisLimits.validateAndSetDefaults(AnalysisLimitsTests.createRandomized(), null, AnalysisLimits.DEFAULT_MODEL_MEMORY_LIMIT_MB)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index c6eb42038901b..e249b22a4a896 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -90,9 +90,6 @@ public JobUpdate createRandom(String jobId, @Nullable Job job) { if (useInternalParser && randomBoolean()) { update.setModelSnapshotMinVersion(Version.CURRENT); } - if (useInternalParser && randomBoolean()) { - update.setEstablishedModelMemory(randomNonNegativeLong()); - } if (useInternalParser && randomBoolean()) { update.setJobVersion(randomFrom(Version.CURRENT, Version.V_6_2_0, Version.V_6_1_0)); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java index cc5a9f4f1b469..d1decd4387f8f 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java @@ -6,13 +6,11 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.junit.After; @@ -29,7 +27,7 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase { @After - public void tearDownData() throws Exception { + public void tearDownData() { cleanUp(); } @@ -52,15 +50,6 @@ public void testDefaultRenormalization() throws Exception { // This is the key assertion: if renormalization never happened then the record_score would // be the same as the initial_record_score on the anomaly record that happened earlier assertThat(earlierRecord.getInitialRecordScore(), greaterThan(earlierRecord.getRecordScore())); - - // Since this job ran for 50 buckets, it's a good place to assert - // that established model memory matches model memory in the job stats - assertBusy(() -> { - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(jobId).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); - }); } public void testRenormalizationDisabled() throws Exception { @@ -94,7 +83,7 @@ private void createAndRunJob(String jobId, Long renormalizationWindow) throws Ex closeJob(job.getId()); } - private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan, Long renormalizationWindow) throws Exception { + private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan, Long renormalizationWindow) { Detector.Builder detector = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); analysisConfig.setBucketSpan(bucketSpan); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index ccef7c3f2e181..be760909e3ea9 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; -import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -25,7 +24,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.junit.After; import java.util.ArrayList; @@ -92,15 +90,6 @@ public void testLookbackOnly() throws Exception { }, 60, TimeUnit.SECONDS); waitUntilJobIsClosed(job.getId()); - - // Since this job ran for 168 buckets, it's a good place to assert - // that established model memory matches model memory in the job stats - assertBusy(() -> { - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(job.getId()).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); - }); } public void testRealtime() throws Exception { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java index add0b9e8a93a3..2e39658ab504a 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsDeletedAfterReopeningJobIT.java @@ -33,7 +33,7 @@ public class InterimResultsDeletedAfterReopeningJobIT extends MlNativeAutodetectIntegTestCase { @After - public void cleanUpTest() throws Exception { + public void cleanUpTest() { cleanUp(); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java index fe344bf835991..ec670773f2f7b 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java @@ -7,14 +7,12 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; -import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetOverallBucketsAction; import org.elasticsearch.xpack.core.ml.action.util.PageParams; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.junit.After; import java.util.ArrayList; @@ -36,7 +34,7 @@ public class OverallBucketsIT extends MlNativeAutodetectIntegTestCase { private static final long BUCKET_SPAN_SECONDS = 3600; @After - public void cleanUpTest() throws Exception { + public void cleanUpTest() { cleanUp(); } @@ -99,15 +97,6 @@ public void test() throws Exception { GetOverallBucketsAction.INSTANCE, filteredOverallBucketsRequest).actionGet(); assertThat(filteredOverallBucketsResponse.getOverallBuckets().count(), equalTo(2L)); } - - // Since this job ran for 3000 buckets, it's a good place to assert - // that established model memory matches model memory in the job stats - assertBusy(() -> { - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(job.getId()).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); - }); } private static Map createRecord(long timestamp) { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java index d7a2b857bf359..42bfe4dcde301 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java @@ -7,12 +7,10 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.junit.After; @@ -82,15 +80,6 @@ public void test() throws Exception { }); closeJob(job.getId()); - - // Since these jobs ran for 72 buckets, it's a good place to assert - // that established model memory matches model memory in the job stats - assertBusy(() -> { - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(job.getId()).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); - }); } private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index efe932f4b5c6b..d9075c5a46fe5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -180,6 +180,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.NativeControllerHolder; import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction; @@ -275,6 +276,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu private final SetOnce autodetectProcessManager = new SetOnce<>(); private final SetOnce datafeedManager = new SetOnce<>(); + private final SetOnce memoryTracker = new SetOnce<>(); public MachineLearning(Settings settings, Path configPath) { this.settings = settings; @@ -415,6 +417,8 @@ public Collection createComponents(Client client, ClusterService cluster this.datafeedManager.set(datafeedManager); MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager, autodetectProcessManager); + MlMemoryTracker memoryTracker = new MlMemoryTracker(clusterService, threadPool, jobManager, jobResultsProvider); + this.memoryTracker.set(memoryTracker); // This object's constructor attaches to the license state, so there's no need to retain another reference to it new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedManager, autodetectProcessManager); @@ -433,7 +437,8 @@ public Collection createComponents(Client client, ClusterService cluster jobDataCountsPersister, datafeedManager, auditor, - new MlAssignmentNotifier(auditor, clusterService) + new MlAssignmentNotifier(auditor, clusterService), + memoryTracker ); } @@ -444,8 +449,9 @@ public List> getPersistentTasksExecutor(ClusterServic } return Arrays.asList( - new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get()), - new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get()) + new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(), + memoryTracker.get()), + new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor( datafeedManager.get()) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index edd60cd4756d0..385e33dfe369c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -68,6 +68,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import java.util.ArrayList; @@ -93,6 +94,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction(); } @@ -210,6 +214,9 @@ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJ ActionListener listener) { String jobId = request.getJobId(); + // We clean up the memory tracker on delete rather than close as close is not a master node action + memoryTracker.removeJob(jobId); + // Step 4. When the job has been removed from the cluster state, return a response // ------- CheckedConsumer apiResponseHandler = jobDeleted -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 98a4a9039b37a..f827d67c9f7c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -56,6 +56,7 @@ import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -68,6 +69,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import java.io.IOException; import java.util.ArrayList; @@ -94,21 +96,23 @@ To ensure that a subsequent close job call will see that same task status (and s */ public class TransportOpenJobAction extends TransportMasterNodeAction { + private static final PersistentTasksCustomMetaData.Assignment AWAITING_LAZY_ASSIGNMENT = + new PersistentTasksCustomMetaData.Assignment(null, "persistent task is awaiting node assignment."); + private final XPackLicenseState licenseState; private final PersistentTasksService persistentTasksService; private final Client client; private final JobResultsProvider jobResultsProvider; private final JobConfigProvider jobConfigProvider; - - private static final PersistentTasksCustomMetaData.Assignment AWAITING_LAZY_ASSIGNMENT = - new PersistentTasksCustomMetaData.Assignment(null, "persistent task is awaiting node assignment."); + private final MlMemoryTracker memoryTracker; @Inject public TransportOpenJobAction(TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState, ClusterService clusterService, PersistentTasksService persistentTasksService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client, - JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider) { + JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider, + MlMemoryTracker memoryTracker) { super(OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, OpenJobAction.Request::new); this.licenseState = licenseState; @@ -116,6 +120,7 @@ public TransportOpenJobAction(TransportService transportService, ThreadPool thre this.client = client; this.jobResultsProvider = jobResultsProvider; this.jobConfigProvider = jobConfigProvider; + this.memoryTracker = memoryTracker; } /** @@ -144,6 +149,7 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j int maxConcurrentJobAllocations, int fallbackMaxNumberOfOpenJobs, int maxMachineMemoryPercent, + MlMemoryTracker memoryTracker, Logger logger) { String resultsIndexName = job != null ? job.getResultsIndexName() : null; List unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState); @@ -154,10 +160,38 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j return new PersistentTasksCustomMetaData.Assignment(null, reason); } + // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe + // because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs + boolean allocateByMemory = true; + + if (memoryTracker.isRecentlyRefreshed() == false) { + + boolean scheduledRefresh = memoryTracker.asyncRefresh(ActionListener.wrap( + acknowledged -> { + if (acknowledged) { + logger.trace("Job memory requirement refresh request completed successfully"); + } else { + logger.warn("Job memory requirement refresh request completed but did not set time in cluster state"); + } + }, + e -> logger.error("Failed to refresh job memory requirements", e) + )); + if (scheduledRefresh) { + String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested"; + logger.debug(reason); + return new PersistentTasksCustomMetaData.Assignment(null, reason); + } else { + allocateByMemory = false; + logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled", + jobId); + } + } + List reasons = new LinkedList<>(); long maxAvailableCount = Long.MIN_VALUE; + long maxAvailableMemory = Long.MIN_VALUE; DiscoveryNode minLoadedNodeByCount = null; - + DiscoveryNode minLoadedNodeByMemory = null; PersistentTasksCustomMetaData persistentTasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { Map nodeAttributes = node.getAttributes(); @@ -198,10 +232,9 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j } } - long numberOfAssignedJobs = 0; int numberOfAllocatingJobs = 0; - + long assignedJobMemory = 0; if (persistentTasks != null) { // find all the job tasks assigned to this node Collection> assignedTasks = persistentTasks.findTasks( @@ -232,6 +265,15 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { // Don't count CLOSED or FAILED jobs, as they don't consume native memory ++numberOfAssignedJobs; + OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams(); + Long jobMemoryRequirement = memoryTracker.getJobMemoryRequirement(params.getJobId()); + if (jobMemoryRequirement == null) { + allocateByMemory = false; + logger.debug("Falling back to allocating job [{}] by job counts because " + + "the memory requirement for job [{}] was not available", jobId, params.getJobId()); + } else { + assignedJobMemory += jobMemoryRequirement; + } } } } @@ -272,10 +314,62 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j maxAvailableCount = availableCount; minLoadedNodeByCount = node; } + + String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR); + long machineMemory = -1; + // TODO: remove leniency and reject the node if the attribute is null in 7.0 + if (machineMemoryStr != null) { + try { + machineMemory = Long.parseLong(machineMemoryStr); + } catch (NumberFormatException e) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " + + MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long"; + logger.trace(reason); + reasons.add(reason); + continue; + } + } + + if (allocateByMemory) { + if (machineMemory > 0) { + long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100; + Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(jobId); + if (estimatedMemoryFootprint != null) { + long availableMemory = maxMlMemory - assignedJobMemory; + if (estimatedMemoryFootprint > availableMemory) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + + "], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory + + "], memory required by existing jobs [" + assignedJobMemory + + "], estimated memory required for this job [" + estimatedMemoryFootprint + "]"; + logger.trace(reason); + reasons.add(reason); + continue; + } + + if (maxAvailableMemory < availableMemory) { + maxAvailableMemory = availableMemory; + minLoadedNodeByMemory = node; + } + } else { + // If we cannot get the job memory requirement, + // fall back to simply allocating by job count + allocateByMemory = false; + logger.debug("Falling back to allocating job [{}] by job counts because its memory requirement was not available", + jobId); + } + } else { + // If we cannot get the available memory on any machine in + // the cluster, fall back to simply allocating by job count + allocateByMemory = false; + logger.debug("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]", + jobId, nodeNameAndMlAttributes(node)); + } + } } - if (minLoadedNodeByCount != null) { - logger.debug("selected node [{}] for job [{}]", minLoadedNodeByCount, jobId); - return new PersistentTasksCustomMetaData.Assignment(minLoadedNodeByCount.getId(), ""); + DiscoveryNode minLoadedNode = allocateByMemory ? minLoadedNodeByMemory : minLoadedNodeByCount; + if (minLoadedNode != null) { + logger.debug("selected node [{}] for job [{}]", minLoadedNode, jobId); + return new PersistentTasksCustomMetaData.Assignment(minLoadedNode.getId(), ""); } else { String explanation = String.join("|", reasons); logger.debug("no node selected for job [{}], reasons [{}]", jobId, explanation); @@ -451,41 +545,48 @@ public void onFailure(Exception e) { }; // Start job task - ActionListener establishedMemoryUpdateListener = ActionListener.wrap( - response -> { - persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), - MlTasks.JOB_TASK_NAME, jobParams, waitForJobToStart); - }, - listener::onFailure + ActionListener memoryRequirementRefreshListener = ActionListener.wrap( + mem -> persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), MlTasks.JOB_TASK_NAME, jobParams, + waitForJobToStart), + listener::onFailure + ); + + // Tell the job tracker to refresh the memory requirement for this job and all other jobs that have persistent tasks + ActionListener jobUpdateListener = ActionListener.wrap( + response -> memoryTracker.refreshJobMemoryAndAllOthers(jobParams.getJobId(), memoryRequirementRefreshListener), + listener::onFailure ); - // Update established model memory for pre-6.1 jobs that haven't had it set - // and increase the model memory limit for 6.1 - 6.3 jobs + // Increase the model memory limit for 6.1 - 6.3 jobs ActionListener missingMappingsListener = ActionListener.wrap( response -> { Job job = jobParams.getJob(); if (job != null) { Version jobVersion = job.getJobVersion(); - Long jobEstablishedModelMemory = job.getEstablishedModelMemory(); - if ((jobVersion == null || jobVersion.before(Version.V_6_1_0)) - && (jobEstablishedModelMemory == null || jobEstablishedModelMemory == 0)) { - jobResultsProvider.getEstablishedMemoryUsage(job.getId(), null, null, establishedModelMemory -> { - if (establishedModelMemory != null && establishedModelMemory > 0) { - JobUpdate update = new JobUpdate.Builder(job.getId()) - .setEstablishedModelMemory(establishedModelMemory).build(); - UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(job.getId(), update); - - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, - establishedMemoryUpdateListener); - } else { - establishedMemoryUpdateListener.onResponse(null); - } - }, listener::onFailure); - } else { - establishedMemoryUpdateListener.onResponse(null); + if (jobVersion != null && + (jobVersion.onOrAfter(Version.V_6_1_0) && jobVersion.before(Version.V_6_3_0))) { + // Increase model memory limit if < 512MB + if (job.getAnalysisLimits() != null && job.getAnalysisLimits().getModelMemoryLimit() != null && + job.getAnalysisLimits().getModelMemoryLimit() < 512L) { + + long updatedModelMemoryLimit = (long) (job.getAnalysisLimits().getModelMemoryLimit() * 1.3); + AnalysisLimits limits = new AnalysisLimits(updatedModelMemoryLimit, + job.getAnalysisLimits().getCategorizationExamplesLimit()); + + JobUpdate update = new JobUpdate.Builder(job.getId()).setJobVersion(Version.CURRENT) + .setAnalysisLimits(limits).build(); + UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(job.getId(), update); + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, + jobUpdateListener); + } else { + jobUpdateListener.onResponse(null); + } + } + else { + jobUpdateListener.onResponse(null); } } else { - establishedMemoryUpdateListener.onResponse(null); + jobUpdateListener.onResponse(null); } }, listener::onFailure ); @@ -629,6 +730,7 @@ private void addDocMappingIfMissing(String alias, CheckedSupplier { private final AutodetectProcessManager autodetectProcessManager; + private final MlMemoryTracker memoryTracker; /** * The maximum number of open jobs can be different on each node. However, nodes on older versions @@ -642,9 +744,10 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut private volatile int maxLazyMLNodes; public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService, - AutodetectProcessManager autodetectProcessManager) { + AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) { super(MlTasks.JOB_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME); this.autodetectProcessManager = autodetectProcessManager; + this.memoryTracker = memoryTracker; this.fallbackMaxNumberOfOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings); this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings); this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings); @@ -658,11 +761,17 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS @Override public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clusterState) { - PersistentTasksCustomMetaData.Assignment assignment =selectLeastLoadedMlNode(params.getJobId(), params.getJob(), clusterState, - maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs, maxMachineMemoryPercent, logger); + PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), + params.getJob(), + clusterState, + maxConcurrentJobAllocations, + fallbackMaxNumberOfOpenJobs, + maxMachineMemoryPercent, + memoryTracker, + logger); if (assignment.getExecutorNode() == null) { int numMlNodes = 0; - for(DiscoveryNode node : clusterState.getNodes()) { + for (DiscoveryNode node : clusterState.getNodes()) { if (Boolean.valueOf(node.getAttributes().get(MachineLearning.ML_ENABLED_NODE_ATTR))) { numMlNodes++; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 0e9d71b93b2da..7d05f2d02a126 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -66,7 +66,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -580,25 +579,17 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList // Step 1. update the job // ------- - Consumer updateJobHandler = response -> { - JobUpdate update = new JobUpdate.Builder(request.getJobId()) - .setModelSnapshotId(modelSnapshot.getSnapshotId()) - .setEstablishedModelMemory(response) - .build(); - - jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap( - job -> { - auditor.info(request.getJobId(), - Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); - updateHandler.accept(Boolean.TRUE); - }, - actionListener::onFailure - )); - }; - - // Step 0. Find the appropriate established model memory for the reverted job - // ------- - jobResultsProvider.getEstablishedMemoryUsage(request.getJobId(), modelSizeStats.getTimestamp(), modelSizeStats, updateJobHandler, - actionListener::onFailure); + JobUpdate update = new JobUpdate.Builder(request.getJobId()) + .setModelSnapshotId(modelSnapshot.getSnapshotId()) + .build(); + + jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap( + job -> { + auditor.info(request.getJobId(), + Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); + updateHandler.accept(Boolean.TRUE); + }, + actionListener::onFailure + )); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 24abeb9d45b47..8b0de68fb582b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -521,8 +521,7 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService, onProcessCrash(jobTask)); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( - client, auditor, jobId, renormalizer, jobResultsPersister, jobResultsProvider, autodetectParams.modelSizeStats(), - autodetectParams.modelSnapshot() != null); + client, auditor, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); ExecutorService autodetectWorkerExecutor; try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index a60c937209c21..f536b79547736 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -20,9 +20,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; @@ -41,7 +38,6 @@ import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; @@ -56,7 +52,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -85,20 +80,11 @@ public class AutoDetectResultProcessor { private static final Logger LOGGER = LogManager.getLogger(AutoDetectResultProcessor.class); - /** - * This is how far behind real-time we'll update the job with the latest established model memory. - * If more updates are received during the delay period then they'll take precedence. - * As a result there will be at most one update of established model memory per delay period. - */ - private static final TimeValue ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY = TimeValue.timeValueSeconds(5); - private final Client client; private final Auditor auditor; private final String jobId; private final Renormalizer renormalizer; private final JobResultsPersister persister; - private final JobResultsProvider jobResultsProvider; - private final boolean restoredSnapshot; final CountDownLatch completionLatch = new CountDownLatch(1); final Semaphore updateModelSnapshotSemaphore = new Semaphore(1); @@ -112,30 +98,21 @@ public class AutoDetectResultProcessor { * New model size stats are read as the process is running */ private volatile ModelSizeStats latestModelSizeStats; - private volatile Date latestDateForEstablishedModelMemoryCalc; - private volatile long latestEstablishedModelMemory; - private volatile boolean haveNewLatestModelSizeStats; - private Future scheduledEstablishedModelMemoryUpdate; // only accessed in synchronized methods public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, - JobResultsPersister persister, JobResultsProvider jobResultsProvider, - ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) { - this(client, auditor, jobId, renormalizer, persister, jobResultsProvider, latestModelSizeStats, - restoredSnapshot, new FlushListener()); + JobResultsPersister persister, ModelSizeStats latestModelSizeStats) { + this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, new FlushListener()); } AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, - JobResultsPersister persister, JobResultsProvider jobResultsProvider, ModelSizeStats latestModelSizeStats, - boolean restoredSnapshot, FlushListener flushListener) { + JobResultsPersister persister, ModelSizeStats latestModelSizeStats, FlushListener flushListener) { this.client = Objects.requireNonNull(client); this.auditor = Objects.requireNonNull(auditor); this.jobId = Objects.requireNonNull(jobId); this.renormalizer = Objects.requireNonNull(renormalizer); this.persister = Objects.requireNonNull(persister); - this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); this.flushListener = Objects.requireNonNull(flushListener); this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats); - this.restoredSnapshot = restoredSnapshot; } public void process(AutodetectProcess process) { @@ -230,17 +207,7 @@ void processResult(Context context, AutodetectResult result) { // persist after deleting interim results in case the new // results are also interim context.bulkResultsPersister.persistBucket(bucket).executeRequest(); - latestDateForEstablishedModelMemoryCalc = bucket.getTimestamp(); ++bucketCount; - - // if we haven't previously set established model memory, consider trying again after - // a reasonable number of buckets have elapsed since the last model size stats update - long minEstablishedTimespanMs = JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket.getBucketSpan() * 1000L; - if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 && latestDateForEstablishedModelMemoryCalc.getTime() - > latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) { - scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY); - haveNewLatestModelSizeStats = false; - } } List records = result.getRecords(); if (records != null && !records.isEmpty()) { @@ -331,15 +298,6 @@ private void processModelSizeStats(Context context, ModelSizeStats modelSizeStat persister.persistModelSizeStats(modelSizeStats); notifyModelMemoryStatusChange(context, modelSizeStats); latestModelSizeStats = modelSizeStats; - latestDateForEstablishedModelMemoryCalc = modelSizeStats.getTimestamp(); - haveNewLatestModelSizeStats = true; - - // This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets - // because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and - // we'll NEVER consider memory usage to be established during this period - if (restoredSnapshot || bucketCount >= JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { - scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY); - } } private void notifyModelMemoryStatusChange(Context context, ModelSizeStats modelSizeStats) { @@ -366,12 +324,11 @@ protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { return; } - Map update = new HashMap<>(); + Map update = new HashMap<>(); update.put(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshot.getSnapshotId()); update.put(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshot.getMinVersion().toString()); - updateJob(jobId, Collections.singletonMap(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshot.getSnapshotId()), - new ActionListener() { + updateJob(jobId, update, new ActionListener() { @Override public void onResponse(UpdateResponse updateResponse) { updateModelSnapshotSemaphore.release(); @@ -387,67 +344,11 @@ public void onFailure(Exception e) { }); } - /** - * The purpose of this method is to avoid saturating the cluster state update thread - * when a lookback job is churning through buckets very fast and the memory usage of - * the job is changing regularly. The idea is to only update the established model - * memory associated with the job a few seconds after the new value has been received. - * If more updates are received during the delay period then they simply replace the - * value that originally caused the update to be scheduled. This rate limits cluster - * state updates due to established model memory changing to one per job per delay period. - * (In reality updates will only occur this rapidly during lookback. During real-time - * operation the limit of one model size stats document per bucket will mean there is a - * maximum of one cluster state update per job per bucket, and usually the bucket span - * is 5 minutes or more.) - * @param delay The delay before updating established model memory. - */ - synchronized void scheduleEstablishedModelMemoryUpdate(TimeValue delay) { - - if (scheduledEstablishedModelMemoryUpdate == null) { - try { - scheduledEstablishedModelMemoryUpdate = client.threadPool().schedule(delay, MachineLearning.UTILITY_THREAD_POOL_NAME, - () -> runEstablishedModelMemoryUpdate(false)); - LOGGER.trace("[{}] Scheduled established model memory update to run in [{}]", jobId, delay); - } catch (EsRejectedExecutionException e) { - if (e.isExecutorShutdown()) { - LOGGER.debug("failed to schedule established model memory update; shutting down", e); - } else { - throw e; - } - } - } - } - - /** - * This method is called from two places: - * - From the {@link Future} used for delayed updates - * - When shutting down this result processor - * When shutting down the result processor it's only necessary to do anything - * if an update has been scheduled, but we want to do the update immediately. - * Despite cancelling the scheduled update in this case, it's possible that - * it's already started running, in which case this method will get called - * twice in quick succession. But the second call will do nothing, as - * scheduledEstablishedModelMemoryUpdate will have been reset - * to null by the first call. - */ - private synchronized void runEstablishedModelMemoryUpdate(boolean cancelExisting) { - - if (scheduledEstablishedModelMemoryUpdate != null) { - if (cancelExisting) { - LOGGER.debug("[{}] Bringing forward previously scheduled established model memory update", jobId); - FutureUtils.cancel(scheduledEstablishedModelMemoryUpdate); - } - scheduledEstablishedModelMemoryUpdate = null; - updateEstablishedModelMemoryOnJob(); - } - } - private void onAutodetectClose() { onCloseActionsLatch = new CountDownLatch(1); ActionListener updateListener = ActionListener.wrap( updateResponse -> { - runEstablishedModelMemoryUpdate(true); onCloseActionsLatch.countDown(); }, e -> { @@ -462,35 +363,6 @@ private void onAutodetectClose() { ); } - private void updateEstablishedModelMemoryOnJob() { - - // Copy these before committing writes, so the calculation is done based on committed documents - Date latestBucketTimestamp = latestDateForEstablishedModelMemoryCalc; - ModelSizeStats modelSizeStatsForCalc = latestModelSizeStats; - - // We need to make all results written up to and including these stats available for the established memory calculation - persister.commitResultWrites(jobId); - - jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> { - if (latestEstablishedModelMemory != establishedModelMemory) { - updateJob(jobId, Collections.singletonMap(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory), - new ActionListener() { - @Override - public void onResponse(UpdateResponse response) { - latestEstablishedModelMemory = establishedModelMemory; - LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); - } - - @Override - public void onFailure(Exception e) { - LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + - establishedModelMemory + "]", e); - } - }); - } - }, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e)); - } - private void updateJob(String jobId, Map update, ActionListener listener) { UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java new file mode 100644 index 0000000000000..63f0fac27d8e8 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -0,0 +1,325 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.process; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.ack.AckedRequest; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * This class keeps track of the memory requirement of ML jobs. + * It only functions on the master node - for this reason it should only be used by master node actions. + * The memory requirement for ML jobs can be updated in 3 ways: + * 1. For all open ML jobs (via {@link #asyncRefresh}) + * 2. For all open ML jobs, plus one named ML job that is not open (via {@link #refreshJobMemoryAndAllOthers}) + * 3. For one named ML job (via {@link #refreshJobMemory}) + * In all cases a listener informs the caller when the requested updates are complete. + */ +public class MlMemoryTracker implements LocalNodeMasterListener { + + private static final AckedRequest ACKED_REQUEST = new AckedRequest() { + @Override + public TimeValue ackTimeout() { + return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT; + } + + @Override + public TimeValue masterNodeTimeout() { + return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT; + } + }; + + private static final Duration RECENT_UPDATE_THRESHOLD = Duration.ofMinutes(1); + + private final Logger logger = LogManager.getLogger(MlMemoryTracker.class); + private final ConcurrentHashMap memoryRequirementByJob = new ConcurrentHashMap<>(); + private final List> fullRefreshCompletionListeners = new ArrayList<>(); + + private final ThreadPool threadPool; + private final ClusterService clusterService; + private final JobManager jobManager; + private final JobResultsProvider jobResultsProvider; + private volatile boolean isMaster; + private volatile Instant lastUpdateTime; + + public MlMemoryTracker(ClusterService clusterService, ThreadPool threadPool, JobManager jobManager, + JobResultsProvider jobResultsProvider) { + this.threadPool = threadPool; + this.clusterService = clusterService; + this.jobManager = jobManager; + this.jobResultsProvider = jobResultsProvider; + clusterService.addLocalNodeMasterListener(this); + } + + @Override + public void onMaster() { + isMaster = true; + logger.trace("ML memory tracker on master"); + } + + @Override + public void offMaster() { + isMaster = false; + logger.trace("ML memory tracker off master"); + memoryRequirementByJob.clear(); + lastUpdateTime = null; + } + + @Override + public String executorName() { + return MachineLearning.UTILITY_THREAD_POOL_NAME; + } + + /** + * Is the information in this object sufficiently up to date + * for valid allocation decisions to be made using it? + */ + public boolean isRecentlyRefreshed() { + Instant localLastUpdateTime = lastUpdateTime; + return localLastUpdateTime != null && localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).isAfter(Instant.now()); + } + + /** + * Get the memory requirement for a job. + * This method only works on the master node. + * @param jobId The job ID. + * @return The memory requirement of the job specified by {@code jobId}, + * or null if it cannot be calculated. + */ + public Long getJobMemoryRequirement(String jobId) { + + if (isMaster == false) { + return null; + } + + Long memoryRequirement = memoryRequirementByJob.get(jobId); + if (memoryRequirement != null) { + return memoryRequirement; + } + + return null; + } + + /** + * Remove any memory requirement that is stored for the specified job. + * It doesn't matter if this method is called for a job that doesn't have + * a stored memory requirement. + */ + public void removeJob(String jobId) { + memoryRequirementByJob.remove(jobId); + } + + /** + * Uses a separate thread to refresh the memory requirement for every ML job that has + * a corresponding persistent task. This method only works on the master node. + * @param listener Will be called when the async refresh completes or fails. The + * boolean value indicates whether the cluster state was updated + * with the refresh completion time. (If it was then this will in + * cause the persistent tasks framework to check if any persistent + * tasks are awaiting allocation.) + * @return true if the async refresh is scheduled, and false + * if this is not possible for some reason. + */ + public boolean asyncRefresh(ActionListener listener) { + + if (isMaster) { + try { + ActionListener mlMetaUpdateListener = ActionListener.wrap( + aVoid -> recordUpdateTimeInClusterState(listener), + listener::onFailure + ); + threadPool.executor(executorName()).execute( + () -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE), mlMetaUpdateListener)); + return true; + } catch (EsRejectedExecutionException e) { + logger.debug("Couldn't schedule ML memory update - node might be shutting down", e); + } + } + + return false; + } + + /** + * This refreshes the memory requirement for every ML job that has a corresponding + * persistent task and, in addition, one job that doesn't have a persistent task. + * This method only works on the master node. + * @param jobId The job ID of the job whose memory requirement is to be refreshed + * despite not having a corresponding persistent task. + * @param listener Receives the memory requirement of the job specified by {@code jobId}, + * or null if it cannot be calculated. + */ + public void refreshJobMemoryAndAllOthers(String jobId, ActionListener listener) { + + if (isMaster == false) { + listener.onResponse(null); + return; + } + + PersistentTasksCustomMetaData persistentTasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + refresh(persistentTasks, ActionListener.wrap(aVoid -> refreshJobMemory(jobId, listener), listener::onFailure)); + } + + /** + * This refreshes the memory requirement for every ML job that has a corresponding persistent task. + * It does NOT remove entries for jobs that no longer have a persistent task, because that would + * lead to a race where a job was opened part way through the refresh. (Instead, entries are removed + * when jobs are deleted.) + */ + void refresh(PersistentTasksCustomMetaData persistentTasks, ActionListener onCompletion) { + + synchronized (fullRefreshCompletionListeners) { + fullRefreshCompletionListeners.add(onCompletion); + if (fullRefreshCompletionListeners.size() > 1) { + // A refresh is already in progress, so don't do another + return; + } + } + + ActionListener refreshComplete = ActionListener.wrap(aVoid -> { + lastUpdateTime = Instant.now(); + synchronized (fullRefreshCompletionListeners) { + assert fullRefreshCompletionListeners.isEmpty() == false; + for (ActionListener listener : fullRefreshCompletionListeners) { + listener.onResponse(null); + } + fullRefreshCompletionListeners.clear(); + } + }, onCompletion::onFailure); + + // persistentTasks will be null if there's never been a persistent task created in this cluster + if (persistentTasks == null) { + refreshComplete.onResponse(null); + } else { + List> mlJobTasks = persistentTasks.tasks().stream() + .filter(task -> MlTasks.JOB_TASK_NAME.equals(task.getTaskName())).collect(Collectors.toList()); + iterateMlJobTasks(mlJobTasks.iterator(), refreshComplete); + } + } + + private void recordUpdateTimeInClusterState(ActionListener listener) { + + clusterService.submitStateUpdateTask("ml-memory-last-update-time", + new AckedClusterStateUpdateTask(ACKED_REQUEST, listener) { + @Override + protected Boolean newResponse(boolean acknowledged) { + return acknowledged; + } + + @Override + public ClusterState execute(ClusterState currentState) { + MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState); + MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); + builder.setLastMemoryRefreshVersion(currentState.getVersion() + 1); + MlMetadata newMlMetadata = builder.build(); + if (newMlMetadata.equals(currentMlMetadata)) { + // Return same reference if nothing has changed + return currentState; + } else { + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMlMetadata).build()); + return newState.build(); + } + } + }); + } + + private void iterateMlJobTasks(Iterator> iterator, + ActionListener refreshComplete) { + if (iterator.hasNext()) { + OpenJobAction.JobParams jobParams = (OpenJobAction.JobParams) iterator.next().getParams(); + refreshJobMemory(jobParams.getJobId(), + ActionListener.wrap(mem -> iterateMlJobTasks(iterator, refreshComplete), refreshComplete::onFailure)); + } else { + refreshComplete.onResponse(null); + } + } + + /** + * Refresh the memory requirement for a single job. + * This method only works on the master node. + * @param jobId The ID of the job to refresh the memory requirement for. + * @param listener Receives the job's memory requirement, or null + * if it cannot be calculated. + */ + public void refreshJobMemory(String jobId, ActionListener listener) { + if (isMaster == false) { + listener.onResponse(null); + return; + } + + try { + jobResultsProvider.getEstablishedMemoryUsage(jobId, null, null, + establishedModelMemoryBytes -> { + if (establishedModelMemoryBytes <= 0L) { + setJobMemoryToLimit(jobId, listener); + } else { + Long memoryRequirementBytes = establishedModelMemoryBytes + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); + memoryRequirementByJob.put(jobId, memoryRequirementBytes); + listener.onResponse(memoryRequirementBytes); + } + }, + e -> { + logger.error("[" + jobId + "] failed to calculate job established model memory requirement", e); + setJobMemoryToLimit(jobId, listener); + } + ); + } catch (Exception e) { + logger.error("[" + jobId + "] failed to calculate job established model memory requirement", e); + setJobMemoryToLimit(jobId, listener); + } + } + + private void setJobMemoryToLimit(String jobId, ActionListener listener) { + jobManager.getJob(jobId, ActionListener.wrap(job -> { + Long memoryLimitMb = job.getAnalysisLimits().getModelMemoryLimit(); + if (memoryLimitMb != null) { + Long memoryRequirementBytes = ByteSizeUnit.MB.toBytes(memoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); + memoryRequirementByJob.put(jobId, memoryRequirementBytes); + listener.onResponse(memoryRequirementBytes); + } else { + memoryRequirementByJob.remove(jobId); + listener.onResponse(null); + } + }, e -> { + if (e instanceof ResourceNotFoundException) { + // TODO: does this also happen if the .ml-config index exists but is unavailable? + logger.trace("[{}] job deleted during ML memory update", jobId); + } else { + logger.error("[" + jobId + "] failed to get job during ML memory update", e); + } + memoryRequirementByJob.remove(jobId); + listener.onResponse(null); + })); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index c7ca2ff805eba..eb58221bf5f35 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -69,6 +69,9 @@ protected MlMetadata createTestInstance() { builder.putJob(job, false); } } + if (randomBoolean()) { + builder.setLastMemoryRefreshVersion(randomNonNegativeLong()); + } return builder.build(); } @@ -438,8 +441,9 @@ protected MlMetadata mutateInstance(MlMetadata instance) { for (Map.Entry entry : datafeeds.entrySet()) { metadataBuilder.putDatafeed(entry.getValue(), Collections.emptyMap()); } + metadataBuilder.setLastMemoryRefreshVersion(instance.getLastMemoryRefreshVersion()); - switch (between(0, 1)) { + switch (between(0, 2)) { case 0: metadataBuilder.putJob(JobTests.createRandomizedJob(), true); break; @@ -459,6 +463,13 @@ protected MlMetadata mutateInstance(MlMetadata instance) { metadataBuilder.putJob(randomJob, false); metadataBuilder.putDatafeed(datafeedConfig, Collections.emptyMap()); break; + case 2: + if (instance.getLastMemoryRefreshVersion() == null) { + metadataBuilder.setLastMemoryRefreshVersion(randomNonNegativeLong()); + } else { + metadataBuilder.setLastMemoryRefreshVersion(null); + } + break; default: throw new AssertionError("Illegal randomisation branch"); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 32ae117f9fd45..84d2ecaf918f9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -50,7 +50,9 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; +import org.junit.Before; import java.io.IOException; import java.net.InetAddress; @@ -71,6 +73,14 @@ public class TransportOpenJobActionTests extends ESTestCase { + private MlMemoryTracker memoryTracker; + + @Before + public void setup() { + memoryTracker = mock(MlMemoryTracker.class); + when(memoryTracker.isRecentlyRefreshed()).thenReturn(true); + } + public void testValidate_jobMissing() { expectThrows(ResourceNotFoundException.class, () -> TransportOpenJobAction.validate("job_id2", null)); } @@ -125,7 +135,7 @@ public void testSelectLeastLoadedMlNode_byCount() { jobBuilder.setJobVersion(Version.CURRENT); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", jobBuilder.build(), - cs.build(), 2, 10, 30, logger); + cs.build(), 2, 10, 30, memoryTracker, logger); assertEquals("", result.getExplanation()); assertEquals("_node_id3", result.getExecutorNode()); } @@ -161,7 +171,7 @@ public void testSelectLeastLoadedMlNode_maxCapacity() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id0", new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date()); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), 2, - maxRunningJobsPerNode, 30, logger); + maxRunningJobsPerNode, 30, memoryTracker, logger); assertNull(result.getExecutorNode()); assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]")); @@ -187,7 +197,7 @@ public void testSelectLeastLoadedMlNode_noMlNodes() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 10, 30, memoryTracker, logger); assertTrue(result.getExplanation().contains("because this node isn't a ml node")); assertNull(result.getExecutorNode()); } @@ -221,7 +231,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id6", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); ClusterState cs = csBuilder.build(); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 10, 30, memoryTracker, logger); assertEquals("_node_id3", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); @@ -231,7 +241,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -242,7 +252,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger); assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -253,7 +263,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger); assertNull("no node selected, because null state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -291,7 +301,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); // Allocation won't be possible if the stale failed job is treated as opening - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger); assertEquals("_node_id1", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); @@ -301,7 +311,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 10, 30, memoryTracker, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -332,7 +342,8 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30, + memoryTracker, logger); assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]")); assertNull(result.getExecutorNode()); } @@ -362,7 +373,7 @@ public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion() metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", job, cs.build(), - 2, 10, 30, logger); + 2, 10, 30, memoryTracker, logger); assertThat(result.getExplanation(), containsString( "because the job's model snapshot requires a node of version [6.3.0] or higher")); assertNull(result.getExecutorNode()); @@ -389,7 +400,8 @@ public void testSelectLeastLoadedMlNode_jobWithRulesButNoNodeMeetsRequiredVersio cs.metaData(metaData); Job job = jobWithRules("job_with_rules"); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, memoryTracker, + logger); assertThat(result.getExplanation(), containsString( "because jobs using custom_rules require a node of version [6.4.0] or higher")); assertNull(result.getExecutorNode()); @@ -416,7 +428,8 @@ public void testSelectLeastLoadedMlNode_jobWithRulesAndNodeMeetsRequiredVersion( cs.metaData(metaData); Job job = jobWithRules("job_with_rules"); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, memoryTracker, + logger); assertNotNull(result.getExecutorNode()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index f1f3ff77c9840..505a2b871da0b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -89,7 +89,7 @@ public void createComponents() throws Exception { renormalizer = mock(Renormalizer.class); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer, - new JobResultsPersister(client()), jobResultsProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) { + new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build()) { @Override protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot); @@ -100,7 +100,7 @@ protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { } @After - public void deleteJob() throws Exception { + public void deleteJob() { DeleteJobAction.Request request = new DeleteJobAction.Request(JOB_ID); AcknowledgedResponse response = client().execute(DeleteJobAction.INSTANCE, request).actionGet(); assertTrue(response.isAcknowledged()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 2e14289da705e..5e4d8fd06030c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -8,10 +8,13 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentHelper; @@ -31,21 +34,32 @@ import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; public class MlDistributedFailureIT extends BaseMlIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(), 4) + .build(); + } + public void testFailOver() throws Exception { internalCluster().ensureAtLeastNumDataNodes(3); ensureStableClusterOnAllNodes(3); @@ -58,8 +72,6 @@ public void testFailOver() throws Exception { }); } - @TestLogging("org.elasticsearch.xpack.ml.action:DEBUG,org.elasticsearch.xpack.persistent:TRACE," + - "org.elasticsearch.xpack.ml.datafeed:TRACE") public void testLoseDedicatedMasterNode() throws Exception { internalCluster().ensureAtMostNumDataNodes(0); logger.info("Starting dedicated master node..."); @@ -136,12 +148,12 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { // Job state is opened but the job is not assigned to a node (because we just killed the only ML node) GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId); GetJobsStatsAction.Response jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); - assertEquals(jobStatsResponse.getResponse().results().get(0).getState(), JobState.OPENED); + assertEquals(JobState.OPENED, jobStatsResponse.getResponse().results().get(0).getState()); GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); GetDatafeedsStatsAction.Response datafeedStatsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); - assertEquals(datafeedStatsResponse.getResponse().results().get(0).getDatafeedState(), DatafeedState.STARTED); + assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); // Can't normal stop an unassigned datafeed StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); @@ -170,6 +182,73 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { assertTrue(closeJobResponse.isClosed()); } + @TestLogging("org.elasticsearch.xpack.ml.action:TRACE,org.elasticsearch.xpack.ml.process:TRACE") + public void testJobRelocationIsMemoryAware() throws Exception { + + internalCluster().ensureAtLeastNumDataNodes(1); + ensureStableClusterOnAllNodes(1); + + // Open 4 small jobs. Since there is only 1 node in the cluster they'll have to go on that node. + + setupJobWithoutDatafeed("small1", new ByteSizeValue(2, ByteSizeUnit.MB)); + setupJobWithoutDatafeed("small2", new ByteSizeValue(2, ByteSizeUnit.MB)); + setupJobWithoutDatafeed("small3", new ByteSizeValue(2, ByteSizeUnit.MB)); + setupJobWithoutDatafeed("small4", new ByteSizeValue(2, ByteSizeUnit.MB)); + + // Expand the cluster to 3 nodes. The 4 small jobs will stay on the + // same node because we don't rebalance jobs that are happily running. + + internalCluster().ensureAtLeastNumDataNodes(3); + ensureStableClusterOnAllNodes(3); + + // Open a big job. This should go on a different node to the 4 small ones. + + setupJobWithoutDatafeed("big1", new ByteSizeValue(500, ByteSizeUnit.MB)); + + // Stop the current master node - this should be the one with the 4 small jobs on. + + internalCluster().stopCurrentMasterNode(); + ensureStableClusterOnAllNodes(2); + + // If memory requirements are used to reallocate the 4 small jobs (as we expect) then they should + // all reallocate to the same node, that being the one that doesn't have the big job on. If job counts + // are used to reallocate the small jobs then this implies the fallback allocation mechanism has been + // used in a situation we don't want it to be used in, and at least one of the small jobs will be on + // the same node as the big job. (This all relies on xpack.ml.node_concurrent_job_allocations being set + // to at least 4, which we do in the nodeSettings() method.) + + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(MetaData.ALL)).actionGet(); + QueryPage jobStats = statsResponse.getResponse(); + assertNotNull(jobStats); + List smallJobNodes = jobStats.results().stream().filter(s -> s.getJobId().startsWith("small") && s.getNode() != null) + .map(s -> s.getNode().getName()).collect(Collectors.toList()); + List bigJobNodes = jobStats.results().stream().filter(s -> s.getJobId().startsWith("big") && s.getNode() != null) + .map(s -> s.getNode().getName()).collect(Collectors.toList()); + logger.info("small job nodes: " + smallJobNodes + ", big job nodes: " + bigJobNodes); + assertEquals(5, jobStats.count()); + assertEquals(4, smallJobNodes.size()); + assertEquals(1, bigJobNodes.size()); + assertEquals(1L, smallJobNodes.stream().distinct().count()); + assertEquals(1L, bigJobNodes.stream().distinct().count()); + assertNotEquals(smallJobNodes, bigJobNodes); + }); + } + + private void setupJobWithoutDatafeed(String jobId, ByteSizeValue modelMemoryLimit) throws Exception { + Job.Builder job = createFareQuoteJob(jobId, modelMemoryLimit); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job); + client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); + + client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).actionGet(); + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); + assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); + }); + } + private void setupJobAndDatafeed(String jobId, String datafeedId) throws Exception { Job.Builder job = createScheduledJob(jobId); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); @@ -183,7 +262,7 @@ private void setupJobAndDatafeed(String jobId, String datafeedId) throws Excepti assertBusy(() -> { GetJobsStatsAction.Response statsResponse = client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); - assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED); + assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); }); StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 87aa3c5b926e3..c4150d633a8f0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -123,12 +123,10 @@ public void testLazyNodeValidation() throws Exception { }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/34084") public void testSingleNode() throws Exception { verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 100)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/34084") public void testMultipleNodes() throws Exception { verifyMaxNumberOfJobsLimit(3, randomIntBetween(1, 100)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 05f83f4ae49cc..761cb56fa8804 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; @@ -56,9 +55,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Consumer; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; @@ -85,7 +82,6 @@ public class AutoDetectResultProcessorTests extends ESTestCase { private Auditor auditor; private Renormalizer renormalizer; private JobResultsPersister persister; - private JobResultsProvider jobResultsProvider; private FlushListener flushListener; private AutoDetectResultProcessor processorUnderTest; private ScheduledThreadPoolExecutor executor; @@ -95,9 +91,10 @@ public void setUpMocks() { executor = new ScheduledThreadPoolExecutor(1); client = mock(Client.class); doAnswer(invocation -> { - ActionListener listener = (ActionListener) invocation.getArguments()[2]; - listener.onResponse(new UpdateResponse()); - return null; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(new UpdateResponse()); + return null; }).when(client).execute(same(UpdateAction.INSTANCE), any(), any()); threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); @@ -113,10 +110,9 @@ public void setUpMocks() { persister = mock(JobResultsPersister.class); when(persister.persistModelSnapshot(any(), any())) .thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true)); - jobResultsProvider = mock(JobResultsProvider.class); flushListener = mock(FlushListener.class); - processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, jobResultsProvider, - new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), false, flushListener); + processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, + new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), flushListener); } @After @@ -300,8 +296,6 @@ public void testProcessResult_modelSizeStats() { verify(persister, times(1)).persistModelSizeStats(modelSizeStats); verifyNoMoreInteractions(persister); - // No interactions with the jobResultsProvider confirms that the established memory calculation did not run - verifyNoMoreInteractions(jobResultsProvider, auditor); assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); } @@ -343,85 +337,6 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { verifyNoMoreInteractions(auditor); } - public void testProcessResult_modelSizeStatsAfterManyBuckets() throws Exception { - JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); - - // To avoid slowing down the test this is using a delay of 1 nanosecond rather than the 5 seconds used in production - setupScheduleDelayTime(TimeValue.timeValueNanos(1)); - - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); - context.deleteInterimRequired = false; - for (int i = 0; i < JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE; ++i) { - AutodetectResult result = mock(AutodetectResult.class); - Bucket bucket = mock(Bucket.class); - when(result.getBucket()).thenReturn(bucket); - processorUnderTest.processResult(context, result); - } - - AutodetectResult result = mock(AutodetectResult.class); - ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); - Date timestamp = new Date(BUCKET_SPAN_MS); - when(modelSizeStats.getTimestamp()).thenReturn(timestamp); - when(result.getModelSizeStats()).thenReturn(modelSizeStats); - processorUnderTest.processResult(context, result); - - // Some calls will be made 1 nanosecond later in a different thread, hence the assertBusy() - assertBusy(() -> { - verify(persister, times(1)).persistModelSizeStats(modelSizeStats); - verify(persister, times(1)).commitResultWrites(JOB_ID); - verifyNoMoreInteractions(persister); - verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(timestamp), - eq(modelSizeStats), any(Consumer.class), any(Consumer.class)); - verifyNoMoreInteractions(jobResultsProvider); - assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); - }); - } - - public void testProcessResult_manyModelSizeStatsInQuickSuccession() throws Exception { - JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); - - setupScheduleDelayTime(TimeValue.timeValueSeconds(1)); - - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); - context.deleteInterimRequired = false; - ModelSizeStats modelSizeStats = null; - for (int i = 1; i <= JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE + 5; ++i) { - AutodetectResult result = mock(AutodetectResult.class); - Bucket bucket = mock(Bucket.class); - when(bucket.getTimestamp()).thenReturn(new Date(BUCKET_SPAN_MS * i)); - when(result.getBucket()).thenReturn(bucket); - processorUnderTest.processResult(context, result); - if (i > JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { - result = mock(AutodetectResult.class); - modelSizeStats = mock(ModelSizeStats.class); - when(modelSizeStats.getTimestamp()).thenReturn(new Date(BUCKET_SPAN_MS * i)); - when(result.getModelSizeStats()).thenReturn(modelSizeStats); - processorUnderTest.processResult(context, result); - } - } - - ModelSizeStats lastModelSizeStats = modelSizeStats; - assertNotNull(lastModelSizeStats); - Date lastTimestamp = lastModelSizeStats.getTimestamp(); - - // Some calls will be made 1 second later in a different thread, hence the assertBusy() - assertBusy(() -> { - // All the model size stats should be persisted to the index... - verify(persister, times(5)).persistModelSizeStats(any(ModelSizeStats.class)); - // ...but only the last should trigger an established model memory update - verify(persister, times(1)).commitResultWrites(JOB_ID); - verifyNoMoreInteractions(persister); - verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(lastTimestamp), eq(lastModelSizeStats), - any(Consumer.class), any(Consumer.class)); - verifyNoMoreInteractions(jobResultsProvider); - assertEquals(lastModelSizeStats, processorUnderTest.modelSizeStats()); - }); - } - public void testProcessResult_modelSnapshot() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); @@ -442,7 +357,7 @@ public void testProcessResult_modelSnapshot() { verifyNoMoreInteractions(persister); UpdateRequest capturedRequest = requestCaptor.getValue(); - assertThat(capturedRequest.doc().sourceAsMap().keySet(), contains(Job.MODEL_SNAPSHOT_ID.getPreferredName())); + assertNotNull(capturedRequest.doc().sourceAsMap().get(Job.MODEL_SNAPSHOT_ID.getPreferredName())); } public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java new file mode 100644 index 0000000000000..cbba7ffa04972 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -0,0 +1,195 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.process; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class MlMemoryTrackerTests extends ESTestCase { + + private ClusterService clusterService; + private ThreadPool threadPool; + private JobManager jobManager; + private JobResultsProvider jobResultsProvider; + private MlMemoryTracker memoryTracker; + + @Before + public void setup() { + + clusterService = mock(ClusterService.class); + threadPool = mock(ThreadPool.class); + ExecutorService executorService = mock(ExecutorService.class); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + Runnable r = (Runnable) invocation.getArguments()[0]; + r.run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + when(threadPool.executor(anyString())).thenReturn(executorService); + jobManager = mock(JobManager.class); + jobResultsProvider = mock(JobResultsProvider.class); + memoryTracker = new MlMemoryTracker(clusterService, threadPool, jobManager, jobResultsProvider); + } + + public void testRefreshAll() { + + boolean isMaster = randomBoolean(); + if (isMaster) { + memoryTracker.onMaster(); + } else { + memoryTracker.offMaster(); + } + + int numMlJobTasks = randomIntBetween(2, 5); + Map> tasks = new HashMap<>(); + for (int i = 1; i <= numMlJobTasks; ++i) { + String jobId = "job" + i; + PersistentTasksCustomMetaData.PersistentTask task = makeTestTask(jobId); + tasks.put(task.getId(), task); + } + PersistentTasksCustomMetaData persistentTasks = new PersistentTasksCustomMetaData(numMlJobTasks, tasks); + + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + Consumer listener = (Consumer) invocation.getArguments()[3]; + listener.accept(randomLongBetween(1000, 1000000)); + return null; + }).when(jobResultsProvider).getEstablishedMemoryUsage(anyString(), any(), any(), any(Consumer.class), any()); + + memoryTracker.refresh(persistentTasks, ActionListener.wrap(aVoid -> {}, ESTestCase::assertNull)); + + if (isMaster) { + for (int i = 1; i <= numMlJobTasks; ++i) { + String jobId = "job" + i; + verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(), any()); + } + } else { + verify(jobResultsProvider, never()).getEstablishedMemoryUsage(anyString(), any(), any(), any(), any()); + } + } + + public void testRefreshOne() { + + boolean isMaster = randomBoolean(); + if (isMaster) { + memoryTracker.onMaster(); + } else { + memoryTracker.offMaster(); + } + + String jobId = "job"; + boolean haveEstablishedModelMemory = randomBoolean(); + + long modelBytes = 1024 * 1024; + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + Consumer listener = (Consumer) invocation.getArguments()[3]; + listener.accept(haveEstablishedModelMemory ? modelBytes : 0L); + return null; + }).when(jobResultsProvider).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(Consumer.class), any()); + + long modelMemoryLimitMb = 2; + Job job = mock(Job.class); + when(job.getAnalysisLimits()).thenReturn(new AnalysisLimits(modelMemoryLimitMb, 4L)); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + listener.onResponse(job); + return null; + }).when(jobManager).getJob(eq(jobId), any(ActionListener.class)); + + AtomicReference refreshedMemoryRequirement = new AtomicReference<>(); + memoryTracker.refreshJobMemory(jobId, ActionListener.wrap(refreshedMemoryRequirement::set, ESTestCase::assertNull)); + + if (isMaster) { + if (haveEstablishedModelMemory) { + assertEquals(Long.valueOf(modelBytes + Job.PROCESS_MEMORY_OVERHEAD.getBytes()), + memoryTracker.getJobMemoryRequirement(jobId)); + } else { + assertEquals(Long.valueOf(ByteSizeUnit.MB.toBytes(modelMemoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes()), + memoryTracker.getJobMemoryRequirement(jobId)); + } + } else { + assertNull(memoryTracker.getJobMemoryRequirement(jobId)); + } + + assertEquals(memoryTracker.getJobMemoryRequirement(jobId), refreshedMemoryRequirement.get()); + + memoryTracker.removeJob(jobId); + assertNull(memoryTracker.getJobMemoryRequirement(jobId)); + } + + @SuppressWarnings("unchecked") + public void testRecordUpdateTimeInClusterState() { + + boolean isMaster = randomBoolean(); + if (isMaster) { + memoryTracker.onMaster(); + } else { + memoryTracker.offMaster(); + } + + when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); + + AtomicReference updateVersion = new AtomicReference<>(); + + doAnswer(invocation -> { + AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocation.getArguments()[1]; + ClusterState currentClusterState = ClusterState.EMPTY_STATE; + ClusterState newClusterState = task.execute(currentClusterState); + assertThat(currentClusterState, not(equalTo(newClusterState))); + MlMetadata newMlMetadata = MlMetadata.getMlMetadata(newClusterState); + updateVersion.set(newMlMetadata.getLastMemoryRefreshVersion()); + task.onAllNodesAcked(null); + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any(AckedClusterStateUpdateTask.class)); + + memoryTracker.asyncRefresh(ActionListener.wrap(ESTestCase::assertTrue, ESTestCase::assertNull)); + + if (isMaster) { + assertNotNull(updateVersion.get()); + } else { + assertNull(updateVersion.get()); + } + } + + private PersistentTasksCustomMetaData.PersistentTask makeTestTask(String jobId) { + return new PersistentTasksCustomMetaData.PersistentTask<>("job-" + jobId, MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(jobId), + 0, PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT); + } +}