Skip to content

Commit

Permalink
[ML] Delete forecast API (#31134) (#33218)
Browse files Browse the repository at this point in the history
* Delete forecast API (#31134)
  • Loading branch information
benwtrent authored Sep 4, 2018
1 parent 09bf4e5 commit 767d8e0
Show file tree
Hide file tree
Showing 11 changed files with 656 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
Expand Down Expand Up @@ -254,6 +255,7 @@ public List<Action<? extends ActionResponse>> getClientActions() {
UpdateProcessAction.INSTANCE,
DeleteExpiredDataAction.INSTANCE,
ForecastJobAction.INSTANCE,
DeleteForecastAction.INSTANCE,
GetCalendarsAction.INSTANCE,
PutCalendarAction.INSTANCE,
DeleteCalendarAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;

public class DeleteForecastAction extends Action<AcknowledgedResponse> {

public static final DeleteForecastAction INSTANCE = new DeleteForecastAction();
public static final String NAME = "cluster:admin/xpack/ml/job/forecast/delete";

private DeleteForecastAction() {
super(NAME);
}

@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}

public static class Request extends AcknowledgedRequest<Request> {

private String jobId;
private String forecastId;
private boolean allowNoForecasts = true;

public Request() {
}

public Request(String jobId, String forecastId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
this.forecastId = ExceptionsHelper.requireNonNull(forecastId, ForecastRequestStats.FORECAST_ID.getPreferredName());
}

public String getJobId() {
return jobId;
}

public String getForecastId() {
return forecastId;
}

public boolean isAllowNoForecasts() {
return allowNoForecasts;
}

public void setAllowNoForecasts(boolean allowNoForecasts) {
this.allowNoForecasts = allowNoForecasts;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
forecastId = in.readString();
allowNoForecasts = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeString(forecastId);
out.writeBoolean(allowNoForecasts);
}
}

public static class RequestBuilder extends ActionRequestBuilder<Request, AcknowledgedResponse> {

public RequestBuilder(ElasticsearchClient client, DeleteForecastAction action) {
super(client, action, new Request());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ public final class Messages {
public static final String REST_JOB_NOT_CLOSED_REVERT = "Can only revert to a model snapshot when the job is closed.";
public static final String REST_NO_SUCH_MODEL_SNAPSHOT = "No model snapshot with id [{0}] exists for job [{1}]";
public static final String REST_START_AFTER_END = "Invalid time range: end time ''{0}'' is earlier than start time ''{1}''.";

public static final String REST_NO_SUCH_FORECAST = "No forecast(s) [{0}] exists for job [{1}]";
public static final String REST_CANNOT_DELETE_FORECAST_IN_CURRENT_STATE =
"Forecast(s) [{0}] for job [{1}] needs to be either FAILED or FINISHED to be deleted";
public static final String FIELD_CANNOT_BE_NULL = "Field [{0}] cannot be null";

private Messages() {
Expand Down
4 changes: 3 additions & 1 deletion x-pack/plugin/ml/qa/ml-with-security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ integTestRunner {
'ml/validate/Test invalid job config',
'ml/validate/Test job config is invalid because model snapshot id set',
'ml/validate/Test job config that is invalid only because of the job ID',
'ml/validate_detector/Test invalid detector'
'ml/validate_detector/Test invalid detector',
'ml/delete_forecast/Test delete on _all forecasts not allow no forecasts',
'ml/delete_forecast/Test delete forecast on missing forecast'
].join(',')
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
Expand Down Expand Up @@ -276,6 +279,104 @@ public void testOverflowToDisk() throws Exception {

}

public void testDelete() throws Exception {
Detector.Builder detector = new Detector.Builder("mean", "value");

TimeValue bucketSpan = TimeValue.timeValueHours(1);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
analysisConfig.setBucketSpan(bucketSpan);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch");

Job.Builder job = new Job.Builder("forecast-it-test-delete");
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);

registerJob(job);
putJob(job);
openJob(job.getId());

long now = Instant.now().getEpochSecond();
long timestamp = now - 50 * bucketSpan.seconds();
List<String> data = new ArrayList<>();
while (timestamp < now) {
data.add(createJsonRecord(createRecord(timestamp, 10.0)));
data.add(createJsonRecord(createRecord(timestamp, 30.0)));
timestamp += bucketSpan.seconds();
}

postData(job.getId(), data.stream().collect(Collectors.joining()));
flushJob(job.getId(), false);
String forecastIdDefaultDurationDefaultExpiry = forecast(job.getId(), null, null);
String forecastIdDuration1HourNoExpiry = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.ZERO);
waitForecastToFinish(job.getId(), forecastIdDefaultDurationDefaultExpiry);
waitForecastToFinish(job.getId(), forecastIdDuration1HourNoExpiry);
closeJob(job.getId());

{
ForecastRequestStats forecastStats = getForecastStats(job.getId(), forecastIdDefaultDurationDefaultExpiry);
assertNotNull(forecastStats);
ForecastRequestStats otherStats = getForecastStats(job.getId(), forecastIdDuration1HourNoExpiry);
assertNotNull(otherStats);
}

{
DeleteForecastAction.Request request = new DeleteForecastAction.Request(job.getId(),
forecastIdDefaultDurationDefaultExpiry + "," + forecastIdDuration1HourNoExpiry);
AcknowledgedResponse response = client().execute(DeleteForecastAction.INSTANCE, request).actionGet();
assertTrue(response.isAcknowledged());
}

{
ForecastRequestStats forecastStats = getForecastStats(job.getId(), forecastIdDefaultDurationDefaultExpiry);
assertNull(forecastStats);
ForecastRequestStats otherStats = getForecastStats(job.getId(), forecastIdDuration1HourNoExpiry);
assertNull(otherStats);
}

{
DeleteForecastAction.Request request = new DeleteForecastAction.Request(job.getId(), "forecast-does-not-exist");
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> client().execute(DeleteForecastAction.INSTANCE, request).actionGet());
assertThat(e.getMessage(),
equalTo("No forecast(s) [forecast-does-not-exist] exists for job [forecast-it-test-delete]"));
}

{
DeleteForecastAction.Request request = new DeleteForecastAction.Request(job.getId(), MetaData.ALL);
AcknowledgedResponse response = client().execute(DeleteForecastAction.INSTANCE, request).actionGet();
assertTrue(response.isAcknowledged());
}

{
Job.Builder otherJob = new Job.Builder("forecasts-delete-with-all-and-allow-no-forecasts");
otherJob.setAnalysisConfig(analysisConfig);
otherJob.setDataDescription(dataDescription);

registerJob(otherJob);
putJob(otherJob);
DeleteForecastAction.Request request = new DeleteForecastAction.Request(otherJob.getId(), MetaData.ALL);
AcknowledgedResponse response = client().execute(DeleteForecastAction.INSTANCE, request).actionGet();
assertTrue(response.isAcknowledged());
}

{
Job.Builder otherJob = new Job.Builder("forecasts-delete-with-all-and-not-allow-no-forecasts");
otherJob.setAnalysisConfig(analysisConfig);
otherJob.setDataDescription(dataDescription);

registerJob(otherJob);
putJob(otherJob);

DeleteForecastAction.Request request = new DeleteForecastAction.Request(otherJob.getId(), MetaData.ALL);
request.setAllowNoForecasts(false);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> client().execute(DeleteForecastAction.INSTANCE, request).actionGet());
assertThat(e.getMessage(),
equalTo("No forecast(s) [_all] exists for job [forecasts-delete-with-all-and-not-allow-no-forecasts]"));
}
}

private void createDataWithLotsOfClientIps(TimeValue bucketSpan, Job.Builder job) throws IOException {
long now = Instant.now().getEpochSecond();
long timestamp = now - 15 * bucketSpan.seconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
Expand Down Expand Up @@ -114,6 +115,7 @@
import org.elasticsearch.xpack.ml.action.TransportDeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteFilterAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteForecastAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteJobAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.TransportFinalizeJobExecutionAction;
Expand Down Expand Up @@ -200,6 +202,7 @@
import org.elasticsearch.xpack.ml.rest.filter.RestPutFilterAction;
import org.elasticsearch.xpack.ml.rest.filter.RestUpdateFilterAction;
import org.elasticsearch.xpack.ml.rest.job.RestCloseJobAction;
import org.elasticsearch.xpack.ml.rest.job.RestDeleteForecastAction;
import org.elasticsearch.xpack.ml.rest.job.RestDeleteJobAction;
import org.elasticsearch.xpack.ml.rest.job.RestFlushJobAction;
import org.elasticsearch.xpack.ml.rest.job.RestForecastJobAction;
Expand Down Expand Up @@ -489,6 +492,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestDeleteModelSnapshotAction(settings, restController),
new RestDeleteExpiredDataAction(settings, restController),
new RestForecastJobAction(settings, restController),
new RestDeleteForecastAction(settings, restController),
new RestGetCalendarsAction(settings, restController),
new RestPutCalendarAction(settings, restController),
new RestDeleteCalendarAction(settings, restController),
Expand Down Expand Up @@ -545,6 +549,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new ActionHandler<>(UpdateProcessAction.INSTANCE, TransportUpdateProcessAction.class),
new ActionHandler<>(DeleteExpiredDataAction.INSTANCE, TransportDeleteExpiredDataAction.class),
new ActionHandler<>(ForecastJobAction.INSTANCE, TransportForecastJobAction.class),
new ActionHandler<>(DeleteForecastAction.INSTANCE, TransportDeleteForecastAction.class),
new ActionHandler<>(GetCalendarsAction.INSTANCE, TransportGetCalendarsAction.class),
new ActionHandler<>(PutCalendarAction.INSTANCE, TransportPutCalendarAction.class),
new ActionHandler<>(DeleteCalendarAction.INSTANCE, TransportDeleteCalendarAction.class),
Expand Down
Loading

0 comments on commit 767d8e0

Please sign in to comment.