Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Fix wire BWC for JobUpdate #30512

Merged
merged 3 commits into from
May 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public PutJobAction.Response newResponse() {
public static class Request extends AcknowledgedRequest<UpdateJobAction.Request> implements ToXContentObject {

public static UpdateJobAction.Request parseRequest(String jobId, XContentParser parser) {
JobUpdate update = JobUpdate.PARSER.apply(parser, null).setJobId(jobId).build();
JobUpdate update = JobUpdate.EXTERNAL_PARSER.apply(parser, null).setJobId(jobId).build();
return new UpdateJobAction.Request(jobId, update);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,35 @@
public class JobUpdate implements Writeable, ToXContentObject {
public static final ParseField DETECTORS = new ParseField("detectors");

public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
// For internal updates
static final ConstructingObjectParser<Builder, Void> INTERNAL_PARSER = new ConstructingObjectParser<>(
"job_update", args -> new Builder((String) args[0]));

// For parsing REST requests
public static final ConstructingObjectParser<Builder, Void> EXTERNAL_PARSER = new ConstructingObjectParser<>(
"job_update", args -> new Builder((String) args[0]));

static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), Job.ID);
PARSER.declareStringArray(Builder::setGroups, Job.GROUPS);
PARSER.declareStringOrNull(Builder::setDescription, Job.DESCRIPTION);
PARSER.declareObjectArray(Builder::setDetectorUpdates, DetectorUpdate.PARSER, DETECTORS);
PARSER.declareObject(Builder::setModelPlotConfig, ModelPlotConfig.CONFIG_PARSER, Job.MODEL_PLOT_CONFIG);
PARSER.declareObject(Builder::setAnalysisLimits, AnalysisLimits.CONFIG_PARSER, Job.ANALYSIS_LIMITS);
PARSER.declareString((builder, val) -> builder.setBackgroundPersistInterval(
TimeValue.parseTimeValue(val, Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName())), Job.BACKGROUND_PERSIST_INTERVAL);
PARSER.declareLong(Builder::setRenormalizationWindowDays, Job.RENORMALIZATION_WINDOW_DAYS);
PARSER.declareLong(Builder::setResultsRetentionDays, Job.RESULTS_RETENTION_DAYS);
PARSER.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS);
PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS);
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
PARSER.declareString(Builder::setModelSnapshotMinVersion, Job.MODEL_SNAPSHOT_MIN_VERSION);
PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY);
for (ConstructingObjectParser<Builder, Void> parser : Arrays.asList(INTERNAL_PARSER, EXTERNAL_PARSER)) {
parser.declareString(ConstructingObjectParser.optionalConstructorArg(), Job.ID);
parser.declareStringArray(Builder::setGroups, Job.GROUPS);
parser.declareStringOrNull(Builder::setDescription, Job.DESCRIPTION);
parser.declareObjectArray(Builder::setDetectorUpdates, DetectorUpdate.PARSER, DETECTORS);
parser.declareObject(Builder::setModelPlotConfig, ModelPlotConfig.CONFIG_PARSER, Job.MODEL_PLOT_CONFIG);
parser.declareObject(Builder::setAnalysisLimits, AnalysisLimits.CONFIG_PARSER, Job.ANALYSIS_LIMITS);
parser.declareString((builder, val) -> builder.setBackgroundPersistInterval(
TimeValue.parseTimeValue(val, Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName())), Job.BACKGROUND_PERSIST_INTERVAL);
parser.declareLong(Builder::setRenormalizationWindowDays, Job.RENORMALIZATION_WINDOW_DAYS);
parser.declareLong(Builder::setResultsRetentionDays, Job.RESULTS_RETENTION_DAYS);
parser.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS);
parser.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS);
parser.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
}
// These fields should not be set by a REST request
INTERNAL_PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
INTERNAL_PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY);
INTERNAL_PARSER.declareString(Builder::setModelSnapshotMinVersion, Job.MODEL_SNAPSHOT_MIN_VERSION);
INTERNAL_PARSER.declareString(Builder::setJobVersion, Job.JOB_VERSION);
}

