Skip to content

Commit

Permalink
Fixing flaky test cases for DiskThresholdDeciderIT
Browse files Browse the repository at this point in the history
Signed-off-by: Rishav Sagar <rissag@amazon.com>
  • Loading branch information
Rishav Sagar committed Jan 25, 2023
1 parent 715ff72 commit ce20ab5
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,22 @@
import org.apache.lucene.tests.mockfile.FilterPath;
import org.apache.lucene.util.Constants;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.support.IndicesOptions;

import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.InternalClusterInfoService;
import org.opensearch.cluster.MockInternalClusterInfoService;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
Expand All @@ -59,6 +67,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
Expand Down Expand Up @@ -88,7 +97,6 @@
import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
Expand All @@ -98,6 +106,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.opensearch.index.store.Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -132,8 +141,7 @@ public void removeFilesystemProvider() {
defaultFileSystem = null;
}

// Increasing watermark limit to avoid flaky test case failures.
private static final long WATERMARK_BYTES = new ByteSizeValue(1, ByteSizeUnit.MB).getBytes();
private static final long WATERMARK_BYTES = new ByteSizeValue(10, ByteSizeUnit.KB).getBytes();
private static final String INDEX_ROUTING_ALLOCATION_NODE_SETTING = "index.routing.allocation.include._name";

@Override
Expand All @@ -158,7 +166,7 @@ protected Settings nodeSettings(int nodeOrdinal) {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(InternalSettingsPlugin.class);
return List.of(InternalSettingsPlugin.class, MockInternalClusterInfoService.TestPlugin.class);
}

public void testHighWatermarkNotExceeded() throws Exception {
Expand All @@ -177,8 +185,9 @@ public void testHighWatermarkNotExceeded() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final long minShardSize = createAndPopulateIndex(indexName, null);

// reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other data node
// (subtract the translog size since the disk threshold decider ignores this and may therefore move the shard back again)
// reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other
// data node (subtract the translog size since the disk threshold decider ignores this and may therefore move
// the shard back again).
fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, empty());

Expand All @@ -189,35 +198,28 @@ public void testHighWatermarkNotExceeded() throws Exception {

public void testIndexCreateBlockWhenAllNodesExceededHighWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());
final Settings settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
.build();

final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2, settings);
ensureStableCluster(3);
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
// Reduce disk space of all node until all of them is breaching high disk watermark.
for (final String dataNodeName : dataNodeNames) {
populateNode(dataNodeName);
}

// Wait for all nodes to breach high disk watermark.
clusterInfoService.prepareClusterInfoAndRefresh(logger, getIndicesStatResponse(), getNodesStatResponse());
assertBusy(() -> {
refreshDiskUsage();
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);

// Validate if cluster block is applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
Expand All @@ -231,56 +233,49 @@ public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throw

public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
final Settings settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
.build();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2, settings);
final List<String> indexNames = new ArrayList<>();
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
// Reduce disk space of all node until all of them is breaching high disk watermark.
for (final String dataNodeName : dataNodeNames) {
final String indexName = populateNode(dataNodeName);
indexNames.add(indexName);
}

// Wait for all the node to breach high disk watermark.
clusterInfoService.prepareClusterInfoAndRefresh(logger, getIndicesStatResponse(), getNodesStatResponse());
// Validate if cluster block is applied on the cluster
assertBusy(() -> {
refreshDiskUsage();
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);

// Validate if index create block is applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));

// Delete indices to free space
deleteIndices(indexNames);

clusterInfoService.prepareClusterInfoAndRefresh(logger, getIndicesStatResponse(), getNodesStatResponse());
// Validate if index create block is removed on the cluster
assertBusy(() -> {
refreshDiskUsage();
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);
}

public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
final Settings settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
.build();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2, settings);
ensureStableCluster(3);
final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();

// Create one of the index.
final String indexName = populateNode(dataNodeNames.get(0));

// Reduce disk space of all other node until all of them is breaching high disk watermark.
// Reduce disk space of all other node until all of them is breaching high disk watermark
for (int i = 1; i < dataNodeNames.size(); i++) {
populateNode(dataNodeNames.get(i));
}
Expand All @@ -291,18 +286,12 @@ public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception {
.put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString())
.build();
client().admin().indices().prepareUpdateSettings(indexName).setSettings(readOnlySettings).get();

clusterInfoService.prepareClusterInfoAndRefresh(logger, getIndicesStatResponse(), getNodesStatResponse());
// Validate index create block is applied on the cluster
assertBusy(() -> {
refreshDiskUsage();
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);

// Validate index create block is applied on the cluster.
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception {
Expand Down Expand Up @@ -382,6 +371,34 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1));
}

private void ensureAllNodesAboveHighWatermark() throws Exception {
final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);

assertBusy(() -> {
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
}, 2L, TimeUnit.MINUTES);
}

private IndicesStatsResponse getIndicesStatResponse() throws ExecutionException, InterruptedException {
final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.clear();
indicesStatsRequest.store(true);
indicesStatsRequest.indicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_CLOSED_HIDDEN);
return client().admin().indices().stats(indicesStatsRequest).get();
}

private NodesStatsResponse getNodesStatResponse() throws ExecutionException, InterruptedException {
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FS.metricName());
nodesStatsRequest.timeout(TimeValue.timeValueSeconds(10));
return client().admin().cluster().nodesStats(nodesStatsRequest).get();
}

private void deleteIndices(final List<String> indexNames) throws ExecutionException, InterruptedException {
for (String indexName : indexNames) {
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName)).get());
Expand All @@ -394,7 +411,6 @@ private String populateNode(final String dataNodeName) throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
long minShardSize = createAndPopulateIndex(indexName, dataNodeName);
fileSystemProvider.getTestFileStore(dataNodePath).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
refreshDiskUsage();
return indexName;
}

Expand All @@ -405,14 +421,28 @@ private long createAndPopulateIndex(final String indexName, final String nodeNam
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false);

// Depending on node name specified or not, we determine whether to enable node name based shard routing for index.
// Depending on node name specified or not, we determine whether to enable node name based shard routing for index
// and whether reallocation is disabled on that index or not.
if (nodeName != null) {
indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(INDEX_ROUTING_ALLOCATION_NODE_SETTING, nodeName);
createIndex(indexName, indexSettingBuilder.build());
assertAcked(
client().admin()
.indices()
.updateSettings(
new UpdateSettingsRequest(indexName).settings(
Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")
)
)
.get()
);

ensureGreen(indexName);
} else {
indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6);
createIndex(indexName, indexSettingBuilder.build());
}

createIndex(indexName, indexSettingBuilder.build());
return createReasonableSizedShards(indexName);
}

Expand Down Expand Up @@ -505,6 +535,10 @@ private void assertBusyWithDiskUsageRefresh(String nodeName, String indexName, M
}, 30L, TimeUnit.SECONDS);
}

private MockInternalClusterInfoService getMockInternalClusterInfoService() {
return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
}

private static class TestFileStore extends FilterFileStore {

private final Path path;
Expand Down
Loading

0 comments on commit ce20ab5

Please sign in to comment.