diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 561e4349a4890..d29d6924a68cb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -40,11 +40,14 @@ import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.admin.indices.stats.ShardStats; import org.opensearch.action.index.IndexRequestBuilder; + import org.opensearch.cluster.ClusterInfoService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.InternalClusterInfoService; +import org.opensearch.cluster.MockInternalClusterInfoService; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.IndexShardRoutingTable; @@ -88,7 +91,6 @@ import java.util.List; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.Locale; import java.util.Map; @@ -98,6 +100,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING; import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.opensearch.index.store.Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -132,8 +135,7 @@ public void removeFilesystemProvider() { defaultFileSystem = null; } - // Increasing watermark limit to avoid flaky test case failures. - private static final long WATERMARK_BYTES = new ByteSizeValue(1, ByteSizeUnit.MB).getBytes(); + private static final long WATERMARK_BYTES = new ByteSizeValue(10, ByteSizeUnit.KB).getBytes(); private static final String INDEX_ROUTING_ALLOCATION_NODE_SETTING = "index.routing.allocation.include._name"; @Override @@ -158,7 +160,7 @@ protected Settings nodeSettings(int nodeOrdinal) { @Override protected Collection> nodePlugins() { - return Collections.singletonList(InternalSettingsPlugin.class); + return List.of(InternalSettingsPlugin.class, MockInternalClusterInfoService.TestPlugin.class); } public void testHighWatermarkNotExceeded() throws Exception { @@ -177,8 +179,9 @@ public void testHighWatermarkNotExceeded() throws Exception { final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final long minShardSize = createAndPopulateIndex(indexName, null); - // reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other data node - // (subtract the translog size since the disk threshold decider ignores this and may therefore move the shard back again) + // reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other + // data node (subtract the translog size since the disk threshold decider ignores this and may therefore move + // the shard back again). fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L); assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, empty()); @@ -188,36 +191,28 @@ public void testHighWatermarkNotExceeded() throws Exception { } public void testIndexCreateBlockWhenAllNodesExceededHighWatermark() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - final List dataNodeNames = internalCluster().startDataOnlyNodes(2); - ensureStableCluster(3); - - final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster() - .getCurrentClusterManagerNodeInstance(ClusterInfoService.class); - internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh()); + final Settings settings = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false) + .build(); + internalCluster().startClusterManagerOnlyNode(settings); + final List dataNodeNames = internalCluster().startDataOnlyNodes(2, settings); + ensureStableCluster(3); // Reduce disk space of all node until all of them is breaching high disk watermark. for (final String dataNodeName : dataNodeNames) { populateNode(dataNodeName); } - // Wait for all nodes to breach high disk watermark. + getMockInternalClusterInfoService().refresh(); assertBusy(() -> { - refreshDiskUsage(); - assertTrue( - StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false) - .allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES) - ); + ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState(); + assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); }, 30L, TimeUnit.SECONDS); - - // Validate if cluster block is applied on the cluster - ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); - assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); } public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throws Exception { internalCluster().startClusterManagerOnlyNode(); - final List dataNodeNames = internalCluster().startDataOnlyNodes(2); + internalCluster().startDataOnlyNodes(2); ensureStableCluster(3); final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster() @@ -230,57 +225,50 @@ public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throw } public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermark() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - final List dataNodeNames = internalCluster().startDataOnlyNodes(2); + final Settings settings = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false) + .build(); + + internalCluster().startClusterManagerOnlyNode(settings); + final List dataNodeNames = internalCluster().startDataOnlyNodes(2, settings); final List indexNames = new ArrayList<>(); ensureStableCluster(3); - final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster() - .getCurrentClusterManagerNodeInstance(ClusterInfoService.class); - internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh()); - // Reduce disk space of all node until all of them is breaching high disk watermark. for (final String dataNodeName : dataNodeNames) { final String indexName = populateNode(dataNodeName); indexNames.add(indexName); } - // Wait for all the node to breach high disk watermark. + getMockInternalClusterInfoService().refresh(); + // Validate if cluster block is applied on the cluster assertBusy(() -> { - refreshDiskUsage(); - assertTrue( - StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false) - .allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES) - ); + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); + assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); }, 30L, TimeUnit.SECONDS); - // Validate if index create block is applied on the cluster - ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); - assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); - // Delete indices to free space deleteIndices(indexNames); - + getMockInternalClusterInfoService().refresh(); // Validate if index create block is removed on the cluster assertBusy(() -> { - refreshDiskUsage(); ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState(); assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); }, 30L, TimeUnit.SECONDS); } public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - final List dataNodeNames = internalCluster().startDataOnlyNodes(2); + final Settings settings = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false) + .build(); + + internalCluster().startClusterManagerOnlyNode(settings); + final List dataNodeNames = internalCluster().startDataOnlyNodes(2, settings); ensureStableCluster(3); - final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster() - .getCurrentClusterManagerNodeInstance(ClusterInfoService.class); - internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh()); // Create one of the index. final String indexName = populateNode(dataNodeNames.get(0)); - - // Reduce disk space of all other node until all of them is breaching high disk watermark. + // Reduce disk space of all other node until all of them is breaching high disk watermark for (int i = 1; i < dataNodeNames.size(); i++) { populateNode(dataNodeNames.get(i)); } @@ -291,18 +279,12 @@ public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception { .put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString()) .build(); client().admin().indices().prepareUpdateSettings(indexName).setSettings(readOnlySettings).get(); - + getMockInternalClusterInfoService().refresh(); + // Validate index create block is applied on the cluster assertBusy(() -> { - refreshDiskUsage(); - assertTrue( - StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false) - .allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES) - ); + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); + assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); }, 30L, TimeUnit.SECONDS); - - // Validate index create block is applied on the cluster. - ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); - assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); } public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception { @@ -394,7 +376,6 @@ private String populateNode(final String dataNodeName) throws Exception { final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); long minShardSize = createAndPopulateIndex(indexName, dataNodeName); fileSystemProvider.getTestFileStore(dataNodePath).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L); - refreshDiskUsage(); return indexName; } @@ -405,14 +386,28 @@ private long createAndPopulateIndex(final String indexName, final String nodeNam .put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms") .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false); - // Depending on node name specified or not, we determine whether to enable node name based shard routing for index. + // Depending on node name specified or not, we determine whether to enable node name based shard routing for index + // and whether reallocation is disabled on that index or not. if (nodeName != null) { indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(INDEX_ROUTING_ALLOCATION_NODE_SETTING, nodeName); + createIndex(indexName, indexSettingBuilder.build()); + assertAcked( + client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(indexName).settings( + Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none") + ) + ) + .get() + ); + + ensureGreen(indexName); } else { indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6); + createIndex(indexName, indexSettingBuilder.build()); } - createIndex(indexName, indexSettingBuilder.build()); return createReasonableSizedShards(indexName); } @@ -438,7 +433,7 @@ private Set getShardRoutings(final String nodeId, final String ind } /** - * Index documents until all the shards are at least WATERMARK_BYTES in size, and return the size of the smallest shard + * Index documents until all the shards are at least WATERMARK_BYTES in size, and return the size of the smallest shard. */ private long createReasonableSizedShards(final String indexName) throws InterruptedException { while (true) { @@ -505,6 +500,10 @@ private void assertBusyWithDiskUsageRefresh(String nodeName, String indexName, M }, 30L, TimeUnit.SECONDS); } + private MockInternalClusterInfoService getMockInternalClusterInfoService() { + return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class); + } + private static class TestFileStore extends FilterFileStore { private final Path path;