private final String jobId;
Expand All @@ -67,14 +76,16 @@ public class JobUpdate implements Writeable, ToXContentObject {
private final String modelSnapshotId;
private final Version modelSnapshotMinVersion;
private final Long establishedModelMemory;
private final Version jobVersion;

private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String description,
@Nullable List<DetectorUpdate> detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig,
@Nullable AnalysisLimits analysisLimits, @Nullable TimeValue backgroundPersistInterval,
@Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays,
@Nullable Long modelSnapshotRetentionDays, @Nullable List<String> categorisationFilters,
@Nullable Map<String, Object> customSettings, @Nullable String modelSnapshotId,
@Nullable Version modelSnapshotMinVersion, @Nullable Long establishedModelMemory) {
@Nullable Version modelSnapshotMinVersion, @Nullable Long establishedModelMemory,
@Nullable Version jobVersion) {
this.jobId = jobId;
this.groups = groups;
this.description = description;
Expand All @@ -90,6 +101,7 @@ private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String
this.modelSnapshotId = modelSnapshotId;
this.modelSnapshotMinVersion = modelSnapshotMinVersion;
this.establishedModelMemory = establishedModelMemory;
this.jobVersion = jobVersion;
}

public JobUpdate(StreamInput in) throws IOException {
Expand Down Expand Up @@ -119,16 +131,21 @@ public JobUpdate(StreamInput in) throws IOException {
}
customSettings = in.readMap();
modelSnapshotId = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.readBoolean()) {
modelSnapshotMinVersion = Version.readVersion(in);
} else {
modelSnapshotMinVersion = null;
}
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
establishedModelMemory = in.readOptionalLong();
} else {
establishedModelMemory = null;
}
if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.readBoolean()) {
jobVersion = Version.readVersion(in);
} else {
jobVersion = null;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1) && in.readBoolean()) {
modelSnapshotMinVersion = Version.readVersion(in);
} else {
modelSnapshotMinVersion = null;
}
}

@Override
Expand All @@ -155,17 +172,25 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeMap(customSettings);
out.writeOptionalString(modelSnapshotId);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeOptionalLong(establishedModelMemory);
}
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
if (jobVersion != null) {
out.writeBoolean(true);
Version.writeVersion(jobVersion, out);
} else {
out.writeBoolean(false);
}
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (modelSnapshotMinVersion != null) {
out.writeBoolean(true);
Version.writeVersion(modelSnapshotMinVersion, out);
} else {
out.writeBoolean(false);
}
}
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeOptionalLong(establishedModelMemory);
}
}

public String getJobId() {
Expand Down Expand Up @@ -228,6 +253,10 @@ public Long getEstablishedModelMemory() {
return establishedModelMemory;
}

public Version getJobVersion() {
return jobVersion;
}

public boolean isAutodetectProcessUpdate() {
return modelPlotConfig != null || detectorUpdates != null;
}
Expand Down Expand Up @@ -278,6 +307,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (establishedModelMemory != null) {
builder.field(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory);
}
if (jobVersion != null) {
builder.field(Job.JOB_VERSION.getPreferredName(), jobVersion);
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -326,13 +358,16 @@ public Set<String> getUpdateFields() {
if (establishedModelMemory != null) {
updateFields.add(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName());
}
if (jobVersion != null) {
updateFields.add(Job.JOB_VERSION.getPreferredName());
}
return updateFields;
}

/**
* Updates {@code source} with the new values in this object returning a new {@link Job}.
*
* @param source Source job to be updated
* @param source Source job to be updated
* @param maxModelMemoryLimit The maximum model memory allowed
* @return A new job equivalent to {@code source} updated.
*/
Expand Down Expand Up @@ -408,6 +443,9 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) {
builder.setEstablishedModelMemory(null);
}
}
if (jobVersion != null) {
builder.setJobVersion(jobVersion);
}
return builder.build();
}

Expand Down Expand Up @@ -437,14 +475,15 @@ public boolean equals(Object other) {
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory);
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory)
&& Objects.equals(this.jobVersion, that.jobVersion);
}

@Override
public int hashCode() {
return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings,
modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory);
modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory, jobVersion);
}

