Skip to content

Commit

Permalink
[ML] Add lazy assignment job config option (#47726)
Browse files Browse the repository at this point in the history
This change adds:

- A new option, allow_lazy_open, to anomaly detection jobs
- A new option, allow_lazy_start, to data frame analytics jobs

Both work in the same way: they allow a job to be
opened/started even if no ML node exists that can
accommodate the job immediately. In this situation
the job waits in the opening/starting state until ML
node capacity is available. (The starting state for data
frame analytics jobs is new in this change.)

Additionally, the ML nightly maintenance tasks now
creates audit warnings for ML jobs that are unassigned.
This means that jobs that cannot be assigned to an ML
node for a very long time will show a yellow warning
triangle in the UI.

A final change is that it is now possible to close a job
that is not assigned to a node without using force.
This is because previously jobs that were open but
not assigned to a node were an aberration, whereas
after this change they'll be relatively common.
  • Loading branch information
droberts195 authored Oct 14, 2019
1 parent 29ac95a commit fd83c18
Show file tree
Hide file tree
Showing 46 changed files with 723 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public static Builder builder() {
private static final ParseField MODEL_MEMORY_LIMIT = new ParseField("model_memory_limit");
private static final ParseField CREATE_TIME = new ParseField("create_time");
private static final ParseField VERSION = new ParseField("version");
private static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start");

private static ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_config", true, Builder::new);

Expand Down Expand Up @@ -86,6 +87,7 @@ public static Builder builder() {
},
VERSION,
ValueType.STRING);
PARSER.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START);
}

private static DataFrameAnalysis parseAnalysis(XContentParser parser) throws IOException {
Expand All @@ -105,11 +107,12 @@ private static DataFrameAnalysis parseAnalysis(XContentParser parser) throws IOE
private final ByteSizeValue modelMemoryLimit;
private final Instant createTime;
private final Version version;
private final Boolean allowLazyStart;

private DataFrameAnalyticsConfig(@Nullable String id, @Nullable String description, @Nullable DataFrameAnalyticsSource source,
@Nullable DataFrameAnalyticsDest dest, @Nullable DataFrameAnalysis analysis,
@Nullable FetchSourceContext analyzedFields, @Nullable ByteSizeValue modelMemoryLimit,
@Nullable Instant createTime, @Nullable Version version) {
@Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart) {
this.id = id;
this.description = description;
this.source = source;
Expand All @@ -119,6 +122,7 @@ private DataFrameAnalyticsConfig(@Nullable String id, @Nullable String descripti
this.modelMemoryLimit = modelMemoryLimit;
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());;
this.version = version;
this.allowLazyStart = allowLazyStart;
}

public String getId() {
Expand Down Expand Up @@ -157,6 +161,10 @@ public Version getVersion() {
return version;
}

public Boolean getAllowLazyStart() {
return allowLazyStart;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -190,6 +198,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (version != null) {
builder.field(VERSION.getPreferredName(), version);
}
if (allowLazyStart != null) {
builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
}
builder.endObject();
return builder;
}
Expand All @@ -208,12 +219,13 @@ public boolean equals(Object o) {
&& Objects.equals(analyzedFields, other.analyzedFields)
&& Objects.equals(modelMemoryLimit, other.modelMemoryLimit)
&& Objects.equals(createTime, other.createTime)
&& Objects.equals(version, other.version);
&& Objects.equals(version, other.version)
&& Objects.equals(allowLazyStart, other.allowLazyStart);
}

@Override
public int hashCode() {
return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version);
return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart);
}

@Override
Expand All @@ -232,6 +244,7 @@ public static class Builder {
private ByteSizeValue modelMemoryLimit;
private Instant createTime;
private Version version;
private Boolean allowLazyStart;

private Builder() {}

Expand Down Expand Up @@ -280,9 +293,14 @@ public Builder setVersion(Version version) {
return this;
}

public Builder setAllowLazyStart(Boolean allowLazyStart) {
this.allowLazyStart = allowLazyStart;
return this;
}

public DataFrameAnalyticsConfig build() {
return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime,
version);
version, allowLazyStart);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Locale;

