Skip to content

Commit

Permalink
adding files
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Nov 13, 2024
1 parent 54b8a88 commit 3d482bb
Show file tree
Hide file tree
Showing 15 changed files with 1,022 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams.action;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction;
import org.elasticsearch.action.datastreams.ReindexDataStreamAction;
import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamRequest;
import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class ReindexDataStreamTransportActionIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class);
}

public void testStuff() {
ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest("nonexistent_source");
ReindexDataStreamResponse response = client().execute(
new ActionType<ReindexDataStreamResponse>(ReindexDataStreamAction.NAME),
reindexDataStreamRequest
).actionGet();
String taskId = response.getTaskId();
GetTaskRequest getTaskRequest = new GetTaskRequest();
getTaskRequest.setPersistentTaskId(taskId);
GetTaskResponse getTaskResponse = client().execute(TransportGetTaskAction.TYPE, getTaskRequest).actionGet();
assertThat(Map.of(), equalTo(getTaskResponse.getTask().getErrorAsMap()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.datastreams.ReindexDataStreamAction;
import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamRequest;
import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.datastreams.task.ReindexDataStreamTask;
import org.elasticsearch.datastreams.task.ReindexDataStreamTaskParams;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

public class ReindexDataStreamTransportAction extends HandledTransportAction<ReindexDataStreamRequest, ReindexDataStreamResponse> {
private final PersistentTasksService persistentTasksService;

@Inject
public ReindexDataStreamTransportAction(
TransportService transportService,
ActionFilters actionFilters,
PersistentTasksService persistentTasksService
) {
super(
ReindexDataStreamAction.NAME,
true,
transportService,
actionFilters,
ReindexDataStreamRequest::new,
transportService.getThreadPool().executor(ThreadPool.Names.GENERIC)
);
this.persistentTasksService = persistentTasksService;
}

@Override
protected void doExecute(Task task, ReindexDataStreamRequest request, ActionListener<ReindexDataStreamResponse> listener) {
ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams(request.getSourceDataStream());
try {
String persistentTaskId = getPersistentTaskId(request.getSourceDataStream());
persistentTasksService.sendStartRequest(
persistentTaskId,
ReindexDataStreamTask.TASK_NAME,
params,
null,
ActionListener.wrap(
startedTask -> persistentTasksService.waitForPersistentTaskCondition(
startedTask.getId(),
PersistentTasksCustomMetadata.PersistentTask::isAssigned,
null,
new PersistentTasksService.WaitForPersistentTaskListener<>() {
@Override
public void onResponse(
PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> persistentTaskParamsPersistentTask
) {
listener.onResponse(new ReindexDataStreamResponse(persistentTaskId));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(new ElasticsearchException("Task [" + persistentTaskId + "] failed starting", e));
}
}

),
e -> listener.onFailure(new ElasticsearchException("Task [" + persistentTaskId + "] failed starting", e))
)
);
} catch (ResourceAlreadyExistsException e) {
listener.onFailure(e);
}
}

private String getPersistentTaskId(String dataStreamName) throws ResourceAlreadyExistsException {
return "reindex-data-stream-" + dataStreamName;
// TODO: Do we want to make an attempt to make these unqiue, and allow multiple to be running at once as long as all but one are
// complete?
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams.rest;

import org.elasticsearch.action.datastreams.ReindexDataStreamAction;
import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.POST;

public class RestReindexDataStreamAction extends BaseRestHandler {
@Override
public String getName() {
return "reindex_data_stream_action";
}

@Override
public List<Route> routes() {
return List.of(new Route(POST, "/_reindex_data_stream"));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
ReindexDataStreamAction.ReindexDataStreamRequest reindexRequest = new ReindexDataStreamAction.ReindexDataStreamRequest(
request.param("source")
);
return channel -> client.execute(
ReindexDataStreamAction.INSTANCE,
reindexRequest,
new ReindexDataStreamRestToXContentListener(channel)
);
}

static class ReindexDataStreamRestToXContentListener extends RestBuilderListener<ReindexDataStreamResponse> {

ReindexDataStreamRestToXContentListener(RestChannel channel) {
super(channel);
}

@Override
public RestResponse buildResponse(ReindexDataStreamResponse response, XContentBuilder builder) throws Exception {
assert response.isFragment() == false;
response.toXContent(builder, channel.request());
return new RestResponse(RestStatus.OK, builder);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams.task;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.Map;

public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor<ReindexDataStreamTaskParams> {
private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;

public ReindexDataStreamPersistentTaskExecutor(Client client, ClusterService clusterService, String taskName, ThreadPool threadPool) {
super(taskName, threadPool.generic());
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
}

@Override
protected ReindexDataStreamTask createTask(
long id,
String type,
String action,
TaskId parentTaskId,
PersistentTasksCustomMetadata.PersistentTask<ReindexDataStreamTaskParams> taskInProgress,
Map<String, String> headers
) {
return new ReindexDataStreamTask(threadPool, id, type, action, "id=" + taskInProgress.getId(), parentTaskId, headers);
}

@Override
protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTaskParams params, PersistentTaskState state) {
String sourceDataStream = params.getSourceDataStream();
GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream });
assert task instanceof ReindexDataStreamTask;
final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task;
client.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = response.getDataStreams();
if (dataStreamInfos.size() == 1) {
List<Index> indices = dataStreamInfos.getFirst().getDataStream().getIndices();
List<Index> indicesToBeReindexed = indices.stream()
.filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion())
.toList();
reindexDataStreamTask.setPendingIndices(indicesToBeReindexed.stream().map(Index::getName).toList());
for (Index index : indicesToBeReindexed) {
// TODO This is just a placeholder. This is where the real data stream reindex logic will go
}
reindexDataStreamTask.markAsCompleted();
} else {
reindexDataStreamTask.markAsFailed(new ElasticsearchException("data stream does not exist"));
}
}, reindexDataStreamTask::markAsFailed));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams.task;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;

public class ReindexDataStreamPersistentTaskState implements Task.Status, PersistentTaskState {
public static final String NAME = ReindexDataStreamTask.TASK_NAME;

@Override
public String getWriteableName() {
return ReindexDataStreamTask.TASK_NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder;
}

public static PersistentTaskState fromXContent(XContentParser xContentParser) {
return new ReindexDataStreamPersistentTaskState();
}

public static PersistentTaskState readFromStream(StreamInput streamInput) {
return new ReindexDataStreamPersistentTaskState();
}
}
Loading

0 comments on commit 3d482bb

Please sign in to comment.