diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index 89db3198662fa..d9474b32bdbf6 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -61,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; @@ -70,93 +71,112 @@ * @opensearch.internal */ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator { + protected boolean shouldSkipFetchForRecovery(ShardRouting shard) { + if (shard.primary()) { + return true; + } + if (shard.initializing() == false) { + return true; + } + if (shard.relocatingNodeId() != null) { + return true; + } + if (shard.unassignedInfo() != null && shard.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) { + // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one... + return true; + } + return false; + } + + protected Runnable cancelExistingRecoveryForBetterMatch( + ShardRouting shard, + RoutingAllocation allocation, + Map nodeShardStores + ) { + if (nodeShardStores == null) { + logger.trace("{}: fetching new stores for initializing shard", shard); + return null; + } + Metadata metadata = allocation.metadata(); + RoutingNodes routingNodes = allocation.routingNodes(); + ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId()); + assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; + assert primaryShard.currentNodeId() != null; + final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); + + final StoreFilesMetadata primaryStore = findStore(primaryNode, nodeShardStores); + if (primaryStore == null) { + // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) + // just let the recovery find it out, no need to do anything about it for the initializing shard + logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard); + return null; + } + + MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, nodeShardStores, false); + if (matchingNodes.getNodeWithHighestMatch() != null) { + DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); + DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); + // current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider + if (currentNode.equals(nodeWithHighestMatch) == false + && matchingNodes.canPerformNoopRecovery(nodeWithHighestMatch) + && canPerformOperationBasedRecovery(primaryStore, nodeShardStores, currentNode) == false) { + // we found a better match that can perform noop recovery, cancel the existing allocation. + logger.debug( + "cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]", + currentNode, + nodeWithHighestMatch + ); + final Set failedNodeIds = shard.unassignedInfo() == null + ? Collections.emptySet() + : shard.unassignedInfo().getFailedNodeIds(); + UnassignedInfo unassignedInfo = new UnassignedInfo( + UnassignedInfo.Reason.REALLOCATED_REPLICA, + "existing allocation of replica to [" + + currentNode + + "] cancelled, can perform a noop recovery on [" + + nodeWithHighestMatch + + "]", + null, + 0, + allocation.getCurrentNanoTime(), + System.currentTimeMillis(), + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + failedNodeIds + ); + // don't cancel shard in the loop as it will cause a ConcurrentModificationException + return () -> routingNodes.failShard( + logger, + shard, + unassignedInfo, + metadata.getIndexSafe(shard.index()), + allocation.changes() + ); + } + } + return null; + } + /** * Process existing recoveries of replicas and see if we need to cancel them if we find a better * match. Today, a better match is one that can perform a no-op recovery while the previous recovery * has to copy segment files. */ public void processExistingRecoveries(RoutingAllocation allocation) { - Metadata metadata = allocation.metadata(); RoutingNodes routingNodes = allocation.routingNodes(); List shardCancellationActions = new ArrayList<>(); for (RoutingNode routingNode : routingNodes) { for (ShardRouting shard : routingNode) { - if (shard.primary()) { - continue; - } - if (shard.initializing() == false) { - continue; - } - if (shard.relocatingNodeId() != null) { - continue; - } - - // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one... - if (shard.unassignedInfo() != null && shard.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) { + if (shouldSkipFetchForRecovery(shard)) { continue; } AsyncShardFetch.FetchResult shardStores = fetchData(shard, allocation); - if (shardStores.hasData() == false) { - logger.trace("{}: fetching new stores for initializing shard", shard); - continue; // still fetching - } - - ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId()); - assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; - assert primaryShard.currentNodeId() != null; - final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); - final StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); - if (primaryStore == null) { - // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) - // just let the recovery find it out, no need to do anything about it for the initializing shard - logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard); - continue; - } + Map nodeShardStores = convertToNodeStoreFilesMetadataMap(shardStores); - MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, shardStores, false); - if (matchingNodes.getNodeWithHighestMatch() != null) { - DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); - DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); - // current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider - if (currentNode.equals(nodeWithHighestMatch) == false - && matchingNodes.canPerformNoopRecovery(nodeWithHighestMatch) - && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == false) { - // we found a better match that can perform noop recovery, cancel the existing allocation. - logger.debug( - "cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]", - currentNode, - nodeWithHighestMatch - ); - final Set failedNodeIds = shard.unassignedInfo() == null - ? Collections.emptySet() - : shard.unassignedInfo().getFailedNodeIds(); - UnassignedInfo unassignedInfo = new UnassignedInfo( - UnassignedInfo.Reason.REALLOCATED_REPLICA, - "existing allocation of replica to [" - + currentNode - + "] cancelled, can perform a noop recovery on [" - + nodeWithHighestMatch - + "]", - null, - 0, - allocation.getCurrentNanoTime(), - System.currentTimeMillis(), - false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT, - failedNodeIds - ); - // don't cancel shard in the loop as it will cause a ConcurrentModificationException - shardCancellationActions.add( - () -> routingNodes.failShard( - logger, - shard, - unassignedInfo, - metadata.getIndexSafe(shard.index()), - allocation.changes() - ) - ); - } + Runnable cancellationAction = cancelExistingRecoveryForBetterMatch(shard, allocation, nodeShardStores); + if (cancellationAction != null) { + shardCancellationActions.add(cancellationAction); } } } @@ -168,7 +188,7 @@ && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == f /** * Is the allocator responsible for allocating the given {@link ShardRouting}? */ - private static boolean isResponsibleFor(final ShardRouting shard) { + protected static boolean isResponsibleFor(final ShardRouting shard) { return shard.primary() == false // must be a replica && shard.unassigned() // must be unassigned // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one... @@ -186,12 +206,11 @@ public AllocateUnassignedDecision makeAllocationDecision( return AllocateUnassignedDecision.NOT_TAKEN; } - final RoutingNodes routingNodes = allocation.routingNodes(); - final boolean explain = allocation.debugDecision(); // pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing Tuple> result = canBeAllocatedToAtLeastOneNode(unassignedShard, allocation); Decision allocateDecision = result.v1(); - if (allocateDecision.type() != Decision.Type.YES && (explain == false || hasInitiatedFetching(unassignedShard) == false)) { + if (allocateDecision.type() != Decision.Type.YES + && (allocation.debugDecision() == false || hasInitiatedFetching(unassignedShard) == false)) { // only return early if we are not in explain mode, or we are in explain mode but we have not // yet attempted to fetch any shard data logger.trace("{}: ignoring allocation, can't be allocated on any node", unassignedShard); @@ -202,28 +221,41 @@ public AllocateUnassignedDecision makeAllocationDecision( } AsyncShardFetch.FetchResult shardStores = fetchData(unassignedShard, allocation); - if (shardStores.hasData() == false) { + Map nodeShardStores = convertToNodeStoreFilesMetadataMap(shardStores); + return getAllocationDecision(unassignedShard, allocation, nodeShardStores, result, logger); + } + + protected AllocateUnassignedDecision getAllocationDecision( + ShardRouting unassignedShard, + RoutingAllocation allocation, + Map nodeShardStores, + Tuple> allocationDecision, + Logger logger + ) { + if (nodeShardStores == null) { + // node shard stores is null when we don't have data yet and still fetching the shard stores logger.trace("{}: ignoring allocation, still fetching shard stores", unassignedShard); allocation.setHasPendingAsyncFetch(); List nodeDecisions = null; - if (explain) { + if (allocation.debugDecision()) { nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); } return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); } - + final RoutingNodes routingNodes = allocation.routingNodes(); + final boolean explain = allocation.debugDecision(); ShardRouting primaryShard = routingNodes.activePrimary(unassignedShard.shardId()); if (primaryShard == null) { assert explain : "primary should only be null here if we are in explain mode, so we didn't " + "exit early when canBeAllocatedToAtLeastOneNode didn't return a YES decision"; return AllocateUnassignedDecision.no( - UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.type()), - new ArrayList<>(result.v2().values()) + UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.v1().type()), + new ArrayList<>(allocationDecision.v2().values()) ); } assert primaryShard.currentNodeId() != null; final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); - final StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); + final StoreFilesMetadata primaryStore = findStore(primaryNode, nodeShardStores); if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // we want to let the replica be allocated in order to expose the actual problem with the primary that the replica @@ -239,14 +271,17 @@ public AllocateUnassignedDecision makeAllocationDecision( false, primaryNode, primaryStore, - shardStores, + nodeShardStores, explain ); assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions"; - List nodeDecisions = augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.nodeDecisions); - if (allocateDecision.type() != Decision.Type.YES) { - return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.type()), nodeDecisions); + List nodeDecisions = augmentExplanationsWithStoreInfo(allocationDecision.v2(), matchingNodes.nodeDecisions); + if (allocationDecision.v1().type() != Decision.Type.YES) { + return AllocateUnassignedDecision.no( + UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.v1().type()), + nodeDecisions + ); } else if (matchingNodes.getNodeWithHighestMatch() != null) { RoutingNode nodeWithHighestMatch = allocation.routingNodes().node(matchingNodes.getNodeWithHighestMatch().getId()); // we only check on THROTTLE since we checked before on NO @@ -301,7 +336,7 @@ public AllocateUnassignedDecision makeAllocationDecision( * YES or THROTTLE). If in explain mode, also returns the node-level explanations as the second element * in the returned tuple. */ - private static Tuple> canBeAllocatedToAtLeastOneNode( + protected static Tuple> canBeAllocatedToAtLeastOneNode( ShardRouting shard, RoutingAllocation allocation ) { @@ -357,12 +392,11 @@ private static List augmentExplanationsWithStoreInfo( /** * Finds the store for the assigned shard in the fetched data, returns null if none is found. */ - private static StoreFilesMetadata findStore(DiscoveryNode node, AsyncShardFetch.FetchResult data) { - NodeStoreFilesMetadata nodeFilesStore = data.getData().get(node); - if (nodeFilesStore == null) { + private static StoreFilesMetadata findStore(DiscoveryNode node, Map data) { + if (!data.containsKey(node)) { return null; } - return nodeFilesStore.storeFilesMetadata(); + return data.get(node); } private MatchingNodes findMatchingNodes( @@ -371,19 +405,19 @@ private MatchingNodes findMatchingNodes( boolean noMatchFailedNodes, DiscoveryNode primaryNode, StoreFilesMetadata primaryStore, - AsyncShardFetch.FetchResult data, + Map data, boolean explain ) { Map matchingNodes = new HashMap<>(); Map nodeDecisions = explain ? new HashMap<>() : null; - for (Map.Entry nodeStoreEntry : data.getData().entrySet()) { + for (Map.Entry nodeStoreEntry : data.entrySet()) { DiscoveryNode discoNode = nodeStoreEntry.getKey(); if (noMatchFailedNodes && shard.unassignedInfo() != null && shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) { continue; } - StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata(); + StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue(); // we don't have any files at all, it is an empty index if (storeFilesMetadata.isEmpty()) { continue; @@ -438,6 +472,19 @@ private MatchingNodes findMatchingNodes( return new MatchingNodes(matchingNodes, nodeDecisions); } + private Map convertToNodeStoreFilesMetadataMap( + AsyncShardFetch.FetchResult data + ) { + if (data.hasData() == false) { + // if we don't have data yet return null + return null; + } + return data.getData() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().storeFilesMetadata())); + } + private static long computeMatchingBytes(StoreFilesMetadata primaryStore, StoreFilesMetadata storeFilesMetadata) { long sizeMatched = 0; for (StoreFileMetadata storeFileMetadata : storeFilesMetadata) { @@ -470,14 +517,14 @@ private static MatchingNode computeMatchingNode( private static boolean canPerformOperationBasedRecovery( StoreFilesMetadata primaryStore, - AsyncShardFetch.FetchResult shardStores, + Map shardStores, DiscoveryNode targetNode ) { - final NodeStoreFilesMetadata targetNodeStore = shardStores.getData().get(targetNode); - if (targetNodeStore == null || targetNodeStore.storeFilesMetadata().isEmpty()) { + final StoreFilesMetadata targetNodeStore = shardStores.get(targetNode); + if (targetNodeStore == null || targetNodeStore.isEmpty()) { return false; } - if (hasMatchingSyncId(primaryStore, targetNodeStore.storeFilesMetadata())) { + if (hasMatchingSyncId(primaryStore, targetNodeStore)) { return true; } return primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(targetNode) >= 0; @@ -490,7 +537,10 @@ private static boolean canPerformOperationBasedRecovery( */ protected abstract boolean hasInitiatedFetching(ShardRouting shard); - private static class MatchingNode { + /** + * A class to enacapsulate the details regarding the a MatchNode for shard assignment + */ + protected static class MatchingNode { static final Comparator COMPARATOR = Comparator.comparing(m -> m.isNoopRecovery) .thenComparing(m -> m.retainingSeqNo) .thenComparing(m -> m.matchingBytes);