Skip to content

Commit

Permalink
[ML] Fix wire BWC for JobUpdate (#30512)
Browse files Browse the repository at this point in the history
Fix wire BWC for the JobUpdate class

Hide JobUpdate internal fields from the REST request parser
  • Loading branch information
davidkyle committed May 11, 2018
1 parent cdcd4a1 commit 0ff8cf4
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public PutJobAction.Response newResponse() {
public static class Request extends AcknowledgedRequest<UpdateJobAction.Request> implements ToXContentObject {

public static UpdateJobAction.Request parseRequest(String jobId, XContentParser parser) {
JobUpdate update = JobUpdate.PARSER.apply(parser, null).setJobId(jobId).build();
JobUpdate update = JobUpdate.EXTERNAL_PARSER.apply(parser, null).setJobId(jobId).build();
return new UpdateJobAction.Request(jobId, update);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,35 @@
public class JobUpdate implements Writeable, ToXContentObject {
public static final ParseField DETECTORS = new ParseField("detectors");

public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
// For internal updates
static final ConstructingObjectParser<Builder, Void> INTERNAL_PARSER = new ConstructingObjectParser<>(
"job_update", args -> new Builder((String) args[0]));

// For parsing REST requests
public static final ConstructingObjectParser<Builder, Void> EXTERNAL_PARSER = new ConstructingObjectParser<>(
"job_update", args -> new Builder((String) args[0]));

static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), Job.ID);
PARSER.declareStringArray(Builder::setGroups, Job.GROUPS);
PARSER.declareStringOrNull(Builder::setDescription, Job.DESCRIPTION);
PARSER.declareObjectArray(Builder::setDetectorUpdates, DetectorUpdate.PARSER, DETECTORS);
PARSER.declareObject(Builder::setModelPlotConfig, ModelPlotConfig.CONFIG_PARSER, Job.MODEL_PLOT_CONFIG);
PARSER.declareObject(Builder::setAnalysisLimits, AnalysisLimits.CONFIG_PARSER, Job.ANALYSIS_LIMITS);
PARSER.declareString((builder, val) -> builder.setBackgroundPersistInterval(
TimeValue.parseTimeValue(val, Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName())), Job.BACKGROUND_PERSIST_INTERVAL);
PARSER.declareLong(Builder::setRenormalizationWindowDays, Job.RENORMALIZATION_WINDOW_DAYS);
PARSER.declareLong(Builder::setResultsRetentionDays, Job.RESULTS_RETENTION_DAYS);
PARSER.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS);
PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS);
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
PARSER.declareString(Builder::setModelSnapshotMinVersion, Job.MODEL_SNAPSHOT_MIN_VERSION);
PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY);
for (ConstructingObjectParser<Builder, Void> parser : Arrays.asList(INTERNAL_PARSER, EXTERNAL_PARSER)) {
parser.declareString(ConstructingObjectParser.optionalConstructorArg(), Job.ID);
parser.declareStringArray(Builder::setGroups, Job.GROUPS);
parser.declareStringOrNull(Builder::setDescription, Job.DESCRIPTION);
parser.declareObjectArray(Builder::setDetectorUpdates, DetectorUpdate.PARSER, DETECTORS);
parser.declareObject(Builder::setModelPlotConfig, ModelPlotConfig.CONFIG_PARSER, Job.MODEL_PLOT_CONFIG);
parser.declareObject(Builder::setAnalysisLimits, AnalysisLimits.CONFIG_PARSER, Job.ANALYSIS_LIMITS);
parser.declareString((builder, val) -> builder.setBackgroundPersistInterval(
TimeValue.parseTimeValue(val, Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName())), Job.BACKGROUND_PERSIST_INTERVAL);
parser.declareLong(Builder::setRenormalizationWindowDays, Job.RENORMALIZATION_WINDOW_DAYS);
parser.declareLong(Builder::setResultsRetentionDays, Job.RESULTS_RETENTION_DAYS);
parser.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS);
parser.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS);
parser.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
}
// These fields should not be set by a REST request
INTERNAL_PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
INTERNAL_PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY);
INTERNAL_PARSER.declareString(Builder::setModelSnapshotMinVersion, Job.MODEL_SNAPSHOT_MIN_VERSION);
INTERNAL_PARSER.declareString(Builder::setJobVersion, Job.JOB_VERSION);
}

