Skip to content

Commit

Permalink
HLRC: ML Close Job (#32943)
Browse files Browse the repository at this point in the history
* HLRC: Adding ML Close Job API

HLRC: Adding ML Close Job API

* reconciling request converters

* Adding serialization tests and addressing PR comments

* Changing constructor order
  • Loading branch information
benwtrent authored Aug 20, 2018
1 parent 9050c7e commit 3fbaae1
Show file tree
Hide file tree
Showing 12 changed files with 623 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.RequestConverters.EndpointBuilder;
import org.elasticsearch.common.Strings;
import org.elasticsearch.protocol.xpack.ml.CloseJobRequest;
import org.elasticsearch.protocol.xpack.ml.DeleteJobRequest;
import org.elasticsearch.protocol.xpack.ml.OpenJobRequest;
import org.elasticsearch.protocol.xpack.ml.PutJobRequest;
Expand Down Expand Up @@ -61,6 +63,30 @@ static Request openJob(OpenJobRequest openJobRequest) throws IOException {
return request;
}

static Request closeJob(CloseJobRequest closeJobRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("anomaly_detectors")
.addPathPart(Strings.collectionToCommaDelimitedString(closeJobRequest.getJobIds()))
.addPathPartAsIs("_close")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);

RequestConverters.Params params = new RequestConverters.Params(request);
if (closeJobRequest.isForce() != null) {
params.putParam("force", Boolean.toString(closeJobRequest.isForce()));
}
if (closeJobRequest.isAllowNoJobs() != null) {
params.putParam("allow_no_jobs", Boolean.toString(closeJobRequest.isAllowNoJobs()));
}
if (closeJobRequest.getTimeout() != null) {
params.putParam("timeout", closeJobRequest.getTimeout().getStringRep());
}

return request;
}

static Request deleteJob(DeleteJobRequest deleteJobRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.elasticsearch.client;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.protocol.xpack.ml.CloseJobRequest;
import org.elasticsearch.protocol.xpack.ml.CloseJobResponse;
import org.elasticsearch.protocol.xpack.ml.DeleteJobRequest;
import org.elasticsearch.protocol.xpack.ml.DeleteJobResponse;
import org.elasticsearch.protocol.xpack.ml.OpenJobRequest;
Expand Down Expand Up @@ -166,4 +168,40 @@ public void openJobAsync(OpenJobRequest request, RequestOptions options, ActionL
listener,
Collections.emptySet());
}

/**
* Closes one or more Machine Learning Jobs. A job can be opened and closed multiple times throughout its lifecycle.
*
* A closed job cannot receive data or perform analysis operations, but you can still explore and navigate results.
*
* @param request request containing job_ids and additional options. See {@link CloseJobRequest}
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return response containing if the job was successfully closed or not.
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public CloseJobResponse closeJob(CloseJobRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::closeJob,
options,
CloseJobResponse::fromXContent,
Collections.emptySet());
}

/**
* Closes one or more Machine Learning Jobs asynchronously, notifies listener on completion
*
* A closed job cannot receive data or perform analysis operations, but you can still explore and navigate results.
*
* @param request request containing job_ids and additional options. See {@link CloseJobRequest}
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void closeJobAsync(CloseJobRequest request, RequestOptions options, ActionListener<CloseJobResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::closeJob,
options,
CloseJobResponse::fromXContent,
listener,
Collections.emptySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.protocol.xpack.ml.CloseJobRequest;
import org.elasticsearch.protocol.xpack.ml.DeleteJobRequest;
import org.elasticsearch.protocol.xpack.ml.OpenJobRequest;
import org.elasticsearch.protocol.xpack.ml.PutJobRequest;
Expand Down Expand Up @@ -66,6 +67,29 @@ public void testOpenJob() throws Exception {
assertEquals(bos.toString("UTF-8"), "{\"job_id\":\""+ jobId +"\",\"timeout\":\"10m\"}");
}

public void testCloseJob() {
String jobId = "somejobid";
CloseJobRequest closeJobRequest = new CloseJobRequest(jobId);

Request request = MLRequestConverters.closeJob(closeJobRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/anomaly_detectors/" + jobId + "/_close", request.getEndpoint());
assertFalse(request.getParameters().containsKey("force"));
assertFalse(request.getParameters().containsKey("allow_no_jobs"));
assertFalse(request.getParameters().containsKey("timeout"));

closeJobRequest = new CloseJobRequest(jobId, "otherjobs*");
closeJobRequest.setForce(true);
closeJobRequest.setAllowNoJobs(false);
closeJobRequest.setTimeout(TimeValue.timeValueMinutes(10));
request = MLRequestConverters.closeJob(closeJobRequest);

assertEquals("/_xpack/ml/anomaly_detectors/" + jobId + ",otherjobs*/_close", request.getEndpoint());
assertEquals(Boolean.toString(true), request.getParameters().get("force"));
assertEquals(Boolean.toString(false), request.getParameters().get("allow_no_jobs"));
assertEquals("10m", request.getParameters().get("timeout"));
}

