Skip to content

Commit

Permalink
Prototype of reindex fully wired up
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Nov 22, 2024
1 parent 18f3c1d commit 55ea673
Show file tree
Hide file tree
Showing 20 changed files with 1,350 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.datastreams.CancelReindexDataStreamAction;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.DataStreamsStatsAction;
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.GetReindexDataStreamStatusAction;
import org.elasticsearch.action.datastreams.MigrateToDataStreamAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
import org.elasticsearch.action.datastreams.ReindexDataStreamAction;
import org.elasticsearch.action.datastreams.ReindexDataStreamIndexAction;
import org.elasticsearch.action.datastreams.SwapDataStreamIndexAction;
import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction;
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
Expand All @@ -39,13 +42,16 @@
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.action.CancelReindexDataStreamTransportAction;
import org.elasticsearch.datastreams.action.CreateDataStreamTransportAction;
import org.elasticsearch.datastreams.action.DataStreamsStatsTransportAction;
import org.elasticsearch.datastreams.action.DeleteDataStreamTransportAction;
import org.elasticsearch.datastreams.action.GetReindexDataStreamStatusTransportAction;
import org.elasticsearch.datastreams.action.MigrateToDataStreamTransportAction;
import org.elasticsearch.datastreams.action.ModifyDataStreamsTransportAction;
import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction;
import org.elasticsearch.datastreams.action.ReindexDataStreamTransportAction;
import org.elasticsearch.datastreams.action.SwapDataStreamIndexTransportAction;
import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction;
import org.elasticsearch.datastreams.action.TransportReindexDataStreamIndexAction;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
Expand Down Expand Up @@ -73,13 +79,16 @@
import org.elasticsearch.datastreams.options.rest.RestDeleteDataStreamOptionsAction;
import org.elasticsearch.datastreams.options.rest.RestGetDataStreamOptionsAction;
import org.elasticsearch.datastreams.options.rest.RestPutDataStreamOptionsAction;
import org.elasticsearch.datastreams.rest.RestCancelReindexDataStreamAction;
import org.elasticsearch.datastreams.rest.RestCreateDataStreamAction;
import org.elasticsearch.datastreams.rest.RestDataStreamsStatsAction;
import org.elasticsearch.datastreams.rest.RestDeleteDataStreamAction;
import org.elasticsearch.datastreams.rest.RestGetDataStreamsAction;
import org.elasticsearch.datastreams.rest.RestGetReindexDataStreamStatusAction;
import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction;
import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction;
import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction;
import org.elasticsearch.datastreams.rest.RestReindexDataStreamAction;
import org.elasticsearch.datastreams.task.ReindexDataStreamPersistentTaskExecutor;
import org.elasticsearch.datastreams.task.ReindexDataStreamPersistentTaskState;
import org.elasticsearch.datastreams.task.ReindexDataStreamStatus;
Expand Down Expand Up @@ -111,6 +120,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

import static org.elasticsearch.action.datastreams.ReindexDataStreamAction.REINDEX_DATA_STREAM_ORIGIN;
import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN;

public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin, PersistentTaskPlugin {
Expand Down Expand Up @@ -264,8 +274,11 @@ public Collection<?> createComponents(PluginServices services) {
actions.add(new ActionHandler<>(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class));
actions.add(new ActionHandler<>(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class));
}
actions.add(new ActionHandler<>(ReindexDataStreamIndexAction.INSTANCE, TransportReindexDataStreamIndexAction.class));
actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class));
actions.add(new ActionHandler<>(GetReindexDataStreamStatusAction.INSTANCE, GetReindexDataStreamStatusTransportAction.class));
actions.add(new ActionHandler<>(CancelReindexDataStreamAction.INSTANCE, CancelReindexDataStreamTransportAction.class));
actions.add(new ActionHandler<>(ReindexDataStreamIndexAction.INSTANCE, TransportReindexDataStreamIndexAction.class));
actions.add(new ActionHandler<>(SwapDataStreamIndexAction.INSTANCE, SwapDataStreamIndexTransportAction.class));
return actions;
}

Expand Down Expand Up @@ -303,6 +316,9 @@ public List<RestHandler> getRestHandlers(
handlers.add(new RestPutDataStreamOptionsAction());
handlers.add(new RestDeleteDataStreamOptionsAction());
}
handlers.add(new RestReindexDataStreamAction());
handlers.add(new RestGetReindexDataStreamStatusAction());
handlers.add(new RestCancelReindexDataStreamAction());
return handlers;
}

