diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 5e2b62614fc47..e7db78f2a9b0c 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -87,6 +87,7 @@ import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction; +import org.opensearch.action.admin.cluster.shards.TransportShardsAction; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingAction; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.TransportDeleteWeightedRoutingAction; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingAction; @@ -106,6 +107,7 @@ import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusAction; import org.opensearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction; import org.opensearch.action.admin.cluster.state.ClusterStateAction; +import org.opensearch.action.admin.cluster.state.ShardsAction; import org.opensearch.action.admin.cluster.state.TransportClusterStateAction; import org.opensearch.action.admin.cluster.stats.ClusterStatsAction; import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction; @@ -609,7 +611,8 @@ public void reg actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class); actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class); actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class); - actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class); +// actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class); + actions.register(ShardsAction.INSTANCE, TransportShardsAction.class); actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class); actions.register(AddVotingConfigExclusionsAction.INSTANCE, TransportAddVotingConfigExclusionsAction.class); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 1b74f418444f7..4678406749a4e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.util.*; +import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; /** @@ -57,7 +58,7 @@ public class NodesStatsRequest extends BaseNodesRequest { private CommonStatsFlags indices = new CommonStatsFlags(); private final Set requestedMetrics = new HashSet<>(); - private TimeValue cancelAfterTimeInterval; + private TimeValue cancelAfterTimeInterval = null; public NodesStatsRequest() { super((String[]) null); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/CreateTaskRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/CreateTaskRequest.java new file mode 100644 index 0000000000000..6d25659fab42b --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/CreateTaskRequest.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.node.tasks; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.rest.action.admin.cluster.ClusterTask; + +import java.io.IOException; +import java.util.Map; + +/** + * Creates a new task. + * + * @opensearch.api + */ +@PublicApi(since = "1.0.0") +public class CreateTaskRequest extends ActionRequest { + + private TimeValue cancelAfterTimeInterval = null; + private String actionName; + + public CreateTaskRequest() {} + + public CreateTaskRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public ClusterTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ClusterTask(id, type, action, parentTaskId, headers, cancelAfterTimeInterval); + } + + public void setCancelAfterTimeInterval(TimeValue cancelAfterTimeInterval) { + this.cancelAfterTimeInterval = cancelAfterTimeInterval; + } + + public TimeValue getCancelAfterTimeInterval() { + return cancelAfterTimeInterval; + } + + public void setActionName(String actionName) { + this.actionName = actionName; + } + + public String getActionName() { + return this.actionName; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/CreateTaskResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/CreateTaskResponse.java new file mode 100644 index 0000000000000..68239c2d917ec --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/CreateTaskResponse.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.node.tasks; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskResult; + +import java.io.IOException; + +/** + * Returns a new task. + * + * @opensearch.api + */ +@PublicApi(since = "1.0.0") +public class CreateTaskResponse extends ActionResponse { + + private final Task task; + + public CreateTaskResponse(Task task) { + this.task = task; + } + +// public CreateTaskResponse(StreamInput in) throws IOException { +// super(in); +//// task = in.readOptionalWriteable(Task); +// } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable((Writeable) task); + } + + public Task getTask() { + return task; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/GetTaskRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/GetTaskRequest.java index 13c6d645d4c3a..0e9d7135249e4 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/GetTaskRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/GetTaskRequest.java @@ -39,8 +39,11 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.tasks.TaskId; +import org.opensearch.rest.action.admin.cluster.ClusterTask; +import org.opensearch.tasks.Task; import java.io.IOException; +import java.util.Map; import static org.opensearch.action.ValidateActions.addValidationError; @@ -54,6 +57,9 @@ public class GetTaskRequest extends ActionRequest { private TaskId taskId = TaskId.EMPTY_TASK_ID; private boolean waitForCompletion = false; private TimeValue timeout = null; + private TimeValue cancelAfterTimeInterval = null; + + private Task task; /** * Get the TaskId to look up. @@ -109,6 +115,19 @@ public GetTaskRequest setTimeout(TimeValue timeout) { return this; } + @Override + public ClusterTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ClusterTask(id, type, action, parentTaskId, headers, cancelAfterTimeInterval); + } + + public void setCancelAfterTimeInterval(TimeValue cancelAfterTimeInterval) { + this.cancelAfterTimeInterval = cancelAfterTimeInterval; + } + + public TimeValue getCancelAfterTimeInterval() { + return cancelAfterTimeInterval; + } + GetTaskRequest nodeRequest(String thisNodeId, long thisTaskId) { GetTaskRequest copy = new GetTaskRequest(); copy.setParentTask(thisNodeId, thisTaskId); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java index 80901373e14d5..f015d45c636de 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java @@ -40,6 +40,7 @@ import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskResult; import java.io.IOException; @@ -56,6 +57,8 @@ public class GetTaskResponse extends ActionResponse implements ToXContentObject private final TaskResult task; + private Task new_task; + public GetTaskResponse(TaskResult task) { this.task = requireNonNull(task, "task is required"); } @@ -77,6 +80,14 @@ public TaskResult getTask() { return task; } + public void setNewTask(Task task) { + this.new_task = task; + } + + public Task getNewTask() { + return new_task; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportShardsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportShardsAction.java new file mode 100644 index 0000000000000..5a44317f66521 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportShardsAction.java @@ -0,0 +1,114 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.admin.cluster.node.tasks.CreateTaskRequest; +import org.opensearch.action.admin.cluster.node.tasks.CreateTaskResponse; +import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction; +import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.TransportSearchAction; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.client.Client; +import org.opensearch.client.OriginSettingClient; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Table; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.logging.DeprecationLogger; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.Strings; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestActionListener; +import org.opensearch.rest.action.RestResponseListener; +import org.opensearch.rest.action.cat.AbstractCatAction; +import org.opensearch.rest.action.cat.RestShardsAction; +import org.opensearch.rest.action.cat.RestTable; +import org.opensearch.tasks.*; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import static org.opensearch.rest.BaseRestHandler.parseDeprecatedMasterTimeoutParameter; + +public class TransportShardsAction extends HandledTransportAction { + + private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(TransportShardsAction.class); + private final TransportService transportService; + private final ClusterService clusterService; + private final ThreadPool threadPool; +// private final ActionFilters actionFilter; + private final NodeClient client; + + @Inject + public TransportShardsAction( + NodeClient client, + ThreadPool threadPool, + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService + ) { + super(GetTaskAction.NAME, transportService, actionFilters, GetTaskRequest::new); + this.threadPool = threadPool; + this.clusterService = clusterService; + this.transportService = transportService; + this.client = client; +// this.client = new OriginSettingClient(client, GetTaskAction.TASKS_ORIGIN); + } + +// @Override +// protected void doExecute(Task task, final Request request, ActionListener listener) { +// executeRequest(task, request, listener); +// } + + @Override + protected void doExecute(Task task, GetTaskRequest request, ActionListener listener) { + Task new_task = transportService.getTaskManager().register("transport", "Admin-action", request); + TaskResult result = new TaskResult(false, new TaskInfo(new TaskId(clusterService.localNode().getId(), new_task.getId()), new_task.getType(), new_task.getAction(), new_task.getDescription(), new_task.getStatus(), new_task.getStartTime(), new_task.getStartTimeNanos(), true, false, new_task.getParentTaskId(), null, null)); + GetTaskResponse response = new GetTaskResponse(result); +// new_task.getCancellationTimeout() + response.setNewTask(new_task); + listener.onResponse(response); + + } + +// @Override +// protected void doExecute(Task task, CreateTaskRequest request, ActionListener listener) { +// Task new_task = transportService.getTaskManager().register("transport", "Admin-action", request); +// TaskResult result = new TaskResult(false, new TaskInfo(new TaskId(clusterService.localNode().getId(), new_task.getId()), new_task.getType(), new_task.getAction(), new_task.getDescription(), new_task.getStatus(), new_task.getStartTime(), new_task.getStartTimeNanos(), true, false, new_task.getParentTaskId(), null, null)); +//// GetTaskResponse response = new GetTaskResponse(result); +// CreateTaskResponse response = new CreateTaskResponse(new_task); +// listener.onResponse(response); +// } + +// private void executeRequest( +// Task task, +// Request request, +// ActionListener originalListener +// ) { +// Task new_task = transportService.getTaskManager().register("transport", "Admin-action", request); +// originalListener.onResponse(new_task); +// } + + + +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/ClusterStateRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/ClusterStateRequest.java index 90a52f7406d57..cdfe1be6eda4b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/ClusterStateRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/ClusterStateRequest.java @@ -64,6 +64,8 @@ public class ClusterStateRequest extends ClusterManagerNodeReadRequest { + public static final ShardsAction INSTANCE = new ShardsAction(); + public static final String NAME = "cluster:monitor/create/task"; + + private ShardsAction() { + super(NAME, GetTaskResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java index 2b64464a76899..c01d07d98e9d0 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java @@ -36,8 +36,11 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.rest.action.admin.cluster.ClusterTask; import java.io.IOException; +import java.util.Map; /** * A request to get indices level stats. Allow to enable different stats to be returned. @@ -63,6 +66,10 @@ public IndicesStatsRequest(StreamInput in) throws IOException { flags = new CommonStatsFlags(in); } + @Override + public ClusterTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ClusterTask(id, type, action, parentTaskId, headers); + } /** * Sets all flags to return all stats. */ diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 143b01af3f62f..fcc60b87f8ba6 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -327,7 +327,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< listener = TimeoutTaskCancellationUtility.wrapWithCancellationListener( client, (CancellableTask) task, - clusterService.getClusterSettings(), +// clusterService.getClusterSettings(), listener ); } diff --git a/server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java b/server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java index a317a45eab31f..3612eb721c0e2 100644 --- a/server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java +++ b/server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java @@ -23,6 +23,7 @@ import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -43,17 +44,17 @@ public class TimeoutTaskCancellationUtility { * generic thread pool * @param client - {@link NodeClient} * @param taskToCancel - task to schedule cancellation for - * @param clusterSettings - {@link ClusterSettings} +// * @param clusterSettings - {@link ClusterSettings} * @param listener - original listener associated with the task * @return wrapped listener */ public static ActionListener wrapWithCancellationListener( NodeClient client, CancellableTask taskToCancel, - ClusterSettings clusterSettings, +// ClusterSettings clusterSettings, ActionListener listener ) { - final TimeValue globalTimeout = clusterSettings.get(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING); + final TimeValue globalTimeout = TimeValue.timeValueSeconds(30); final TimeValue timeoutInterval = (taskToCancel.getCancellationTimeout() == null) ? globalTimeout : taskToCancel.getCancellationTimeout(); @@ -100,6 +101,25 @@ public static ActionListener wrapWithCancellationListener( return listenerToReturn; } + public static ActionListener wrapWithCancellationListener( + NodeClient client, + CountDownLatch latch, + TimeValue timeoutInterval, + ActionListener listener + ) { + try { + final TimeoutRunnableListener wrappedListener = new TimeoutRunnableListener<>(timeoutInterval, listener, () -> { + // timeout expired + latch.countDown(); + }); + client.threadPool().schedule(wrappedListener, timeoutInterval, ThreadPool.Names.GENERIC); + } catch (Exception ex) { + // if there is any exception in scheduling thread then continue without it + logger.warn("Failed to schedule the thread, will continue without it"); + } + return listener; + } + /** * Timeout listener which executes the provided runnable after timeout is expired and if a response/failure is not yet received. * If either a response/failure is received before timeout then the scheduled task is cancelled and response/failure is sent back to diff --git a/server/src/main/java/org/opensearch/action/support/TransportAction.java b/server/src/main/java/org/opensearch/action/support/TransportAction.java index f71347f6f1d07..9055653f1492a 100644 --- a/server/src/main/java/org/opensearch/action/support/TransportAction.java +++ b/server/src/main/java/org/opensearch/action/support/TransportAction.java @@ -97,6 +97,8 @@ public final Task execute(Request request, ActionListener listener) { final Releasable unregisterChildNode = registerChildNode(request.getParentTask()); final Task task; + logger.info("Creating task for action: {}", actionName); + try { task = taskManager.register("transport", actionName, request); } catch (TaskCancelledException e) { @@ -138,6 +140,7 @@ public void onFailure(Exception e) { */ public final Task execute(Request request, TaskListener listener) { final Releasable unregisterChildNode = registerChildNode(request.getParentTask()); + logger.info("Creating task for action1: {}", actionName); final Task task; try { task = taskManager.register("transport", actionName, request); diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index c08cfb7af0e3d..c2de438454a74 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -380,11 +380,13 @@ public NodeResponse read(StreamInput in) throws IOException { @Override public void handleResponse(NodeResponse response) { + logger.info("TransportBroadcastByNodeAction testing: Response received"); onNodeResponse(node, nodeIndex, response); } @Override public void handleException(TransportException exp) { + logger.info("TransportBroadcastByNodeAction testing: Exception occurred"); onNodeFailure(node, nodeIndex, exp); } diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index b3c348f2fadf5..730a70ca6962f 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionRunnable; import org.opensearch.action.FailedNodeException; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.TimeoutTaskCancellationUtility; @@ -215,7 +216,7 @@ protected void doExecute(Task task, NodesRequest request, ActionListener listener); + void createTask(GetTaskRequest request, ActionListener listener); + /** * Fetch a task by id. */ diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 6c6049f04231b..ca0d9b3ba0816 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -157,10 +157,7 @@ import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequestBuilder; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; -import org.opensearch.action.admin.cluster.state.ClusterStateAction; -import org.opensearch.action.admin.cluster.state.ClusterStateRequest; -import org.opensearch.action.admin.cluster.state.ClusterStateRequestBuilder; -import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.cluster.state.*; import org.opensearch.action.admin.cluster.stats.ClusterStatsAction; import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest; import org.opensearch.action.admin.cluster.stats.ClusterStatsRequestBuilder; @@ -1004,6 +1001,11 @@ public void getTask(final GetTaskRequest request, final ActionListener listener) { + execute(ShardsAction.INSTANCE, request, listener); + } + @Override public GetTaskRequestBuilder prepareGetTask(String taskId) { return prepareGetTask(new TaskId(taskId)); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java index 75dcadfef55e4..2cbcb7068e7c8 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java @@ -33,11 +33,14 @@ package org.opensearch.rest.action.admin.cluster; import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.admin.indices.stats.CommonStatsFlags.Flag; +import org.opensearch.action.support.TimeoutTaskCancellationUtility; import org.opensearch.client.node.NodeClient; import org.opensearch.common.cache.CacheType; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; @@ -51,6 +54,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; import static java.util.Arrays.asList; diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index e11012a23fce7..d75bc91247653 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -77,6 +77,7 @@ import java.util.List; import java.util.Locale; +import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import static java.util.Collections.singletonList; diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java index 4413c8eb370be..0fe9a845b9e9a 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java @@ -32,19 +32,31 @@ package org.opensearch.rest.action.cat; +import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; +import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.cluster.state.ShardsAction; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.TimeoutTaskCancellationUtility; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.common.Table; +import org.opensearch.common.inject.Inject; import org.opensearch.common.logging.DeprecationLogger; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; +import org.opensearch.core.tasks.TaskId; import org.opensearch.index.cache.query.QueryCacheStats; import org.opensearch.index.engine.CommitStats; import org.opensearch.index.engine.Engine; @@ -57,6 +69,7 @@ import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.shard.DocsStats; +import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.store.StoreStats; import org.opensearch.index.warmer.WarmerStats; import org.opensearch.rest.RestRequest; @@ -64,10 +77,13 @@ import org.opensearch.rest.action.RestActionListener; import org.opensearch.rest.action.RestResponseListener; import org.opensearch.search.suggest.completion.CompletionStats; +import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.Task; import java.time.Instant; import java.util.List; import java.util.Locale; +import java.util.concurrent.CountDownLatch; import java.util.function.Function; import static java.util.Arrays.asList; @@ -112,14 +128,68 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli clusterStateRequest.clusterManagerNodeTimeout( request.paramAsTime("cluster_manager_timeout", clusterStateRequest.clusterManagerNodeTimeout()) ); + clusterStateRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", TimeValue.timeValueSeconds(30))); parseDeprecatedMasterTimeoutParameter(clusterStateRequest, request, deprecationLogger, getName()); clusterStateRequest.clear().nodes(true).routingTable(true).indices(indices); +// clusterStateRequest.setParentTask(); + GetTaskRequest getTaskRequest = new GetTaskRequest(); + getTaskRequest.setTaskId(new TaskId("task:11")); + getTaskRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", TimeValue.timeValueSeconds(30))); + CountDownLatch latch = new CountDownLatch(1); + final GetTaskResponse[] response = new GetTaskResponse[1]; + client.admin().cluster().createTask(getTaskRequest, new ActionListener() { + @Override + public void onResponse(GetTaskResponse getTaskResponse) { + response[0] = getTaskResponse; + latch.countDown(); + logger.info("Task ID is: {}", getTaskResponse.getTask().getTask().getId()); + } + + @Override + public void onFailure(Exception e) { + + } + }); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + + } + CancellableTask task = (CancellableTask) response[0].getNewTask(); + ActionListener listener = TimeoutTaskCancellationUtility.wrapWithCancellationListener( + client, + task, + new ActionListener() { + @Override + public void onResponse(RestResponse restResponse) { + logger.info("Task cancellation scheduled successfully."); + } + + @Override + public void onFailure(Exception e) { + + } + } + ); + clusterStateRequest.setParentTask(client.getLocalNodeId(), response[0].getNewTask().getId()); +// try { +// Thread.sleep(2000); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { @Override public void processResponse(final ClusterStateResponse clusterStateResponse) { IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); indicesStatsRequest.all(); indicesStatsRequest.indices(indices); + indicesStatsRequest.setParentTask(client.getLocalNodeId(), response[0].getNewTask().getId()); +// try { +// Thread.sleep(2000); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } client.admin().indices().stats(indicesStatsRequest, new RestResponseListener(channel) { @Override public RestResponse buildResponse(IndicesStatsResponse indicesStatsResponse) throws Exception { diff --git a/server/src/main/java/org/opensearch/tasks/TaskManager.java b/server/src/main/java/org/opensearch/tasks/TaskManager.java index a49968ab85e89..491758dfa0483 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskManager.java +++ b/server/src/main/java/org/opensearch/tasks/TaskManager.java @@ -661,6 +661,7 @@ public CancellableTask getTask() { } synchronized void registerChildNode(DiscoveryNode node) { + logger.info("registerChildNode {}", banChildren); if (banChildren) { throw new TaskCancelledException("The parent task was cancelled, shouldn't start any child tasks"); }