private final String jobId;
Expand All @@ -67,14 +76,16 @@ public class JobUpdate implements Writeable, ToXContentObject {
private final String modelSnapshotId;
private final Version modelSnapshotMinVersion;
private final Long establishedModelMemory;
private final Version jobVersion;

private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String description,
@Nullable List<DetectorUpdate> detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig,
@Nullable AnalysisLimits analysisLimits, @Nullable TimeValue backgroundPersistInterval,
@Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays,
@Nullable Long modelSnapshotRetentionDays, @Nullable List<String> categorisationFilters,
@Nullable Map<String, Object> customSettings, @Nullable String modelSnapshotId,
@Nullable Version modelSnapshotMinVersion, @Nullable Long establishedModelMemory) {
@Nullable Version modelSnapshotMinVersion, @Nullable Long establishedModelMemory,
@Nullable Version jobVersion) {
this.jobId = jobId;
this.groups = groups;
this.description = description;
Expand All @@ -90,6 +101,7 @@ private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String
this.modelSnapshotId = modelSnapshotId;
this.modelSnapshotMinVersion = modelSnapshotMinVersion;
this.establishedModelMemory = establishedModelMemory;
this.jobVersion = jobVersion;
}

public JobUpdate(StreamInput in) throws IOException {
Expand Down Expand Up @@ -119,16 +131,21 @@ public JobUpdate(StreamInput in) throws IOException {
}
customSettings = in.readMap();
modelSnapshotId = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.readBoolean()) {
modelSnapshotMinVersion = Version.readVersion(in);
} else {
modelSnapshotMinVersion = null;
}
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
establishedModelMemory = in.readOptionalLong();
} else {
establishedModelMemory = null;
}
if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.readBoolean()) {
jobVersion = Version.readVersion(in);
} else {
jobVersion = null;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1) && in.readBoolean()) {
modelSnapshotMinVersion = Version.readVersion(in);
} else {
modelSnapshotMinVersion = null;
}
}

@Override
Expand All @@ -155,17 +172,25 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeMap(customSettings);
out.writeOptionalString(modelSnapshotId);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeOptionalLong(establishedModelMemory);
}
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
if (jobVersion != null) {
out.writeBoolean(true);
Version.writeVersion(jobVersion, out);
} else {
out.writeBoolean(false);
}
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (modelSnapshotMinVersion != null) {
out.writeBoolean(true);
Version.writeVersion(modelSnapshotMinVersion, out);
} else {
out.writeBoolean(false);
}
}
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeOptionalLong(establishedModelMemory);
}
}

public String getJobId() {
Expand Down Expand Up @@ -228,6 +253,10 @@ public Long getEstablishedModelMemory() {
return establishedModelMemory;
}

public Version getJobVersion() {
return jobVersion;
}

public boolean isAutodetectProcessUpdate() {
return modelPlotConfig != null || detectorUpdates != null;
}
Expand Down Expand Up @@ -278,6 +307,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (establishedModelMemory != null) {
builder.field(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory);
}
if (jobVersion != null) {
builder.field(Job.JOB_VERSION.getPreferredName(), jobVersion);
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -326,13 +358,16 @@ public Set<String> getUpdateFields() {
if (establishedModelMemory != null) {
updateFields.add(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName());
}
if (jobVersion != null) {
updateFields.add(Job.JOB_VERSION.getPreferredName());
}
return updateFields;
}

/**
* Updates {@code source} with the new values in this object returning a new {@link Job}.
*
* @param source Source job to be updated
* @param source Source job to be updated
* @param maxModelMemoryLimit The maximum model memory allowed
* @return A new job equivalent to {@code source} updated.
*/
Expand Down Expand Up @@ -408,6 +443,9 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) {
builder.setEstablishedModelMemory(null);
}
}
if (jobVersion != null) {
builder.setJobVersion(jobVersion);
}
return builder.build();
}

Expand Down Expand Up @@ -437,14 +475,15 @@ public boolean equals(Object other) {
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory);
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory)
&& Objects.equals(this.jobVersion, that.jobVersion);
}

@Override
public int hashCode() {
return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings,
modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory);
modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory, jobVersion);
}

