Skip to content

Commit

Permalink
Add API for clearing file cache on search nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Apr 28, 2023
1 parent d984f50 commit 0664b6b
Show file tree
Hide file tree
Showing 16 changed files with 519 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- [Extensions] Moving Extensions APIs to protobuf serialization. ([#6960](https://github.com/opensearch-project/OpenSearch/pull/6960))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- API to clear filecache on search capable nodes ([#7323](https://github.com/opensearch-project/OpenSearch/pull/7323))

### Dependencies
- Bump `jackson` from 2.14.2 to 2.15.0 ([#7286](https://github.com/opensearch-project/OpenSearch/pull/7286)
Expand All @@ -99,4 +100,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ public void testApiNamingConventions() throws Exception {
"ingest.processor_grok",
"nodes.info",
"nodes.stats",
"nodes.clear_filecache",
"nodes.hot_threads",
"nodes.usage",
"nodes.reload_secure_settings",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"nodes.clear_filecache":{
"documentation":{
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/filecache#clear",
"description":"Clears all file caches for one or more search capable nodes."
},
"stability":"experimental",
"url":{
"paths":[
{
"path":"/_filecache/clear",
"methods":[
"POST"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.hamcrest.MatcherAssert;
import org.opensearch.action.admin.cluster.node.filecache.clear.ClearNodeFileCacheResponse;
import org.opensearch.action.admin.cluster.node.filecache.clear.ClearNodesFileCacheRequest;
import org.opensearch.action.admin.cluster.node.filecache.clear.ClearNodesFileCacheResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand Down Expand Up @@ -664,6 +667,41 @@ public void testCacheIndexFilesClearedOnDelete() throws Exception {
logger.info("--> validated that the cache file path doesn't exist");
}

/**
* Test scenario that calls the clear filecache API which prunes the file cache
* Ensures that the filecache was pruned on all the search capable nodes.
*/
public void testFileCacheClearAPI() throws Exception {
final int numReplicas = randomIntBetween(1, 4);
final int numShards = numReplicas + 1;
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final String repoName = "test-repo";
final String snapshotName = "test-snap";
final Client client = client();

internalCluster().ensureAtLeastNumSearchAndDataNodes(numShards);
createIndexWithDocsAndEnsureGreen(numReplicas, 100, indexName);
createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertDocCount(restoredIndexName, 100L);
assertRemoteSnapshotIndexSettings(client, restoredIndexName);

ClearNodesFileCacheResponse nodesResponse = client.admin().cluster().clearFileCache(new ClearNodesFileCacheRequest()).actionGet();
int nodesClearedCount = 0;
for (ClearNodeFileCacheResponse nodeResponse : nodesResponse.getNodes()) {
if (nodeResponse.getNode().isSearchNode()) {
nodesClearedCount++;
assertTrue(nodeResponse.isCleared());
assertTrue(nodeResponse.getCount() >= 0);
}
}

assertEquals(nodesClearedCount, numShards);
deleteIndicesAndEnsureGreen(client, restoredIndexName);
}

/**
* Asserts the cache folder count to match the number of shards and the number of indices within the cache folder
* as provided.
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@
import org.opensearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsAction;
import org.opensearch.action.admin.indices.exists.indices.TransportIndicesExistsAction;
import org.opensearch.action.admin.cluster.node.filecache.clear.ClearNodesFileCacheAction;
import org.opensearch.action.admin.cluster.node.filecache.clear.TransportClearNodesFileCacheAction;
import org.opensearch.action.admin.indices.flush.FlushAction;
import org.opensearch.action.admin.indices.flush.TransportFlushAction;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeAction;
Expand Down Expand Up @@ -311,6 +313,7 @@
import org.opensearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction;
import org.opensearch.rest.action.admin.cluster.RestCancelTasksAction;
import org.opensearch.rest.action.admin.cluster.RestCleanupRepositoryAction;
import org.opensearch.rest.action.admin.cluster.RestClearNodesFileCacheAction;
import org.opensearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction;
import org.opensearch.rest.action.admin.cluster.RestCloneSnapshotAction;
import org.opensearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
Expand Down Expand Up @@ -580,6 +583,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class);
actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
actions.register(ClearNodesFileCacheAction.INSTANCE, TransportClearNodesFileCacheAction.class);
actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class);
Expand Down Expand Up @@ -769,6 +773,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestNodesInfoAction(settingsFilter));
registerHandler.accept(new RestRemoteClusterInfoAction());
registerHandler.accept(new RestNodesStatsAction());
registerHandler.accept(new RestClearNodesFileCacheAction());
registerHandler.accept(new RestNodesUsageAction());
registerHandler.accept(new RestNodesHotThreadsAction());
registerHandler.accept(new RestClusterAllocationExplainAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.filecache.clear;

import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

public class ClearNodeFileCacheResponse extends BaseNodeResponse implements ToXContentFragment {

boolean cleared;

long count;

protected ClearNodeFileCacheResponse(StreamInput in) throws IOException {
super(in);
cleared = in.readBoolean();
count = in.readLong();
}

public ClearNodeFileCacheResponse(DiscoveryNode discoveryNode, boolean cleared, long count) {
super(discoveryNode);
this.cleared = cleared;
this.count = count;
}

public boolean isCleared() {
return cleared;
}

public long getCount() {
return count;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(cleared);
out.writeLong(count);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("name", getNode().getName());

builder.startArray("roles");
for (DiscoveryNodeRole role : getNode().getRoles()) {
builder.value(role.roleName());
}
builder.endArray();

builder.field("cleared", cleared);
builder.field("item_count", count);
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.filecache.clear;

import org.opensearch.action.ActionType;

/**
* Transport action for clearing filecache on nodes
*
* @opensearch.internal
*/
public class ClearNodesFileCacheAction extends ActionType<ClearNodesFileCacheResponse> {

public static final ClearNodesFileCacheAction INSTANCE = new ClearNodesFileCacheAction();
public static final String NAME = "cluster:admin/nodes/filecache/clear";

private ClearNodesFileCacheAction() {
super(NAME, ClearNodesFileCacheResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.filecache.clear;

import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Request object for clearing filecache on nodes
*
* @opensearch.internal
*/
public class ClearNodesFileCacheRequest extends BaseNodesRequest<ClearNodesFileCacheRequest> {

public ClearNodesFileCacheRequest() {
super((String[]) null);
}

public ClearNodesFileCacheRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.filecache.clear;

import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterName;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;

/**
* The response of a clear filecache action.
*
* @opensearch.internal
*/
public class ClearNodesFileCacheResponse extends BaseNodesResponse<ClearNodeFileCacheResponse> implements ToXContentFragment {

ClearNodesFileCacheResponse(StreamInput in) throws IOException {
super(in);
}

ClearNodesFileCacheResponse(
ClusterName clusterName,
List<ClearNodeFileCacheResponse> clearNodeFileCachResponses,
List<FailedNodeException> failures
) {
super(clusterName, clearNodeFileCachResponses, failures);
}

@Override
protected List<ClearNodeFileCacheResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(ClearNodeFileCacheResponse::new);
}

@Override
protected void writeNodesTo(StreamOutput out, List<ClearNodeFileCacheResponse> clearNodeFileCacheResponse) throws IOException {
out.writeList(clearNodeFileCacheResponse);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("nodes");
for (ClearNodeFileCacheResponse clearNodeFileCacheResponse : getNodes()) {
builder.startObject(clearNodeFileCacheResponse.getNode().getId());
clearNodeFileCacheResponse.toXContent(builder, params);
builder.endObject();
}
builder.endObject();

return builder;
}
}
Loading

0 comments on commit 0664b6b

Please sign in to comment.