diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java index d532b001f5aaa..7f68007f821ba 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java @@ -20,6 +20,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamRequest; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamResponse; import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask; @@ -72,7 +73,8 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList sourceDataStreamName, transportService.getThreadPool().absoluteTimeInMillis(), totalIndices, - totalIndicesToBeUpgraded + totalIndicesToBeUpgraded, + ClientHelper.getPersistableSafeSecurityHeaders(transportService.getThreadPool().getThreadContext(), clusterService.state()) ); String persistentTaskId = getPersistentTaskId(sourceDataStreamName); persistentTasksService.sendStartRequest( diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamClient.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamClient.java new file mode 100644 index 0000000000000..b915729887aa2 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamClient.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.task; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.support.AbstractClient; +import org.elasticsearch.xpack.core.ClientHelper; + +import java.util.Map; + +public class ReindexDataStreamClient extends AbstractClient { + + private final Client client; + private final Map headers; + + public ReindexDataStreamClient(Client client, Map headers) { + super(client.settings(), client.threadPool()); + this.client = client; + this.headers = headers; + } + + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + ClientHelper.executeWithHeadersAsync(headers, null, client, action, request, listener); + } + +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index e2a41ea186643..ce21ff0220276 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -67,7 +67,8 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream }); assert task instanceof ReindexDataStreamTask; final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task; - client.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> { + ReindexDataStreamClient reindexClient = new ReindexDataStreamClient(client, params.headers()); + reindexClient.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> { List dataStreamInfos = response.getDataStreams(); if (dataStreamInfos.size() == 1) { List indices = dataStreamInfos.getFirst().getDataStream().getIndices(); diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParams.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParams.java index 0f26713a75184..7ab64c616038e 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParams.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParams.java @@ -13,37 +13,58 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; +import java.util.Map; import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; -public record ReindexDataStreamTaskParams(String sourceDataStream, long startTime, int totalIndices, int totalIndicesToBeUpgraded) - implements - PersistentTaskParams { +public record ReindexDataStreamTaskParams( + String sourceDataStream, + long startTime, + int totalIndices, + int totalIndicesToBeUpgraded, + Map headers +) implements PersistentTaskParams { public static final String NAME = ReindexDataStreamTask.TASK_NAME; private static final String SOURCE_DATA_STREAM_FIELD = "source_data_stream"; private static final String START_TIME_FIELD = "start_time"; private static final String TOTAL_INDICES_FIELD = "total_indices"; private static final String TOTAL_INDICES_TO_BE_UPGRADED_FIELD = "total_indices_to_be_upgraded"; + private static final String HEADERS_FIELD = "headers"; + @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME, true, - args -> new ReindexDataStreamTaskParams((String) args[0], (long) args[1], (int) args[2], (int) args[3]) + args -> new ReindexDataStreamTaskParams( + (String) args[0], + (long) args[1], + (int) args[2], + (int) args[3], + (Map) args[4] + ) ); static { PARSER.declareString(constructorArg(), new ParseField(SOURCE_DATA_STREAM_FIELD)); PARSER.declareLong(constructorArg(), new ParseField(START_TIME_FIELD)); PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_FIELD)); PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_TO_BE_UPGRADED_FIELD)); + PARSER.declareField( + ConstructingObjectParser.constructorArg(), + XContentParser::mapStrings, + new ParseField(HEADERS_FIELD), + ObjectParser.ValueType.OBJECT + ); } + @SuppressWarnings("unchecked") public ReindexDataStreamTaskParams(StreamInput in) throws IOException { - this(in.readString(), in.readLong(), in.readInt(), in.readInt()); + this(in.readString(), in.readLong(), in.readInt(), in.readInt(), (Map) in.readGenericValue()); } @Override @@ -62,6 +83,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(startTime); out.writeInt(totalIndices); out.writeInt(totalIndicesToBeUpgraded); + out.writeGenericValue(headers); } @Override @@ -71,6 +93,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .field(START_TIME_FIELD, startTime) .field(TOTAL_INDICES_FIELD, totalIndices) .field(TOTAL_INDICES_TO_BE_UPGRADED_FIELD, totalIndicesToBeUpgraded) + .stringStringMap(HEADERS_FIELD, headers) .endObject(); } @@ -81,4 +104,8 @@ public String getSourceDataStream() { public static ReindexDataStreamTaskParams fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } + + public Map getHeaders() { + return headers; + } } diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParamsTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParamsTests.java index fc39b5d8cb703..8fb80018d64b9 100644 --- a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParamsTests.java +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParamsTests.java @@ -29,7 +29,13 @@ protected Writeable.Reader instanceReader() { @Override protected ReindexDataStreamTaskParams createTestInstance() { - return new ReindexDataStreamTaskParams(randomAlphaOfLength(50), randomLong(), randomNonNegativeInt(), randomNonNegativeInt()); + return new ReindexDataStreamTaskParams( + randomAlphaOfLength(50), + randomLong(), + randomNonNegativeInt(), + randomNonNegativeInt(), + getTestHeaders() + ); } @Override @@ -38,14 +44,16 @@ protected ReindexDataStreamTaskParams mutateInstance(ReindexDataStreamTaskParams long startTime = instance.startTime(); int totalIndices = instance.totalIndices(); int totalIndicesToBeUpgraded = instance.totalIndicesToBeUpgraded(); + Map headers = instance.headers(); switch (randomIntBetween(0, 3)) { case 0 -> sourceDataStream = randomAlphaOfLength(50); case 1 -> startTime = randomLong(); case 2 -> totalIndices = totalIndices + 1; case 3 -> totalIndices = totalIndicesToBeUpgraded + 1; + case 4 -> headers = headers.isEmpty() ? getTestHeaders(false) : getTestHeaders(); default -> throw new UnsupportedOperationException(); } - return new ReindexDataStreamTaskParams(sourceDataStream, startTime, totalIndices, totalIndicesToBeUpgraded); + return new ReindexDataStreamTaskParams(sourceDataStream, startTime, totalIndices, totalIndicesToBeUpgraded, headers); } @Override @@ -53,6 +61,18 @@ protected ReindexDataStreamTaskParams doParseInstance(XContentParser parser) { return ReindexDataStreamTaskParams.fromXContent(parser); } + private Map getTestHeaders() { + return getTestHeaders(randomBoolean()); + } + + private Map getTestHeaders(boolean empty) { + if (empty) { + return Map.of(); + } else { + return Map.of(randomAlphaOfLength(20), randomAlphaOfLength(30)); + } + } + public void testToXContent() throws IOException { ReindexDataStreamTaskParams params = createTestInstance(); try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) {