diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index bc0557ddc2afa..fc0a574c191b1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -34,6 +34,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.opensearch.Version; +import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; @@ -55,7 +56,9 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocationDecision; import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; @@ -98,6 +101,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.Collections.emptyMap; @@ -105,8 +109,10 @@ import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING; import static org.opensearch.cluster.health.ClusterHealthStatus.GREEN; import static org.opensearch.cluster.health.ClusterHealthStatus.RED; +import static org.opensearch.cluster.health.ClusterHealthStatus.YELLOW; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; import static org.opensearch.gateway.GatewayRecoveryTestUtils.corruptShard; import static org.opensearch.gateway.GatewayRecoveryTestUtils.getDiscoveryNodes; @@ -753,6 +759,7 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception { Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") .build() ); @@ -843,6 +850,87 @@ public void testBatchModeDisabled() throws Exception { ensureGreen("test"); } + public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentNodeStartTimeInBatchMode() throws Exception { + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build() + ); + internalCluster().startDataOnlyNodes(6); + createIndex( + "test", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m") + .build() + ); + ensureGreen("test"); + + List nodesWithReplicaShards = findNodesWithShard(false); + Settings replicaNode0DataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0)); + Settings replicaNode1DataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(1)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(1))); + + ensureStableCluster(5); + + logger.info("--> explicitly triggering reroute"); + ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertTrue(clusterRerouteResponse.isAcknowledged()); + + ClusterHealthResponse health = client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet(); + assertFalse(health.isTimedOut()); + assertEquals(YELLOW, health.getStatus()); + assertEquals(2, health.getUnassignedShards()); + // shard should be unassigned because of Allocation_Delayed + ClusterAllocationExplainResponse allocationExplainResponse = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test") + .setShard(0) + .setPrimary(false) + .get(); + assertEquals( + AllocationDecision.ALLOCATION_DELAYED, + allocationExplainResponse.getExplanation().getShardAllocationDecision().getAllocateDecision().getAllocationDecision() + ); + + logger.info("--> restarting the node 1"); + internalCluster().startDataOnlyNode( + Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNode0DataPathSettings).build() + ); + clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertTrue(clusterRerouteResponse.isAcknowledged()); + ensureStableCluster(6); + waitUntil( + () -> client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet().getInitializingShards() == 0 + ); + + health = client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet(); + assertFalse(health.isTimedOut()); + assertEquals(YELLOW, health.getStatus()); + assertEquals(1, health.getUnassignedShards()); + assertEquals(1, health.getDelayedUnassignedShards()); + allocationExplainResponse = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test") + .setShard(0) + .setPrimary(false) + .get(); + assertEquals( + AllocationDecision.ALLOCATION_DELAYED, + allocationExplainResponse.getExplanation().getShardAllocationDecision().getAllocateDecision().getAllocationDecision() + ); + + logger.info("--> restarting the node 0"); + internalCluster().startDataOnlyNode( + Settings.builder().put("node.name", nodesWithReplicaShards.get(1)).put(replicaNode1DataPathSettings).build() + ); + ensureStableCluster(7); + ensureGreen("test"); + } + public void testNBatchesCreationAndAssignment() throws Exception { // we will reduce batch size to 5 to make sure we have enough batches to test assignment // Total number of primary shards = 50 (50 indices*1) @@ -1293,4 +1381,14 @@ private void prepareIndex(String indexName, int numberOfPrimaryShards) { index(indexName, "type", "1", Collections.emptyMap()); flush(indexName); } + + private List findNodesWithShard(final boolean primary) { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + List startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED); + List requiredStartedShards = startedShards.stream() + .filter(startedShard -> startedShard.primary() == primary) + .collect(Collectors.toList()); + Collections.shuffle(requiredStartedShards, random()); + return requiredStartedShards.stream().map(shard -> state.nodes().get(shard.currentNodeId()).getName()).collect(Collectors.toList()); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 901b36f872622..9dcbe380477dc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -9,6 +9,8 @@ package org.opensearch.remotemigration; import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.bulk.BulkRequest; @@ -16,11 +18,15 @@ import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; +import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Before; @@ -39,6 +45,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; public class MigrationBaseTestCase extends OpenSearchIntegTestCase { protected static final String REPOSITORY_NAME = "test-remote-store-repo"; @@ -114,6 +121,10 @@ public void initDocRepToRemoteMigration() { ); } + public ClusterHealthStatus ensureGreen(String... indices) { + return ensureGreen(TimeValue.timeValueSeconds(60), indices); + } + public BulkResponse indexBulk(String indexName, int numDocs) { BulkRequest bulkRequest = new BulkRequest(); for (int i = 0; i < numDocs; i++) { @@ -181,14 +192,12 @@ private Thread getIndexingThread() { long currentDocCount = indexedDocs.incrementAndGet(); if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) { if (rarely()) { - logger.info("--> [iteration {}] flushing index", currentDocCount); client().admin().indices().prepareFlush(indexName).get(); + logger.info("Completed ingestion of {} docs. Flushing now", currentDocCount); } else { - logger.info("--> [iteration {}] refreshing index", currentDocCount); client().admin().indices().prepareRefresh(indexName).get(); } } - logger.info("Completed ingestion of {} docs", currentDocCount); } }); } @@ -218,4 +227,21 @@ public void stopShardRebalancing() { .get() ); } + + public ClusterHealthStatus waitForRelocation() { + ClusterHealthRequest request = Requests.clusterHealthRequest() + .waitForNoRelocatingShards(true) + .timeout(TimeValue.timeValueSeconds(60)) + .waitForEvents(Priority.LANGUID); + ClusterHealthResponse actionGet = client().admin().cluster().health(request).actionGet(); + if (actionGet.isTimedOut()) { + logger.info( + "waitForRelocation timed out, cluster state:\n{}\n{}", + client().admin().cluster().prepareState().get().getState(), + client().admin().cluster().preparePendingClusterTasks().get() + ); + assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false)); + } + return actionGet.getStatus(); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java index cea653c0ead4b..fa3b9368ded47 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java @@ -99,16 +99,7 @@ public void testRemotePrimaryRelocation() throws Exception { .add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode)) .execute() .actionGet(); - ClusterHealthResponse clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setTimeout(TimeValue.timeValueSeconds(60)) - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .execute() - .actionGet(); - - assertEquals(0, clusterHealthResponse.getRelocatingShards()); + waitForRelocation(); assertEquals(remoteNode, primaryNodeName("test")); logger.info("--> relocation from docrep to remote complete"); @@ -123,16 +114,7 @@ public void testRemotePrimaryRelocation() throws Exception { .add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2)) .execute() .actionGet(); - clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setTimeout(TimeValue.timeValueSeconds(60)) - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .execute() - .actionGet(); - - assertEquals(0, clusterHealthResponse.getRelocatingShards()); + waitForRelocation(); assertEquals(remoteNode2, primaryNodeName("test")); logger.info("--> relocation from remote to remote complete"); @@ -155,7 +137,6 @@ public void testRemotePrimaryRelocation() throws Exception { public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { String docRepNode = internalCluster().startNode(); - Client client = internalCluster().client(docRepNode); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index eed5de65258fc..58982e869794f 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -36,7 +36,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNode; -import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationDecision; @@ -46,9 +45,7 @@ import org.opensearch.cluster.routing.allocation.decider.Decision; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.stream.Collectors; /** * An abstract class that implements basic functionality for allocating @@ -81,38 +78,7 @@ public void allocateUnassigned( executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler); } - /** - * Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists - * @param shardRoutings the shards to allocate - * @param allocation the allocation state container object - */ - public void allocateUnassignedBatch(List shardRoutings, RoutingAllocation allocation) { - // make Allocation Decisions for all shards - HashMap decisionMap = makeAllocationDecision(shardRoutings, allocation, logger); - assert shardRoutings.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for " - + "some shards"; - // get all unassigned shards iterator - RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); - - while (iterator.hasNext()) { - ShardRouting shard = iterator.next(); - try { - if (decisionMap.isEmpty() == false) { - if (decisionMap.containsKey(shard)) { - executeDecision(shard, decisionMap.remove(shard), allocation, iterator); - } - } else { - // no need to keep iterating the unassigned shards, if we don't have anything in decision map - break; - } - } catch (Exception e) { - logger.error("Failed to execute decision for shard {} while initializing {}", shard, e); - throw e; - } - } - } - - private void executeDecision( + protected void executeDecision( ShardRouting shardRouting, AllocateUnassignedDecision allocateUnassignedDecision, RoutingAllocation allocation, @@ -135,8 +101,6 @@ private void executeDecision( } } - public void allocateUnassignedBatch(String batchId, RoutingAllocation allocation) {} - protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) { if (shardRouting.primary()) { if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) { @@ -165,21 +129,6 @@ public abstract AllocateUnassignedDecision makeAllocationDecision( Logger logger ); - public HashMap makeAllocationDecision( - List unassignedShardBatch, - RoutingAllocation allocation, - Logger logger - ) { - - return (HashMap) unassignedShardBatch.stream() - .collect( - Collectors.toMap( - unassignedShard -> unassignedShard, - unassignedShard -> makeAllocationDecision(unassignedShard, allocation, logger) - ) - ); - } - /** * Builds decisions for all nodes in the cluster, so that the explain API can provide information on * allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data). diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java index 1979f33484d49..27f9bedc4e495 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java @@ -14,6 +14,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard; @@ -61,50 +62,59 @@ protected FetchResult shardsState = fetchData( + List.of(unassignedShard), + Collections.emptyList(), + allocation + ); + List nodeGatewayStartedShards = adaptToNodeShardStates(unassignedShard, shardsState); + return getAllocationDecision(unassignedShard, allocation, nodeGatewayStartedShards, logger); } /** - * Build allocation decisions for all the shards present in the batch identified by batchId. + * Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists * - * @param shards set of shards given for allocation - * @param allocation current allocation of all the shards - * @param logger logger used for logging - * @return shard to allocation decision map + * @param shardRoutings the shards to allocate + * @param allocation the allocation state container object */ - @Override - public HashMap makeAllocationDecision( - List shards, - RoutingAllocation allocation, - Logger logger - ) { - HashMap shardAllocationDecisions = new HashMap<>(); + public void allocateUnassignedBatch(List shardRoutings, RoutingAllocation allocation) { + HashMap ineligibleShardAllocationDecisions = new HashMap<>(); List eligibleShards = new ArrayList<>(); List inEligibleShards = new ArrayList<>(); // identify ineligible shards - for (ShardRouting shard : shards) { + for (ShardRouting shard : shardRoutings) { AllocateUnassignedDecision decision = getInEligibleShardDecision(shard, allocation); if (decision != null) { + ineligibleShardAllocationDecisions.put(shard.shardId(), decision); inEligibleShards.add(shard); - shardAllocationDecisions.put(shard, decision); } else { eligibleShards.add(shard); } } - // Do not call fetchData if there are no eligible shards - if (eligibleShards.isEmpty()) { - return shardAllocationDecisions; - } + // only fetch data for eligible shards final FetchResult shardsState = fetchData(eligibleShards, inEligibleShards, allocation); - // process the received data - for (ShardRouting unassignedShard : eligibleShards) { - List nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState); - // get allocation decision for this shard - shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger)); + RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); + while (iterator.hasNext()) { + ShardRouting unassignedShard = iterator.next(); + AllocateUnassignedDecision allocationDecision; + + if (shardRoutings.contains(unassignedShard)) { + assert unassignedShard.primary(); + if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) { + allocationDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId()); + } else { + List nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState); + allocationDecision = getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger); + } + executeDecision(unassignedShard, allocationDecision, allocation, iterator); + } } - return shardAllocationDecisions; } /** diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java index be7867b7823f6..f2cb3d053440d 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; @@ -29,6 +30,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; /** * Allocates replica shards in a batch mode @@ -42,7 +45,7 @@ public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator { * match. Today, a better match is one that can perform a no-op recovery while the previous recovery * has to copy segment files. * - * @param allocation the overall routing allocation + * @param allocation the overall routing allocation * @param shardBatches a list of shard batches to check for existing recoveries */ public void processExistingRecoveries(RoutingAllocation allocation, List> shardBatches) { @@ -98,71 +101,92 @@ protected FetchResult> fetchDataResultSupplier = () -> { + return convertToNodeStoreFilesMetadataMap( + unassignedShard, + fetchData(List.of(unassignedShard), Collections.emptyList(), allocation) + ); + }; + return getUnassignedShardAllocationDecision(unassignedShard, allocation, fetchDataResultSupplier); } - @Override - public HashMap makeAllocationDecision( - List shards, - RoutingAllocation allocation, - Logger logger - ) { - HashMap shardAllocationDecisions = new HashMap<>(); - final boolean explain = allocation.debugDecision(); + /** + * Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists + * + * @param shardRoutings the shards to allocate + * @param allocation the allocation state container object + */ + public void allocateUnassignedBatch(List shardRoutings, RoutingAllocation allocation) { List eligibleShards = new ArrayList<>(); List ineligibleShards = new ArrayList<>(); - HashMap>> nodeAllocationDecisions = new HashMap<>(); - for (ShardRouting shard : shards) { - if (!isResponsibleFor(shard)) { - // this allocator n is not responsible for allocating this shard + Map ineligibleShardAllocationDecisions = new HashMap<>(); + + for (ShardRouting shard : shardRoutings) { + AllocateUnassignedDecision shardDecisionWithoutFetch = getUnassignedShardAllocationDecision(shard, allocation, null); + // Without fetchData, decision for in-eligible shards is non-null from our preliminary checks and null for eligible shards. + if (shardDecisionWithoutFetch != null) { ineligibleShards.add(shard); - shardAllocationDecisions.put(shard, AllocateUnassignedDecision.NOT_TAKEN); - continue; + ineligibleShardAllocationDecisions.put(shard, shardDecisionWithoutFetch); + } else { + eligibleShards.add(shard); } + } - Tuple> result = canBeAllocatedToAtLeastOneNode(shard, allocation); - Decision allocationDecision = result.v1(); - if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shard))) { - // only return early if we are not in explain mode, or we are in explain mode but we have not - // yet attempted to fetch any shard data - logger.trace("{}: ignoring allocation, can't be allocated on any node", shard); - shardAllocationDecisions.put( - shard, - AllocateUnassignedDecision.no( - UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()), - result.v2() != null ? new ArrayList<>(result.v2().values()) : null - ) - ); - continue; - } - // storing the nodeDecisions in nodeAllocationDecisions if the decision is not YES - // so that we don't have to compute the decisions again - nodeAllocationDecisions.put(shard, result); + // only fetch data for eligible shards + final FetchResult shardsState = fetchData(eligibleShards, ineligibleShards, allocation); - eligibleShards.add(shard); + List shardIdsFromBatch = shardRoutings.stream().map(shardRouting -> shardRouting.shardId()).collect(Collectors.toList()); + RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); + while (iterator.hasNext()) { + ShardRouting unassignedShard = iterator.next(); + // There will be only one entry for the shard in the unassigned shards batch + // for a shard with multiple unassigned replicas, hence we are comparing the shard ids + // instead of ShardRouting in-order to evaluate shard assignment for all unassigned replicas of a shard. + if (!unassignedShard.primary() && shardIdsFromBatch.contains(unassignedShard.shardId())) { + AllocateUnassignedDecision allocateUnassignedDecision; + if (ineligibleShardAllocationDecisions.containsKey(unassignedShard)) { + allocateUnassignedDecision = ineligibleShardAllocationDecisions.get(unassignedShard); + } else { + // The shard's eligibility is being recomputed again as + // the routing allocation state is updated during shard allocation decision execution + // because of which allocation eligibility of other unassigned shards can change. + allocateUnassignedDecision = getUnassignedShardAllocationDecision( + unassignedShard, + allocation, + () -> convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState) + ); + } + executeDecision(unassignedShard, allocateUnassignedDecision, allocation, iterator); + } } + } - // Do not call fetchData if there are no eligible shards - if (eligibleShards.isEmpty()) { - return shardAllocationDecisions; + private AllocateUnassignedDecision getUnassignedShardAllocationDecision( + ShardRouting shardRouting, + RoutingAllocation allocation, + Supplier> nodeStoreFileMetaDataMapSupplier + ) { + if (!isResponsibleFor(shardRouting)) { + return AllocateUnassignedDecision.NOT_TAKEN; } - // only fetch data for eligible shards - final FetchResult shardsState = fetchData(eligibleShards, ineligibleShards, allocation); + Tuple> result = canBeAllocatedToAtLeastOneNode(shardRouting, allocation); - for (ShardRouting unassignedShard : eligibleShards) { - Tuple> result = nodeAllocationDecisions.get(unassignedShard); - shardAllocationDecisions.put( - unassignedShard, - getAllocationDecision( - unassignedShard, - allocation, - convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState), - result, - logger - ) + final boolean explain = allocation.debugDecision(); + Decision allocationDecision = result.v1(); + if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) { + // only return early if we are not in explain mode, or we are in explain mode but we have not + // yet attempted to fetch any shard data + logger.trace("{}: ignoring allocation, can't be allocated on any node", shardRouting); + return AllocateUnassignedDecision.no( + UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()), + result.v2() != null ? new ArrayList<>(result.v2().values()) : null ); } - return shardAllocationDecisions; + if (nodeStoreFileMetaDataMapSupplier != null) { + Map discoveryNodeStoreFilesMetadataMap = nodeStoreFileMetaDataMapSupplier.get(); + return getAllocationDecision(shardRouting, allocation, discoveryNodeStoreFilesMetadataMap, result, logger); + } + return null; } private Map convertToNodeStoreFilesMetadataMap( diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 49cb710c915fc..82b68b32f3bf8 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2146,7 +2146,7 @@ public void waitForRemoteStoreSync(Runnable onProgress) throws IOException { segmentUploadeCount = directory.getSegmentsUploadedToRemoteStore().size(); } try { - Thread.sleep(TimeValue.timeValueSeconds(30).seconds()); + Thread.sleep(TimeValue.timeValueSeconds(30).millis()); } catch (InterruptedException ie) { throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie); } diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java index 522ad2a64ea5d..e90850de3fe33 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java @@ -9,6 +9,7 @@ import org.apache.lucene.codecs.Codec; import org.opensearch.Version; +import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.OpenSearchAllocationTestCase; @@ -19,12 +20,15 @@ import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationDecision; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.Nullable; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.shard.ShardId; @@ -44,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import static org.opensearch.cluster.routing.UnassignedInfo.Reason.CLUSTER_RECOVERED; @@ -87,42 +92,28 @@ private void allocateAllUnassignedBatch(final RoutingAllocation allocation) { public void testMakeAllocationDecisionDataFetching() { final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1"); - - List shards = new ArrayList<>(); - allocateAllUnassignedBatch(allocation); ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - shards.add(shard); - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(shards, new ArrayList<>(allDecisions.keySet())); - assertEquals(AllocationDecision.AWAITING_INFO, allDecisions.get(shard).getAllocationDecision()); + AllocateUnassignedDecision allocateUnassignedDecision = batchAllocator.makeAllocationDecision(shard, allocation, logger); + assertEquals(AllocationDecision.AWAITING_INFO, allocateUnassignedDecision.getAllocationDecision()); } public void testMakeAllocationDecisionForReplicaShard() { final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1"); List replicaShards = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).replicaShards(); - List shards = new ArrayList<>(replicaShards); - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(shards, new ArrayList<>(allDecisions.keySet())); - assertFalse(allDecisions.get(replicaShards.get(0)).isDecisionTaken()); + for (ShardRouting shardRouting : replicaShards) { + AllocateUnassignedDecision allocateUnassignedDecision = batchAllocator.makeAllocationDecision(shardRouting, allocation, logger); + assertFalse(allocateUnassignedDecision.isDecisionTaken()); + } } public void testMakeAllocationDecisionDataFetched() { final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1"); - List shards = new ArrayList<>(); ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - shards.add(shard); batchAllocator.addData(node1, "allocId1", true, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName())); - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(shards, new ArrayList<>(allDecisions.keySet())); - assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision()); + AllocateUnassignedDecision allocateUnassignedDecision = batchAllocator.makeAllocationDecision(shard, allocation, logger); + assertEquals(AllocationDecision.YES, allocateUnassignedDecision.getAllocationDecision()); } public void testMakeAllocationDecisionDataFetchedMultipleShards() { @@ -149,13 +140,88 @@ public void testMakeAllocationDecisionDataFetchedMultipleShards() { null ); } - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(new HashSet<>(shards), allDecisions.keySet()); - for (ShardRouting shard : shards) { - assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision()); + for (ShardRouting shardRouting : shards) { + AllocateUnassignedDecision allocateUnassignedDecision = batchAllocator.makeAllocationDecision(shardRouting, allocation, logger); + assertEquals(AllocationDecision.YES, allocateUnassignedDecision.getAllocationDecision()); + } + } + + public void testInitializePrimaryShards() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random()); + setUpShards(2); + final RoutingAllocation routingAllocation = routingAllocationWithMultiplePrimaries( + allocationDeciders, + CLUSTER_RECOVERED, + 2, + 0, + "allocId-0", + "allocId-1" + ); + + for (ShardId shardId : shardsInBatch) { + batchAllocator.addShardData( + node1, + "allocId-" + shardId.id(), + shardId, + true, + new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()), + null + ); + } + + allocateAllUnassignedBatch(routingAllocation); + + assertEquals(0, routingAllocation.routingNodes().unassigned().size()); + List initializingShards = routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + assertEquals(2, initializingShards.size()); + assertTrue(shardsInBatch.contains(initializingShards.get(0).shardId())); + assertTrue(shardsInBatch.contains(initializingShards.get(1).shardId())); + assertEquals(2, routingAllocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId())); + } + + public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + AllocationDeciders allocationDeciders = randomAllocationDeciders( + Settings.builder() + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1) + .build(), + clusterSettings, + random() + ); + setUpShards(2); + final RoutingAllocation routingAllocation = routingAllocationWithMultiplePrimaries( + allocationDeciders, + CLUSTER_RECOVERED, + 2, + 0, + "allocId-0", + "allocId-1" + ); + + for (ShardId shardId : shardsInBatch) { + batchAllocator.addShardData( + node1, + "allocId-" + shardId.id(), + shardId, + true, + new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()), + null + ); } + + allocateAllUnassignedBatch(routingAllocation); + + // Verify the throttling decider was not throttled, recovering shards on node greater than initial concurrent recovery setting + assertEquals(1, routingAllocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId())); + List initializingShards = routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + assertEquals(1, initializingShards.size()); + Set nodesWithInitialisingShards = initializingShards.stream().map(ShardRouting::currentNodeId).collect(Collectors.toSet()); + assertEquals(1, nodesWithInitialisingShards.size()); + assertEquals(Collections.singleton(node1.getId()), nodesWithInitialisingShards); + List ignoredShards = routingAllocation.routingNodes().unassigned().ignored(); + assertEquals(1, ignoredShards.size()); + assertEquals(UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, ignoredShards.get(0).unassignedInfo().getLastAllocationStatus()); } private RoutingAllocation routingAllocationWithOnePrimary( @@ -235,7 +301,7 @@ private RoutingAllocation routingAllocationWithMultiplePrimaries( .routingTable(routingTableBuilder.build()) .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) .build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, null, null, System.nanoTime()); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, null, System.nanoTime()); } class TestBatchAllocator extends PrimaryShardBatchAllocator { diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java index 464038c93228b..2e148c2bc8130 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java @@ -28,16 +28,19 @@ import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.Nullable; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.set.Sets; +import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.Engine; import org.opensearch.index.seqno.ReplicationTracker; @@ -72,6 +75,7 @@ public class ReplicaShardBatchAllocatorTests extends OpenSearchAllocationTestCas private static final org.apache.lucene.util.Version MIN_SUPPORTED_LUCENE_VERSION = org.opensearch.Version.CURRENT .minimumIndexCompatibilityVersion().luceneVersion; private final ShardId shardId = new ShardId("test", "_na_", 0); + private static Set shardsInBatch; private final DiscoveryNode node1 = newNode("node1"); private final DiscoveryNode node2 = newNode("node2"); private final DiscoveryNode node3 = newNode("node3"); @@ -83,6 +87,14 @@ public void buildTestAllocator() { this.testBatchAllocator = new TestBatchAllocator(); } + public static void setUpShards(int numberOfShards) { + shardsInBatch = new HashSet<>(); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + ShardId shardId = new ShardId("test", "_na_", shardNumber); + shardsInBatch.add(shardId); + } + } + private void allocateAllUnassignedBatch(final RoutingAllocation allocation) { final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); List shardToBatch = new ArrayList<>(); @@ -115,8 +127,6 @@ public void testAsyncFetchWithNoShardOnIndexCreation() { ); testBatchAllocator.clean(); allocateAllUnassignedBatch(allocation); - assertThat(testBatchAllocator.getFetchDataCalledAndClean(), equalTo(false)); - assertThat(testBatchAllocator.getShardEligibleFetchDataCountAndClean(), equalTo(0)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId)); } @@ -634,6 +644,60 @@ public void testDoNotCancelForBrokenNode() { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED), empty()); } + public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() throws InterruptedException { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + AllocationDeciders allocationDeciders = randomAllocationDeciders( + Settings.builder() + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 1) + .build(), + clusterSettings, + random() + ); + setUpShards(2); + final RoutingAllocation routingAllocation = twoPrimaryAndOneUnAssignedReplica(allocationDeciders); + for (ShardId shardIdFromBatch : shardsInBatch) { + testBatchAllocator.addShardData( + node1, + shardIdFromBatch, + "MATCH", + null, + new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION) + ) + .addShardData( + node2, + shardIdFromBatch, + "NO_MATCH", + null, + new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION) + ) + .addShardData( + node3, + shardIdFromBatch, + "MATCH", + null, + new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION) + ); + } + allocateAllUnassignedBatch(routingAllocation); + // Verify the throttling decider was throttled, incoming recoveries on a node should be + // lesser than or equal to allowed concurrent recoveries + assertEquals(0, routingAllocation.routingNodes().getIncomingRecoveries(node2.getId())); + assertEquals(1, routingAllocation.routingNodes().getIncomingRecoveries(node3.getId())); + List initializingShards = routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + assertEquals(1, initializingShards.size()); + List ignoredShardRoutings = routingAllocation.routingNodes().unassigned().ignored(); + assertEquals(1, ignoredShardRoutings.size()); + // Allocation status for ignored replicas shards is not updated after running the deciders they just get marked as ignored. + assertEquals(UnassignedInfo.AllocationStatus.NO_ATTEMPT, ignoredShardRoutings.get(0).unassignedInfo().getLastAllocationStatus()); + AllocateUnassignedDecision allocateUnassignedDecision = testBatchAllocator.makeAllocationDecision( + ignoredShardRoutings.get(0), + routingAllocation, + logger + ); + assertEquals(UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, allocateUnassignedDecision.getAllocationStatus()); + } + private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) { return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.CLUSTER_RECOVERED); } @@ -692,6 +756,77 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide ); } + private RoutingAllocation twoPrimaryAndOneUnAssignedReplica(AllocationDeciders deciders) throws InterruptedException { + Map shardIdShardRoutingMap = new HashMap<>(); + Index index = shardId.getIndex(); + + // Created started ShardRouting for each primary shards + for (ShardId shardIdFromBatch : shardsInBatch) { + shardIdShardRoutingMap.put( + shardIdFromBatch, + TestShardRouting.newShardRouting(shardIdFromBatch, node1.getId(), true, ShardRoutingState.STARTED) + ); + } + + // Create Index Metadata + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(index.getName()) + .settings(settings(Version.CURRENT).put(Settings.EMPTY)) + .numberOfShards(2) + .numberOfReplicas(1); + for (ShardId shardIdFromBatch : shardsInBatch) { + indexMetadata.putInSyncAllocationIds( + shardIdFromBatch.id(), + Sets.newHashSet(shardIdShardRoutingMap.get(shardIdFromBatch).allocationId().getId()) + ); + } + Metadata metadata = Metadata.builder().put(indexMetadata).build(); + + // Create Index Routing table + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + for (ShardId shardIdFromBatch : shardsInBatch) { + IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder(shardIdFromBatch); + // Add a primary shard in started state + indexShardRoutingTableBuilder.addShard(shardIdShardRoutingMap.get(shardIdFromBatch)); + // Add replicas of primary shard in un-assigned state. + for (int i = 0; i < 1; i++) { + indexShardRoutingTableBuilder.addShard( + ShardRouting.newUnassigned( + shardIdFromBatch, + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo( + UnassignedInfo.Reason.CLUSTER_RECOVERED, + null, + null, + 0, + System.nanoTime(), + System.currentTimeMillis(), + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet() + ) + ) + ); + } + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTableBuilder.build()); + } + + RoutingTable routingTable = RoutingTable.builder().add(indexRoutingTableBuilder.build()).build(); + ClusterState state = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + return new RoutingAllocation( + deciders, + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + SnapshotShardSizeInfo.EMPTY, + System.nanoTime() + ); + } + private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders, UnassignedInfo unassignedInfo) { ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId, node1.getId(), true, ShardRoutingState.STARTED); Metadata metadata = Metadata.builder() @@ -755,7 +890,7 @@ static String randomSyncId() { } class TestBatchAllocator extends ReplicaShardBatchAllocator { - private Map data = null; + private Map data = null; private AtomicBoolean fetchDataCalled = new AtomicBoolean(false); private AtomicInteger eligibleShardFetchDataCount = new AtomicInteger(0); @@ -800,6 +935,55 @@ public TestBatchAllocator addData( } data.put( node, + new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch( + node, + Map.of( + shardId, + new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata( + new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( + shardId, + new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), + peerRecoveryRetentionLeases + ), + storeFileFetchException + ) + ) + ) + ); + return this; + } + + public TestBatchAllocator addShardData( + DiscoveryNode node, + ShardId shardId, + String syncId, + @Nullable Exception storeFileFetchException, + StoreFileMetadata... files + ) { + return addShardData(node, Collections.emptyList(), shardId, syncId, storeFileFetchException, files); + } + + public TestBatchAllocator addShardData( + DiscoveryNode node, + List peerRecoveryRetentionLeases, + ShardId shardId, + String syncId, + @Nullable Exception storeFileFetchException, + StoreFileMetadata... files + ) { + if (data == null) { + data = new HashMap<>(); + } + Map filesAsMap = new HashMap<>(); + for (StoreFileMetadata file : files) { + filesAsMap.put(file.name(), file); + } + Map commitData = new HashMap<>(); + if (syncId != null) { + commitData.put(Engine.SYNC_COMMIT_ID, syncId); + } + + TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata( new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( shardId, @@ -807,8 +991,19 @@ public TestBatchAllocator addData( peerRecoveryRetentionLeases ), storeFileFetchException - ) + ); + Map shardIdNodeStoreFilesMetadataHashMap = + new HashMap<>(); + if (data.containsKey(node)) { + NodeStoreFilesMetadataBatch nodeStoreFilesMetadataBatch = data.get(node); + shardIdNodeStoreFilesMetadataHashMap.putAll(nodeStoreFilesMetadataBatch.getNodeStoreFilesMetadataBatch()); + } + shardIdNodeStoreFilesMetadataHashMap.put(shardId, nodeStoreFilesMetadata); + data.put( + node, + new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch(node, shardIdNodeStoreFilesMetadataHashMap) ); + return this; } @@ -820,25 +1015,7 @@ protected AsyncShardFetch.FetchResult fetchData( ) { fetchDataCalled.set(true); eligibleShardFetchDataCount.set(eligibleShards.size()); - Map tData = null; - if (data != null) { - tData = new HashMap<>(); - for (Map.Entry entry : data.entrySet()) { - Map shardData = Map.of( - shardId, - entry.getValue() - ); - tData.put( - entry.getKey(), - new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch(entry.getKey(), shardData) - ); - } - } - return new AsyncShardFetch.FetchResult<>(tData, new HashMap<>() { - { - put(shardId, Collections.emptySet()); - } - }); + return new AsyncShardFetch.FetchResult<>(data, Collections.>emptyMap()); } @Override