Skip to content

Commit

Permalink
Support cancellation for admin apis (#13966)
Browse files Browse the repository at this point in the history
* Support cancellation for admin apis - add implementation for _cat/shards

Signed-off-by: Somesh Gupta <someshgupta987@gmail.com>
  • Loading branch information
aasom143 committed Sep 3, 2024
1 parent bcd09ab commit 9f81479
Show file tree
Hide file tree
Showing 17 changed files with 601 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Support cancellation for cat shards and node stats API.([#13966](https://github.com/opensearch-project/OpenSearch/pull/13966))
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.search.SearchService.NO_TIMEOUT;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
public class TransportCatShardsActionIT extends OpenSearchIntegTestCase {

public void testCatShardsWithSuccessResponse() throws InterruptedException {
internalCluster().startClusterManagerOnlyNodes(1);
List<String> nodes = internalCluster().startDataOnlyNodes(3);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
ensureGreen("test");

final CatShardsRequest shardsRequest = new CatShardsRequest();
shardsRequest.setCancelAfterTimeInterval(NO_TIMEOUT);
shardsRequest.setIndices(Strings.EMPTY_ARRAY);
CountDownLatch latch = new CountDownLatch(1);
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
@Override
public void onResponse(CatShardsResponse catShardsResponse) {
ClusterStateResponse clusterStateResponse = catShardsResponse.getClusterStateResponse();
IndicesStatsResponse indicesStatsResponse = catShardsResponse.getIndicesStatsResponse();
for (ShardRouting shard : clusterStateResponse.getState().routingTable().allShards()) {
assertEquals("test", shard.getIndexName());
assertNotNull(indicesStatsResponse.asMap().get(shard));
}
latch.countDown();
}

@Override
public void onFailure(Exception e) {
fail();
latch.countDown();
}
});
latch.await();
}

public void testCatShardsWithTimeoutException() throws IOException, AssertionError, InterruptedException {
List<String> masterNodes = internalCluster().startClusterManagerOnlyNodes(1);
List<String> nodes = internalCluster().startDataOnlyNodes(3);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
ensureGreen("test");

Settings clusterManagerDataPathSettings = internalCluster().dataPathSettings(masterNodes.get(0));
// Dropping master node to delay in cluster state call.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNodes.get(0)));

CountDownLatch latch = new CountDownLatch(2);
new Thread(() -> {
try {
// Ensures the cancellation timeout expires.
Thread.sleep(2000);
// Starting master node to proceed in cluster state call.
internalCluster().startClusterManagerOnlyNode(
Settings.builder().put("node.name", masterNodes.get(0)).put(clusterManagerDataPathSettings).build()
);
latch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();

final CatShardsRequest shardsRequest = new CatShardsRequest();
TimeValue timeoutInterval = timeValueMillis(1000);
shardsRequest.setCancelAfterTimeInterval(timeoutInterval);
shardsRequest.clusterManagerNodeTimeout(timeValueMillis(2500));
shardsRequest.setIndices(Strings.EMPTY_ARRAY);
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
@Override
public void onResponse(CatShardsResponse catShardsResponse) {
// onResponse should not be called.
latch.countDown();
throw new AssertionError(
"The cat shards action is expected to fail with a TaskCancelledException, but it received a successful response instead."
);
}

@Override
public void onFailure(Exception e) {
assertSame(e.getClass(), TaskCancelledException.class);
assertEquals(e.getMessage(), "Cancellation timeout of " + timeoutInterval + " is expired");
latch.countDown();
}
});
latch.await();
}

}
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@
import org.opensearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.shards.CatShardsAction;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportCatShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.TransportDeleteWeightedRoutingAction;
Expand Down Expand Up @@ -646,6 +648,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ClusterGetWeightedRoutingAction.INSTANCE, TransportGetWeightedRoutingAction.class);
actions.register(ClusterDeleteWeightedRoutingAction.INSTANCE, TransportDeleteWeightedRoutingAction.class);
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(CatShardsAction.INSTANCE, TransportCatShardsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.ActionType;

/**
* Transport action for cat shards
*
* @opensearch.internal
*/
public class CatShardsAction extends ActionType<CatShardsResponse> {
public static final CatShardsAction INSTANCE = new CatShardsAction();
public static final String NAME = "cluster:monitor/shards";

private CatShardsAction() {
super(NAME, CatShardsResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;

import java.io.IOException;
import java.util.Map;

/**
* A request of _cat/shards.
*
* @opensearch.api
*/
public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsRequest> {

private String[] indices;
private TimeValue cancelAfterTimeInterval;

public CatShardsRequest() {}

public CatShardsRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

public void setIndices(String[] indices) {
this.indices = indices;
}

public String[] getIndices() {
return this.indices;
}

public void setCancelAfterTimeInterval(TimeValue timeout) {
this.cancelAfterTimeInterval = timeout;
}

public TimeValue getCancelAfterTimeInterval() {
return this.cancelAfterTimeInterval;
}

@Override
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* A response of a cat shards request.
*
* @opensearch.api
*/
public class CatShardsResponse extends ActionResponse {

private ClusterStateResponse clusterStateResponse = null;

private IndicesStatsResponse indicesStatsResponse = null;

public CatShardsResponse() {}

public CatShardsResponse(StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterStateResponse.writeTo(out);
indicesStatsResponse.writeTo(out);
}

public void setClusterStateResponse(ClusterStateResponse clusterStateResponse) {
this.clusterStateResponse = clusterStateResponse;
}

public ClusterStateResponse getClusterStateResponse() {
return this.clusterStateResponse;
}

public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
this.indicesStatsResponse = indicesStatsResponse;
}

public IndicesStatsResponse getIndicesStatsResponse() {
return this.indicesStatsResponse;
}
}
Loading

0 comments on commit 9f81479

Please sign in to comment.