public static class DetectorUpdate implements Writeable, ToXContentObject {
Expand Down Expand Up @@ -555,6 +594,7 @@ public static class Builder {
private String modelSnapshotId;
private Version modelSnapshotMinVersion;
private Long establishedModelMemory;
private Version jobVersion;

public Builder(String jobId) {
this.jobId = jobId;
Expand Down Expand Up @@ -640,10 +680,20 @@ public Builder setEstablishedModelMemory(Long establishedModelMemory) {
return this;
}

public Builder setJobVersion(Version version) {
this.jobVersion = version;
return this;
}

public Builder setJobVersion(String version) {
this.jobVersion = Version.fromString(version);
return this;
}

public JobUpdate build() {
return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings,
modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory);
modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory, jobVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {

private boolean useInternalParser = randomBoolean();

@Override
protected JobUpdate createTestInstance() {
JobUpdate.Builder update = new JobUpdate.Builder(randomAlphaOfLength(4));
Expand Down Expand Up @@ -84,15 +86,18 @@ protected JobUpdate createTestInstance() {
if (randomBoolean()) {
update.setCustomSettings(Collections.singletonMap(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
if (randomBoolean()) {
if (useInternalParser && randomBoolean()) {
update.setModelSnapshotId(randomAlphaOfLength(10));
}
if (randomBoolean()) {
if (useInternalParser && randomBoolean()) {
update.setModelSnapshotMinVersion(Version.CURRENT);
}
if (randomBoolean()) {
if (useInternalParser && randomBoolean()) {
update.setEstablishedModelMemory(randomNonNegativeLong());
}
if (useInternalParser && randomBoolean()) {
update.setJobVersion(randomFrom(Version.CURRENT, Version.V_6_2_0, Version.V_6_1_0));
}

return update.build();
}
Expand All @@ -104,7 +109,11 @@ protected Writeable.Reader<JobUpdate> instanceReader() {

@Override
protected JobUpdate doParseInstance(XContentParser parser) {
return JobUpdate.PARSER.apply(parser, null).build();
if (useInternalParser) {
return JobUpdate.INTERNAL_PARSER.apply(parser, null).build();
} else {
return JobUpdate.EXTERNAL_PARSER.apply(parser, null).build();
}
}

public void testMergeWithJob() {
Expand Down Expand Up @@ -137,6 +146,7 @@ public void testMergeWithJob() {
updateBuilder.setCategorizationFilters(categorizationFilters);
updateBuilder.setCustomSettings(customSettings);
updateBuilder.setModelSnapshotId(randomAlphaOfLength(10));
updateBuilder.setJobVersion(Version.V_6_1_0);
JobUpdate update = updateBuilder.build();

Job.Builder jobBuilder = new Job.Builder("foo");
Expand Down Expand Up @@ -164,6 +174,7 @@ public void testMergeWithJob() {
assertEquals(update.getCategorizationFilters(), updatedJob.getAnalysisConfig().getCategorizationFilters());
assertEquals(update.getCustomSettings(), updatedJob.getCustomSettings());
assertEquals(update.getModelSnapshotId(), updatedJob.getModelSnapshotId());
assertEquals(update.getJobVersion(), updatedJob.getJobVersion());
for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
assertNotNull(updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex()).getDetectorDescription());
assertEquals(detectorUpdate.getDescription(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,24 @@ setup:
"description": "second",
"latest_record_time_stamp": "2016-06-01T00:00:00Z",
"latest_result_time_stamp": "2016-06-01T00:00:00Z",
"snapshot_doc_count": 3
"snapshot_doc_count": 3,
"model_size_stats": {
"job_id" : "delete-model-snapshot",
"result_type" : "model_size_stats",
"model_bytes" : 0,
"total_by_field_count" : 101,
"total_over_field_count" : 0,
"total_partition_field_count" : 0,
"bucket_allocation_failures_count" : 0,
"memory_status" : "ok",
"log_time" : 1495808248662,
"timestamp" : 1495808248662
},
"quantiles": {
"job_id": "delete-model-snapshot",
"timestamp": 1495808248662,
"quantile_state": "quantiles-1"
}
}

- do:
Expand All @@ -106,12 +123,10 @@ setup:
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
xpack.ml.update_job:
xpack.ml.revert_model_snapshot:
job_id: delete-model-snapshot
body: >
{
"model_snapshot_id": "active-snapshot"
}
snapshot_id: "active-snapshot"


---
"Test delete snapshot missing snapshotId":
Expand Down