public enum DataFrameAnalyticsState {
STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED;
STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED, STARTING;

public static DataFrameAnalyticsState fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class Job implements ToXContentObject {
public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id");
public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name");
public static final ParseField DELETING = new ParseField("deleting");
public static final ParseField ALLOW_LAZY_OPEN = new ParseField("allow_lazy_open");

public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("job_details", true, Builder::new);

Expand Down Expand Up @@ -96,6 +97,7 @@ public class Job implements ToXContentObject {
PARSER.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID);
PARSER.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME);
PARSER.declareBoolean(Builder::setDeleting, DELETING);
PARSER.declareBoolean(Builder::setAllowLazyOpen, ALLOW_LAZY_OPEN);
}

private final String jobId;
Expand All @@ -117,13 +119,14 @@ public class Job implements ToXContentObject {
private final String modelSnapshotId;
private final String resultsIndexName;
private final Boolean deleting;
private final Boolean allowLazyOpen;

private Job(String jobId, String jobType, List<String> groups, String description,
Date createTime, Date finishedTime,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
String modelSnapshotId, String resultsIndexName, Boolean deleting) {
String modelSnapshotId, String resultsIndexName, Boolean deleting, Boolean allowLazyOpen) {

this.jobId = jobId;
this.jobType = jobType;
Expand All @@ -143,6 +146,7 @@ private Job(String jobId, String jobType, List<String> groups, String descriptio
this.modelSnapshotId = modelSnapshotId;
this.resultsIndexName = resultsIndexName;
this.deleting = deleting;
this.allowLazyOpen = allowLazyOpen;
}

/**
Expand Down Expand Up @@ -271,6 +275,10 @@ public Boolean getDeleting() {
return deleting;
}

public Boolean getAllowLazyOpen() {
return allowLazyOpen;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -326,6 +334,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (deleting != null) {
builder.field(DELETING.getPreferredName(), deleting);
}
if (allowLazyOpen != null) {
builder.field(ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen);
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -358,15 +369,16 @@ public boolean equals(Object other) {
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleting, that.deleting);
&& Objects.equals(this.deleting, that.deleting)
&& Objects.equals(this.allowLazyOpen, that.allowLazyOpen);
}

@Override
public int hashCode() {
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName, deleting);
modelSnapshotId, resultsIndexName, deleting, allowLazyOpen);
}

@Override
Expand Down Expand Up @@ -398,6 +410,7 @@ public static class Builder {
private String modelSnapshotId;
private String resultsIndexName;
private Boolean deleting;
private Boolean allowLazyOpen;

private Builder() {
}
Expand Down Expand Up @@ -425,6 +438,7 @@ public Builder(Job job) {
this.modelSnapshotId = job.getModelSnapshotId();
this.resultsIndexName = job.getResultsIndexNameNoPrefix();
this.deleting = job.getDeleting();
this.allowLazyOpen = job.getAllowLazyOpen();
}

public Builder setId(String id) {
Expand Down Expand Up @@ -521,6 +535,11 @@ Builder setDeleting(Boolean deleting) {
return this;
}

Builder setAllowLazyOpen(Boolean allowLazyOpen) {
this.allowLazyOpen = allowLazyOpen;
return this;
}

/**
* Builds a job.
*
Expand All @@ -533,7 +552,7 @@ public Job build() {
id, jobType, groups, description, createTime, finishedTime,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName, deleting);
modelSnapshotId, resultsIndexName, deleting, allowLazyOpen);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class JobUpdate implements ToXContentObject {
PARSER.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS);
PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS);
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
PARSER.declareBoolean(Builder::setAllowLazyOpen, Job.ALLOW_LAZY_OPEN);
}

private final String jobId;
Expand All @@ -68,13 +69,14 @@ public class JobUpdate implements ToXContentObject {
private final Long resultsRetentionDays;
private final List<String> categorizationFilters;
private final Map<String, Object> customSettings;
private final Boolean allowLazyOpen;

private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String description,
@Nullable List<DetectorUpdate> detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig,
@Nullable AnalysisLimits analysisLimits, @Nullable TimeValue backgroundPersistInterval,
@Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays,
@Nullable Long modelSnapshotRetentionDays, @Nullable List<String> categorisationFilters,
@Nullable Map<String, Object> customSettings) {
@Nullable Map<String, Object> customSettings, @Nullable Boolean allowLazyOpen) {
this.jobId = jobId;
this.groups = groups;
this.description = description;
Expand All @@ -87,6 +89,7 @@ private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String
this.resultsRetentionDays = resultsRetentionDays;
this.categorizationFilters = categorisationFilters;
this.customSettings = customSettings;
this.allowLazyOpen = allowLazyOpen;
}

public String getJobId() {
Expand Down Expand Up @@ -137,6 +140,10 @@ public Map<String, Object> getCustomSettings() {
return customSettings;
}

public Boolean getAllowLazyOpen() {
return allowLazyOpen;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -174,6 +181,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (customSettings != null) {
builder.field(Job.CUSTOM_SETTINGS.getPreferredName(), customSettings);
}
if (allowLazyOpen != null) {
builder.field(Job.ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen);
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -201,13 +211,15 @@ public boolean equals(Object other) {
&& Objects.equals(this.modelSnapshotRetentionDays, that.modelSnapshotRetentionDays)
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
&& Objects.equals(this.categorizationFilters, that.categorizationFilters)
&& Objects.equals(this.customSettings, that.customSettings);
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.allowLazyOpen, that.allowLazyOpen);
}

@Override
public int hashCode() {
return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings);
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings,
allowLazyOpen);
}

public static class DetectorUpdate implements ToXContentObject {
Expand Down Expand Up @@ -303,6 +315,7 @@ public static class Builder {
private Long resultsRetentionDays;
private List<String> categorizationFilters;
private Map<String, Object> customSettings;
private Boolean allowLazyOpen;

/**
* New {@link JobUpdate.Builder} object for the existing job
Expand Down Expand Up @@ -446,9 +459,15 @@ public Builder setCustomSettings(Map<String, Object> customSettings) {
return this;
}

public Builder setAllowLazyOpen(boolean allowLazyOpen) {
this.allowLazyOpen = allowLazyOpen;
return this;
}

public JobUpdate build() {
return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings);
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings,
allowLazyOpen);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public static DataFrameAnalyticsConfig randomDataFrameAnalyticsConfig() {
if (randomBoolean()) {
builder.setVersion(Version.CURRENT);
}
if (randomBoolean()) {
builder.setAllowLazyStart(randomBoolean());
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ public static Job.Builder createRandomizedJobBuilder() {
if (randomBoolean()) {
builder.setDeleting(randomBoolean());
}
if (randomBoolean()) {
builder.setAllowLazyOpen(randomBoolean());
}
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public static JobUpdate createRandom(String jobId) {
if (randomBoolean()) {
update.setCustomSettings(Collections.singletonMap(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
if (randomBoolean()) {
update.setAllowLazyOpen(randomBoolean());
}

return update.build();
}
Expand Down
3 changes: 2 additions & 1 deletion docs/reference/ml/anomaly-detection/apis/get-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ The API returns the following results:
"time_format": "epoch_ms"
},
"model_snapshot_retention_days": 1,
"results_index_name": "shared"
"results_index_name": "shared",
"allow_lazy_open": false
}
]
}
Expand Down
13 changes: 13 additions & 0 deletions docs/reference/ml/anomaly-detection/apis/jobresource.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ so do not set the `background_persist_interval` value too low.
deleted from Elasticsearch. The default value is null, which means results
are retained.

`allow_lazy_open`::
(boolean) Advanced configuration option.
Whether this job should be allowed to open when there is insufficient
{ml} node capacity for it to be immediately assigned to a node.
The default is `false`, which means that the <<ml-open-job>>
will return an error if a {ml} node with capacity to run the
job cannot immediately be found. (However, this is also subject to
the cluster-wide `xpack.ml.max_lazy_ml_nodes` setting - see
<<advanced-ml-settings>>.) If this option is set to `true` then
the <<ml-open-job>> will not return an error, and the job will
wait in the `opening` state until sufficient {ml} node capacity
is available.

[[ml-analysisconfig]]
==== Analysis Configuration Objects

Expand Down
3 changes: 2 additions & 1 deletion docs/reference/ml/anomaly-detection/apis/put-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ When the job is created, you receive the following results:
"time_format" : "epoch_ms"
},
"model_snapshot_retention_days" : 1,
"results_index_name" : "shared"
"results_index_name" : "shared",
"allow_lazy_open" : false
}
----
// TESTRESPONSE[s/"job_version" : "8.0.0"/"job_version" : $body.job_version/]
Expand Down
Loading

0 comments on commit fd83c18

Please sign in to comment.