Expand Down Expand Up @@ -366,6 +382,13 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
SettingsModule settingsModule,
IndexNameExpressionResolver expressionResolver
) {
return List.of(new ReindexDataStreamPersistentTaskExecutor(client, clusterService, ReindexDataStreamTask.TASK_NAME, threadPool));
return List.of(
new ReindexDataStreamPersistentTaskExecutor(
new OriginSettingClient(client, REINDEX_DATA_STREAM_ORIGIN),
clusterService,
ReindexDataStreamTask.TASK_NAME,
threadPool
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.datastreams.CancelReindexDataStreamAction;
import org.elasticsearch.action.datastreams.CancelReindexDataStreamAction.Request;
import org.elasticsearch.action.datastreams.CancelReindexDataStreamAction.Response;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Strings;
import org.elasticsearch.datastreams.task.ReindexDataStreamTask;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import java.util.Map;
import java.util.Optional;

public class CancelReindexDataStreamTransportAction extends HandledTransportAction<Request, Response> {
private final ClusterService clusterService;
private final TransportService transportService;

@Inject
public CancelReindexDataStreamTransportAction(
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters
) {
super(CancelReindexDataStreamAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.clusterService = clusterService;
this.transportService = transportService;
}

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
String persistentTaskId = request.getPersistentTaskId();
PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
.getMetadata()
.custom(PersistentTasksCustomMetadata.TYPE);
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(persistentTaskId);
if (persistentTask == null) {
listener.onFailure(new ElasticsearchException("No persistent task found with id [{}]", persistentTaskId));
} else if (persistentTask.isAssigned()) {
String nodeId = persistentTask.getExecutorNode();
if (clusterService.localNode().getId().equals(nodeId)) {
getRunningTaskFromNode(persistentTaskId, listener);
} else {
runOnNodeWithTaskIfPossible(task, request, nodeId, listener);
}
} else {
listener.onFailure(new ElasticsearchException("Persistent task with id [{}] is not assigned to a node", persistentTaskId));
}
}

private ReindexDataStreamTask getRunningPersistentTaskFromTaskManager(String persistentTaskId) {
Optional<Map.Entry<Long, CancellableTask>> optionalTask = taskManager.getCancellableTasks()
.entrySet()
.stream()
.filter(entry -> entry.getValue().getType().equals("persistent"))
.filter(
entry -> entry.getValue() instanceof ReindexDataStreamTask
&& persistentTaskId.equals((((AllocatedPersistentTask) entry.getValue()).getPersistentTaskId()))
)
.findAny();
return optionalTask.map(entry -> (ReindexDataStreamTask) entry.getValue()).orElse(null);
}

void getRunningTaskFromNode(String persistentTaskId, ActionListener<Response> listener) {
ReindexDataStreamTask runningTask = getRunningPersistentTaskFromTaskManager(persistentTaskId);
if (runningTask == null) {
listener.onFailure(
new ResourceNotFoundException(
Strings.format(
"Persistent task [{}] is supposed to be running on node [{}], " + "but the task is not found on that node",
persistentTaskId,
clusterService.localNode().getId()
)
)
);
} else {
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true);
runningTask.markAsCompleted();
listener.onResponse(new Response(new TaskResult(true, info)));
}
}

private void runOnNodeWithTaskIfPossible(Task thisTask, Request request, String nodeId, ActionListener<Response> listener) {
DiscoveryNode node = clusterService.state().nodes().get(nodeId);
if (node == null) {
listener.onFailure(
new ResourceNotFoundException(
Strings.format(
"Persistent task [{}] is supposed to be running on node [{}], but that node is not part of the cluster",
request.getPersistentTaskId(),
nodeId
)
)
);
} else {
Request nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId());
transportService.sendRequest(
node,
CancelReindexDataStreamAction.NAME,
nodeRequest,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, Response::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.datastreams.GetReindexDataStreamStatusAction;
import org.elasticsearch.action.datastreams.GetReindexDataStreamStatusAction.Request;
import org.elasticsearch.action.datastreams.GetReindexDataStreamStatusAction.Response;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Strings;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import java.util.Map;
import java.util.Optional;

public class GetReindexDataStreamStatusTransportAction extends HandledTransportAction<Request, Response> {
private final ClusterService clusterService;
private final TransportService transportService;

@Inject
public GetReindexDataStreamStatusTransportAction(
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters
) {
super(GetReindexDataStreamStatusAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.clusterService = clusterService;
this.transportService = transportService;
}

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
String persistentTaskId = request.getPersistentTaskId();
PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
.getMetadata()
.custom(PersistentTasksCustomMetadata.TYPE);
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(persistentTaskId);
if (persistentTask == null) {
listener.onFailure(new ElasticsearchException("No persistent task found with id [{}]", persistentTaskId));
} else if (persistentTask.isAssigned()) {
String nodeId = persistentTask.getExecutorNode();
if (clusterService.localNode().getId().equals(nodeId)) {
getRunningTaskFromNode(persistentTaskId, listener);
} else {
runOnNodeWithTaskIfPossible(task, request, nodeId, listener);
}
} else {
listener.onFailure(new ElasticsearchException("Persistent task with id [{}] is not assigned to a node", persistentTaskId));
}
}

private Task getRunningPersistentTaskFromTaskManager(String persistentTaskId) {
Optional<Map.Entry<Long, CancellableTask>> optionalTask = taskManager.getCancellableTasks()
.entrySet()
.stream()
.filter(entry -> entry.getValue().getType().equals("persistent"))
.filter(
entry -> entry.getValue() instanceof AllocatedPersistentTask
&& persistentTaskId.equals((((AllocatedPersistentTask) entry.getValue()).getPersistentTaskId()))
)
.findAny();
return optionalTask.<Task>map(Map.Entry::getValue).orElse(null);
}

void getRunningTaskFromNode(String persistentTaskId, ActionListener<Response> listener) {
Task runningTask = getRunningPersistentTaskFromTaskManager(persistentTaskId);
if (runningTask == null) {
listener.onFailure(
new ResourceNotFoundException(
Strings.format(
"Persistent task [{}] is supposed to be running on node [{}], " + "but the task is not found on that node",
persistentTaskId,
clusterService.localNode().getId()
)
)
);
} else {
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true);
listener.onResponse(new Response(new TaskResult(false, info)));
}
}

private void runOnNodeWithTaskIfPossible(Task thisTask, Request request, String nodeId, ActionListener<Response> listener) {
DiscoveryNode node = clusterService.state().nodes().get(nodeId);
if (node == null) {
listener.onFailure(
new ResourceNotFoundException(
Strings.format(
"Persistent task [{}] is supposed to be running on node [{}], but that node is not part of the cluster",
request.getPersistentTaskId(),
nodeId
)
)
);
} else {
Request nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId());
transportService.sendRequest(
node,
GetReindexDataStreamStatusAction.NAME,
nodeRequest,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, Response::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
);
}
}
}
Loading

0 comments on commit 55ea673

Please sign in to comment.