From 55ea67322ee333e114601bf163cf40b6867489c9 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 22 Nov 2024 15:41:01 -0600 Subject: [PATCH] Prototype of reindex fully wired up --- .../datastreams/DataStreamsPlugin.java | 27 ++- ...ancelReindexDataStreamTransportAction.java | 130 ++++++++++++++ ...eindexDataStreamStatusTransportAction.java | 128 ++++++++++++++ .../SwapDataStreamIndexTransportAction.java | 145 ++++++++++++++++ .../RestCancelReindexDataStreamAction.java | 41 +++++ .../RestGetReindexDataStreamStatusAction.java | 41 +++++ .../rest/RestReindexDataStreamAction.java | 64 +++++++ ...indexDataStreamPersistentTaskExecutor.java | 60 ++++++- .../task/ReindexDataStreamTask.java | 34 ++-- .../CancelReindexDataStreamAction.java | 133 +++++++++++++++ .../GetReindexDataStreamStatusAction.java | 145 ++++++++++++++++ .../datastreams/ReindexDataStreamAction.java | 16 +- .../ReindexDataStreamIndexAction.java | 14 +- .../SwapDataStreamIndexAction.java | 142 ++++++++++++++++ .../ReindexDataStreamRequestTests.java | 33 ++++ .../core/security/user/InternalUsers.java | 51 ++++++ .../core/security/user/UsernamesField.java | 2 + .../security/authz/AuthorizationUtils.java | 4 + .../xpack/security/authz/RBACEngine.java | 5 +- .../upgrades/DataStreamsUpgradeIT.java | 160 ++++++++++++++++++ 20 files changed, 1350 insertions(+), 25 deletions(-) create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/CancelReindexDataStreamTransportAction.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/GetReindexDataStreamStatusTransportAction.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/SwapDataStreamIndexTransportAction.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestCancelReindexDataStreamAction.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetReindexDataStreamStatusAction.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestReindexDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/datastreams/CancelReindexDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/datastreams/GetReindexDataStreamStatusAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/datastreams/SwapDataStreamIndexAction.java create mode 100644 server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamRequestTests.java diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 2ec21ad43f17c..bc6ddfd3069e7 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -12,15 +12,18 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.datastreams.CancelReindexDataStreamAction; import org.elasticsearch.action.datastreams.CreateDataStreamAction; import org.elasticsearch.action.datastreams.DataStreamsStatsAction; import org.elasticsearch.action.datastreams.DeleteDataStreamAction; import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.action.datastreams.GetReindexDataStreamStatusAction; import org.elasticsearch.action.datastreams.MigrateToDataStreamAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.datastreams.PromoteDataStreamAction; import org.elasticsearch.action.datastreams.ReindexDataStreamAction; import org.elasticsearch.action.datastreams.ReindexDataStreamIndexAction; +import org.elasticsearch.action.datastreams.SwapDataStreamIndexAction; import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction; @@ -39,13 +42,16 @@ import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.datastreams.action.CancelReindexDataStreamTransportAction; import org.elasticsearch.datastreams.action.CreateDataStreamTransportAction; import org.elasticsearch.datastreams.action.DataStreamsStatsTransportAction; import org.elasticsearch.datastreams.action.DeleteDataStreamTransportAction; +import org.elasticsearch.datastreams.action.GetReindexDataStreamStatusTransportAction; import org.elasticsearch.datastreams.action.MigrateToDataStreamTransportAction; import org.elasticsearch.datastreams.action.ModifyDataStreamsTransportAction; import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction; import org.elasticsearch.datastreams.action.ReindexDataStreamTransportAction; +import org.elasticsearch.datastreams.action.SwapDataStreamIndexTransportAction; import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction; import org.elasticsearch.datastreams.action.TransportReindexDataStreamIndexAction; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; @@ -73,13 +79,16 @@ import org.elasticsearch.datastreams.options.rest.RestDeleteDataStreamOptionsAction; import org.elasticsearch.datastreams.options.rest.RestGetDataStreamOptionsAction; import org.elasticsearch.datastreams.options.rest.RestPutDataStreamOptionsAction; +import org.elasticsearch.datastreams.rest.RestCancelReindexDataStreamAction; import org.elasticsearch.datastreams.rest.RestCreateDataStreamAction; import org.elasticsearch.datastreams.rest.RestDataStreamsStatsAction; import org.elasticsearch.datastreams.rest.RestDeleteDataStreamAction; import org.elasticsearch.datastreams.rest.RestGetDataStreamsAction; +import org.elasticsearch.datastreams.rest.RestGetReindexDataStreamStatusAction; import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction; import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction; import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction; +import org.elasticsearch.datastreams.rest.RestReindexDataStreamAction; import org.elasticsearch.datastreams.task.ReindexDataStreamPersistentTaskExecutor; import org.elasticsearch.datastreams.task.ReindexDataStreamPersistentTaskState; import org.elasticsearch.datastreams.task.ReindexDataStreamStatus; @@ -111,6 +120,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; +import static org.elasticsearch.action.datastreams.ReindexDataStreamAction.REINDEX_DATA_STREAM_ORIGIN; import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN; public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin, PersistentTaskPlugin { @@ -264,8 +274,11 @@ public Collection createComponents(PluginServices services) { actions.add(new ActionHandler<>(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class)); actions.add(new ActionHandler<>(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class)); } - actions.add(new ActionHandler<>(ReindexDataStreamIndexAction.INSTANCE, TransportReindexDataStreamIndexAction.class)); actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class)); + actions.add(new ActionHandler<>(GetReindexDataStreamStatusAction.INSTANCE, GetReindexDataStreamStatusTransportAction.class)); + actions.add(new ActionHandler<>(CancelReindexDataStreamAction.INSTANCE, CancelReindexDataStreamTransportAction.class)); + actions.add(new ActionHandler<>(ReindexDataStreamIndexAction.INSTANCE, TransportReindexDataStreamIndexAction.class)); + actions.add(new ActionHandler<>(SwapDataStreamIndexAction.INSTANCE, SwapDataStreamIndexTransportAction.class)); return actions; } @@ -303,6 +316,9 @@ public List getRestHandlers( handlers.add(new RestPutDataStreamOptionsAction()); handlers.add(new RestDeleteDataStreamOptionsAction()); } + handlers.add(new RestReindexDataStreamAction()); + handlers.add(new RestGetReindexDataStreamStatusAction()); + handlers.add(new RestCancelReindexDataStreamAction()); return handlers; } @@ -366,6 +382,13 @@ public List> getPersistentTasksExecutor( SettingsModule settingsModule, IndexNameExpressionResolver expressionResolver ) { - return List.of(new ReindexDataStreamPersistentTaskExecutor(client, clusterService, ReindexDataStreamTask.TASK_NAME, threadPool)); + return List.of( + new ReindexDataStreamPersistentTaskExecutor( + new OriginSettingClient(client, REINDEX_DATA_STREAM_ORIGIN), + clusterService, + ReindexDataStreamTask.TASK_NAME, + threadPool + ) + ); } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/CancelReindexDataStreamTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/CancelReindexDataStreamTransportAction.java new file mode 100644 index 0000000000000..8ce71f1318575 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/CancelReindexDataStreamTransportAction.java @@ -0,0 +1,130 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.datastreams.CancelReindexDataStreamAction; +import org.elasticsearch.action.datastreams.CancelReindexDataStreamAction.Request; +import org.elasticsearch.action.datastreams.CancelReindexDataStreamAction.Response; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Strings; +import org.elasticsearch.datastreams.task.ReindexDataStreamTask; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; + +import java.util.Map; +import java.util.Optional; + +public class CancelReindexDataStreamTransportAction extends HandledTransportAction { + private final ClusterService clusterService; + private final TransportService transportService; + + @Inject + public CancelReindexDataStreamTransportAction( + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters + ) { + super(CancelReindexDataStreamAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); + this.clusterService = clusterService; + this.transportService = transportService; + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + String persistentTaskId = request.getPersistentTaskId(); + PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state() + .getMetadata() + .custom(PersistentTasksCustomMetadata.TYPE); + PersistentTasksCustomMetadata.PersistentTask persistentTask = persistentTasksCustomMetadata.getTask(persistentTaskId); + if (persistentTask == null) { + listener.onFailure(new ElasticsearchException("No persistent task found with id [{}]", persistentTaskId)); + } else if (persistentTask.isAssigned()) { + String nodeId = persistentTask.getExecutorNode(); + if (clusterService.localNode().getId().equals(nodeId)) { + getRunningTaskFromNode(persistentTaskId, listener); + } else { + runOnNodeWithTaskIfPossible(task, request, nodeId, listener); + } + } else { + listener.onFailure(new ElasticsearchException("Persistent task with id [{}] is not assigned to a node", persistentTaskId)); + } + } + + private ReindexDataStreamTask getRunningPersistentTaskFromTaskManager(String persistentTaskId) { + Optional> optionalTask = taskManager.getCancellableTasks() + .entrySet() + .stream() + .filter(entry -> entry.getValue().getType().equals("persistent")) + .filter( + entry -> entry.getValue() instanceof ReindexDataStreamTask + && persistentTaskId.equals((((AllocatedPersistentTask) entry.getValue()).getPersistentTaskId())) + ) + .findAny(); + return optionalTask.map(entry -> (ReindexDataStreamTask) entry.getValue()).orElse(null); + } + + void getRunningTaskFromNode(String persistentTaskId, ActionListener listener) { + ReindexDataStreamTask runningTask = getRunningPersistentTaskFromTaskManager(persistentTaskId); + if (runningTask == null) { + listener.onFailure( + new ResourceNotFoundException( + Strings.format( + "Persistent task [{}] is supposed to be running on node [{}], " + "but the task is not found on that node", + persistentTaskId, + clusterService.localNode().getId() + ) + ) + ); + } else { + TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true); + runningTask.markAsCompleted(); + listener.onResponse(new Response(new TaskResult(true, info))); + } + } + + private void runOnNodeWithTaskIfPossible(Task thisTask, Request request, String nodeId, ActionListener listener) { + DiscoveryNode node = clusterService.state().nodes().get(nodeId); + if (node == null) { + listener.onFailure( + new ResourceNotFoundException( + Strings.format( + "Persistent task [{}] is supposed to be running on node [{}], but that node is not part of the cluster", + request.getPersistentTaskId(), + nodeId + ) + ) + ); + } else { + Request nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId()); + transportService.sendRequest( + node, + CancelReindexDataStreamAction.NAME, + nodeRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, Response::new, EsExecutors.DIRECT_EXECUTOR_SERVICE) + ); + } + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/GetReindexDataStreamStatusTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/GetReindexDataStreamStatusTransportAction.java new file mode 100644 index 0000000000000..a150cbf9b64dc --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/GetReindexDataStreamStatusTransportAction.java @@ -0,0 +1,128 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.datastreams.GetReindexDataStreamStatusAction; +import org.elasticsearch.action.datastreams.GetReindexDataStreamStatusAction.Request; +import org.elasticsearch.action.datastreams.GetReindexDataStreamStatusAction.Response; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Strings; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; + +import java.util.Map; +import java.util.Optional; + +public class GetReindexDataStreamStatusTransportAction extends HandledTransportAction { + private final ClusterService clusterService; + private final TransportService transportService; + + @Inject + public GetReindexDataStreamStatusTransportAction( + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters + ) { + super(GetReindexDataStreamStatusAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); + this.clusterService = clusterService; + this.transportService = transportService; + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + String persistentTaskId = request.getPersistentTaskId(); + PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state() + .getMetadata() + .custom(PersistentTasksCustomMetadata.TYPE); + PersistentTasksCustomMetadata.PersistentTask persistentTask = persistentTasksCustomMetadata.getTask(persistentTaskId); + if (persistentTask == null) { + listener.onFailure(new ElasticsearchException("No persistent task found with id [{}]", persistentTaskId)); + } else if (persistentTask.isAssigned()) { + String nodeId = persistentTask.getExecutorNode(); + if (clusterService.localNode().getId().equals(nodeId)) { + getRunningTaskFromNode(persistentTaskId, listener); + } else { + runOnNodeWithTaskIfPossible(task, request, nodeId, listener); + } + } else { + listener.onFailure(new ElasticsearchException("Persistent task with id [{}] is not assigned to a node", persistentTaskId)); + } + } + + private Task getRunningPersistentTaskFromTaskManager(String persistentTaskId) { + Optional> optionalTask = taskManager.getCancellableTasks() + .entrySet() + .stream() + .filter(entry -> entry.getValue().getType().equals("persistent")) + .filter( + entry -> entry.getValue() instanceof AllocatedPersistentTask + && persistentTaskId.equals((((AllocatedPersistentTask) entry.getValue()).getPersistentTaskId())) + ) + .findAny(); + return optionalTask.map(Map.Entry::getValue).orElse(null); + } + + void getRunningTaskFromNode(String persistentTaskId, ActionListener listener) { + Task runningTask = getRunningPersistentTaskFromTaskManager(persistentTaskId); + if (runningTask == null) { + listener.onFailure( + new ResourceNotFoundException( + Strings.format( + "Persistent task [{}] is supposed to be running on node [{}], " + "but the task is not found on that node", + persistentTaskId, + clusterService.localNode().getId() + ) + ) + ); + } else { + TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true); + listener.onResponse(new Response(new TaskResult(false, info))); + } + } + + private void runOnNodeWithTaskIfPossible(Task thisTask, Request request, String nodeId, ActionListener listener) { + DiscoveryNode node = clusterService.state().nodes().get(nodeId); + if (node == null) { + listener.onFailure( + new ResourceNotFoundException( + Strings.format( + "Persistent task [{}] is supposed to be running on node [{}], but that node is not part of the cluster", + request.getPersistentTaskId(), + nodeId + ) + ) + ); + } else { + Request nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId()); + transportService.sendRequest( + node, + GetReindexDataStreamStatusAction.NAME, + nodeRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, Response::new, EsExecutors.DIRECT_EXECUTOR_SERVICE) + ); + } + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/SwapDataStreamIndexTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/SwapDataStreamIndexTransportAction.java new file mode 100644 index 0000000000000..0b27f81a2b0ac --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/SwapDataStreamIndexTransportAction.java @@ -0,0 +1,145 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.datastreams.SwapDataStreamIndexAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.SimpleBatchedExecutor; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.Index; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.List; + +public class SwapDataStreamIndexTransportAction extends TransportMasterNodeAction< + SwapDataStreamIndexAction.Request, + SwapDataStreamIndexAction.Response> { + private final MasterServiceTaskQueue reindexDataStreamClusterStateUpdateTaskQueue; + + private static final SimpleBatchedExecutor REINDEX_DATA_STREAM_STATE_UPDATE_TASK_EXECUTOR = + new SimpleBatchedExecutor<>() { + @Override + public Tuple executeTask(UpdateDataStreamTask task, ClusterState clusterState) throws Exception { + return Tuple.tuple(task.execute(clusterState), null); + } + + @Override + public void taskSucceeded(UpdateDataStreamTask task, Void unused) { + task.listener.onResponse(null); + } + }; + + @Inject + public SwapDataStreamIndexTransportAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + SwapDataStreamIndexAction.NAME, + true, + transportService, + clusterService, + threadPool, + actionFilters, + SwapDataStreamIndexAction.Request::new, + indexNameExpressionResolver, + SwapDataStreamIndexAction.Response::new, + transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) + ); + this.reindexDataStreamClusterStateUpdateTaskQueue = clusterService.createTaskQueue( + "reindex-data-stream-state-update", + Priority.LOW, + REINDEX_DATA_STREAM_STATE_UPDATE_TASK_EXECUTOR + ); + } + + @Override + protected void masterOperation( + Task task, + SwapDataStreamIndexAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + reindexDataStreamClusterStateUpdateTaskQueue.submitTask( + Strings.format( + "Swapping indices in [%s]. Adding [%s] and removing[%s]", + request.getDataStream(), + request.getNewIndex(), + request.getOldIndex() + ), + new UpdateDataStreamTask( + ActionListener.wrap(unused -> listener.onResponse(new SwapDataStreamIndexAction.Response("")), listener::onFailure), + request.getDataStream(), + request.getOldIndex(), + request.getNewIndex() + ), + null + ); + } + + @Override + protected ClusterBlockException checkBlock(SwapDataStreamIndexAction.Request request, ClusterState state) { + return null; + } + + static class UpdateDataStreamTask implements ClusterStateTaskListener { + private final ActionListener listener; + private final String dataStream; + private final String oldIndex; + private final String newIndex; + + UpdateDataStreamTask(ActionListener listener, String dataStream, String oldIndex, String newIndex) { + this.listener = listener; + this.dataStream = dataStream; + this.oldIndex = oldIndex; + this.newIndex = newIndex; + } + + ClusterState execute(ClusterState currentState) throws Exception { + Metadata currentMetadata = currentState.metadata(); + DataStream oldDataStream = currentMetadata.dataStreams().get(dataStream); + List indicesWithoutOldIndex = oldDataStream.getIndices() + .stream() + .filter(index -> index.getName().equals(oldIndex) == false) + .toList(); + List newIndices = new ArrayList<>(indicesWithoutOldIndex); + newIndices.add(currentMetadata.index(newIndex).getIndex()); + DataStream newDataStream = oldDataStream.copy() + .setBackingIndices(DataStream.DataStreamIndices.backingIndicesBuilder(newIndices).build()) + .build(); + Metadata metadata = Metadata.builder(currentMetadata).put(newDataStream).build(); + return ClusterState.builder(currentState).metadata(metadata).build(); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestCancelReindexDataStreamAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestCancelReindexDataStreamAction.java new file mode 100644 index 0000000000000..2fa45ec2aafbc --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestCancelReindexDataStreamAction.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.rest; + +import org.elasticsearch.action.datastreams.CancelReindexDataStreamAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestCancelReindexDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "cancel_reindex_data_stream_action"; + } + + @Override + public List routes() { + return List.of(new Route(POST, "/_reindex_data_stream/{persistent_task_id}/_cancel")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String persistentTaskId = request.param("persistent_task_id"); + CancelReindexDataStreamAction.Request cancelTaskRequest = new CancelReindexDataStreamAction.Request(persistentTaskId); + return channel -> client.execute(CancelReindexDataStreamAction.INSTANCE, cancelTaskRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetReindexDataStreamStatusAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetReindexDataStreamStatusAction.java new file mode 100644 index 0000000000000..177ae45e318d8 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetReindexDataStreamStatusAction.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.rest; + +import org.elasticsearch.action.datastreams.GetReindexDataStreamStatusAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class RestGetReindexDataStreamStatusAction extends BaseRestHandler { + + @Override + public String getName() { + return "get_reindex_data_stream_status_action"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/_reindex_data_stream_status/{persistent_task_id}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String persistentTaskId = request.param("persistent_task_id"); + GetReindexDataStreamStatusAction.Request getTaskRequest = new GetReindexDataStreamStatusAction.Request(persistentTaskId); + return channel -> client.execute(GetReindexDataStreamStatusAction.INSTANCE, getTaskRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestReindexDataStreamAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestReindexDataStreamAction.java new file mode 100644 index 0000000000000..c98e0b71b8efa --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestReindexDataStreamAction.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.rest; + +import org.elasticsearch.action.datastreams.ReindexDataStreamAction; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestBuilderListener; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestReindexDataStreamAction extends BaseRestHandler { + @Override + public String getName() { + return "reindex_data_stream_action"; + } + + @Override + public List routes() { + return List.of(new Route(POST, "/_reindex_data_stream")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + ReindexDataStreamAction.ReindexDataStreamRequest reindexRequest = new ReindexDataStreamAction.ReindexDataStreamRequest( + request.param("source") + ); + return channel -> client.execute( + ReindexDataStreamAction.INSTANCE, + reindexRequest, + new ReindexDataStreamRestToXContentListener(channel) + ); + } + + static class ReindexDataStreamRestToXContentListener extends RestBuilderListener { + + ReindexDataStreamRestToXContentListener(RestChannel channel) { + super(channel); + } + + @Override + public RestResponse buildResponse(ReindexDataStreamResponse response, XContentBuilder builder) throws Exception { + assert response.isFragment() == false; + response.toXContent(builder, channel.request()); + return new RestResponse(RestStatus.OK, builder); + } + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java index f10d2e7b356fb..743a04a9ef3d9 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java @@ -12,6 +12,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.action.datastreams.ReindexDataStreamIndexAction; +import org.elasticsearch.action.datastreams.SwapDataStreamIndexAction; +import org.elasticsearch.action.support.CountDownActionListener; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.core.TimeValue; @@ -74,27 +77,68 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask if (dataStreamInfos.size() == 1) { List indices = dataStreamInfos.getFirst().getDataStream().getIndices(); List indicesToBeReindexed = indices.stream() - .filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion()) + // .filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion()) .toList(); - reindexDataStreamTask.setPendingIndices(indicesToBeReindexed.stream().map(Index::getName).toList()); + reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size()); + CountDownActionListener listener = new CountDownActionListener( + indicesToBeReindexed.size() + 1, + ActionListener.wrap(response1 -> { + completeSuccessfulPersistentTask(reindexDataStreamTask); + }, exception -> { completeFailedPersistentTask(reindexDataStreamTask, exception); }) + ); + // TODO: put all these on a queue, only process N from queue at a time for (Index index : indicesToBeReindexed) { - // TODO This is just a placeholder. This is where the real data stream reindex logic will go - } + reindexDataStreamTask.incrementInProgressIndicesCount(); + client.execute( + ReindexDataStreamIndexAction.INSTANCE, + new ReindexDataStreamIndexAction.Request(index.getName()), + ActionListener.wrap(response1 -> { + updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> { + reindexDataStreamTask.reindexSucceeded(); + listener.onResponse(null); + }, exception -> { + reindexDataStreamTask.reindexFailed(index.getName(), exception); + listener.onResponse(null); + })); - completeSuccessfulPersistentTask(reindexDataStreamTask); + }, exception -> { + reindexDataStreamTask.reindexFailed(index.getName(), exception); + listener.onResponse(null); + }) + ); + } + listener.onResponse(null); } else { completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist")); } - }, reindexDataStreamTask::markAsFailed)); + }, exception -> completeFailedPersistentTask(reindexDataStreamTask, exception))); + } + + private void updateDataStream(String dataStream, String oldIndex, String newIndex, ActionListener listener) { + client.execute( + SwapDataStreamIndexAction.INSTANCE, + new SwapDataStreamIndexAction.Request(dataStream, oldIndex, newIndex), + new ActionListener<>() { + @Override + public void onResponse(SwapDataStreamIndexAction.Response response) { + listener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + ); } private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) { - persistentTask.reindexSucceeded(); + persistentTask.allReindexesCompleted(); threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic()); } private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) { - persistentTask.reindexFailed(e); + persistentTask.taskFailed(e); threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic()); } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java index 2ae244679659f..b26d999def888 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; public class ReindexDataStreamTask extends AllocatedPersistentTask { public static final String TASK_NAME = "reindex-data-stream"; @@ -26,8 +27,8 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask { private final ThreadPool threadPool; private boolean complete = false; private Exception exception; - private List inProgress = new ArrayList<>(); - private List pending = List.of(); + private AtomicInteger inProgress = new AtomicInteger(0); + private AtomicInteger pending = new AtomicInteger(); private List> errors = new ArrayList<>(); public ReindexDataStreamTask( @@ -57,30 +58,39 @@ public ReindexDataStreamStatus getStatus() { totalIndicesToBeUpgraded, complete, exception, - inProgress.size(), - pending.size(), + inProgress.get(), + pending.get(), errors ); } - public void reindexSucceeded() { + public void allReindexesCompleted() { this.complete = true; } - public void reindexFailed(Exception e) { + public void taskFailed(Exception e) { this.complete = true; this.exception = e; } - public void setInProgressIndices(List inProgressIndices) { - this.inProgress = inProgressIndices; + public void reindexSucceeded() { + inProgress.decrementAndGet(); + } + + public void reindexFailed(String index, Exception error) { + System.out.println("****************** In ReindexDataStreamTask *************************** "); + error.printStackTrace(System.out); + new RuntimeException("****************** In ReindexDataStreamTask ***************************").printStackTrace(System.out); + this.errors.add(Tuple.tuple(index, error)); + inProgress.decrementAndGet(); } - public void setPendingIndices(List pendingIndices) { - this.pending = pendingIndices; + public void incrementInProgressIndicesCount() { + inProgress.incrementAndGet(); + pending.decrementAndGet(); } - public void addErrorIndex(String index, Exception error) { - this.errors.add(Tuple.tuple(index, error)); + public void setPendingIndicesCount(int size) { + pending.set(size); } } diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/CancelReindexDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/CancelReindexDataStreamAction.java new file mode 100644 index 0000000000000..0da6fa0b451bf --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/CancelReindexDataStreamAction.java @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class CancelReindexDataStreamAction extends ActionType { + + public static final CancelReindexDataStreamAction INSTANCE = new CancelReindexDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/reindex_cancel"; + + public CancelReindexDataStreamAction() { + super(NAME); + } + + public static class Response extends ActionResponse implements ToXContentObject { + private final TaskResult task; + + public Response(TaskResult task) { + this.task = requireNonNull(task, "task is required"); + } + + public Response(StreamInput in) throws IOException { + super(in); + task = in.readOptionalWriteable(TaskResult::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(task); + } + + /** + * Get the actual result of the fetch. + */ + public TaskResult getTask() { + return task; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + task.getTask().status().toXContent(builder, params); + return builder; + } + + @Override + public int hashCode() { + return Objects.hashCode(task); + } + + @Override + public boolean equals(Object other) { + return other instanceof Response && task.equals(((Response) other).task); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + } + + public static class Request extends ActionRequest { + private final String persistentTaskId; + + public Request(String persistentTaskId) { + super(); + this.persistentTaskId = persistentTaskId; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.persistentTaskId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(persistentTaskId); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean getShouldStoreResult() { + return true; + } + + public String getPersistentTaskId() { + return persistentTaskId; + } + + @Override + public int hashCode() { + return Objects.hashCode(persistentTaskId); + } + + @Override + public boolean equals(Object other) { + return other instanceof Request && persistentTaskId.equals(((Request) other).persistentTaskId); + } + + public Request nodeRequest(String thisNodeId, long thisTaskId) { + Request copy = new Request(persistentTaskId); + copy.setParentTask(thisNodeId, thisTaskId); + return copy; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetReindexDataStreamStatusAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetReindexDataStreamStatusAction.java new file mode 100644 index 0000000000000..39af843d4f76b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetReindexDataStreamStatusAction.java @@ -0,0 +1,145 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class GetReindexDataStreamStatusAction extends ActionType { + + public static final GetReindexDataStreamStatusAction INSTANCE = new GetReindexDataStreamStatusAction(); + public static final String NAME = "indices:admin/data_stream/reindex_status"; + + public GetReindexDataStreamStatusAction() { + super(NAME); + } + + public static class Response extends ActionResponse implements ToXContentObject { + private final TaskResult task; + + public Response(TaskResult task) { + this.task = requireNonNull(task, "task is required"); + } + + public Response(StreamInput in) throws IOException { + super(in); + task = in.readOptionalWriteable(TaskResult::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(task); + } + + /** + * Get the actual result of the fetch. + */ + public TaskResult getTask() { + return task; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + task.getTask().status().toXContent(builder, params); + return builder; + } + + @Override + public int hashCode() { + return Objects.hashCode(task); + } + + @Override + public boolean equals(Object other) { + return other instanceof Response && task.equals(((Response) other).task); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + } + + public static class Request extends ActionRequest implements IndicesRequest { + private final String persistentTaskId; + + public Request(String persistentTaskId) { + super(); + this.persistentTaskId = persistentTaskId; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.persistentTaskId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(persistentTaskId); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean getShouldStoreResult() { + return true; // do not wait_for_completion + } + + public String getPersistentTaskId() { + return persistentTaskId; + } + + @Override + public int hashCode() { + return Objects.hashCode(persistentTaskId); + } + + @Override + public boolean equals(Object other) { + return other instanceof Request && persistentTaskId.equals(((Request) other).persistentTaskId); + } + + public Request nodeRequest(String thisNodeId, long thisTaskId) { + Request copy = new Request(persistentTaskId); + copy.setParentTask(thisNodeId, thisTaskId); + return copy; + } + + @Override + public String[] indices() { + return new String[] { persistentTaskId.substring("reindex-data-stream-".length()) }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java index 814c512c43bec..124c579dbb068 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java @@ -13,6 +13,8 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xcontent.ToXContentObject; @@ -25,6 +27,7 @@ public class ReindexDataStreamAction extends ActionType { + + public static final String NAME = "indices:admin/data_stream/index/swap"; + + public static final ActionType INSTANCE = new SwapDataStreamIndexAction(); + + private SwapDataStreamIndexAction() { + super(NAME); + } + + public static class Request extends AcknowledgedRequest implements IndicesRequest { + + private final String dataStream; + private final String oldIndex; + private final String newIndex; + + public Request(String dataStream, String oldIndex, String newIndex) { + super(TimeValue.MAX_VALUE, TimeValue.MAX_VALUE); + this.dataStream = dataStream; + this.oldIndex = oldIndex; + this.newIndex = newIndex; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.dataStream = in.readString(); + this.oldIndex = in.readString(); + this.newIndex = in.readString(); + } + + public static Request readFrom(StreamInput in) throws IOException { + return new Request(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(dataStream); + out.writeString(oldIndex); + out.writeString(newIndex); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getDataStream() { + return dataStream; + } + + public String getOldIndex() { + return oldIndex; + } + + public String getNewIndex() { + return newIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(dataStream, request.dataStream); + } + + @Override + public int hashCode() { + return Objects.hash(dataStream); + } + + @Override + public String[] indices() { + return new String[] { dataStream }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + } + + public static class Response extends ActionResponse { + private final String destIndex; + + public Response(String destIndex) { + this.destIndex = destIndex; + } + + public Response(StreamInput in) throws IOException { + super(in); + this.destIndex = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(destIndex); + } + + public String getDestIndex() { + return destIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return Objects.equals(destIndex, response.destIndex); + } + + @Override + public int hashCode() { + return Objects.hash(destIndex); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamRequestTests.java new file mode 100644 index 0000000000000..5198117ed107b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamRequestTests.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamRequest; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; + +public class ReindexDataStreamRequestTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return ReindexDataStreamRequest::new; + } + + @Override + protected ReindexDataStreamRequest createTestInstance() { + return new ReindexDataStreamRequest(randomAlphaOfLength(50)); + } + + @Override + protected ReindexDataStreamRequest mutateInstance(ReindexDataStreamRequest instance) throws IOException { + return createTestInstance(); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java index 23431e184422a..c10dac7291734 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java @@ -180,6 +180,56 @@ public class InternalUsers { ) ); + public static final InternalUser REINDEX_DATA_STREAM_USER = new InternalUser( + UsernamesField.REINDEX_DATA_STREAM_NAME, + new RoleDescriptor( + UsernamesField.REINDEX_DATA_STREAM_ROLE, + new String[] {}, + new RoleDescriptor.IndicesPrivileges[] { + RoleDescriptor.IndicesPrivileges.builder() + .indices("*") + .privileges( + "delete_index", + "view_index_metadata", + "manage", + "all", + RolloverAction.NAME, + ForceMergeAction.NAME + "*", + // indices stats is used by rollover, so we need to grant it here + IndicesStatsAction.NAME + "*", + TransportUpdateSettingsAction.TYPE.name(), + DownsampleAction.NAME, + TransportAddIndexBlockAction.TYPE.name() + ) + .allowRestrictedIndices(false) + .build(), + RoleDescriptor.IndicesPrivileges.builder() + .indices( + // System data stream for result history of fleet actions (see Fleet#fleetActionsResultsDescriptor) + ".fleet-actions-results", + // System data streams for storing uploaded file data for Agent diagnostics and Endpoint response actions + ".fleet-fileds*" + ) + .privileges( + "delete_index", + RolloverAction.NAME, + ForceMergeAction.NAME + "*", + // indices stats is used by rollover, so we need to grant it here + IndicesStatsAction.NAME + "*", + TransportUpdateSettingsAction.TYPE.name(), + DownsampleAction.NAME, + TransportAddIndexBlockAction.TYPE.name() + ) + .allowRestrictedIndices(true) + .build() }, + null, + null, + new String[] {}, + MetadataUtils.DEFAULT_RESERVED_METADATA, + Map.of() + ) + ); + /** * Internal user that can rollover an index/data stream. */ @@ -234,6 +284,7 @@ public class InternalUsers { ASYNC_SEARCH_USER, STORAGE_USER, DATA_STREAM_LIFECYCLE_USER, + REINDEX_DATA_STREAM_USER, SYNONYMS_USER, LAZY_ROLLOVER_USER ).collect(Collectors.toUnmodifiableMap(InternalUser::principal, Function.identity())); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/UsernamesField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/UsernamesField.java index 22e3c2df22ec3..69e9b518f0c79 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/UsernamesField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/UsernamesField.java @@ -18,6 +18,8 @@ public final class UsernamesField { public static final String XPACK_SECURITY_ROLE = "_xpack_security"; public static final String DATA_STREAM_LIFECYCLE_NAME = "_data_stream_lifecycle"; public static final String DATA_STREAM_LIFECYCLE_ROLE = "_data_stream_lifecycle"; + public static final String REINDEX_DATA_STREAM_NAME = "_reindex_data_stream"; + public static final String REINDEX_DATA_STREAM_ROLE = "_reindex_data_stream"; public static final String SECURITY_PROFILE_NAME = "_security_profile"; public static final String SECURITY_PROFILE_ROLE = "_security_profile"; public static final String XPACK_NAME = "_xpack"; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java index 4173f3db45409..89a1509e1c80b 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java @@ -21,6 +21,7 @@ import static org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction.TASKS_ORIGIN; import static org.elasticsearch.action.bulk.TransportBulkAction.LAZY_ROLLOVER_ORIGIN; +import static org.elasticsearch.action.datastreams.ReindexDataStreamAction.REINDEX_DATA_STREAM_ORIGIN; import static org.elasticsearch.action.support.replication.PostWriteRefresh.POST_WRITE_REFRESH_ORIGIN; import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN; import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN; @@ -136,6 +137,9 @@ public static void switchUserBasedOnActionOriginAndExecute( case DATA_STREAM_LIFECYCLE_ORIGIN: securityContext.executeAsInternalUser(InternalUsers.DATA_STREAM_LIFECYCLE_USER, version, consumer); break; + case REINDEX_DATA_STREAM_ORIGIN: + securityContext.executeAsInternalUser(InternalUsers.REINDEX_DATA_STREAM_USER, version, consumer); + break; case LAZY_ROLLOVER_ORIGIN: securityContext.executeAsInternalUser(InternalUsers.LAZY_ROLLOVER_USER, version, consumer); break; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index fa6187798da25..adcfaf0a0799d 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -372,7 +372,10 @@ public void authorizeIndexAction( } else { assert false : "only scroll and async-search related requests are known indices api that don't " - + "support retrieving the indices they relate to"; + + "support retrieving the indices they relate to: " + + action + + ", for request " + + request.getClass(); listener.onFailure( new IllegalStateException( "only scroll and async-search related requests are known indices " diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java index 40ad5bba29baa..f8256ec7a7198 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java @@ -11,18 +11,106 @@ import org.elasticsearch.client.Response; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; import org.elasticsearch.core.Booleans; import org.elasticsearch.core.Strings; +import org.elasticsearch.test.rest.ObjectPath; import org.hamcrest.Matchers; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.List; +import java.util.Map; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; import static org.elasticsearch.upgrades.IndexingIT.assertCount; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase { + static final String TEMPLATE = """ + { + "settings":{ + "index": { + "mode": "time_series" + } + }, + "mappings":{ + "dynamic_templates": [ + { + "labels": { + "path_match": "pod.labels.*", + "mapping": { + "type": "keyword", + "time_series_dimension": true + } + } + } + ], + "properties": { + "@timestamp" : { + "type": "date" + }, + "metricset": { + "type": "keyword", + "time_series_dimension": true + }, + "k8s": { + "properties": { + "pod": { + "properties": { + "uid": { + "type": "keyword", + "time_series_dimension": true + }, + "name": { + "type": "keyword" + }, + "ip": { + "type": "ip" + }, + "network": { + "properties": { + "tx": { + "type": "long" + }, + "rx": { + "type": "long" + } + } + } + } + } + } + } + } + } + } + """; + + private static final String BULK = + """ + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "hamster", "uid":"947e4ced-1786-4e53-9e0c-5c447e959508", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cow", "uid":"947e4ced-1786-4e53-9e0c-5c447e959509", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "rat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959510", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "tiger", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea10", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "lion", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876e11", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "elephant", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876eb4", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}} + """; + public void testDataStreams() throws IOException { if (CLUSTER_TYPE == ClusterType.OLD) { String requestBody = """ @@ -164,4 +252,76 @@ public void testDataStreamValidationDoesNotBreakUpgrade() throws Exception { } } + public void testUpgradeDataStream() throws Exception { + String dataStreamName = "k8s"; + if (CLUSTER_TYPE == ClusterType.OLD) { + final String INDEX_TEMPLATE = """ + { + "index_patterns": ["$PATTERN"], + "template": $TEMPLATE, + "data_stream": { + } + }"""; + // Add composable index template + String templateName = "1"; + var putIndexTemplateRequest = new Request("POST", "/_index_template/" + templateName); + putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$TEMPLATE", TEMPLATE).replace("$PATTERN", dataStreamName)); + assertOK(client().performRequest(putIndexTemplateRequest)); + + performOldClustertOperations(templateName, dataStreamName); + } else if (CLUSTER_TYPE == ClusterType.MIXED) { + // nothing + } else if (CLUSTER_TYPE == ClusterType.UPGRADED) { + performUpgradedClusterOperations(dataStreamName); + } + } + + private void performUpgradedClusterOperations(String dataStreamName) throws IOException { + Request reindexRequest = new Request("POST", "/_reindex_data_stream?source=" + dataStreamName); + Response reindexResponse = client().performRequest(reindexRequest); + assertOK(reindexResponse); + Request statusRequest = new Request("GET", "/_reindex_data_stream_status/reindex-data-stream-" + dataStreamName + "?pretty"); + Response statusResponse = client().performRequest(statusRequest); + assertOK(statusResponse); + } + + private static void performOldClustertOperations(String templateName, String dataStreamName) throws IOException { + var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk"); + bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now()))); + bulkRequest.addParameter("refresh", "true"); + var response = client().performRequest(bulkRequest); + assertOK(response); + var responseBody = entityAsMap(response); + assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false)); + + var dataStreams = getDataStream(dataStreamName); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo(templateName)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1)); + String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name"); + assertThat(firstBackingIndex, backingIndexEqualTo(dataStreamName, 1)); + assertSearch(dataStreamName, 8); + } + + private static Map getDataStream(String dataStreamName) throws IOException { + var getDataStreamsRequest = new Request("GET", "/_data_stream/" + dataStreamName); + var response = client().performRequest(getDataStreamsRequest); + assertOK(response); + return entityAsMap(response); + } + + static String formatInstant(Instant instant) { + return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); + } + + private static void assertSearch(String dataStreamName, int expectedHitCount) throws IOException { + var searchRequest = new Request("GET", dataStreamName + "/_search"); + var response = client().performRequest(searchRequest); + assertOK(response); + var responseBody = entityAsMap(response); + assertThat(ObjectPath.evaluate(responseBody, "hits.total.value"), equalTo(expectedHitCount)); + } + }