Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cancellation for admin apis #13966

Merged
merged 29 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
23dbd9f
Support cancellation for admin apis
aasom143 Jun 4, 2024
3d1adad
Create task in rest action and give parent child corelation.
aasom143 Jun 19, 2024
c7cc8f6
Changes to support cancellation in admin apis
aasom143 Jul 4, 2024
194a45c
Revert "Changes to support cancellation in admin apis"
aasom143 Jul 24, 2024
558bff5
Revert "Create task in rest action and give parent child corelation."
aasom143 Jul 24, 2024
114824c
Revert "Support cancellation for admin apis"
aasom143 Jul 24, 2024
d04f8e0
Added cancellation on cat shards and nodes stats api
aasom143 Jul 24, 2024
df10662
Removed failure scenarios for ITs, will cover.
aasom143 Jul 24, 2024
00a5a50
Added Failure cases ITs
aasom143 Jul 25, 2024
62d6721
Merge branch 'main' into cancellation-1
aasom143 Jul 26, 2024
8ec121a
Removed latch from source code and fixed uts.
aasom143 Jul 31, 2024
a2cfa7b
Fixed comments and wrapped listner to fail after the timeout
aasom143 Aug 5, 2024
791be08
Merge branch 'main' into cancellation-1
aasom143 Aug 7, 2024
647950b
Added test for CatShardsResponse
aasom143 Aug 7, 2024
c47f0f0
Added flag to create cancellable task only if required
aasom143 Aug 8, 2024
de2ef54
Fixed comments from rajiv-kv
guptasom Aug 9, 2024
fe05bf0
Fixed comments from rajiv-kv - rev-2
aasom143 Aug 12, 2024
334476a
Fixed blocking comment for cancellation phase-2
aasom143 Aug 22, 2024
e943745
Fixed final comments to support cancellation phase-II
aasom143 Aug 26, 2024
d7c839f
Merge branch 'main' into cancellation-1
aasom143 Aug 26, 2024
9039bab
minor comment fix
aasom143 Aug 26, 2024
5d18423
minor comment fix
aasom143 Aug 27, 2024
007e9ca
minor
aasom143 Aug 27, 2024
923fe5a
Fixed comment from Shweta
aasom143 Aug 28, 2024
5705b92
Fixed comments from Shweta rev-2
aasom143 Aug 29, 2024
6492189
Fixed comments related to version check and ClusterManagerNodeReadReq…
aasom143 Aug 30, 2024
f689a76
Merge branch 'main' into cancellation-1
aasom143 Aug 30, 2024
24d16a2
Fixed test file
aasom143 Aug 31, 2024
e062a97
Merge branch 'main' into cancellation-1
aasom143 Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Support cancellation for cat shards and node stats API.([#13966](https://github.com/opensearch-project/OpenSearch/pull/13966))
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.search.SearchService.NO_TIMEOUT;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
public class TransportCatShardsActionIT extends OpenSearchIntegTestCase {

public void testCatShardsWithSuccessResponse() throws InterruptedException {
internalCluster().startClusterManagerOnlyNodes(1);
List<String> nodes = internalCluster().startDataOnlyNodes(3);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
ensureGreen("test");

final CatShardsRequest shardsRequest = new CatShardsRequest();
shardsRequest.setCancelAfterTimeInterval(NO_TIMEOUT);
shardsRequest.setIndices(Strings.EMPTY_ARRAY);
CountDownLatch latch = new CountDownLatch(1);
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
@Override
public void onResponse(CatShardsResponse catShardsResponse) {
ClusterStateResponse clusterStateResponse = catShardsResponse.getClusterStateResponse();
IndicesStatsResponse indicesStatsResponse = catShardsResponse.getIndicesStatsResponse();
for (ShardRouting shard : clusterStateResponse.getState().routingTable().allShards()) {
assertEquals("test", shard.getIndexName());
assertNotNull(indicesStatsResponse.asMap().get(shard));
}
latch.countDown();
}

@Override
public void onFailure(Exception e) {
fail();
latch.countDown();
}
});
latch.await();
}
aasom143 marked this conversation as resolved.
Show resolved Hide resolved

public void testCatShardsWithTimeoutException() throws IOException, AssertionError, InterruptedException {
List<String> masterNodes = internalCluster().startClusterManagerOnlyNodes(1);
List<String> nodes = internalCluster().startDataOnlyNodes(3);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
ensureGreen("test");

Settings clusterManagerDataPathSettings = internalCluster().dataPathSettings(masterNodes.get(0));
// Dropping master node to delay in cluster state call.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNodes.get(0)));
aasom143 marked this conversation as resolved.
Show resolved Hide resolved

CountDownLatch latch = new CountDownLatch(2);
new Thread(() -> {
try {
// Ensures the cancellation timeout expires.
Thread.sleep(2000);
// Starting master node to proceed in cluster state call.
internalCluster().startClusterManagerOnlyNode(
aasom143 marked this conversation as resolved.
Show resolved Hide resolved
Settings.builder().put("node.name", masterNodes.get(0)).put(clusterManagerDataPathSettings).build()
);
latch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();

final CatShardsRequest shardsRequest = new CatShardsRequest();
TimeValue timeoutInterval = timeValueMillis(1000);
shardsRequest.setCancelAfterTimeInterval(timeoutInterval);
shardsRequest.clusterManagerNodeTimeout(timeValueMillis(2500));
shardsRequest.setIndices(Strings.EMPTY_ARRAY);
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
aasom143 marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void onResponse(CatShardsResponse catShardsResponse) {
// onResponse should not be called.
latch.countDown();
throw new AssertionError(
"The cat shards action is expected to fail with a TaskCancelledException, but it received a successful response instead."
);
}

@Override
public void onFailure(Exception e) {
assertSame(e.getClass(), TaskCancelledException.class);
assertEquals(e.getMessage(), "Cancellation timeout of " + timeoutInterval + " is expired");
latch.countDown();
}
});
latch.await();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@
import org.opensearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.shards.CatShardsAction;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportCatShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.TransportDeleteWeightedRoutingAction;
Expand Down Expand Up @@ -646,6 +648,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ClusterGetWeightedRoutingAction.INSTANCE, TransportGetWeightedRoutingAction.class);
actions.register(ClusterDeleteWeightedRoutingAction.INSTANCE, TransportDeleteWeightedRoutingAction.class);
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(CatShardsAction.INSTANCE, TransportCatShardsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.ActionType;

/**
* Transport action for cat shards
*
* @opensearch.internal
*/
public class CatShardsAction extends ActionType<CatShardsResponse> {
public static final CatShardsAction INSTANCE = new CatShardsAction();
public static final String NAME = "cluster:monitor/shards";

private CatShardsAction() {
super(NAME, CatShardsResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;

import java.io.IOException;
import java.util.Map;

/**
* A request of _cat/shards.
*
* @opensearch.api
*/
public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsRequest> {
aasom143 marked this conversation as resolved.
Show resolved Hide resolved

private String[] indices;
private TimeValue cancelAfterTimeInterval;

public CatShardsRequest() {}

public CatShardsRequest(StreamInput in) throws IOException {
super(in);
}

Check warning on line 35 in server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java#L34-L35

Added lines #L34 - L35 were not covered by tests

@Override
public ActionRequestValidationException validate() {
return null;

Check warning on line 39 in server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java#L39

Added line #L39 was not covered by tests
}

public void setIndices(String[] indices) {
this.indices = indices;
}

public String[] getIndices() {
return this.indices;

Check warning on line 47 in server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java#L47

Added line #L47 was not covered by tests
}

public void setCancelAfterTimeInterval(TimeValue timeout) {
aasom143 marked this conversation as resolved.
Show resolved Hide resolved
this.cancelAfterTimeInterval = timeout;
}

public TimeValue getCancelAfterTimeInterval() {
return this.cancelAfterTimeInterval;

Check warning on line 55 in server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java#L55

Added line #L55 was not covered by tests
}

@Override
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);

Check warning on line 60 in server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java#L60

Added line #L60 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* A response of a cat shards request.
*
* @opensearch.api
*/
public class CatShardsResponse extends ActionResponse {
aasom143 marked this conversation as resolved.
Show resolved Hide resolved

private ClusterStateResponse clusterStateResponse = null;

private IndicesStatsResponse indicesStatsResponse = null;

public CatShardsResponse() {}

public CatShardsResponse(StreamInput in) throws IOException {
super(in);
aasom143 marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 34 in server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java#L33-L34

Added lines #L33 - L34 were not covered by tests

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterStateResponse.writeTo(out);
indicesStatsResponse.writeTo(out);
}

Check warning on line 40 in server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java#L38-L40

Added lines #L38 - L40 were not covered by tests

public void setClusterStateResponse(ClusterStateResponse clusterStateResponse) {
this.clusterStateResponse = clusterStateResponse;
}

public ClusterStateResponse getClusterStateResponse() {
return this.clusterStateResponse;
}

public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
this.indicesStatsResponse = indicesStatsResponse;
}

public IndicesStatsResponse getIndicesStatsResponse() {
return this.indicesStatsResponse;
}
}
Loading
Loading