Skip to content

Commit

Permalink
Merge branch 'main' into Issue_5848
Browse files Browse the repository at this point in the history
  • Loading branch information
dreamer-89 authored Jan 19, 2023
2 parents 3647e7f + 7a511dd commit affed1c
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 75 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ out/
!.idea/vcs.xml
!.idea/icon.svg

# These files are generated in the main tree by IntelliJ
# These files are generated in the main tree by annotation processors
benchmarks/src/main/generated/*
benchmarks/bin/*
benchmarks/build-eclipse-default/*

# eclipse files
.project
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Adding index create block when all nodes have breached high disk watermark ([#5852](https://github.com/opensearch-project/OpenSearch/pull/5852))
- Added cluster manager throttling stats in nodes/stats API ([#5790](https://github.com/opensearch-project/OpenSearch/pull/5790))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@

import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.InternalClusterInfoService;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
Expand Down Expand Up @@ -82,12 +85,15 @@
import java.nio.file.NotDirectoryException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -126,7 +132,9 @@ public void removeFilesystemProvider() {
defaultFileSystem = null;
}

private static final long WATERMARK_BYTES = new ByteSizeValue(10, ByteSizeUnit.KB).getBytes();
// Increasing watermark limit to avoid flaky test case failures.
private static final long WATERMARK_BYTES = new ByteSizeValue(1, ByteSizeUnit.MB).getBytes();
private static final String INDEX_ROUTING_ALLOCATION_NODE_SETTING = "index.routing.allocation.include._name";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
Expand Down Expand Up @@ -167,16 +175,7 @@ public void testHighWatermarkNotExceeded() throws Exception {
final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6)
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
.build()
);
final long minShardSize = createReasonableSizedShards(indexName);
final long minShardSize = createAndPopulateIndex(indexName, null);

// reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other data node
// (subtract the translog size since the disk threshold decider ignores this and may therefore move the shard back again)
Expand All @@ -188,6 +187,124 @@ public void testHighWatermarkNotExceeded() throws Exception {
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1));
}

public void testIndexCreateBlockWhenAllNodesExceededHighWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

// Reduce disk space of all node until all of them is breaching high disk watermark.
for (final String dataNodeName : dataNodeNames) {
populateNode(dataNodeName);
}

// Wait for all nodes to breach high disk watermark.
assertBusy(() -> {
refreshDiskUsage();
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
}, 30L, TimeUnit.SECONDS);

// Validate if cluster block is applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

// Validate cluster block is not applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
final List<String> indexNames = new ArrayList<>();
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

// Reduce disk space of all node until all of them is breaching high disk watermark.
for (final String dataNodeName : dataNodeNames) {
final String indexName = populateNode(dataNodeName);
indexNames.add(indexName);
}

// Wait for all the node to breach high disk watermark.
assertBusy(() -> {
refreshDiskUsage();
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
}, 30L, TimeUnit.SECONDS);

// Validate if index create block is applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));

// Delete indices to free space
deleteIndices(indexNames);

// Validate if index create block is removed on the cluster
assertBusy(() -> {
refreshDiskUsage();
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);
}

public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);
final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

// Create one of the index.
final String indexName = populateNode(dataNodeNames.get(0));

// Reduce disk space of all other node until all of them is breaching high disk watermark.
for (int i = 1; i < dataNodeNames.size(); i++) {
populateNode(dataNodeNames.get(i));
}

// Apply a read_only_allow_delete_block on one of the index
// (can happen if the corresponding node has breached flood stage watermark).
final Settings readOnlySettings = Settings.builder()
.put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString())
.build();
client().admin().indices().prepareUpdateSettings(indexName).setSettings(readOnlySettings).get();

assertBusy(() -> {
refreshDiskUsage();
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
}, 30L, TimeUnit.SECONDS);

// Validate index create block is applied on the cluster.
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
Expand All @@ -210,16 +327,7 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6)
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
.build()
);
final long minShardSize = createReasonableSizedShards(indexName);
final long minShardSize = createAndPopulateIndex(indexName, null);

final CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
Expand Down Expand Up @@ -274,6 +382,40 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1));
}

private void deleteIndices(final List<String> indexNames) throws ExecutionException, InterruptedException {
for (String indexName : indexNames) {
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName)).get());
assertFalse("index [" + indexName + "] should have been deleted", indexExists(indexName));
}
}

private String populateNode(final String dataNodeName) throws Exception {
final Path dataNodePath = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
long minShardSize = createAndPopulateIndex(indexName, dataNodeName);
fileSystemProvider.getTestFileStore(dataNodePath).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
refreshDiskUsage();
return indexName;
}

private long createAndPopulateIndex(final String indexName, final String nodeName) throws Exception {

final Settings.Builder indexSettingBuilder = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false);

// Depending on node name specified or not, we determine whether to enable node name based shard routing for index.
if (nodeName != null) {
indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(INDEX_ROUTING_ALLOCATION_NODE_SETTING, nodeName);
} else {
indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6);
}

createIndex(indexName, indexSettingBuilder.build());
return createReasonableSizedShards(indexName);
}

private Set<ShardRouting> getShardRoutings(final String nodeId, final String indexName) {
final Set<ShardRouting> shardRoutings = new HashSet<>();
for (IndexShardRoutingTable indexShardRoutingTable : client().admin()
Expand Down
Loading

0 comments on commit affed1c

Please sign in to comment.