Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cancellation for admin apis #13966

Merged
merged 29 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
23dbd9f
Support cancellation for admin apis
aasom143 Jun 4, 2024
3d1adad
Create task in rest action and give parent child corelation.
aasom143 Jun 19, 2024
c7cc8f6
Changes to support cancellation in admin apis
aasom143 Jul 4, 2024
194a45c
Revert "Changes to support cancellation in admin apis"
aasom143 Jul 24, 2024
558bff5
Revert "Create task in rest action and give parent child corelation."
aasom143 Jul 24, 2024
114824c
Revert "Support cancellation for admin apis"
aasom143 Jul 24, 2024
d04f8e0
Added cancellation on cat shards and nodes stats api
aasom143 Jul 24, 2024
df10662
Removed failure scenarios for ITs, will cover.
aasom143 Jul 24, 2024
00a5a50
Added Failure cases ITs
aasom143 Jul 25, 2024
62d6721
Merge branch 'main' into cancellation-1
aasom143 Jul 26, 2024
8ec121a
Removed latch from source code and fixed uts.
aasom143 Jul 31, 2024
a2cfa7b
Fixed comments and wrapped listner to fail after the timeout
aasom143 Aug 5, 2024
791be08
Merge branch 'main' into cancellation-1
aasom143 Aug 7, 2024
647950b
Added test for CatShardsResponse
aasom143 Aug 7, 2024
c47f0f0
Added flag to create cancellable task only if required
aasom143 Aug 8, 2024
de2ef54
Fixed comments from rajiv-kv
guptasom Aug 9, 2024
fe05bf0
Fixed comments from rajiv-kv - rev-2
aasom143 Aug 12, 2024
334476a
Fixed blocking comment for cancellation phase-2
aasom143 Aug 22, 2024
e943745
Fixed final comments to support cancellation phase-II
aasom143 Aug 26, 2024
d7c839f
Merge branch 'main' into cancellation-1
aasom143 Aug 26, 2024
9039bab
minor comment fix
aasom143 Aug 26, 2024
5d18423
minor comment fix
aasom143 Aug 27, 2024
007e9ca
minor
aasom143 Aug 27, 2024
923fe5a
Fixed comment from Shweta
aasom143 Aug 28, 2024
5705b92
Fixed comments from Shweta rev-2
aasom143 Aug 29, 2024
6492189
Fixed comments related to version check and ClusterManagerNodeReadReq…
aasom143 Aug 30, 2024
f689a76
Merge branch 'main' into cancellation-1
aasom143 Aug 30, 2024
24d16a2
Fixed test file
aasom143 Aug 31, 2024
e062a97
Merge branch 'main' into cancellation-1
aasom143 Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Support cancellation for cat shards and node stats API.([#13966](https://github.com/opensearch-project/OpenSearch/pull/13966))
aasom143 marked this conversation as resolved.
Show resolved Hide resolved

### Dependencies
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.15.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.search.SearchService.NO_TIMEOUT;
import static org.junit.Assert.fail;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
public class TransportCatShardsActionIT extends OpenSearchIntegTestCase {

public void testCatShardsWithSuccessResponse() throws InterruptedException {
internalCluster().startClusterManagerOnlyNodes(1);
List<String> nodes = internalCluster().startDataOnlyNodes(3);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
ensureGreen("test");

final CatShardsRequest shardsRequest = new CatShardsRequest();
shardsRequest.setCancelAfterTimeInterval(NO_TIMEOUT);
shardsRequest.setIndices(Strings.EMPTY_ARRAY);
CountDownLatch latch = new CountDownLatch(1);
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
@Override
public void onResponse(CatShardsResponse catShardsResponse) {
ClusterStateResponse clusterStateResponse = catShardsResponse.getClusterStateResponse();
IndicesStatsResponse indicesStatsResponse = catShardsResponse.getIndicesStatsResponse();
for (ShardRouting shard : clusterStateResponse.getState().routingTable().allShards()) {
assertEquals("test", shard.getIndexName());
assertNotNull(indicesStatsResponse.asMap().get(shard));
}
latch.countDown();
}

@Override
public void onFailure(Exception e) {
fail();
latch.countDown();
}
});
latch.await();
}
aasom143 marked this conversation as resolved.
Show resolved Hide resolved

public void testCatShardsWithTimeoutException() throws IOException, AssertionError, InterruptedException {
List<String> masterNodes = internalCluster().startClusterManagerOnlyNodes(1);
List<String> nodes = internalCluster().startDataOnlyNodes(3);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
ensureGreen("test");

Settings clusterManagerDataPathSettings = internalCluster().dataPathSettings(masterNodes.get(0));
// Dropping master node to delay in cluster state call.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNodes.get(0)));
aasom143 marked this conversation as resolved.
Show resolved Hide resolved

new Thread(() -> {
try {
// Ensures the cancellation timeout expires.
Thread.sleep(1000);
aasom143 marked this conversation as resolved.
Show resolved Hide resolved
// Starting master node to proceed in cluster state call.
internalCluster().startClusterManagerOnlyNode(
aasom143 marked this conversation as resolved.
Show resolved Hide resolved
Settings.builder().put("node.name", masterNodes.get(0)).put(clusterManagerDataPathSettings).build()
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();

final CatShardsRequest shardsRequest = new CatShardsRequest();
shardsRequest.setCancelAfterTimeInterval(timeValueMillis(1000));
shardsRequest.setIndices(Strings.EMPTY_ARRAY);
CountDownLatch latch = new CountDownLatch(1);
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
aasom143 marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void onResponse(CatShardsResponse catShardsResponse) {
// onResponse should not be called.
fail();
aasom143 marked this conversation as resolved.
Show resolved Hide resolved
latch.countDown();
}

@Override
public void onFailure(Exception e) {
boolean timeoutException = (e.getClass() == TaskCancelledException.class);
if (e.getCause() != null) {
timeoutException = timeoutException
|| (e.getCause().getMessage().contains("The parent task was cancelled, shouldn't start any child tasks"));
}
assertTrue(timeoutException);
latch.countDown();
}
});
latch.await();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@
import org.opensearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.shards.CatShardsAction;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportCatShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.TransportDeleteWeightedRoutingAction;
Expand Down Expand Up @@ -646,6 +648,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ClusterGetWeightedRoutingAction.INSTANCE, TransportGetWeightedRoutingAction.class);
actions.register(ClusterDeleteWeightedRoutingAction.INSTANCE, TransportDeleteWeightedRoutingAction.class);
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(CatShardsAction.INSTANCE, TransportCatShardsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.ActionType;

/**
* Transport action for cat shards
*
* @opensearch.internal
*/
public class CatShardsAction extends ActionType<CatShardsResponse> {
public static final CatShardsAction INSTANCE = new CatShardsAction();
public static final String NAME = "cluster:monitor/shards";

private CatShardsAction() {
super(NAME, CatShardsResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;

import java.io.IOException;
import java.util.Map;

import static org.opensearch.search.SearchService.NO_TIMEOUT;

/**
* A request of _cat/shards.
*
* @opensearch.api
*/
public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsRequest> {
aasom143 marked this conversation as resolved.
Show resolved Hide resolved

private String[] indices;
private TimeValue cancelAfterTimeInterval = NO_TIMEOUT;

public CatShardsRequest() {}

public CatShardsRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

public void setClusterManagerNodeTimeout(TimeValue timeout) {
this.clusterManagerNodeTimeout = timeout;
}

public TimeValue getClusterManagerNodeTimeout() {
return this.clusterManagerNodeTimeout;
}

public void setLocal(boolean local) {
this.local = local;
}

public boolean getLocal() {
return this.local;
}

public void setIndices(String[] indices) {
this.indices = indices;
}

public String[] getIndices() {
return this.indices;
}

public void setCancelAfterTimeInterval(TimeValue timeout) {
aasom143 marked this conversation as resolved.
Show resolved Hide resolved
this.cancelAfterTimeInterval = timeout;
}

@Override
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* A response of a cat shards request.
*
* @opensearch.api
*/
public class CatShardsResponse extends ActionResponse {
aasom143 marked this conversation as resolved.
Show resolved Hide resolved

private ClusterStateResponse clusterStateResponse = null;

private IndicesStatsResponse indicesStatsResponse = null;

@Override
public void writeTo(StreamOutput out) throws IOException {
aasom143 marked this conversation as resolved.
Show resolved Hide resolved

}

public CatShardsResponse() {}

public CatShardsResponse(StreamInput in) throws IOException {
super(in);
aasom143 marked this conversation as resolved.
Show resolved Hide resolved
}

public void setClusterStateResponse(ClusterStateResponse clusterStateResponse) {
this.clusterStateResponse = clusterStateResponse;
}

public ClusterStateResponse getClusterStateResponse() {
return this.clusterStateResponse;
}

public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
this.indicesStatsResponse = indicesStatsResponse;
}

public IndicesStatsResponse getIndicesStatsResponse() {
return this.indicesStatsResponse;
}
}
Loading
Loading