public void testDeleteJob() {
String jobId = randomAlphaOfLength(10);
DeleteJobRequest deleteJobRequest = new DeleteJobRequest(jobId);
Expand All @@ -87,4 +111,4 @@ private static Job createValidJob(String jobId) {
jobBuilder.setAnalysisConfig(analysisConfig);
return jobBuilder.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.protocol.xpack.ml.CloseJobRequest;
import org.elasticsearch.protocol.xpack.ml.CloseJobResponse;
import org.elasticsearch.protocol.xpack.ml.DeleteJobRequest;
import org.elasticsearch.protocol.xpack.ml.DeleteJobResponse;
import org.elasticsearch.protocol.xpack.ml.OpenJobRequest;
Expand Down Expand Up @@ -77,6 +79,19 @@ public void testOpenJob() throws Exception {
assertTrue(response.isOpened());
}

public void testCloseJob() throws Exception {
String jobId = randomValidJobId();
Job job = buildJob(jobId);
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
machineLearningClient.openJob(new OpenJobRequest(jobId), RequestOptions.DEFAULT);

CloseJobResponse response = execute(new CloseJobRequest(jobId),
machineLearningClient::closeJob,
machineLearningClient::closeJobAsync);
assertTrue(response.isClosed());
}

public static String randomValidJobId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz0123456789".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.protocol.xpack.ml.CloseJobRequest;
import org.elasticsearch.protocol.xpack.ml.CloseJobResponse;
import org.elasticsearch.protocol.xpack.ml.DeleteJobRequest;
import org.elasticsearch.protocol.xpack.ml.DeleteJobResponse;
import org.elasticsearch.protocol.xpack.ml.OpenJobRequest;
Expand Down Expand Up @@ -221,4 +223,56 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

public void testCloseJob() throws Exception {
RestHighLevelClient client = highLevelClient();

{
Job job = MachineLearningIT.buildJob("closing-my-first-machine-learning-job");
client.machineLearning().putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
client.machineLearning().openJob(new OpenJobRequest(job.getId()), RequestOptions.DEFAULT);

//tag::x-pack-ml-close-job-request
CloseJobRequest closeJobRequest = new CloseJobRequest("closing-my-first-machine-learning-job", "otherjobs*"); //<1>
closeJobRequest.setForce(false); //<2>
closeJobRequest.setAllowNoJobs(true); //<3>
closeJobRequest.setTimeout(TimeValue.timeValueMinutes(10)); //<4>
//end::x-pack-ml-close-job-request

//tag::x-pack-ml-close-job-execute
CloseJobResponse closeJobResponse = client.machineLearning().closeJob(closeJobRequest, RequestOptions.DEFAULT);
boolean isClosed = closeJobResponse.isClosed(); //<1>
//end::x-pack-ml-close-job-execute

}
{
Job job = MachineLearningIT.buildJob("closing-my-second-machine-learning-job");
client.machineLearning().putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
client.machineLearning().openJob(new OpenJobRequest(job.getId()), RequestOptions.DEFAULT);

//tag::x-pack-ml-close-job-listener
ActionListener<CloseJobResponse> listener = new ActionListener<CloseJobResponse>() {
@Override
public void onResponse(CloseJobResponse closeJobResponse) {
//<1>
}

@Override
public void onFailure(Exception e) {
// <2>
}
};
//end::x-pack-ml-close-job-listener
CloseJobRequest closeJobRequest = new CloseJobRequest("closing-my-second-machine-learning-job");
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::x-pack-ml-close-job-execute-async
client.machineLearning().closeJobAsync(closeJobRequest, RequestOptions.DEFAULT, listener); //<1>
// end::x-pack-ml-close-job-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}
59 changes: 59 additions & 0 deletions docs/java-rest/high-level/ml/close-job.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
[[java-rest-high-x-pack-ml-close-job]]
=== Close Job API

The Close Job API provides the ability to close {ml} jobs in the cluster.
It accepts a `CloseJobRequest` object and responds
with a `CloseJobResponse` object.

[[java-rest-high-x-pack-ml-close-job-request]]
==== Close Job Request

A `CloseJobRequest` object gets created with an existing non-null `jobId`.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-close-job-request]
--------------------------------------------------
<1> Constructing a new request referencing existing job IDs
<2> Optionally used to close a failed job, or to forcefully close a job
which has not responded to its initial close request.
<3> Optionally set to ignore if a wildcard expression matches no jobs.
(This includes `_all` string or when no jobs have been specified)
<4> Optionally setting the `timeout` value for how long the
execution should wait for the job to be closed.

[[java-rest-high-x-pack-ml-close-job-execution]]
==== Execution

The request can be executed through the `MachineLearningClient` contained
in the `RestHighLevelClient` object, accessed via the `machineLearningClient()` method.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-close-job-execute]
--------------------------------------------------
<1> `isClosed()` from the `CloseJobResponse` indicates if the job was successfully
closed or not.

[[java-rest-high-x-pack-ml-close-job-execution-async]]
==== Asynchronous Execution

The request can also be executed asynchronously:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-close-job-execute-async]
--------------------------------------------------
<1> The `CloseJobRequest` to execute and the `ActionListener` to use when
the execution completes

The method does not block and returns immediately. The passed `ActionListener` is used
to notify the caller of completion. A typical `ActionListener` for `CloseJobResponse` may
look like

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-close-job-listener]
--------------------------------------------------
<1> `onResponse` is called back when the action is completed successfully
<2> `onFailure` is called back when some unexpected error occurs
2 changes: 1 addition & 1 deletion docs/java-rest/high-level/ml/open-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-open-job-exec
the execution completes

The method does not block and returns immediately. The passed `ActionListener` is used
to notify the caller of completion. A typical `ActionListner` for `OpenJobResponse` may
to notify the caller of completion. A typical `ActionListener` for `OpenJobResponse` may
look like

["source","java",subs="attributes,callouts,macros"]
Expand Down
2 changes: 2 additions & 0 deletions docs/java-rest/high-level/supported-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,12 @@ The Java High Level REST Client supports the following Machine Learning APIs:
* <<java-rest-high-x-pack-ml-put-job>>
* <<java-rest-high-x-pack-ml-delete-job>>
* <<java-rest-high-x-pack-ml-open-job>>
* <<java-rest-high-x-pack-ml-close-job>>

include::ml/put-job.asciidoc[]
include::ml/delete-job.asciidoc[]
include::ml/open-job.asciidoc[]
include::ml/close-job.asciidoc[]

== Migration APIs

Expand Down
Loading

0 comments on commit 3fbaae1

Please sign in to comment.