Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky test cases for DiskThresholdDeciderIT #5952

Merged
merged 1 commit into from
Jan 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.index.IndexRequestBuilder;

import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.InternalClusterInfoService;
import org.opensearch.cluster.MockInternalClusterInfoService;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
Expand Down Expand Up @@ -88,7 +91,6 @@
import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
Expand All @@ -98,6 +100,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.opensearch.index.store.Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -132,8 +135,7 @@ public void removeFilesystemProvider() {
defaultFileSystem = null;
}

// Increasing watermark limit to avoid flaky test case failures.
private static final long WATERMARK_BYTES = new ByteSizeValue(1, ByteSizeUnit.MB).getBytes();
private static final long WATERMARK_BYTES = new ByteSizeValue(10, ByteSizeUnit.KB).getBytes();
private static final String INDEX_ROUTING_ALLOCATION_NODE_SETTING = "index.routing.allocation.include._name";

@Override
Expand All @@ -158,7 +160,7 @@ protected Settings nodeSettings(int nodeOrdinal) {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(InternalSettingsPlugin.class);
return List.of(InternalSettingsPlugin.class, MockInternalClusterInfoService.TestPlugin.class);
}

public void testHighWatermarkNotExceeded() throws Exception {
Expand All @@ -177,8 +179,9 @@ public void testHighWatermarkNotExceeded() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final long minShardSize = createAndPopulateIndex(indexName, null);

// reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other data node
// (subtract the translog size since the disk threshold decider ignores this and may therefore move the shard back again)
// reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other
// data node (subtract the translog size since the disk threshold decider ignores this and may therefore move
// the shard back again).
fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, empty());

Expand All @@ -188,36 +191,28 @@ public void testHighWatermarkNotExceeded() throws Exception {
}

public void testIndexCreateBlockWhenAllNodesExceededHighWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());
final Settings settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
.build();

internalCluster().startClusterManagerOnlyNode(settings);
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2, settings);
ensureStableCluster(3);
// Reduce disk space of all node until all of them is breaching high disk watermark.
for (final String dataNodeName : dataNodeNames) {
populateNode(dataNodeName);
}

// Wait for all nodes to breach high disk watermark.
getMockInternalClusterInfoService().refresh();
assertBusy(() -> {
refreshDiskUsage();
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);

// Validate if cluster block is applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
Expand All @@ -230,57 +225,50 @@ public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throw
}

public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
final Settings settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
.build();

internalCluster().startClusterManagerOnlyNode(settings);
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2, settings);
final List<String> indexNames = new ArrayList<>();
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

// Reduce disk space of all node until all of them is breaching high disk watermark.
for (final String dataNodeName : dataNodeNames) {
final String indexName = populateNode(dataNodeName);
indexNames.add(indexName);
}

// Wait for all the node to breach high disk watermark.
getMockInternalClusterInfoService().refresh();
// Validate if cluster block is applied on the cluster
assertBusy(() -> {
refreshDiskUsage();
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);

// Validate if index create block is applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));

// Delete indices to free space
deleteIndices(indexNames);

getMockInternalClusterInfoService().refresh();
// Validate if index create block is removed on the cluster
assertBusy(() -> {
refreshDiskUsage();
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);
}

public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
final Settings settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
.build();

internalCluster().startClusterManagerOnlyNode(settings);
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2, settings);
ensureStableCluster(3);
final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

// Create one of the index.
final String indexName = populateNode(dataNodeNames.get(0));

// Reduce disk space of all other node until all of them is breaching high disk watermark.
// Reduce disk space of all other node until all of them is breaching high disk watermark
for (int i = 1; i < dataNodeNames.size(); i++) {
populateNode(dataNodeNames.get(i));
}
Expand All @@ -291,18 +279,12 @@ public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception {
.put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString())
.build();
client().admin().indices().prepareUpdateSettings(indexName).setSettings(readOnlySettings).get();

getMockInternalClusterInfoService().refresh();
// Validate index create block is applied on the cluster
assertBusy(() -> {
refreshDiskUsage();
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);

// Validate index create block is applied on the cluster.
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception {
Expand Down Expand Up @@ -394,7 +376,6 @@ private String populateNode(final String dataNodeName) throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
long minShardSize = createAndPopulateIndex(indexName, dataNodeName);
fileSystemProvider.getTestFileStore(dataNodePath).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
refreshDiskUsage();
return indexName;
}

Expand All @@ -405,14 +386,28 @@ private long createAndPopulateIndex(final String indexName, final String nodeNam
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false);

// Depending on node name specified or not, we determine whether to enable node name based shard routing for index.
// Depending on node name specified or not, we determine whether to enable node name based shard routing for index
// and whether reallocation is disabled on that index or not.
if (nodeName != null) {
indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(INDEX_ROUTING_ALLOCATION_NODE_SETTING, nodeName);
createIndex(indexName, indexSettingBuilder.build());
assertAcked(
client().admin()
.indices()
.updateSettings(
new UpdateSettingsRequest(indexName).settings(
Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")
)
)
.get()
);

ensureGreen(indexName);
} else {
indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6);
createIndex(indexName, indexSettingBuilder.build());
}

createIndex(indexName, indexSettingBuilder.build());
return createReasonableSizedShards(indexName);
}

Expand All @@ -438,7 +433,7 @@ private Set<ShardRouting> getShardRoutings(final String nodeId, final String ind
}

/**
* Index documents until all the shards are at least WATERMARK_BYTES in size, and return the size of the smallest shard
* Index documents until all the shards are at least WATERMARK_BYTES in size, and return the size of the smallest shard.
*/
private long createReasonableSizedShards(final String indexName) throws InterruptedException {
while (true) {
Expand Down Expand Up @@ -505,6 +500,10 @@ private void assertBusyWithDiskUsageRefresh(String nodeName, String indexName, M
}, 30L, TimeUnit.SECONDS);
}

private MockInternalClusterInfoService getMockInternalClusterInfoService() {
return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
}

private static class TestFileStore extends FilterFileStore {

private final Path path;
Expand Down