Skip to content

Commit

Permalink
Using the credentials of the user who calls reindex data stream
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 3, 2024
1 parent c30ef96 commit e697ada
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamRequest;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamResponse;
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask;
Expand Down Expand Up @@ -72,7 +73,8 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList
sourceDataStreamName,
transportService.getThreadPool().absoluteTimeInMillis(),
totalIndices,
totalIndicesToBeUpgraded
totalIndicesToBeUpgraded,
ClientHelper.getPersistableSafeSecurityHeaders(transportService.getThreadPool().getThreadContext(), clusterService.state())
);
String persistentTaskId = getPersistentTaskId(sourceDataStreamName);
persistentTasksService.sendStartRequest(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.migrate.task;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.support.AbstractClient;
import org.elasticsearch.xpack.core.ClientHelper;

import java.util.Map;

public class ReindexDataStreamClient extends AbstractClient {

private final Client client;
private final Map<String, String> headers;

public ReindexDataStreamClient(Client client, Map<String, String> headers) {
super(client.settings(), client.threadPool());
this.client = client;
this.headers = headers;
}

@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
ClientHelper.executeWithHeadersAsync(headers, null, client, action, request, listener);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream });
assert task instanceof ReindexDataStreamTask;
final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task;
client.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
ReindexDataStreamClient reindexClient = new ReindexDataStreamClient(client, params.headers());
reindexClient.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = response.getDataStreams();
if (dataStreamInfos.size() == 1) {
List<Index> indices = dataStreamInfos.getFirst().getDataStream().getIndices();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,58 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.Map;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;

public record ReindexDataStreamTaskParams(String sourceDataStream, long startTime, int totalIndices, int totalIndicesToBeUpgraded)
implements
PersistentTaskParams {
public record ReindexDataStreamTaskParams(
String sourceDataStream,
long startTime,
int totalIndices,
int totalIndicesToBeUpgraded,
Map<String, String> headers
) implements PersistentTaskParams {

public static final String NAME = ReindexDataStreamTask.TASK_NAME;
private static final String SOURCE_DATA_STREAM_FIELD = "source_data_stream";
private static final String START_TIME_FIELD = "start_time";
private static final String TOTAL_INDICES_FIELD = "total_indices";
private static final String TOTAL_INDICES_TO_BE_UPGRADED_FIELD = "total_indices_to_be_upgraded";
private static final String HEADERS_FIELD = "headers";
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<ReindexDataStreamTaskParams, Void> PARSER = new ConstructingObjectParser<>(
NAME,
true,
args -> new ReindexDataStreamTaskParams((String) args[0], (long) args[1], (int) args[2], (int) args[3])
args -> new ReindexDataStreamTaskParams(
(String) args[0],
(long) args[1],
(int) args[2],
(int) args[3],
(Map<String, String>) args[4]
)
);
static {
PARSER.declareString(constructorArg(), new ParseField(SOURCE_DATA_STREAM_FIELD));
PARSER.declareLong(constructorArg(), new ParseField(START_TIME_FIELD));
PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_FIELD));
PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_TO_BE_UPGRADED_FIELD));
PARSER.declareField(
ConstructingObjectParser.constructorArg(),
XContentParser::mapStrings,
new ParseField(HEADERS_FIELD),
ObjectParser.ValueType.OBJECT
);
}

@SuppressWarnings("unchecked")
public ReindexDataStreamTaskParams(StreamInput in) throws IOException {
this(in.readString(), in.readLong(), in.readInt(), in.readInt());
this(in.readString(), in.readLong(), in.readInt(), in.readInt(), (Map<String, String>) in.readGenericValue());
}

@Override
Expand All @@ -62,6 +83,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(startTime);
out.writeInt(totalIndices);
out.writeInt(totalIndicesToBeUpgraded);
out.writeGenericValue(headers);
}

@Override
Expand All @@ -71,6 +93,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(START_TIME_FIELD, startTime)
.field(TOTAL_INDICES_FIELD, totalIndices)
.field(TOTAL_INDICES_TO_BE_UPGRADED_FIELD, totalIndicesToBeUpgraded)
.stringStringMap(HEADERS_FIELD, headers)
.endObject();
}

Expand All @@ -81,4 +104,8 @@ public String getSourceDataStream() {
public static ReindexDataStreamTaskParams fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

public Map<String, String> getHeaders() {
return headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ protected Writeable.Reader<ReindexDataStreamTaskParams> instanceReader() {

@Override
protected ReindexDataStreamTaskParams createTestInstance() {
return new ReindexDataStreamTaskParams(randomAlphaOfLength(50), randomLong(), randomNonNegativeInt(), randomNonNegativeInt());
return new ReindexDataStreamTaskParams(
randomAlphaOfLength(50),
randomLong(),
randomNonNegativeInt(),
randomNonNegativeInt(),
getTestHeaders()
);
}

@Override
Expand All @@ -38,21 +44,35 @@ protected ReindexDataStreamTaskParams mutateInstance(ReindexDataStreamTaskParams
long startTime = instance.startTime();
int totalIndices = instance.totalIndices();
int totalIndicesToBeUpgraded = instance.totalIndicesToBeUpgraded();
Map<String, String> headers = instance.headers();
switch (randomIntBetween(0, 3)) {
case 0 -> sourceDataStream = randomAlphaOfLength(50);
case 1 -> startTime = randomLong();
case 2 -> totalIndices = totalIndices + 1;
case 3 -> totalIndices = totalIndicesToBeUpgraded + 1;
case 4 -> headers = headers.isEmpty() ? getTestHeaders(false) : getTestHeaders();
default -> throw new UnsupportedOperationException();
}
return new ReindexDataStreamTaskParams(sourceDataStream, startTime, totalIndices, totalIndicesToBeUpgraded);
return new ReindexDataStreamTaskParams(sourceDataStream, startTime, totalIndices, totalIndicesToBeUpgraded, headers);
}

@Override
protected ReindexDataStreamTaskParams doParseInstance(XContentParser parser) {
return ReindexDataStreamTaskParams.fromXContent(parser);
}

private Map<String, String> getTestHeaders() {
return getTestHeaders(randomBoolean());
}

private Map<String, String> getTestHeaders(boolean empty) {
if (empty) {
return Map.of();
} else {
return Map.of(randomAlphaOfLength(20), randomAlphaOfLength(30));
}
}

public void testToXContent() throws IOException {
ReindexDataStreamTaskParams params = createTestInstance();
try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) {
Expand Down

0 comments on commit e697ada

Please sign in to comment.