Skip to content

Commit

Permalink
HLRC: ML Flush job (#33187)
Browse files Browse the repository at this point in the history
* HLRC: ML Flush job

* Fixing package, paths, and test

* Addressing comments
  • Loading branch information
benwtrent authored Sep 1, 2018
1 parent 19b14fa commit 6770a45
Show file tree
Hide file tree
Showing 11 changed files with 668 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.client.ml.OpenJobRequest;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.client.ml.FlushJobRequest;

import java.io.IOException;

Expand Down Expand Up @@ -127,6 +128,19 @@ static Request getBuckets(GetBucketsRequest getBucketsRequest) throws IOExceptio
return request;
}

static Request flushJob(FlushJobRequest flushJobRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("anomaly_detectors")
.addPathPart(flushJobRequest.getJobId())
.addPathPartAsIs("_flush")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
request.setEntity(createEntity(flushJobRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request getJobStats(GetJobStatsRequest getJobStatsRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.elasticsearch.client;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.GetJobStatsRequest;
import org.elasticsearch.client.ml.GetJobStatsResponse;
import org.elasticsearch.client.ml.job.stats.JobStats;
Expand Down Expand Up @@ -292,6 +294,60 @@ public void getBucketsAsync(GetBucketsRequest request, RequestOptions options, A
}

/**
* Flushes internally buffered data for the given Machine Learning Job ensuring all data sent to the has been processed.
* This may cause new results to be calculated depending on the contents of the buffer
*
* Both flush and close operations are similar,
* however the flush is more efficient if you are expecting to send more data for analysis.
*
* When flushing, the job remains open and is available to continue analyzing data.
* A close operation additionally prunes and persists the model state to disk and the
* job must be opened again before analyzing further data.
*
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-flush-job.html">Flush ML job documentation</a>
*
* @param request The {@link FlushJobRequest} object enclosing the `jobId` and additional request options
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
*/
public FlushJobResponse flushJob(FlushJobRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::flushJob,
options,
FlushJobResponse::fromXContent,
Collections.emptySet());
}

/**
* Flushes internally buffered data for the given Machine Learning Job asynchronously ensuring all data sent to the has been processed.
* This may cause new results to be calculated depending on the contents of the buffer
*
* Both flush and close operations are similar,
* however the flush is more efficient if you are expecting to send more data for analysis.
*
* When flushing, the job remains open and is available to continue analyzing data.
* A close operation additionally prunes and persists the model state to disk and the
* job must be opened again before analyzing further data.
*
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-flush-job.html">Flush ML job documentation</a>
*
* @param request The {@link FlushJobRequest} object enclosing the `jobId` and additional request options
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void flushJobAsync(FlushJobRequest request, RequestOptions options, ActionListener<FlushJobResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::flushJob,
options,
FlushJobResponse::fromXContent,
listener,
Collections.emptySet());
}

/**
* Gets usage statistics for one or more Machine Learning jobs
*
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Request object to flush a given Machine Learning job.
*/
public class FlushJobRequest extends ActionRequest implements ToXContentObject {

public static final ParseField CALC_INTERIM = new ParseField("calc_interim");
public static final ParseField START = new ParseField("start");
public static final ParseField END = new ParseField("end");
public static final ParseField ADVANCE_TIME = new ParseField("advance_time");
public static final ParseField SKIP_TIME = new ParseField("skip_time");

public static final ConstructingObjectParser<FlushJobRequest, Void> PARSER =
new ConstructingObjectParser<>("flush_job_request", (a) -> new FlushJobRequest((String) a[0]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareBoolean(FlushJobRequest::setCalcInterim, CALC_INTERIM);
PARSER.declareString(FlushJobRequest::setStart, START);
PARSER.declareString(FlushJobRequest::setEnd, END);
PARSER.declareString(FlushJobRequest::setAdvanceTime, ADVANCE_TIME);
PARSER.declareString(FlushJobRequest::setSkipTime, SKIP_TIME);
}

private final String jobId;
private Boolean calcInterim;
private String start;
private String end;
private String advanceTime;
private String skipTime;

/**
* Create new Flush job request
*
* @param jobId The job ID of the job to flush
*/
public FlushJobRequest(String jobId) {
this.jobId = jobId;
}

public String getJobId() {
return jobId;
}

public boolean getCalcInterim() {
return calcInterim;
}

/**
* When {@code true} calculates the interim results for the most recent bucket or all buckets within the latency period.
*
* @param calcInterim defaults to {@code false}.
*/
public void setCalcInterim(boolean calcInterim) {
this.calcInterim = calcInterim;
}

public String getStart() {
return start;
}

/**
* When used in conjunction with {@link FlushJobRequest#calcInterim},
* specifies the start of the range of buckets on which to calculate interim results.
*
* @param start the beginning of the range of buckets; may be an epoch seconds, epoch millis or an ISO string
*/
public void setStart(String start) {
this.start = start;
}

public String getEnd() {
return end;
}

/**
* When used in conjunction with {@link FlushJobRequest#calcInterim}, specifies the end of the range
* of buckets on which to calculate interim results
*
* @param end the end of the range of buckets; may be an epoch seconds, epoch millis or an ISO string
*/
public void setEnd(String end) {
this.end = end;
}

public String getAdvanceTime() {
return advanceTime;
}

/**
* Specifies to advance to a particular time value.
* Results are generated and the model is updated for data from the specified time interval.
*
* @param advanceTime String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO string
*/
public void setAdvanceTime(String advanceTime) {
this.advanceTime = advanceTime;
}

public String getSkipTime() {
return skipTime;
}

/**
* Specifies to skip to a particular time value.
* Results are not generated and the model is not updated for data from the specified time interval.
*
* @param skipTime String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO string
*/
public void setSkipTime(String skipTime) {
this.skipTime = skipTime;
}

@Override
public int hashCode() {
return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}

FlushJobRequest other = (FlushJobRequest) obj;
return Objects.equals(jobId, other.jobId) &&
calcInterim == other.calcInterim &&
Objects.equals(start, other.start) &&
Objects.equals(end, other.end) &&
Objects.equals(advanceTime, other.advanceTime) &&
Objects.equals(skipTime, other.skipTime);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
if (calcInterim != null) {
builder.field(CALC_INTERIM.getPreferredName(), calcInterim);
}
if (start != null) {
builder.field(START.getPreferredName(), start);
}
if (end != null) {
builder.field(END.getPreferredName(), end);
}
if (advanceTime != null) {
builder.field(ADVANCE_TIME.getPreferredName(), advanceTime);
}
if (skipTime != null) {
builder.field(SKIP_TIME.getPreferredName(), skipTime);
}
builder.endObject();
return builder;
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Loading

0 comments on commit 6770a45

Please sign in to comment.