Skip to content

Commit

Permalink
[Rollup] Job deletion should be invoked on the allocated task (elasti…
Browse files Browse the repository at this point in the history
…c#34574)

We should delete a job by directly talking to the allocated 
task and telling it to shutdown. Today we shut down a job 
via the persistent task framework. This is not ideal because, 
while the job has been removed from the persistent task 
CS, the allocated task continues to live until it gets the 
shutdown message.

This means a user can delete a job, immediately delete 
the rollup index, and then see new documents appear in
 the just-deleted index. This happens because the indexer
 in the allocated task is still running and indexes a few 
more documents before getting the shutdown command.

In this PR, the transport action is changed to a TransportTasksAction, 
and we invoke onCancelled() directly on the matching job. 
The race condition still exists after this PR (albeit less likely), 
but this was a precursor to fixing the issue and a self-contained
chunk of code. A second PR will followup to fix the race itself.
  • Loading branch information
polyfractal authored Oct 23, 2018
1 parent 46b49b0 commit 4dbf498
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 186 deletions.
8 changes: 4 additions & 4 deletions docs/reference/rollup/apis/delete-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

experimental[]

This API deletes an existing rollup job. The job can be started or stopped, in both cases it will be deleted. Attempting
to delete a non-existing job will throw an exception
This API deletes an existing rollup job. A job must be *stopped* first before it can be deleted. Attempting to delete
a started job will result in an error. Similarly, attempting to delete a nonexistent job will throw an exception.

.Deleting the job does not delete rolled up data
**********************************
Expand Down Expand Up @@ -99,12 +99,12 @@ A 404 `resource_not_found` exception will be thrown:
"root_cause" : [
{
"type" : "resource_not_found_exception",
"reason" : "the task with id does_not_exist doesn't exist",
"reason" : "the task with id [does_not_exist] doesn't exist",
"stack_trace": ...
}
],
"type" : "resource_not_found_exception",
"reason" : "the task with id does_not_exist doesn't exist",
"reason" : "the task with id [does_not_exist] doesn't exist",
"stack_trace": ...
},
"status": 404
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
*/
public class ListTasksResponse extends BaseTasksResponse implements ToXContentObject {
private static final String TASKS = "tasks";
private static final String TASK_FAILURES = "task_failures";
private static final String NODE_FAILURES = "node_failures";

private List<TaskInfo> tasks;

Expand Down Expand Up @@ -246,28 +244,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

private void toXContentCommon(XContentBuilder builder, Params params) throws IOException {
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
builder.startArray(TASK_FAILURES);
for (TaskOperationFailure ex : getTaskFailures()){
builder.startObject();
builder.value(ex);
builder.endObject();
}
builder.endArray();
}

if (getNodeFailures() != null && getNodeFailures().size() > 0) {
builder.startArray(NODE_FAILURES);
for (ElasticsearchException ex : getNodeFailures()) {
builder.startObject();
ex.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
}
}

public static ListTasksResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
Expand All @@ -41,6 +44,9 @@
* Base class for responses of task-related operations
*/
public class BaseTasksResponse extends ActionResponse {
protected static final String TASK_FAILURES = "task_failures";
protected static final String NODE_FAILURES = "node_failures";

private List<TaskOperationFailure> taskFailures;
private List<ElasticsearchException> nodeFailures;

Expand Down Expand Up @@ -103,4 +109,44 @@ public void writeTo(StreamOutput out) throws IOException {
exp.writeTo(out);
}
}

protected void toXContentCommon(XContentBuilder builder, ToXContent.Params params) throws IOException {
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
builder.startArray(TASK_FAILURES);
for (TaskOperationFailure ex : getTaskFailures()){
builder.startObject();
builder.value(ex);
builder.endObject();
}
builder.endArray();
}

if (getNodeFailures() != null && getNodeFailures().size() > 0) {
builder.startArray(NODE_FAILURES);
for (ElasticsearchException ex : getNodeFailures()) {
builder.startObject();
ex.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BaseTasksResponse response = (BaseTasksResponse) o;
return taskFailures.equals(response.taskFailures)
&& nodeFailures.equals(response.nodeFailures);
}

@Override
public int hashCode() {
return Objects.hash(taskFailures, nodeFailures);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -449,7 +450,7 @@ private void wipeClusterSettings() throws IOException {
}
}

private void wipeRollupJobs() throws IOException {
private void wipeRollupJobs() throws IOException, InterruptedException {
Response response = adminClient().performRequest(new Request("GET", "/_xpack/rollup/job/_all"));
Map<String, Object> jobs = entityAsMap(response);
@SuppressWarnings("unchecked")
Expand All @@ -460,6 +461,29 @@ private void wipeRollupJobs() throws IOException {
return;
}

for (Map<String, Object> jobConfig : jobConfigs) {
@SuppressWarnings("unchecked")
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
Request request = new Request("POST", "/_xpack/rollup/job/" + jobId + "/_stop");
request.addParameter("ignore", "404");
logger.debug("stopping rollup job [{}]", jobId);
adminClient().performRequest(request);
}

// TODO this is temporary until StopJob API gains the ability to block until stopped
awaitBusy(() -> {
Request request = new Request("GET", "/_xpack/rollup/job/_all");
try {
Response jobsResponse = adminClient().performRequest(request);
String body = EntityUtils.toString(jobsResponse.getEntity());
logger.error(body);
// If the body contains any of the non-stopped states, at least one job is not finished yet
return Arrays.stream(new String[]{"started", "aborting", "stopping", "indexing"}).noneMatch(body::contains);
} catch (IOException e) {
return false;
}
}, 10, TimeUnit.SECONDS);

for (Map<String, Object> jobConfig : jobConfigs) {
@SuppressWarnings("unchecked")
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,29 @@


import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.rollup.RollupField;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class DeleteRollupJobAction extends Action<AcknowledgedResponse> {
public class DeleteRollupJobAction extends Action<DeleteRollupJobAction.Response> {

public static final DeleteRollupJobAction INSTANCE = new DeleteRollupJobAction();
public static final String NAME = "cluster:admin/xpack/rollup/delete";
Expand All @@ -32,11 +39,11 @@ private DeleteRollupJobAction() {
}

@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
public Response newResponse() {
return new Response();
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
public static class Request extends BaseTasksRequest<Request> implements ToXContentFragment {
private String id;

public Request(String id) {
Expand All @@ -45,6 +52,11 @@ public Request(String id) {

public Request() {}

@Override
public boolean match(Task task) {
return task.getDescription().equals(RollupField.NAME + "_" + id);
}

public String getId() {
return id;
}
Expand Down Expand Up @@ -90,10 +102,74 @@ public boolean equals(Object obj) {
}
}

public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, AcknowledgedResponse, RequestBuilder> {

public static class RequestBuilder extends ActionRequestBuilder<DeleteRollupJobAction.Request, DeleteRollupJobAction.Response> {
protected RequestBuilder(ElasticsearchClient client, DeleteRollupJobAction action) {
super(client, action, new Request());
super(client, action, new DeleteRollupJobAction.Request());
}
}

public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {

private boolean acknowledged;

public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
readFrom(in);
}

public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.acknowledged = acknowledged;
}

public Response(boolean acknowledged) {
super(Collections.emptyList(), Collections.emptyList());
this.acknowledged = acknowledged;
}

public Response() {
super(Collections.emptyList(), Collections.emptyList());
this.acknowledged = false;
}

public boolean isDeleted() {
return acknowledged;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
toXContentCommon(builder, params);
builder.field("acknowledged", acknowledged);
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeleteRollupJobAction.Response response = (DeleteRollupJobAction.Response) o;
return super.equals(o) && acknowledged == response.acknowledged;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), acknowledged);
}
}
}
Loading

0 comments on commit 4dbf498

Please sign in to comment.