Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into fix_flaky
Browse files Browse the repository at this point in the history
  • Loading branch information
gaobinlong committed Jun 13, 2024
2 parents c1ffe62 + 18c5bb6 commit 9415706
Show file tree
Hide file tree
Showing 9 changed files with 537 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
Expand All @@ -55,7 +56,9 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -98,15 +101,18 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING;
import static org.opensearch.cluster.health.ClusterHealthStatus.GREEN;
import static org.opensearch.cluster.health.ClusterHealthStatus.RED;
import static org.opensearch.cluster.health.ClusterHealthStatus.YELLOW;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.gateway.GatewayRecoveryTestUtils.corruptShard;
import static org.opensearch.gateway.GatewayRecoveryTestUtils.getDiscoveryNodes;
Expand Down Expand Up @@ -753,6 +759,7 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.build()
);
Expand Down Expand Up @@ -843,6 +850,87 @@ public void testBatchModeDisabled() throws Exception {
ensureGreen("test");
}

public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentNodeStartTimeInBatchMode() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
);
internalCluster().startDataOnlyNodes(6);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
ensureGreen("test");

List<String> nodesWithReplicaShards = findNodesWithShard(false);
Settings replicaNode0DataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0));
Settings replicaNode1DataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(1));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(1)));

ensureStableCluster(5);

logger.info("--> explicitly triggering reroute");
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

ClusterHealthResponse health = client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet();
assertFalse(health.isTimedOut());
assertEquals(YELLOW, health.getStatus());
assertEquals(2, health.getUnassignedShards());
// shard should be unassigned because of Allocation_Delayed
ClusterAllocationExplainResponse allocationExplainResponse = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get();
assertEquals(
AllocationDecision.ALLOCATION_DELAYED,
allocationExplainResponse.getExplanation().getShardAllocationDecision().getAllocateDecision().getAllocationDecision()
);

logger.info("--> restarting the node 1");
internalCluster().startDataOnlyNode(
Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNode0DataPathSettings).build()
);
clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());
ensureStableCluster(6);
waitUntil(
() -> client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet().getInitializingShards() == 0
);

health = client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet();
assertFalse(health.isTimedOut());
assertEquals(YELLOW, health.getStatus());
assertEquals(1, health.getUnassignedShards());
assertEquals(1, health.getDelayedUnassignedShards());
allocationExplainResponse = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get();
assertEquals(
AllocationDecision.ALLOCATION_DELAYED,
allocationExplainResponse.getExplanation().getShardAllocationDecision().getAllocateDecision().getAllocationDecision()
);

logger.info("--> restarting the node 0");
internalCluster().startDataOnlyNode(
Settings.builder().put("node.name", nodesWithReplicaShards.get(1)).put(replicaNode1DataPathSettings).build()
);
ensureStableCluster(7);
ensureGreen("test");
}

public void testNBatchesCreationAndAssignment() throws Exception {
// we will reduce batch size to 5 to make sure we have enough batches to test assignment
// Total number of primary shards = 50 (50 indices*1)
Expand Down Expand Up @@ -1293,4 +1381,14 @@ private void prepareIndex(String indexName, int numberOfPrimaryShards) {
index(indexName, "type", "1", Collections.emptyMap());
flush(indexName);
}

private List<String> findNodesWithShard(final boolean primary) {
ClusterState state = client().admin().cluster().prepareState().get().getState();
List<ShardRouting> startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED);
List<ShardRouting> requiredStartedShards = startedShards.stream()
.filter(startedShard -> startedShard.primary() == primary)
.collect(Collectors.toList());
Collections.shuffle(requiredStartedShards, random());
return requiredStartedShards.stream().map(shard -> state.nodes().get(shard.currentNodeId()).getName()).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,24 @@
package org.opensearch.remotemigration;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.common.Priority;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
Expand All @@ -39,6 +45,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
Expand Down Expand Up @@ -114,6 +121,10 @@ public void initDocRepToRemoteMigration() {
);
}

