diff --git a/modules/data-streams/src/main/java/module-info.java b/modules/data-streams/src/main/java/module-info.java index 2d49029c1023c..6798d7b1fe5b4 100644 --- a/modules/data-streams/src/main/java/module-info.java +++ b/modules/data-streams/src/main/java/module-info.java @@ -15,6 +15,7 @@ requires org.apache.lucene.core; exports org.elasticsearch.datastreams.action to org.elasticsearch.server; + exports org.elasticsearch.datastreams.task to org.elasticsearch.server; exports org.elasticsearch.datastreams.lifecycle.action to org.elasticsearch.server; exports org.elasticsearch.datastreams.lifecycle; exports org.elasticsearch.datastreams.options.action to org.elasticsearch.server; diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index faafdb27e8eac..cd0db1be3d7f0 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -97,6 +97,7 @@ import org.elasticsearch.features.NodeFeature; import org.elasticsearch.health.HealthIndicatorService; import org.elasticsearch.index.IndexSettingProvider; +import org.elasticsearch.node.PluginComponentBinding; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksExecutor; @@ -173,6 +174,7 @@ public static TimeValue getLookAheadTime(Settings settings) { private final SetOnce dataStreamLifecycleErrorsPublisher = new SetOnce<>(); private final SetOnce dataStreamLifecycleHealthIndicatorService = new SetOnce<>(); private final Settings settings; + private PersistentTasksExecutor persistentTaskExecutor; public DataStreamsPlugin(Settings settings) { this.settings = settings; @@ -250,6 +252,13 @@ public Collection createComponents(PluginServices services) { components.add(errorStoreInitialisationService.get()); components.add(dataLifecycleInitialisationService.get()); components.add(dataStreamLifecycleErrorsPublisher.get()); + persistentTaskExecutor = new ReindexDataStreamPersistentTaskExecutor( + services.client(), + services.clusterService(), + ReindexDataStreamTask.TASK_NAME, + services.threadPool() + ); + components.add(new PluginComponentBinding<>(ReindexDataStreamPersistentTaskExecutor.class, persistentTaskExecutor)); return components; } @@ -381,6 +390,6 @@ public List> getPersistentTasksExecutor( SettingsModule settingsModule, IndexNameExpressionResolver expressionResolver ) { - return List.of(new ReindexDataStreamPersistentTaskExecutor(client, clusterService, ReindexDataStreamTask.TASK_NAME, threadPool)); + return List.of(persistentTaskExecutor); } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamClient.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamClient.java index bf6f1d42a0de5..6216575eac671 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamClient.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamClient.java @@ -13,32 +13,21 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.ClientHelperService; import org.elasticsearch.client.internal.support.AbstractClient; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.core.Assertions; import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.function.BiConsumer; -import java.util.function.Supplier; -import java.util.regex.Pattern; -import java.util.stream.Collectors; public class ReindexDataStreamClient extends AbstractClient { - private static final String RUN_AS_USER_HEADER = "es-security-runas-user"; - private static final String AUTHENTICATION_KEY = "_xpack_security_authentication"; - private static final String THREAD_CTX_KEY = "_xpack_security_secondary_authc"; - - private static final Set SECURITY_HEADER_FILTERS = Set.of(RUN_AS_USER_HEADER, AUTHENTICATION_KEY, THREAD_CTX_KEY); + private final ClientHelperService clientHelperService; private final Client client; private final Map headers; - public ReindexDataStreamClient(Client client, Map headers) { + public ReindexDataStreamClient(ClientHelperService clientHelperService, Client client, Map headers) { super(client.settings(), client.threadPool()); + this.clientHelperService = clientHelperService; this.client = client; this.headers = headers; } @@ -49,73 +38,7 @@ protected void Request request, ActionListener listener ) { - executeWithHeadersAsync(headers, client, action, request, listener); + clientHelperService.executeWithHeadersAsync(headers, "", client, action, request, listener); } - private static void executeWithHeadersAsync( - Map headers, - Client client, - ActionType action, - Request request, - ActionListener listener - ) { - executeWithHeadersAsync(client.threadPool().getThreadContext(), headers, request, listener, (r, l) -> client.execute(action, r, l)); - } - - private static void executeWithHeadersAsync( - ThreadContext threadContext, - Map headers, - Request request, - ActionListener listener, - BiConsumer> consumer - ) { - // No need to rewrite authentication header because it will be handled by Security Interceptor - final Map filteredHeaders = filterSecurityHeaders(headers); - filteredHeaders.forEach((k, v) -> System.out.printf("%-15s : %s%n", k, v)); - // No headers (e.g. security not installed/in use) so execute as origin - if (filteredHeaders.isEmpty()) { - consumer.accept(request, listener); - } else { - // Otherwise stash the context and copy in the saved headers before executing - final Supplier supplier = threadContext.newRestorableContext(false); - try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) { - consumer.accept(request, new ContextPreservingActionListener<>(supplier, listener)); - } - } - } - - private static Map filterSecurityHeaders(Map headers) { - if (SECURITY_HEADER_FILTERS.containsAll(headers.keySet())) { - // fast-track to skip the artifice below - return headers; - } else { - return Objects.requireNonNull(headers) - .entrySet() - .stream() - .filter(e -> SECURITY_HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - } - - private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map headers) { - final ThreadContext.StoredContext storedContext = threadContext.stashContext(); - assertNoAuthorizationHeader(headers); - threadContext.copyHeaders(headers.entrySet()); - return storedContext; - } - - private static final Pattern authorizationHeaderPattern = Pattern.compile( - "\\s*" + Pattern.quote("Authorization") + "\\s*", - Pattern.CASE_INSENSITIVE - ); - - private static void assertNoAuthorizationHeader(Map headers) { - if (Assertions.ENABLED) { - for (String header : headers.keySet()) { - if (authorizationHeaderPattern.matcher(header).find()) { - assert false : "headers contain \"Authorization\""; - } - } - } - } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java index 76f890002185e..0141ff92f6305 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java @@ -16,9 +16,11 @@ import org.elasticsearch.action.datastreams.SwapDataStreamIndexAction; import org.elasticsearch.action.support.CountDownActionListener; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.ClientHelperService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -34,6 +36,7 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec private final Client client; private final ClusterService clusterService; private final ThreadPool threadPool; + private ClientHelperService clientHelperService; public ReindexDataStreamPersistentTaskExecutor(Client client, ClusterService clusterService, String taskName, ThreadPool threadPool) { super(taskName, threadPool.generic()); @@ -42,6 +45,11 @@ public ReindexDataStreamPersistentTaskExecutor(Client client, ClusterService clu this.threadPool = threadPool; } + @Inject + public void initialize(ClientHelperService clientHelperService) { + this.clientHelperService = clientHelperService; + } + @Override protected ReindexDataStreamTask createTask( long id, @@ -72,7 +80,7 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream }); assert task instanceof ReindexDataStreamTask; final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task; - ReindexDataStreamClient reindexClient = new ReindexDataStreamClient(client, params.headers()); + ReindexDataStreamClient reindexClient = new ReindexDataStreamClient(clientHelperService, client, params.headers()); reindexClient.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> { List dataStreamInfos = response.getDataStreams(); if (dataStreamInfos.size() == 1) {