diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 2f3b63d27ca35..cb7445705537a 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -19,23 +19,19 @@ import org.elasticsearch.action.datastreams.MigrateToDataStreamAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.datastreams.PromoteDataStreamAction; -import org.elasticsearch.action.datastreams.ReindexDataStreamAction; import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction; -import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; -import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.action.CreateDataStreamTransportAction; @@ -44,7 +40,6 @@ import org.elasticsearch.datastreams.action.MigrateToDataStreamTransportAction; import org.elasticsearch.datastreams.action.ModifyDataStreamsTransportAction; import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction; -import org.elasticsearch.datastreams.action.ReindexDataStreamTransportAction; import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; @@ -78,27 +73,14 @@ import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction; import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction; import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction; -import org.elasticsearch.datastreams.task.ReindexDataStreamPersistentTaskExecutor; -import org.elasticsearch.datastreams.task.ReindexDataStreamPersistentTaskState; -import org.elasticsearch.datastreams.task.ReindexDataStreamStatus; -import org.elasticsearch.datastreams.task.ReindexDataStreamTask; -import org.elasticsearch.datastreams.task.ReindexDataStreamTaskParams; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.health.HealthIndicatorService; import org.elasticsearch.index.IndexSettingProvider; -import org.elasticsearch.persistent.PersistentTaskParams; -import org.elasticsearch.persistent.PersistentTaskState; -import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.HealthPlugin; -import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xcontent.NamedXContentRegistry; -import org.elasticsearch.xcontent.ParseField; import java.io.IOException; import java.time.Clock; @@ -111,7 +93,7 @@ import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN; -public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin, PersistentTaskPlugin { +public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin { public static final Setting TIME_SERIES_POLL_INTERVAL = Setting.timeSetting( "time_series.poll_interval", @@ -262,7 +244,6 @@ public Collection createComponents(PluginServices services) { actions.add(new ActionHandler<>(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class)); actions.add(new ActionHandler<>(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class)); } - actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class)); return actions; } @@ -321,48 +302,4 @@ public void close() throws IOException { public Collection getHealthIndicatorServices() { return List.of(dataStreamLifecycleHealthIndicatorService.get()); } - - @Override - public List getNamedXContent() { - return List.of( - new NamedXContentRegistry.Entry( - PersistentTaskState.class, - new ParseField(ReindexDataStreamPersistentTaskState.NAME), - ReindexDataStreamPersistentTaskState::fromXContent - ), - new NamedXContentRegistry.Entry( - PersistentTaskParams.class, - new ParseField(ReindexDataStreamTaskParams.NAME), - ReindexDataStreamTaskParams::fromXContent - ) - ); - } - - @Override - public List getNamedWriteables() { - return List.of( - new NamedWriteableRegistry.Entry( - PersistentTaskState.class, - ReindexDataStreamPersistentTaskState.NAME, - ReindexDataStreamPersistentTaskState::new - ), - new NamedWriteableRegistry.Entry( - PersistentTaskParams.class, - ReindexDataStreamTaskParams.NAME, - ReindexDataStreamTaskParams::new - ), - new NamedWriteableRegistry.Entry(Task.Status.class, ReindexDataStreamStatus.NAME, ReindexDataStreamStatus::new) - ); - } - - @Override - public List> getPersistentTasksExecutor( - ClusterService clusterService, - ThreadPool threadPool, - Client client, - SettingsModule settingsModule, - IndexNameExpressionResolver expressionResolver - ) { - return List.of(new ReindexDataStreamPersistentTaskExecutor(client, clusterService, ReindexDataStreamTask.TASK_NAME, threadPool)); - } } diff --git a/x-pack/plugin/migrate/build.gradle b/x-pack/plugin/migrate/build.gradle new file mode 100644 index 0000000000000..56305ff93a1c3 --- /dev/null +++ b/x-pack/plugin/migrate/build.gradle @@ -0,0 +1,25 @@ +apply plugin: 'elasticsearch.internal-es-plugin' +apply plugin: 'elasticsearch.internal-cluster-test' + +esplugin { + name 'x-pack-migrate' + description 'Elasticsearch Expanded Pack Plugin - Index and Data Stream Migration' + classname 'org.elasticsearch.xpack.migrate.MigratePlugin' + extendedPlugins = ['x-pack-core'] + hasNativeController false + requiresKeystore true +} +base { + archivesName = 'x-pack-migrate' +} + +dependencies { + compileOnly project(path: xpackModule('core')) + testImplementation(testArtifact(project(xpackModule('core')))) + testImplementation project(xpackModule('ccr')) + testImplementation project(':modules:data-streams') + testImplementation project(path: ':modules:reindex') +} + +addQaCheckDependencies(project) + diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java similarity index 89% rename from modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportActionIT.java rename to x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java index fdc96892d4b27..3b68fc9995b57 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java @@ -1,13 +1,11 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ -package org.elasticsearch.datastreams.action; +package org.elasticsearch.xpack.migrate.action; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; @@ -17,21 +15,21 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.action.datastreams.CreateDataStreamAction; -import org.elasticsearch.action.datastreams.ReindexDataStreamAction; -import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamRequest; -import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.datastreams.DataStreamsPlugin; -import org.elasticsearch.datastreams.task.ReindexDataStreamTask; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.migrate.MigratePlugin; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamRequest; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamResponse; +import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask; import java.util.Collection; import java.util.List; @@ -48,7 +46,7 @@ public class ReindexDataStreamTransportActionIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return List.of(DataStreamsPlugin.class); + return List.of(DataStreamsPlugin.class, MigratePlugin.class); } public void testNonExistentDataStream() { diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java new file mode 100644 index 0000000000000..4ba686a661a8e --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.PersistentTaskPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction; +import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamTransportAction; +import org.elasticsearch.xpack.migrate.action.GetReindexDataStreamStatusAction; +import org.elasticsearch.xpack.migrate.action.GetReindexDataStreamStatusTransportAction; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexAction; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamTransportAction; +import org.elasticsearch.xpack.migrate.action.SwapDataStreamIndexAction; +import org.elasticsearch.xpack.migrate.action.SwapDataStreamIndexTransportAction; +import org.elasticsearch.xpack.migrate.action.TransportReindexDataStreamIndexAction; +import org.elasticsearch.xpack.migrate.rest.RestCancelReindexDataStreamAction; +import org.elasticsearch.xpack.migrate.rest.RestGetReindexDataStreamStatusAction; +import org.elasticsearch.xpack.migrate.rest.RestReindexDataStreamAction; +import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor; +import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskState; +import org.elasticsearch.xpack.migrate.task.ReindexDataStreamStatus; +import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask; +import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; +import java.util.function.Supplier; + +public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin { + + @Override + public List getRestHandlers( + Settings unused, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster, + Predicate clusterSupportsFeature + ) { + List handlers = new ArrayList<>(); + handlers.add(new RestReindexDataStreamAction()); + handlers.add(new RestGetReindexDataStreamStatusAction()); + handlers.add(new RestCancelReindexDataStreamAction()); + return handlers; + } + + @Override + public List> getActions() { + List> actions = new ArrayList<>(); + actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class)); + actions.add(new ActionHandler<>(GetReindexDataStreamStatusAction.INSTANCE, GetReindexDataStreamStatusTransportAction.class)); + actions.add(new ActionHandler<>(CancelReindexDataStreamAction.INSTANCE, CancelReindexDataStreamTransportAction.class)); + actions.add(new ActionHandler<>(ReindexDataStreamIndexAction.INSTANCE, TransportReindexDataStreamIndexAction.class)); + actions.add(new ActionHandler<>(SwapDataStreamIndexAction.INSTANCE, SwapDataStreamIndexTransportAction.class)); + return actions; + } + + @Override + public List getNamedXContent() { + return List.of( + new NamedXContentRegistry.Entry( + PersistentTaskState.class, + new ParseField(ReindexDataStreamPersistentTaskState.NAME), + ReindexDataStreamPersistentTaskState::fromXContent + ), + new NamedXContentRegistry.Entry( + PersistentTaskParams.class, + new ParseField(ReindexDataStreamTaskParams.NAME), + ReindexDataStreamTaskParams::fromXContent + ) + ); + } + + @Override + public List getNamedWriteables() { + return List.of( + new NamedWriteableRegistry.Entry( + PersistentTaskState.class, + ReindexDataStreamPersistentTaskState.NAME, + ReindexDataStreamPersistentTaskState::new + ), + new NamedWriteableRegistry.Entry( + PersistentTaskParams.class, + ReindexDataStreamTaskParams.NAME, + ReindexDataStreamTaskParams::new + ), + new NamedWriteableRegistry.Entry(Task.Status.class, ReindexDataStreamStatus.NAME, ReindexDataStreamStatus::new) + ); + } + + @Override + public List> getPersistentTasksExecutor( + ClusterService clusterService, + ThreadPool threadPool, + Client client, + SettingsModule settingsModule, + IndexNameExpressionResolver expressionResolver + ) { + return List.of(new ReindexDataStreamPersistentTaskExecutor(client, clusterService, ReindexDataStreamTask.TASK_NAME, threadPool)); + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamAction.java new file mode 100644 index 0000000000000..6ade5388303f8 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamAction.java @@ -0,0 +1,131 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class CancelReindexDataStreamAction extends ActionType { + + public static final CancelReindexDataStreamAction INSTANCE = new CancelReindexDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/reindex_cancel"; + + public CancelReindexDataStreamAction() { + super(NAME); + } + + public static class Response extends ActionResponse implements ToXContentObject { + private final TaskResult task; + + public Response(TaskResult task) { + this.task = requireNonNull(task, "task is required"); + } + + public Response(StreamInput in) throws IOException { + super(in); + task = in.readOptionalWriteable(TaskResult::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(task); + } + + /** + * Get the actual result of the fetch. + */ + public TaskResult getTask() { + return task; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + task.getTask().status().toXContent(builder, params); + return builder; + } + + @Override + public int hashCode() { + return Objects.hashCode(task); + } + + @Override + public boolean equals(Object other) { + return other instanceof Response && task.equals(((Response) other).task); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + } + + public static class Request extends ActionRequest { + private final String persistentTaskId; + + public Request(String persistentTaskId) { + super(); + this.persistentTaskId = persistentTaskId; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.persistentTaskId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(persistentTaskId); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean getShouldStoreResult() { + return true; + } + + public String getPersistentTaskId() { + return persistentTaskId; + } + + @Override + public int hashCode() { + return Objects.hashCode(persistentTaskId); + } + + @Override + public boolean equals(Object other) { + return other instanceof Request && persistentTaskId.equals(((Request) other).persistentTaskId); + } + + public Request nodeRequest(String thisNodeId, long thisTaskId) { + Request copy = new Request(persistentTaskId); + copy.setParentTask(thisNodeId, thisTaskId); + return copy; + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamTransportAction.java new file mode 100644 index 0000000000000..99eb34c775bfc --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamTransportAction.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Strings; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction.Request; +import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction.Response; +import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask; + +import java.util.Map; +import java.util.Optional; + +public class CancelReindexDataStreamTransportAction extends HandledTransportAction { + private final ClusterService clusterService; + private final TransportService transportService; + + @Inject + public CancelReindexDataStreamTransportAction( + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters + ) { + super(CancelReindexDataStreamAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); + this.clusterService = clusterService; + this.transportService = transportService; + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + String persistentTaskId = request.getPersistentTaskId(); + PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state() + .getMetadata() + .custom(PersistentTasksCustomMetadata.TYPE); + PersistentTasksCustomMetadata.PersistentTask persistentTask = persistentTasksCustomMetadata.getTask(persistentTaskId); + if (persistentTask == null) { + listener.onFailure(new ElasticsearchException("No persistent task found with id [{}]", persistentTaskId)); + } else if (persistentTask.isAssigned()) { + String nodeId = persistentTask.getExecutorNode(); + if (clusterService.localNode().getId().equals(nodeId)) { + getRunningTaskFromNode(persistentTaskId, listener); + } else { + runOnNodeWithTaskIfPossible(task, request, nodeId, listener); + } + } else { + listener.onFailure(new ElasticsearchException("Persistent task with id [{}] is not assigned to a node", persistentTaskId)); + } + } + + private ReindexDataStreamTask getRunningPersistentTaskFromTaskManager(String persistentTaskId) { + Optional> optionalTask = taskManager.getCancellableTasks() + .entrySet() + .stream() + .filter(entry -> entry.getValue().getType().equals("persistent")) + .filter( + entry -> entry.getValue() instanceof ReindexDataStreamTask + && persistentTaskId.equals((((AllocatedPersistentTask) entry.getValue()).getPersistentTaskId())) + ) + .findAny(); + return optionalTask.map(entry -> (ReindexDataStreamTask) entry.getValue()).orElse(null); + } + + void getRunningTaskFromNode(String persistentTaskId, ActionListener listener) { + ReindexDataStreamTask runningTask = getRunningPersistentTaskFromTaskManager(persistentTaskId); + if (runningTask == null) { + listener.onFailure( + new ResourceNotFoundException( + Strings.format( + "Persistent task [{}] is supposed to be running on node [{}], " + "but the task is not found on that node", + persistentTaskId, + clusterService.localNode().getId() + ) + ) + ); + } else { + TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true); + runningTask.markAsCompleted(); + listener.onResponse(new Response(new TaskResult(true, info))); + } + } + + private void runOnNodeWithTaskIfPossible(Task thisTask, Request request, String nodeId, ActionListener listener) { + DiscoveryNode node = clusterService.state().nodes().get(nodeId); + if (node == null) { + listener.onFailure( + new ResourceNotFoundException( + Strings.format( + "Persistent task [{}] is supposed to be running on node [{}], but that node is not part of the cluster", + request.getPersistentTaskId(), + nodeId + ) + ) + ); + } else { + Request nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId()); + transportService.sendRequest( + node, + CancelReindexDataStreamAction.NAME, + nodeRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, Response::new, EsExecutors.DIRECT_EXECUTOR_SERVICE) + ); + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetReindexDataStreamStatusAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetReindexDataStreamStatusAction.java new file mode 100644 index 0000000000000..e9056a8aa2d39 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetReindexDataStreamStatusAction.java @@ -0,0 +1,143 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class GetReindexDataStreamStatusAction extends ActionType { + + public static final GetReindexDataStreamStatusAction INSTANCE = new GetReindexDataStreamStatusAction(); + public static final String NAME = "indices:admin/data_stream/reindex_status"; + + public GetReindexDataStreamStatusAction() { + super(NAME); + } + + public static class Response extends ActionResponse implements ToXContentObject { + private final TaskResult task; + + public Response(TaskResult task) { + this.task = requireNonNull(task, "task is required"); + } + + public Response(StreamInput in) throws IOException { + super(in); + task = in.readOptionalWriteable(TaskResult::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(task); + } + + /** + * Get the actual result of the fetch. + */ + public TaskResult getTask() { + return task; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + task.getTask().status().toXContent(builder, params); + return builder; + } + + @Override + public int hashCode() { + return Objects.hashCode(task); + } + + @Override + public boolean equals(Object other) { + return other instanceof Response && task.equals(((Response) other).task); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + } + + public static class Request extends ActionRequest implements IndicesRequest { + private final String persistentTaskId; + + public Request(String persistentTaskId) { + super(); + this.persistentTaskId = persistentTaskId; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.persistentTaskId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(persistentTaskId); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean getShouldStoreResult() { + return true; // do not wait_for_completion + } + + public String getPersistentTaskId() { + return persistentTaskId; + } + + @Override + public int hashCode() { + return Objects.hashCode(persistentTaskId); + } + + @Override + public boolean equals(Object other) { + return other instanceof Request && persistentTaskId.equals(((Request) other).persistentTaskId); + } + + public Request nodeRequest(String thisNodeId, long thisTaskId) { + Request copy = new Request(persistentTaskId); + copy.setParentTask(thisNodeId, thisTaskId); + return copy; + } + + @Override + public String[] indices() { + return new String[] { persistentTaskId.substring("reindex-data-stream-".length()) }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetReindexDataStreamStatusTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetReindexDataStreamStatusTransportAction.java new file mode 100644 index 0000000000000..02cac9f593805 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetReindexDataStreamStatusTransportAction.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Strings; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.migrate.action.GetReindexDataStreamStatusAction.Request; +import org.elasticsearch.xpack.migrate.action.GetReindexDataStreamStatusAction.Response; + +import java.util.Map; +import java.util.Optional; + +public class GetReindexDataStreamStatusTransportAction extends HandledTransportAction { + private final ClusterService clusterService; + private final TransportService transportService; + + @Inject + public GetReindexDataStreamStatusTransportAction( + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters + ) { + super(GetReindexDataStreamStatusAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); + this.clusterService = clusterService; + this.transportService = transportService; + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + String persistentTaskId = request.getPersistentTaskId(); + PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state() + .getMetadata() + .custom(PersistentTasksCustomMetadata.TYPE); + PersistentTasksCustomMetadata.PersistentTask persistentTask = persistentTasksCustomMetadata.getTask(persistentTaskId); + if (persistentTask == null) { + listener.onFailure(new ElasticsearchException("No persistent task found with id [{}]", persistentTaskId)); + } else if (persistentTask.isAssigned()) { + String nodeId = persistentTask.getExecutorNode(); + if (clusterService.localNode().getId().equals(nodeId)) { + getRunningTaskFromNode(persistentTaskId, listener); + } else { + runOnNodeWithTaskIfPossible(task, request, nodeId, listener); + } + } else { + listener.onFailure(new ElasticsearchException("Persistent task with id [{}] is not assigned to a node", persistentTaskId)); + } + } + + private Task getRunningPersistentTaskFromTaskManager(String persistentTaskId) { + Optional> optionalTask = taskManager.getCancellableTasks() + .entrySet() + .stream() + .filter(entry -> entry.getValue().getType().equals("persistent")) + .filter( + entry -> entry.getValue() instanceof AllocatedPersistentTask + && persistentTaskId.equals((((AllocatedPersistentTask) entry.getValue()).getPersistentTaskId())) + ) + .findAny(); + return optionalTask.map(Map.Entry::getValue).orElse(null); + } + + void getRunningTaskFromNode(String persistentTaskId, ActionListener listener) { + Task runningTask = getRunningPersistentTaskFromTaskManager(persistentTaskId); + if (runningTask == null) { + listener.onFailure( + new ResourceNotFoundException( + Strings.format( + "Persistent task [{}] is supposed to be running on node [{}], " + "but the task is not found on that node", + persistentTaskId, + clusterService.localNode().getId() + ) + ) + ); + } else { + TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true); + listener.onResponse(new Response(new TaskResult(false, info))); + } + } + + private void runOnNodeWithTaskIfPossible(Task thisTask, Request request, String nodeId, ActionListener listener) { + DiscoveryNode node = clusterService.state().nodes().get(nodeId); + if (node == null) { + listener.onFailure( + new ResourceNotFoundException( + Strings.format( + "Persistent task [{}] is supposed to be running on node [{}], but that node is not part of the cluster", + request.getPersistentTaskId(), + nodeId + ) + ) + ); + } else { + Request nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId()); + transportService.sendRequest( + node, + GetReindexDataStreamStatusAction.NAME, + nodeRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, Response::new, EsExecutors.DIRECT_EXECUTOR_SERVICE) + ); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java similarity index 82% rename from server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java rename to x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java index 814c512c43bec..07c9d09c4e076 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java @@ -1,18 +1,18 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ -package org.elasticsearch.action.datastreams; +package org.elasticsearch.xpack.migrate.action; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xcontent.ToXContentObject; @@ -25,6 +25,7 @@ public class ReindexDataStreamAction extends ActionType { + + public static final String NAME = "indices:admin/data_stream/index/reindex"; + + public static final ActionType INSTANCE = new ReindexDataStreamIndexAction(); + + private ReindexDataStreamIndexAction() { + super(NAME); + } + + public static class Request extends ActionRequest implements IndicesRequest { + + private final String sourceIndex; + + public Request(String sourceIndex) { + this.sourceIndex = sourceIndex; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.sourceIndex = in.readString(); + } + + public static Request readFrom(StreamInput in) throws IOException { + return new Request(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sourceIndex); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getSourceIndex() { + return sourceIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(sourceIndex, request.sourceIndex); + } + + @Override + public int hashCode() { + return Objects.hash(sourceIndex); + } + + @Override + public String[] indices() { + return new String[] { sourceIndex }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + } + + public static class Response extends ActionResponse { + private final String destIndex; + + public Response(String destIndex) { + this.destIndex = destIndex; + } + + public Response(StreamInput in) throws IOException { + super(in); + this.destIndex = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(destIndex); + } + + public String getDestIndex() { + return destIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return Objects.equals(destIndex, response.destIndex); + } + + @Override + public int hashCode() { + return Objects.hash(destIndex); + } + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java similarity index 80% rename from modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportAction.java rename to x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java index 0a86985c6c7b2..7f68007f821ba 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java @@ -1,32 +1,30 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ -package org.elasticsearch.datastreams.action; +package org.elasticsearch.xpack.migrate.action; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.datastreams.ReindexDataStreamAction; -import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamRequest; -import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.datastreams.task.ReindexDataStreamTask; -import org.elasticsearch.datastreams.task.ReindexDataStreamTaskParams; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamRequest; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamResponse; +import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask; +import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams; /* * This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation @@ -75,7 +73,8 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList sourceDataStreamName, transportService.getThreadPool().absoluteTimeInMillis(), totalIndices, - totalIndicesToBeUpgraded + totalIndicesToBeUpgraded, + ClientHelper.getPersistableSafeSecurityHeaders(transportService.getThreadPool().getThreadContext(), clusterService.state()) ); String persistentTaskId = getPersistentTaskId(sourceDataStreamName); persistentTasksService.sendStartRequest( diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/SwapDataStreamIndexAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/SwapDataStreamIndexAction.java new file mode 100644 index 0000000000000..530412b8be54e --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/SwapDataStreamIndexAction.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; + +import java.io.IOException; +import java.util.Objects; + +public class SwapDataStreamIndexAction extends ActionType { + + public static final String NAME = "indices:admin/data_stream/index/swap"; + + public static final ActionType INSTANCE = new SwapDataStreamIndexAction(); + + private SwapDataStreamIndexAction() { + super(NAME); + } + + public static class Request extends AcknowledgedRequest implements IndicesRequest { + + private final String dataStream; + private final String oldIndex; + private final String newIndex; + + public Request(String dataStream, String oldIndex, String newIndex) { + super(TimeValue.MAX_VALUE, TimeValue.MAX_VALUE); + this.dataStream = dataStream; + this.oldIndex = oldIndex; + this.newIndex = newIndex; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.dataStream = in.readString(); + this.oldIndex = in.readString(); + this.newIndex = in.readString(); + } + + public static Request readFrom(StreamInput in) throws IOException { + return new Request(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(dataStream); + out.writeString(oldIndex); + out.writeString(newIndex); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getDataStream() { + return dataStream; + } + + public String getOldIndex() { + return oldIndex; + } + + public String getNewIndex() { + return newIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(dataStream, request.dataStream); + } + + @Override + public int hashCode() { + return Objects.hash(dataStream); + } + + @Override + public String[] indices() { + return new String[] { dataStream }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + } + + public static class Response extends ActionResponse { + private final String destIndex; + + public Response(String destIndex) { + this.destIndex = destIndex; + } + + public Response(StreamInput in) throws IOException { + super(in); + this.destIndex = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(destIndex); + } + + public String getDestIndex() { + return destIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return Objects.equals(destIndex, response.destIndex); + } + + @Override + public int hashCode() { + return Objects.hash(destIndex); + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/SwapDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/SwapDataStreamIndexTransportAction.java new file mode 100644 index 0000000000000..6b90f7789364e --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/SwapDataStreamIndexTransportAction.java @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.SimpleBatchedExecutor; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.Index; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.List; + +public class SwapDataStreamIndexTransportAction extends TransportMasterNodeAction< + SwapDataStreamIndexAction.Request, + SwapDataStreamIndexAction.Response> { + private final MasterServiceTaskQueue reindexDataStreamClusterStateUpdateTaskQueue; + + private static final SimpleBatchedExecutor REINDEX_DATA_STREAM_STATE_UPDATE_TASK_EXECUTOR = + new SimpleBatchedExecutor<>() { + @Override + public Tuple executeTask(UpdateDataStreamTask task, ClusterState clusterState) throws Exception { + return Tuple.tuple(task.execute(clusterState), null); + } + + @Override + public void taskSucceeded(UpdateDataStreamTask task, Void unused) { + task.listener.onResponse(null); + } + }; + + @Inject + public SwapDataStreamIndexTransportAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + SwapDataStreamIndexAction.NAME, + true, + transportService, + clusterService, + threadPool, + actionFilters, + SwapDataStreamIndexAction.Request::new, + indexNameExpressionResolver, + SwapDataStreamIndexAction.Response::new, + transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) + ); + this.reindexDataStreamClusterStateUpdateTaskQueue = clusterService.createTaskQueue( + "reindex-data-stream-state-update", + Priority.LOW, + REINDEX_DATA_STREAM_STATE_UPDATE_TASK_EXECUTOR + ); + } + + @Override + protected void masterOperation( + Task task, + SwapDataStreamIndexAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + reindexDataStreamClusterStateUpdateTaskQueue.submitTask( + Strings.format( + "Swapping indices in [%s]. Adding [%s] and removing[%s]", + request.getDataStream(), + request.getNewIndex(), + request.getOldIndex() + ), + new UpdateDataStreamTask( + ActionListener.wrap(unused -> listener.onResponse(new SwapDataStreamIndexAction.Response("")), listener::onFailure), + request.getDataStream(), + request.getOldIndex(), + request.getNewIndex() + ), + null + ); + } + + @Override + protected ClusterBlockException checkBlock(SwapDataStreamIndexAction.Request request, ClusterState state) { + return null; + } + + static class UpdateDataStreamTask implements ClusterStateTaskListener { + private final ActionListener listener; + private final String dataStream; + private final String oldIndex; + private final String newIndex; + + UpdateDataStreamTask(ActionListener listener, String dataStream, String oldIndex, String newIndex) { + this.listener = listener; + this.dataStream = dataStream; + this.oldIndex = oldIndex; + this.newIndex = newIndex; + } + + ClusterState execute(ClusterState currentState) throws Exception { + Metadata currentMetadata = currentState.metadata(); + DataStream oldDataStream = currentMetadata.dataStreams().get(dataStream); + List indicesWithoutOldIndex = oldDataStream.getIndices() + .stream() + .filter(index -> index.getName().equals(oldIndex) == false) + .toList(); + List newIndices = new ArrayList<>(indicesWithoutOldIndex); + newIndices.add(currentMetadata.index(newIndex).getIndex()); + DataStream newDataStream = oldDataStream.copy() + .setBackingIndices(DataStream.DataStreamIndices.backingIndicesBuilder(newIndices).build()) + .build(); + Metadata metadata = Metadata.builder(currentMetadata).put(newDataStream).build(); + return ClusterState.builder(currentState).metadata(metadata).build(); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/TransportReindexDataStreamIndexAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/TransportReindexDataStreamIndexAction.java new file mode 100644 index 0000000000000..497b9f7d0d22b --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/TransportReindexDataStreamIndexAction.java @@ -0,0 +1,211 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +public class TransportReindexDataStreamIndexAction extends HandledTransportAction< + ReindexDataStreamIndexAction.Request, + ReindexDataStreamIndexAction.Response> { + + private static final Logger logger = LogManager.getLogger(TransportReindexDataStreamIndexAction.class); + + // copied from + // https://github.com/elastic/kibana/blob/8a8363f02cc990732eb9cbb60cd388643a336bed/x-pack + // /plugins/upgrade_assistant/server/lib/reindexing/index_settings.ts#L155 + private static final Set DISALLOWED_SETTINGS = Set.of( + // Private ES settings + "index.allocation.existing_shards_allocator", + "index.history.uuid", + "index.merge.enabled", + "index.provided_name", + "index.resize.source.name", + "index.resize.source.uuid", + "index.routing.allocation.initial_recovery._id", + "index.search.throttled", + "index.source_only", + "index.shrink.source.name", + "index.shrink.source.uuid", + "index.verified_before_close", + "index.store.snapshot.repository_name", + "index.store.snapshot.snapshot_name", + "index.store.snapshot.snapshot_uuid", + "index.store.snapshot.index_name", + "index.store.snapshot.index_uuid", + + // TODO verify set to correct value during index creation` + "index.creation_date", + "index.uuid", + "index.version.created", + "index.blocks.write", + "index.frozen", + + // Deprecated in 9.0 + "index.version.upgraded" + ); + + private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions.fromOptions(true, true, false, false); + private final ClusterService clusterService; + private final Client client; + + @Inject + public TransportReindexDataStreamIndexAction( + TransportService transportService, + ClusterService clusterService, + ActionFilters actionFilters, + Client client + ) { + super( + ReindexDataStreamIndexAction.NAME, + false, + transportService, + actionFilters, + ReindexDataStreamIndexAction.Request::new, + transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) + ); + this.clusterService = clusterService; + this.client = client; + } + + @Override + protected void doExecute( + Task task, + ReindexDataStreamIndexAction.Request request, + ActionListener listener + ) { + var sourceIndexName = request.getSourceIndex(); + var destIndexName = generateDestIndexName(sourceIndexName); + IndexMetadata sourceIndex = clusterService.state().getMetadata().index(sourceIndexName); + + if (sourceIndex.getCreationVersion().isLegacyIndexVersion() == false) { + logger.warn( + "Reindexing datastream index with non-legacy version, index [{}], version [{}]", + sourceIndexName, + sourceIndex.getCreationVersion() + ); + } + + SubscribableListener.newForked(l -> setReadOnly(sourceIndexName, l)) + .andThen(l -> deleteDestIfExists(destIndexName, l)) + .andThen(l -> createIndex(sourceIndex, destIndexName, l)) + .andThen(l -> reindex(sourceIndexName, destIndexName, l)) + .andThen(l -> updateSettings(sourceIndex, destIndexName, l)) + // TODO handle searchable snapshots + .andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName)) + .addListener(listener); + } + + private void setReadOnly(String sourceIndexName, ActionListener listener) { + logger.info("Setting read only on source index [{}]", sourceIndexName); + final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build(); + var updateSettingsRequest = new UpdateSettingsRequest(readOnlySettings, sourceIndexName); + var errorMessage = String.format(Locale.ROOT, "Could not set read-only on source index [%s]", sourceIndexName); + client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage)); + } + + private void deleteDestIfExists(String destIndexName, ActionListener listener) { + logger.info("Attempting to delete index [{}]", destIndexName); + var deleteIndexRequest = new DeleteIndexRequest(destIndexName).indicesOptions(IGNORE_MISSING_OPTIONS) + .masterNodeTimeout(TimeValue.MAX_VALUE); + var errorMessage = String.format(Locale.ROOT, "Failed to acknowledge delete of index [%s]", destIndexName); + client.admin().indices().delete(deleteIndexRequest, failIfNotAcknowledged(listener, errorMessage)); + } + + private void createIndex(IndexMetadata sourceIndex, String destIndexName, ActionListener listener) { + logger.info("Creating destination index [{}] for source index [{}]", destIndexName, sourceIndex.getIndex().getName()); + + // Create destination with subset of source index settings that can be added before reindex + var settings = getPreSettings(sourceIndex); + + var sourceMapping = sourceIndex.mapping(); + Map mapping = sourceMapping != null ? sourceMapping.rawSourceAsMap() : Map.of(); + var createIndexRequest = new CreateIndexRequest(destIndexName).settings(settings).mapping(mapping); + + var errorMessage = String.format(Locale.ROOT, "Could not create index [%s]", destIndexName); + client.admin().indices().create(createIndexRequest, failIfNotAcknowledged(listener, errorMessage)); + } + + private void reindex(String sourceIndexName, String destIndexName, ActionListener listener) { + logger.info("Reindex to destination index [{}] from source index [{}]", destIndexName, sourceIndexName); + var reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(sourceIndexName); + reindexRequest.getSearchRequest().allowPartialSearchResults(false); + reindexRequest.getSearchRequest().source().fetchSource(true); + reindexRequest.setDestIndex(destIndexName); + client.execute(ReindexAction.INSTANCE, reindexRequest, listener); + } + + private void updateSettings(IndexMetadata sourceIndex, String destIndexName, ActionListener listener) { + logger.info("Adding settings from source index that could not be added before reindex"); + + Settings postSettings = getPostSettings(sourceIndex); + if (postSettings.isEmpty()) { + listener.onResponse(null); + return; + } + + var updateSettingsRequest = new UpdateSettingsRequest(postSettings, destIndexName); + var errorMessage = String.format(Locale.ROOT, "Could not update settings on index [%s]", destIndexName); + client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage)); + } + + // Filter source index settings to subset of settings that can be included during reindex + // TODO does tsdb start/end time get set in create request if copied into settings from source index + private Settings getPreSettings(IndexMetadata sourceIndex) { + return sourceIndex.getSettings().filter(settingName -> DISALLOWED_SETTINGS.contains(settingName) == false); + } + + private Settings getPostSettings(IndexMetadata sourceIndex) { + // TODO populate with real values + return Settings.EMPTY; + } + + public static String generateDestIndexName(String sourceIndex) { + return "upgrade-" + sourceIndex; + } + + private static ActionListener failIfNotAcknowledged( + ActionListener listener, + String errorMessage + ) { + return listener.delegateFailureAndWrap((delegate, response) -> { + if (response.isAcknowledged()) { + delegate.onResponse(null); + } + throw new ElasticsearchException(errorMessage); + }); + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestCancelReindexDataStreamAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestCancelReindexDataStreamAction.java new file mode 100644 index 0000000000000..b23769d5dfd16 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestCancelReindexDataStreamAction.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.rest; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestCancelReindexDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "cancel_reindex_data_stream_action"; + } + + @Override + public List routes() { + return List.of(new Route(POST, "/_reindex_data_stream/{persistent_task_id}/_cancel")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String persistentTaskId = request.param("persistent_task_id"); + CancelReindexDataStreamAction.Request cancelTaskRequest = new CancelReindexDataStreamAction.Request(persistentTaskId); + return channel -> client.execute(CancelReindexDataStreamAction.INSTANCE, cancelTaskRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestGetReindexDataStreamStatusAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestGetReindexDataStreamStatusAction.java new file mode 100644 index 0000000000000..9e36632920ffd --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestGetReindexDataStreamStatusAction.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.rest; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.migrate.action.GetReindexDataStreamStatusAction; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class RestGetReindexDataStreamStatusAction extends BaseRestHandler { + + @Override + public String getName() { + return "get_reindex_data_stream_status_action"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/_reindex_data_stream_status/{persistent_task_id}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String persistentTaskId = request.param("persistent_task_id"); + GetReindexDataStreamStatusAction.Request getTaskRequest = new GetReindexDataStreamStatusAction.Request(persistentTaskId); + return channel -> client.execute(GetReindexDataStreamStatusAction.INSTANCE, getTaskRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestReindexDataStreamAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestReindexDataStreamAction.java new file mode 100644 index 0000000000000..7f5dc89307a1a --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestReindexDataStreamAction.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.rest; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestBuilderListener; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamResponse; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestReindexDataStreamAction extends BaseRestHandler { + @Override + public String getName() { + return "reindex_data_stream_action"; + } + + @Override + public List routes() { + return List.of(new Route(POST, "/_reindex_data_stream")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + ReindexDataStreamAction.ReindexDataStreamRequest reindexRequest = new ReindexDataStreamAction.ReindexDataStreamRequest( + request.param("source") + ); + return channel -> client.execute( + ReindexDataStreamAction.INSTANCE, + reindexRequest, + new ReindexDataStreamRestToXContentListener(channel) + ); + } + + static class ReindexDataStreamRestToXContentListener extends RestBuilderListener { + + ReindexDataStreamRestToXContentListener(RestChannel channel) { + super(channel); + } + + @Override + public RestResponse buildResponse(ReindexDataStreamResponse response, XContentBuilder builder) throws Exception { + assert response.isFragment() == false; + response.toXContent(builder, channel.request()); + return new RestResponse(RestStatus.OK, builder); + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamClient.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamClient.java new file mode 100644 index 0000000000000..bb8fe74df6076 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamClient.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.task; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.support.AbstractClient; +import org.elasticsearch.xpack.core.ClientHelper; + +import java.util.Map; + +public class ReindexDataStreamClient extends AbstractClient { + + private final Client client; + private final Map headers; + + public ReindexDataStreamClient(Client client, Map headers) { + super(client.settings(), client.threadPool()); + this.client = client; + this.headers = headers; + } + + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + ClientHelper.executeWithHeadersAsync(headers, "", client, action, request, listener); + } + +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java similarity index 59% rename from modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java rename to x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index f10d2e7b356fb..91b12d7e083cb 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -1,17 +1,16 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ -package org.elasticsearch.datastreams.task; +package org.elasticsearch.xpack.migrate.task; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.action.support.CountDownActionListener; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.core.TimeValue; @@ -22,6 +21,8 @@ import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexAction; +import org.elasticsearch.xpack.migrate.action.SwapDataStreamIndexAction; import java.util.List; import java.util.Map; @@ -69,32 +70,80 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream }); assert task instanceof ReindexDataStreamTask; final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task; - client.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> { + ReindexDataStreamClient reindexClient = new ReindexDataStreamClient(client, params.headers()); + reindexClient.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> { List dataStreamInfos = response.getDataStreams(); if (dataStreamInfos.size() == 1) { List indices = dataStreamInfos.getFirst().getDataStream().getIndices(); List indicesToBeReindexed = indices.stream() .filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion()) .toList(); - reindexDataStreamTask.setPendingIndices(indicesToBeReindexed.stream().map(Index::getName).toList()); + reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size()); + CountDownActionListener listener = new CountDownActionListener( + indicesToBeReindexed.size() + 1, + ActionListener.wrap(response1 -> { + completeSuccessfulPersistentTask(reindexDataStreamTask); + }, exception -> { completeFailedPersistentTask(reindexDataStreamTask, exception); }) + ); + // TODO: put all these on a queue, only process N from queue at a time for (Index index : indicesToBeReindexed) { - // TODO This is just a placeholder. This is where the real data stream reindex logic will go - } + reindexDataStreamTask.incrementInProgressIndicesCount(); + reindexClient.execute( + ReindexDataStreamIndexAction.INSTANCE, + new ReindexDataStreamIndexAction.Request(index.getName()), + ActionListener.wrap(response1 -> { + updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> { + reindexDataStreamTask.reindexSucceeded(); + listener.onResponse(null); + }, exception -> { + reindexDataStreamTask.reindexFailed(index.getName(), exception); + listener.onResponse(null); + }), reindexClient); - completeSuccessfulPersistentTask(reindexDataStreamTask); + }, exception -> { + reindexDataStreamTask.reindexFailed(index.getName(), exception); + listener.onResponse(null); + }) + ); + } + listener.onResponse(null); } else { completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist")); } - }, reindexDataStreamTask::markAsFailed)); + }, exception -> completeFailedPersistentTask(reindexDataStreamTask, exception))); + } + + private void updateDataStream( + String dataStream, + String oldIndex, + String newIndex, + ActionListener listener, + ReindexDataStreamClient reindexClient + ) { + reindexClient.execute( + SwapDataStreamIndexAction.INSTANCE, + new SwapDataStreamIndexAction.Request(dataStream, oldIndex, newIndex), + new ActionListener<>() { + @Override + public void onResponse(SwapDataStreamIndexAction.Response response) { + listener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + ); } private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) { - persistentTask.reindexSucceeded(); + persistentTask.allReindexesCompleted(); threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic()); } private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) { - persistentTask.reindexFailed(e); + persistentTask.taskFailed(e); threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic()); } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskState.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java similarity index 82% rename from modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskState.java rename to x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java index d6f32a3d34a7a..130a8f7ce372b 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskState.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java @@ -1,13 +1,11 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ -package org.elasticsearch.datastreams.task; +package org.elasticsearch.xpack.migrate.task; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatus.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamStatus.java similarity index 87% rename from modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatus.java rename to x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamStatus.java index 10dfded853a13..358062550b50a 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatus.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamStatus.java @@ -1,13 +1,11 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ -package org.elasticsearch.datastreams.task; +package org.elasticsearch.xpack.migrate.task; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java similarity index 67% rename from modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java rename to x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java index 2ae244679659f..0940fcddd2718 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java @@ -1,13 +1,11 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ -package org.elasticsearch.datastreams.task; +package org.elasticsearch.xpack.migrate.task; import org.elasticsearch.core.Tuple; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -17,6 +15,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; public class ReindexDataStreamTask extends AllocatedPersistentTask { public static final String TASK_NAME = "reindex-data-stream"; @@ -26,8 +25,8 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask { private final ThreadPool threadPool; private boolean complete = false; private Exception exception; - private List inProgress = new ArrayList<>(); - private List pending = List.of(); + private AtomicInteger inProgress = new AtomicInteger(0); + private AtomicInteger pending = new AtomicInteger(); private List> errors = new ArrayList<>(); public ReindexDataStreamTask( @@ -57,30 +56,36 @@ public ReindexDataStreamStatus getStatus() { totalIndicesToBeUpgraded, complete, exception, - inProgress.size(), - pending.size(), + inProgress.get(), + pending.get(), errors ); } - public void reindexSucceeded() { + public void allReindexesCompleted() { this.complete = true; } - public void reindexFailed(Exception e) { + public void taskFailed(Exception e) { this.complete = true; this.exception = e; } - public void setInProgressIndices(List inProgressIndices) { - this.inProgress = inProgressIndices; + public void reindexSucceeded() { + inProgress.decrementAndGet(); + } + + public void reindexFailed(String index, Exception error) { + this.errors.add(Tuple.tuple(index, error)); + inProgress.decrementAndGet(); } - public void setPendingIndices(List pendingIndices) { - this.pending = pendingIndices; + public void incrementInProgressIndicesCount() { + inProgress.incrementAndGet(); + pending.decrementAndGet(); } - public void addErrorIndex(String index, Exception error) { - this.errors.add(Tuple.tuple(index, error)); + public void setPendingIndicesCount(int size) { + pending.set(size); } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParams.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParams.java similarity index 68% rename from modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParams.java rename to x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParams.java index 5efbc6b672216..7ab64c616038e 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParams.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParams.java @@ -1,13 +1,11 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ -package org.elasticsearch.datastreams.task; +package org.elasticsearch.xpack.migrate.task; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; @@ -15,37 +13,58 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; +import java.util.Map; import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; -public record ReindexDataStreamTaskParams(String sourceDataStream, long startTime, int totalIndices, int totalIndicesToBeUpgraded) - implements - PersistentTaskParams { +public record ReindexDataStreamTaskParams( + String sourceDataStream, + long startTime, + int totalIndices, + int totalIndicesToBeUpgraded, + Map headers +) implements PersistentTaskParams { public static final String NAME = ReindexDataStreamTask.TASK_NAME; private static final String SOURCE_DATA_STREAM_FIELD = "source_data_stream"; private static final String START_TIME_FIELD = "start_time"; private static final String TOTAL_INDICES_FIELD = "total_indices"; private static final String TOTAL_INDICES_TO_BE_UPGRADED_FIELD = "total_indices_to_be_upgraded"; + private static final String HEADERS_FIELD = "headers"; + @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME, true, - args -> new ReindexDataStreamTaskParams((String) args[0], (long) args[1], (int) args[2], (int) args[3]) + args -> new ReindexDataStreamTaskParams( + (String) args[0], + (long) args[1], + (int) args[2], + (int) args[3], + (Map) args[4] + ) ); static { PARSER.declareString(constructorArg(), new ParseField(SOURCE_DATA_STREAM_FIELD)); PARSER.declareLong(constructorArg(), new ParseField(START_TIME_FIELD)); PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_FIELD)); PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_TO_BE_UPGRADED_FIELD)); + PARSER.declareField( + ConstructingObjectParser.constructorArg(), + XContentParser::mapStrings, + new ParseField(HEADERS_FIELD), + ObjectParser.ValueType.OBJECT + ); } + @SuppressWarnings("unchecked") public ReindexDataStreamTaskParams(StreamInput in) throws IOException { - this(in.readString(), in.readLong(), in.readInt(), in.readInt()); + this(in.readString(), in.readLong(), in.readInt(), in.readInt(), (Map) in.readGenericValue()); } @Override @@ -64,6 +83,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(startTime); out.writeInt(totalIndices); out.writeInt(totalIndicesToBeUpgraded); + out.writeGenericValue(headers); } @Override @@ -73,6 +93,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .field(START_TIME_FIELD, startTime) .field(TOTAL_INDICES_FIELD, totalIndices) .field(TOTAL_INDICES_TO_BE_UPGRADED_FIELD, totalIndicesToBeUpgraded) + .stringStringMap(HEADERS_FIELD, headers) .endObject(); } @@ -83,4 +104,8 @@ public String getSourceDataStream() { public static ReindexDataStreamTaskParams fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } + + public Map getHeaders() { + return headers; + } } diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamResponseTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamResponseTests.java similarity index 76% rename from server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamResponseTests.java rename to x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamResponseTests.java index fe839c28aab88..06844577c4e36 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamResponseTests.java +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamResponseTests.java @@ -1,21 +1,19 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ -package org.elasticsearch.action.datastreams; +package org.elasticsearch.xpack.migrate.action; -import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamResponse; import java.io.IOException; import java.util.Map; diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskStateTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskStateTests.java similarity index 74% rename from modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskStateTests.java rename to x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskStateTests.java index be11bff131909..a35cd6e5fa474 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskStateTests.java +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskStateTests.java @@ -1,13 +1,11 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ -package org.elasticsearch.datastreams.task; +package org.elasticsearch.xpack.migrate.task; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractXContentSerializingTestCase; diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatusTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamStatusTests.java similarity index 92% rename from modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatusTests.java rename to x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamStatusTests.java index 8f0fabc2ce7ee..d81e9d35cd490 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatusTests.java +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamStatusTests.java @@ -1,13 +1,11 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ -package org.elasticsearch.datastreams.task; +package org.elasticsearch.xpack.migrate.task; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.bytes.BytesReference; diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParamsTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParamsTests.java similarity index 80% rename from modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParamsTests.java rename to x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParamsTests.java index 55098bf4a68d5..c2b0d550658e0 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParamsTests.java +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParamsTests.java @@ -1,13 +1,11 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ -package org.elasticsearch.datastreams.task; +package org.elasticsearch.xpack.migrate.task; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.Writeable; @@ -31,7 +29,13 @@ protected Writeable.Reader instanceReader() { @Override protected ReindexDataStreamTaskParams createTestInstance() { - return new ReindexDataStreamTaskParams(randomAlphaOfLength(50), randomLong(), randomNonNegativeInt(), randomNonNegativeInt()); + return new ReindexDataStreamTaskParams( + randomAlphaOfLength(50), + randomLong(), + randomNonNegativeInt(), + randomNonNegativeInt(), + Map.of() + ); } @Override @@ -47,7 +51,7 @@ protected ReindexDataStreamTaskParams mutateInstance(ReindexDataStreamTaskParams case 3 -> totalIndices = totalIndicesToBeUpgraded + 1; default -> throw new UnsupportedOperationException(); } - return new ReindexDataStreamTaskParams(sourceDataStream, startTime, totalIndices, totalIndicesToBeUpgraded); + return new ReindexDataStreamTaskParams(sourceDataStream, startTime, totalIndices, totalIndicesToBeUpgraded, Map.of()); } @Override diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java index 40ad5bba29baa..e18af3e97db1e 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java @@ -11,18 +11,106 @@ import org.elasticsearch.client.Response; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; import org.elasticsearch.core.Booleans; import org.elasticsearch.core.Strings; +import org.elasticsearch.test.rest.ObjectPath; import org.hamcrest.Matchers; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.List; +import java.util.Map; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; import static org.elasticsearch.upgrades.IndexingIT.assertCount; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase { + static final String TEMPLATE = """ + { + "settings":{ + "index": { + "mode": "time_series" + } + }, + "mappings":{ + "dynamic_templates": [ + { + "labels": { + "path_match": "pod.labels.*", + "mapping": { + "type": "keyword", + "time_series_dimension": true + } + } + } + ], + "properties": { + "@timestamp" : { + "type": "date" + }, + "metricset": { + "type": "keyword", + "time_series_dimension": true + }, + "k8s": { + "properties": { + "pod": { + "properties": { + "uid": { + "type": "keyword", + "time_series_dimension": true + }, + "name": { + "type": "keyword" + }, + "ip": { + "type": "ip" + }, + "network": { + "properties": { + "tx": { + "type": "long" + }, + "rx": { + "type": "long" + } + } + } + } + } + } + } + } + } + } + """; + + private static final String BULK = + """ + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "hamster", "uid":"947e4ced-1786-4e53-9e0c-5c447e959508", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cow", "uid":"947e4ced-1786-4e53-9e0c-5c447e959509", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "rat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959510", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "tiger", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea10", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "lion", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876e11", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "elephant", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876eb4", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}} + """; + public void testDataStreams() throws IOException { if (CLUSTER_TYPE == ClusterType.OLD) { String requestBody = """ @@ -164,4 +252,92 @@ public void testDataStreamValidationDoesNotBreakUpgrade() throws Exception { } } + public void testUpgradeDataStream() throws Exception { + String dataStreamName = "k8s"; + if (CLUSTER_TYPE == ClusterType.OLD) { + final String INDEX_TEMPLATE = """ + { + "index_patterns": ["$PATTERN"], + "template": $TEMPLATE, + "data_stream": { + } + }"""; + // Add composable index template + String templateName = "1"; + var putIndexTemplateRequest = new Request("POST", "/_index_template/" + templateName); + putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$TEMPLATE", TEMPLATE).replace("$PATTERN", dataStreamName)); + assertOK(client().performRequest(putIndexTemplateRequest)); + + performOldClustertOperations(templateName, dataStreamName); + } else if (CLUSTER_TYPE == ClusterType.MIXED) { + // nothing + } else if (CLUSTER_TYPE == ClusterType.UPGRADED) { + performUpgradedClusterOperations(dataStreamName); + } + } + + private void performUpgradedClusterOperations(String dataStreamName) throws IOException { + Request reindexRequest = new Request("POST", "/_reindex_data_stream?source=" + dataStreamName); + Response reindexResponse = client().performRequest(reindexRequest); + assertOK(reindexResponse); + Request statusRequest = new Request("GET", "/_reindex_data_stream_status/reindex-data-stream-" + dataStreamName + "?pretty"); + Response statusResponse = client().performRequest(statusRequest); + assertOK(statusResponse); + } + + private static void performOldClustertOperations(String templateName, String dataStreamName) throws IOException { + bulkLoadData(dataStreamName); + + var dataStreams = getDataStream(dataStreamName); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo(templateName)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1)); + String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name"); + assertThat(firstBackingIndex, backingIndexEqualTo(dataStreamName, 1)); + assertSearch(dataStreamName, 8); + + for (int i = 0; i < 10; i++) { + rollover(dataStreamName); + bulkLoadData(dataStreamName); + } + + } + + private static void bulkLoadData(String dataStreamName) throws IOException { + var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk"); + bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now()))); + bulkRequest.addParameter("refresh", "true"); + var response = client().performRequest(bulkRequest); + assertOK(response); + var responseBody = entityAsMap(response); + assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false)); + } + + private static void rollover(String dataStreamName) throws IOException { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + Response rolloverResponse = client().performRequest(rolloverRequest); + assertOK(rolloverResponse); + } + + private static Map getDataStream(String dataStreamName) throws IOException { + var getDataStreamsRequest = new Request("GET", "/_data_stream/" + dataStreamName); + var response = client().performRequest(getDataStreamsRequest); + assertOK(response); + return entityAsMap(response); + } + + static String formatInstant(Instant instant) { + return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); + } + + private static void assertSearch(String dataStreamName, int expectedHitCount) throws IOException { + var searchRequest = new Request("GET", dataStreamName + "/_search"); + var response = client().performRequest(searchRequest); + assertOK(response); + var responseBody = entityAsMap(response); + assertThat(ObjectPath.evaluate(responseBody, "hits.total.value"), equalTo(expectedHitCount)); + } + }