public ClusterHealthStatus ensureGreen(String... indices) {
return ensureGreen(TimeValue.timeValueSeconds(60), indices);
}

public BulkResponse indexBulk(String indexName, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
Expand Down Expand Up @@ -181,14 +192,12 @@ private Thread getIndexingThread() {
long currentDocCount = indexedDocs.incrementAndGet();
if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) {
if (rarely()) {
logger.info("--> [iteration {}] flushing index", currentDocCount);
client().admin().indices().prepareFlush(indexName).get();
logger.info("Completed ingestion of {} docs. Flushing now", currentDocCount);
} else {
logger.info("--> [iteration {}] refreshing index", currentDocCount);
client().admin().indices().prepareRefresh(indexName).get();
}
}
logger.info("Completed ingestion of {} docs", currentDocCount);
}
});
}
Expand Down Expand Up @@ -218,4 +227,21 @@ public void stopShardRebalancing() {
.get()
);
}

public ClusterHealthStatus waitForRelocation() {
ClusterHealthRequest request = Requests.clusterHealthRequest()
.waitForNoRelocatingShards(true)
.timeout(TimeValue.timeValueSeconds(60))
.waitForEvents(Priority.LANGUID);
ClusterHealthResponse actionGet = client().admin().cluster().health(request).actionGet();
if (actionGet.isTimedOut()) {
logger.info(
"waitForRelocation timed out, cluster state:\n{}\n{}",
client().admin().cluster().prepareState().get().getState(),
client().admin().cluster().preparePendingClusterTasks().get()
);
assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false));
}
return actionGet.getStatus();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,7 @@ public void testRemotePrimaryRelocation() throws Exception {
.add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
waitForRelocation();
assertEquals(remoteNode, primaryNodeName("test"));
logger.info("--> relocation from docrep to remote complete");

Expand All @@ -123,16 +114,7 @@ public void testRemotePrimaryRelocation() throws Exception {
.add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2))
.execute()
.actionGet();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
waitForRelocation();
assertEquals(remoteNode2, primaryNodeName("test"));

logger.info("--> relocation from remote to remote complete");
Expand All @@ -155,7 +137,6 @@ public void testRemotePrimaryRelocation() throws Exception {

public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
String docRepNode = internalCluster().startNode();
Client client = internalCluster().client(docRepNode);
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
Expand All @@ -46,9 +45,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

/**
* An abstract class that implements basic functionality for allocating
Expand Down Expand Up @@ -81,38 +78,7 @@ public void allocateUnassigned(
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

/**
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
// make Allocation Decisions for all shards
HashMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shardRoutings, allocation, logger);
assert shardRoutings.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for "
+ "some shards";
// get all unassigned shards iterator
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();

while (iterator.hasNext()) {
ShardRouting shard = iterator.next();
try {
if (decisionMap.isEmpty() == false) {
if (decisionMap.containsKey(shard)) {
executeDecision(shard, decisionMap.remove(shard), allocation, iterator);
}
} else {
// no need to keep iterating the unassigned shards, if we don't have anything in decision map
break;
}
} catch (Exception e) {
logger.error("Failed to execute decision for shard {} while initializing {}", shard, e);
throw e;
}
}
}

private void executeDecision(
protected void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
RoutingAllocation allocation,
Expand All @@ -135,8 +101,6 @@ private void executeDecision(
}
}

public void allocateUnassignedBatch(String batchId, RoutingAllocation allocation) {}

protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
Expand Down Expand Up @@ -165,21 +129,6 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
Logger logger
);

public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> unassignedShardBatch,
RoutingAllocation allocation,
Logger logger
) {

return (HashMap<ShardRouting, AllocateUnassignedDecision>) unassignedShardBatch.stream()
.collect(
Collectors.toMap(
unassignedShard -> unassignedShard,
unassignedShard -> makeAllocationDecision(unassignedShard, allocation, logger)
)
);
}

/**
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
Expand Down
Loading

0 comments on commit 9415706

Please sign in to comment.