Skip to content

Commit

Permalink
Create task in rest action and give parent child corelation.
Browse files Browse the repository at this point in the history
  • Loading branch information
aasom143 committed Jun 19, 2024
1 parent da592e1 commit 9152bbf
Show file tree
Hide file tree
Showing 21 changed files with 420 additions and 14 deletions.
5 changes: 4 additions & 1 deletion server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportShardsAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.TransportDeleteWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingAction;
Expand All @@ -106,6 +107,7 @@
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusAction;
import org.opensearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction;
import org.opensearch.action.admin.cluster.state.ClusterStateAction;
import org.opensearch.action.admin.cluster.state.ShardsAction;
import org.opensearch.action.admin.cluster.state.TransportClusterStateAction;
import org.opensearch.action.admin.cluster.stats.ClusterStatsAction;
import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction;
Expand Down Expand Up @@ -609,7 +611,8 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class);
actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class);
// actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class);
actions.register(ShardsAction.INSTANCE, TransportShardsAction.class);
actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);

actions.register(AddVotingConfigExclusionsAction.INSTANCE, TransportAddVotingConfigExclusionsAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

/**
Expand All @@ -57,7 +58,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
private CommonStatsFlags indices = new CommonStatsFlags();
private final Set<String> requestedMetrics = new HashSet<>();

private TimeValue cancelAfterTimeInterval;
private TimeValue cancelAfterTimeInterval = null;

public NodesStatsRequest() {
super((String[]) null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.tasks;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.rest.action.admin.cluster.ClusterTask;

import java.io.IOException;
import java.util.Map;

/**
* Creates a new task.
*
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class CreateTaskRequest extends ActionRequest {

private TimeValue cancelAfterTimeInterval = null;
private String actionName;

public CreateTaskRequest() {}

public CreateTaskRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public ClusterTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ClusterTask(id, type, action, parentTaskId, headers, cancelAfterTimeInterval);
}

public void setCancelAfterTimeInterval(TimeValue cancelAfterTimeInterval) {
this.cancelAfterTimeInterval = cancelAfterTimeInterval;
}

public TimeValue getCancelAfterTimeInterval() {
return cancelAfterTimeInterval;
}

public void setActionName(String actionName) {
this.actionName = actionName;
}

public String getActionName() {
return this.actionName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.tasks;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResult;

import java.io.IOException;

/**
* Returns a new task.
*
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class CreateTaskResponse extends ActionResponse {

private final Task task;

public CreateTaskResponse(Task task) {
this.task = task;
}

// public CreateTaskResponse(StreamInput in) throws IOException {
// super(in);
//// task = in.readOptionalWriteable(Task);
// }

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable((Writeable) task);
}

public Task getTask() {
return task;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.rest.action.admin.cluster.ClusterTask;
import org.opensearch.tasks.Task;

import java.io.IOException;
import java.util.Map;

import static org.opensearch.action.ValidateActions.addValidationError;

Expand All @@ -54,6 +57,9 @@ public class GetTaskRequest extends ActionRequest {
private TaskId taskId = TaskId.EMPTY_TASK_ID;
private boolean waitForCompletion = false;
private TimeValue timeout = null;
private TimeValue cancelAfterTimeInterval = null;

private Task task;

/**
* Get the TaskId to look up.
Expand Down Expand Up @@ -109,6 +115,19 @@ public GetTaskRequest setTimeout(TimeValue timeout) {
return this;
}

@Override
public ClusterTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ClusterTask(id, type, action, parentTaskId, headers, cancelAfterTimeInterval);
}

public void setCancelAfterTimeInterval(TimeValue cancelAfterTimeInterval) {
this.cancelAfterTimeInterval = cancelAfterTimeInterval;
}

public TimeValue getCancelAfterTimeInterval() {
return cancelAfterTimeInterval;
}

GetTaskRequest nodeRequest(String thisNodeId, long thisTaskId) {
GetTaskRequest copy = new GetTaskRequest();
copy.setParentTask(thisNodeId, thisTaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResult;

import java.io.IOException;
Expand All @@ -56,6 +57,8 @@ public class GetTaskResponse extends ActionResponse implements ToXContentObject

private final TaskResult task;

private Task new_task;

public GetTaskResponse(TaskResult task) {
this.task = requireNonNull(task, "task is required");
}
Expand All @@ -77,6 +80,14 @@ public TaskResult getTask() {
return task;
}

public void setNewTask(Task task) {
this.new_task = task;
}

public Task getNewTask() {
return new_task;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.admin.cluster.node.tasks.CreateTaskRequest;
import org.opensearch.action.admin.cluster.node.tasks.CreateTaskResponse;
import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction;
import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.client.Client;
import org.opensearch.client.OriginSettingClient;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Table;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.Strings;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestActionListener;
import org.opensearch.rest.action.RestResponseListener;
import org.opensearch.rest.action.cat.AbstractCatAction;
import org.opensearch.rest.action.cat.RestShardsAction;
import org.opensearch.rest.action.cat.RestTable;
import org.opensearch.tasks.*;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import static org.opensearch.rest.BaseRestHandler.parseDeprecatedMasterTimeoutParameter;

public class TransportShardsAction extends HandledTransportAction<GetTaskRequest, GetTaskResponse> {

private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(TransportShardsAction.class);
private final TransportService transportService;
private final ClusterService clusterService;
private final ThreadPool threadPool;
// private final ActionFilters actionFilter;
private final NodeClient client;

@Inject
public TransportShardsAction(
NodeClient client,
ThreadPool threadPool,
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService
) {
super(GetTaskAction.NAME, transportService, actionFilters, GetTaskRequest::new);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
this.client = client;
// this.client = new OriginSettingClient(client, GetTaskAction.TASKS_ORIGIN);
}

// @Override
// protected void doExecute(Task task, final Request request, ActionListener<Task> listener) {
// executeRequest(task, request, listener);
// }

@Override
protected void doExecute(Task task, GetTaskRequest request, ActionListener<GetTaskResponse> listener) {
Task new_task = transportService.getTaskManager().register("transport", "Admin-action", request);
TaskResult result = new TaskResult(false, new TaskInfo(new TaskId(clusterService.localNode().getId(), new_task.getId()), new_task.getType(), new_task.getAction(), new_task.getDescription(), new_task.getStatus(), new_task.getStartTime(), new_task.getStartTimeNanos(), true, false, new_task.getParentTaskId(), null, null));
GetTaskResponse response = new GetTaskResponse(result);
// new_task.getCancellationTimeout()
response.setNewTask(new_task);
listener.onResponse(response);

}

// @Override
// protected void doExecute(Task task, CreateTaskRequest request, ActionListener<CreateTaskResponse> listener) {
// Task new_task = transportService.getTaskManager().register("transport", "Admin-action", request);
// TaskResult result = new TaskResult(false, new TaskInfo(new TaskId(clusterService.localNode().getId(), new_task.getId()), new_task.getType(), new_task.getAction(), new_task.getDescription(), new_task.getStatus(), new_task.getStartTime(), new_task.getStartTimeNanos(), true, false, new_task.getParentTaskId(), null, null));
//// GetTaskResponse response = new GetTaskResponse(result);
// CreateTaskResponse response = new CreateTaskResponse(new_task);
// listener.onResponse(response);
// }

// private void executeRequest(
// Task task,
// Request request,
// ActionListener<Task> originalListener
// ) {
// Task new_task = transportService.getTaskManager().register("transport", "Admin-action", request);
// originalListener.onResponse(new_task);
// }



}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class ClusterStateRequest extends ClusterManagerNodeReadRequest<ClusterSt
private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen();

private TimeValue cancelAfterTimeInterval;

public ClusterStateRequest() {}

public ClusterStateRequest(StreamInput in) throws IOException {
Expand Down Expand Up @@ -118,6 +120,14 @@ public ClusterStateRequest clear() {
return this;
}

public void setCancelAfterTimeInterval(TimeValue cancelAfterTimeInterval) {
this.cancelAfterTimeInterval = cancelAfterTimeInterval;
}

public TimeValue getCancelAfterTimeInterval() {
return cancelAfterTimeInterval;
}

public boolean routingTable() {
return routingTable;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.state;

import org.opensearch.action.ActionType;
import org.opensearch.action.admin.cluster.node.tasks.CreateTaskResponse;
import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.opensearch.tasks.Task;

public class ShardsAction extends ActionType<GetTaskResponse> {
public static final ShardsAction INSTANCE = new ShardsAction();
public static final String NAME = "cluster:monitor/create/task";

private ShardsAction() {
super(NAME, GetTaskResponse::new);
}
}
Loading

0 comments on commit 9152bbf

Please sign in to comment.