Skip to content

Commit

Permalink
Adding a migration reindex cancel API (elastic#118291)
Browse files Browse the repository at this point in the history
This introduces the migration reindex cancel API, which cancels a
migration reindex task for a given data stream name that was started
with elastic#118109. For example:

```
POST localhost:9200/_migration/reindex/my-data-stream/_cancel?pretty
```

returns

```
{
  "acknowledged" : true
}
```

This cancels the task, and cancels any ongoing reindexing of backing
indices, but does not do any cleanup.
  • Loading branch information
masseyke authored Dec 12, 2024
1 parent 59690f5 commit 9b095eb
Show file tree
Hide file tree
Showing 13 changed files with 371 additions and 45 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/118291.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118291
summary: Adding a migration reindex cancel API
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"migrate.cancel_reindex":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-stream-reindex.html",
"description":"This API returns the status of a migration reindex attempt for a data stream or index"
},
"stability":"experimental",
"visibility":"private",
"headers":{
"accept": [ "application/json"],
"content_type": ["application/json"]
},
"url":{
"paths":[
{
"path":"/_migration/reindex/{index}/_cancel",
"methods":[
"POST"
],
"parts":{
"index":{
"type":"string",
"description":"The index or data stream name"
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,23 @@ public void testAlreadyUpToDateDataStream() throws Exception {
assertThat(status.totalIndices(), equalTo(backingIndexCount));
assertThat(status.totalIndicesToBeUpgraded(), equalTo(0));
});
AcknowledgedResponse cancelResponse = client().execute(
CancelReindexDataStreamAction.INSTANCE,
new CancelReindexDataStreamAction.Request(dataStreamName)
).actionGet();
assertNotNull(cancelResponse);
assertThrows(
ResourceNotFoundException.class,
() -> client().execute(CancelReindexDataStreamAction.INSTANCE, new CancelReindexDataStreamAction.Request(dataStreamName))
.actionGet()
);
assertThrows(
ResourceNotFoundException.class,
() -> client().execute(
new ActionType<GetMigrationReindexStatusAction.Response>(GetMigrationReindexStatusAction.NAME),
new GetMigrationReindexStatusAction.Request(dataStreamName)
).actionGet()
);
}

private int createDataStream(String dataStreamName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction;
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamTransportAction;
import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction;
import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusTransportAction;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamTransportAction;
import org.elasticsearch.xpack.migrate.rest.RestCancelReindexDataStreamAction;
import org.elasticsearch.xpack.migrate.rest.RestGetMigrationReindexStatusAction;
import org.elasticsearch.xpack.migrate.rest.RestMigrationReindexAction;
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor;
Expand Down Expand Up @@ -69,6 +72,7 @@ public List<RestHandler> getRestHandlers(
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
handlers.add(new RestMigrationReindexAction());
handlers.add(new RestGetMigrationReindexStatusAction());
handlers.add(new RestCancelReindexDataStreamAction());
}
return handlers;
}
Expand All @@ -79,6 +83,7 @@ public List<RestHandler> getRestHandlers(
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class));
actions.add(new ActionHandler<>(GetMigrationReindexStatusAction.INSTANCE, GetMigrationReindexStatusTransportAction.class));
actions.add(new ActionHandler<>(CancelReindexDataStreamAction.INSTANCE, CancelReindexDataStreamTransportAction.class));
}
return actions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.migrate.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Objects;

public class CancelReindexDataStreamAction extends ActionType<AcknowledgedResponse> {

public static final CancelReindexDataStreamAction INSTANCE = new CancelReindexDataStreamAction();
public static final String NAME = "indices:admin/data_stream/reindex_cancel";

public CancelReindexDataStreamAction() {
super(NAME);
}

public static class Request extends ActionRequest implements IndicesRequest {
private final String index;

public Request(String index) {
super();
this.index = index;
}

public Request(StreamInput in) throws IOException {
super(in);
this.index = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public boolean getShouldStoreResult() {
return true;
}

public String getIndex() {
return index;
}

@Override
public int hashCode() {
return Objects.hashCode(index);
}

@Override
public boolean equals(Object other) {
return other instanceof Request && index.equals(((Request) other).index);
}

public Request nodeRequest(String thisNodeId, long thisTaskId) {
Request copy = new Request(index);
copy.setParentTask(thisNodeId, thisTaskId);
return copy;
}

@Override
public String[] indices() {
return new String[] { index };
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.migrate.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction.Request;

public class CancelReindexDataStreamTransportAction extends HandledTransportAction<Request, AcknowledgedResponse> {
private final PersistentTasksService persistentTasksService;

@Inject
public CancelReindexDataStreamTransportAction(
TransportService transportService,
ActionFilters actionFilters,
PersistentTasksService persistentTasksService
) {
super(CancelReindexDataStreamAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.persistentTasksService = persistentTasksService;
}

@Override
protected void doExecute(Task task, Request request, ActionListener<AcknowledgedResponse> listener) {
String index = request.getIndex();
String persistentTaskId = ReindexDataStreamAction.TASK_ID_PREFIX + index;
/*
* This removes the persistent task from the cluster state and results in the running task being cancelled (but not removed from
* the task manager). The running task is removed from the task manager in ReindexDataStreamTask::onCancelled, which is called as
* as result of this.
*/
persistentTasksService.sendRemoveRequest(persistentTaskId, TimeValue.MAX_VALUE, new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
listener.onResponse(AcknowledgedResponse.TRUE);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.migrate.rest;

import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.POST;

public class RestCancelReindexDataStreamAction extends BaseRestHandler {

@Override
public String getName() {
return "cancel_reindex_data_stream_action";
}

@Override
public List<Route> routes() {
return List.of(new Route(POST, "/_migration/reindex/{index}/_cancel"));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String index = request.param("index");
CancelReindexDataStreamAction.Request cancelTaskRequest = new CancelReindexDataStreamAction.Request(index);
return channel -> client.execute(CancelReindexDataStreamAction.INSTANCE, cancelTaskRequest, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,11 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
}

private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {
persistentTask.allReindexesCompleted();
threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic());
persistentTask.allReindexesCompleted(threadPool, getTimeToLive(persistentTask));
}

private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) {
persistentTask.taskFailed(e);
threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic());
persistentTask.taskFailed(threadPool, getTimeToLive(persistentTask), e);
}

private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@

package org.elasticsearch.xpack.migrate.task;

import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -21,12 +24,14 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
private final long persistentTaskStartTime;
private final int totalIndices;
private final int totalIndicesToBeUpgraded;
private boolean complete = false;
private Exception exception;
private AtomicInteger inProgress = new AtomicInteger(0);
private AtomicInteger pending = new AtomicInteger();
private List<Tuple<String, Exception>> errors = new ArrayList<>();
private volatile boolean complete = false;
private volatile Exception exception;
private final AtomicInteger inProgress = new AtomicInteger(0);
private final AtomicInteger pending = new AtomicInteger();
private final List<Tuple<String, Exception>> errors = new ArrayList<>();
private final RunOnce completeTask;

@SuppressWarnings("this-escape")
public ReindexDataStreamTask(
long persistentTaskStartTime,
int totalIndices,
Expand All @@ -42,6 +47,13 @@ public ReindexDataStreamTask(
this.persistentTaskStartTime = persistentTaskStartTime;
this.totalIndices = totalIndices;
this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded;
this.completeTask = new RunOnce(() -> {
if (exception == null) {
markAsCompleted();
} else {
markAsFailed(exception);
}
});
}

@Override
Expand All @@ -58,13 +70,18 @@ public ReindexDataStreamStatus getStatus() {
);
}

public void allReindexesCompleted() {
public void allReindexesCompleted(ThreadPool threadPool, TimeValue timeToLive) {
this.complete = true;
if (isCancelled()) {
completeTask.run();
} else {
threadPool.schedule(completeTask, timeToLive, threadPool.generic());
}
}

public void taskFailed(Exception e) {
this.complete = true;
public void taskFailed(ThreadPool threadPool, TimeValue timeToLive, Exception e) {
this.exception = e;
allReindexesCompleted(threadPool, timeToLive);
}

public void reindexSucceeded() {
Expand All @@ -84,4 +101,16 @@ public void incrementInProgressIndicesCount() {
public void setPendingIndicesCount(int size) {
pending.set(size);
}

@Override
public void onCancelled() {
/*
* If the task is complete, but just waiting for its scheduled removal, we go ahead and call markAsCompleted/markAsFailed
* immediately. This results in the running task being removed from the task manager. If the task is not complete, then one of
* allReindexesCompleted or taskFailed will be called in the future, resulting in the same thing.
*/
if (complete) {
completeTask.run();
}
}
}
Loading

0 comments on commit 9b095eb

Please sign in to comment.