public static class DetectorUpdate implements Writeable, ToXContentObject {
Expand Down Expand Up @@ -555,6 +594,7 @@ public static class Builder {
private String modelSnapshotId;
private Version modelSnapshotMinVersion;
private Long establishedModelMemory;
private Version jobVersion;

public Builder(String jobId) {
this.jobId = jobId;
Expand Down Expand Up @@ -640,10 +680,20 @@ public Builder setEstablishedModelMemory(Long establishedModelMemory) {
return this;
}

public Builder setJobVersion(Version version) {
this.jobVersion = version;
return this;
}

public Builder setJobVersion(String version) {
this.jobVersion = Version.fromString(version);
return this;
}

public JobUpdate build() {
return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings,
modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory);
modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory, jobVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {

private boolean useInternalParser = randomBoolean();

@Override
protected JobUpdate createTestInstance() {
JobUpdate.Builder update = new JobUpdate.Builder(randomAlphaOfLength(4));
Expand Down Expand Up @@ -84,15 +86,18 @@ protected JobUpdate createTestInstance() {
if (randomBoolean()) {
update.setCustomSettings(Collections.singletonMap(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
if (randomBoolean()) {
if (useInternalParser && randomBoolean()) {
update.setModelSnapshotId(randomAlphaOfLength(10));
}
if (randomBoolean()) {
if (useInternalParser && randomBoolean()) {
update.setModelSnapshotMinVersion(Version.CURRENT);
}
if (randomBoolean()) {
if (useInternalParser && randomBoolean()) {
update.setEstablishedModelMemory(randomNonNegativeLong());
}
if (useInternalParser && randomBoolean()) {
update.setJobVersion(randomFrom(Version.CURRENT, Version.V_6_2_0, Version.V_6_1_0));
}

return update.build();
}
Expand All @@ -104,7 +109,11 @@ protected Writeable.Reader<JobUpdate> instanceReader() {

@Override
protected JobUpdate doParseInstance(XContentParser parser) {
return JobUpdate.PARSER.apply(parser, null).build();
if (useInternalParser) {
return JobUpdate.INTERNAL_PARSER.apply(parser, null).build();
} else {
return JobUpdate.EXTERNAL_PARSER.apply(parser, null).build();
}
}

public void testMergeWithJob() {
Expand Down Expand Up @@ -137,6 +146,7 @@ public void testMergeWithJob() {
updateBuilder.setCategorizationFilters(categorizationFilters);
updateBuilder.setCustomSettings(customSettings);
updateBuilder.setModelSnapshotId(randomAlphaOfLength(10));
updateBuilder.setJobVersion(Version.V_6_1_0);
JobUpdate update = updateBuilder.build();

Job.Builder jobBuilder = new Job.Builder("foo");
Expand Down Expand Up @@ -164,6 +174,7 @@ public void testMergeWithJob() {
assertEquals(update.getCategorizationFilters(), updatedJob.getAnalysisConfig().getCategorizationFilters());
assertEquals(update.getCustomSettings(), updatedJob.getCustomSettings());
assertEquals(update.getModelSnapshotId(), updatedJob.getModelSnapshotId());
assertEquals(update.getJobVersion(), updatedJob.getJobVersion());
for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
assertNotNull(updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex()).getDetectorDescription());
assertEquals(detectorUpdate.getDescription(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,24 @@ setup:
"description": "second",
"latest_record_time_stamp": "2016-06-01T00:00:00Z",
"latest_result_time_stamp": "2016-06-01T00:00:00Z",
"snapshot_doc_count": 3
"snapshot_doc_count": 3,
"model_size_stats": {
"job_id" : "delete-model-snapshot",
"result_type" : "model_size_stats",
"model_bytes" : 0,
"total_by_field_count" : 101,
"total_over_field_count" : 0,
"total_partition_field_count" : 0,
"bucket_allocation_failures_count" : 0,
"memory_status" : "ok",
"log_time" : 1495808248662,
"timestamp" : 1495808248662
},
"quantiles": {
"job_id": "delete-model-snapshot",
"timestamp": 1495808248662,
"quantile_state": "quantiles-1"
}
}
- do:
Expand All @@ -106,12 +123,10 @@ setup:
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
xpack.ml.update_job:
xpack.ml.revert_model_snapshot:
job_id: delete-model-snapshot
body: >
{
"model_snapshot_id": "active-snapshot"
}
snapshot_id: "active-snapshot"


---
"Test delete snapshot missing snapshotId":
Expand Down

0 comments on commit 0ff8cf4

Please sign in to comment.