Skip to content

Commit

Permalink
[Remote Store] Fix shards condition in stats api (opensearch-project#…
Browse files Browse the repository at this point in the history
…7739)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 authored and stephen-crawford committed May 31, 2023
1 parent d65e384 commit e572a67
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@ protected void assertDocCounts(int expectedDocCount, String... nodeNames) {
}
}

protected ClusterState getClusterState() {
return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
}

protected DiscoveryNode getNodeContainingPrimaryShard() {
final ClusterState state = getClusterState();
final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,17 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.nio.file.Path;
import java.util.Collection;

import static java.util.Arrays.asList;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remore-store-repo";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

@Override
protected boolean addMockInternalEngine() {
return false;
Expand All @@ -47,6 +38,10 @@ protected Settings featureFlagSettings() {
}

public Settings indexSettings() {
return defaultIndexSettings();
}

private Settings defaultIndexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
Expand All @@ -59,6 +54,22 @@ public Settings indexSettings() {
.build();
}

protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(defaultIndexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.build();
}

protected Settings remoteTranslogIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas))
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,6 @@ public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.build();
}

private Settings remoteTranslogIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas))
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.UUIDs;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";

public void testStatsResponseFromAllNodes() {

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
internalCluster().startDataOnlyNodes(3);
if (randomBoolean()) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
} else {
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
}
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

// Indexing documents along with refreshes and flushes.
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
}
}

// Step 2 - We find all the nodes that are present in the cluster. We make the remote store stats api call from
// each of the node in the cluster and check that the response is coming as expected.
ClusterState state = getClusterState();
List<String> nodes = state.nodes().getNodes().values().stream().map(DiscoveryNode::getName).collect(Collectors.toList());
String shardId = "0";
for (String node : nodes) {
RemoteStoreStatsResponse response = client(node).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get();
assertTrue(response.getSuccessfulShards() > 0);
assertTrue(response.getShards() != null && response.getShards().length != 0);
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getShards())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats();
assertEquals(0, stats.refreshTimeLagMs);
assertEquals(stats.localRefreshNumber, stats.remoteRefreshNumber);
assertTrue(stats.uploadBytesStarted > 0);
assertEquals(0, stats.uploadBytesFailed);
assertTrue(stats.uploadBytesSucceeded > 0);
assertTrue(stats.totalUploadsStarted > 0);
assertEquals(0, stats.totalUploadsFailed);
assertTrue(stats.totalUploadsSucceeded > 0);
assertEquals(0, stats.rejectionCount);
assertEquals(0, stats.consecutiveFailuresCount);
assertEquals(0, stats.bytesLag);
assertTrue(stats.uploadBytesMovingAverage > 0);
assertTrue(stats.uploadBytesPerSecMovingAverage > 0);
assertTrue(stats.uploadTimeMovingAverage > 0);
}
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.PlainShardsIterator;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -89,14 +90,17 @@ protected ShardsIterator shards(ClusterState clusterState, RemoteStoreStatsReque
}
return new PlainShardsIterator(
newShardRoutings.stream()
.filter(shardRouting -> remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardRouting.shardId()) != null)
.filter(
shardRouting -> !request.local()
|| (shardRouting.currentNodeId() == null
|| shardRouting.currentNodeId().equals(clusterState.getNodes().getLocalNodeId()))
)
.filter(ShardRouting::primary)
.filter(shardRouting -> indicesService.indexService(shardRouting.index()).getIndexSettings().isRemoteStoreEnabled())
.filter(
shardRouting -> Boolean.parseBoolean(
clusterState.getMetadata().index(shardRouting.index()).getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED)
)
)
.collect(Collectors.toList())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2474,4 +2474,8 @@ protected String replicaNodeName(String indexName) {
return clusterState.getRoutingNodes().node(nodeId).node().getName();
}

protected ClusterState getClusterState() {
return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
}

}

0 comments on commit e572a67

Please sign in to comment.