diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index 5a20112b19219..503aa7274c5ee 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -42,21 +42,29 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RerouteService; +import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.routing.allocation.FailedShard; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.common.Priority; +import org.opensearch.common.UUIDs; import org.opensearch.common.inject.Inject; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.set.Sets; import org.opensearch.common.lease.Releasables; -import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.ShardId; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch; import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.Spliterators; import java.util.concurrent.ConcurrentMap; @@ -73,29 +81,45 @@ public class GatewayAllocator implements ExistingShardsAllocator { public static final String ALLOCATOR_NAME = "gateway_allocator"; private static final Logger logger = LogManager.getLogger(GatewayAllocator.class); + private static final long MAX_BATCH_SIZE = 2000; // will change it to a dynamic setting later private final RerouteService rerouteService; private final PrimaryShardAllocator primaryShardAllocator; private final ReplicaShardAllocator replicaShardAllocator; + private final PrimaryShardBatchAllocator primaryBatchShardAllocator; + private final ReplicaShardBatchAllocator replicaBatchShardAllocator; + private final TransportNodesListGatewayStartedShardsBatch batchStartedAction; + private final TransportNodesListShardStoreMetadataBatch batchStoreAction; + private final ConcurrentMap< ShardId, AsyncShardFetch> asyncFetchStarted = ConcurrentCollections - .newConcurrentMap(); + .newConcurrentMap(); private final ConcurrentMap> asyncFetchStore = ConcurrentCollections.newConcurrentMap(); private Set lastSeenEphemeralIds = Collections.emptySet(); + private final ConcurrentMap startedShardBatchLookup = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap batchIdToStartedShardBatch = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap storeShardBatchLookup = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap batchIdToStoreShardBatch = ConcurrentCollections.newConcurrentMap(); @Inject public GatewayAllocator( RerouteService rerouteService, TransportNodesListGatewayStartedShards startedAction, - TransportNodesListShardStoreMetadata storeAction + TransportNodesListShardStoreMetadata storeAction, + TransportNodesListGatewayStartedShardsBatch batchStartedAction, + TransportNodesListShardStoreMetadataBatch batchStoreAction ) { this.rerouteService = rerouteService; this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction); this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); + this.batchStartedAction = batchStartedAction; + this.primaryBatchShardAllocator = new InternalPrimaryBatchShardAllocator(); + this.batchStoreAction = batchStoreAction; + this.replicaBatchShardAllocator = new InternalReplicaBatchShardAllocator(); } @Override @@ -104,6 +128,10 @@ public void cleanCaches() { asyncFetchStarted.clear(); Releasables.close(asyncFetchStore.values()); asyncFetchStore.clear(); + batchIdToStartedShardBatch.clear(); + batchIdToStoreShardBatch.clear(); + startedShardBatchLookup.clear(); + storeShardBatchLookup.clear(); } // for tests @@ -111,6 +139,10 @@ protected GatewayAllocator() { this.rerouteService = null; this.primaryShardAllocator = null; this.replicaShardAllocator = null; + this.batchStartedAction = null; + this.primaryBatchShardAllocator = null; + this.batchStoreAction = null; + this.replicaBatchShardAllocator = null; } @Override @@ -130,6 +162,7 @@ public void applyStartedShards(final List startedShards, final Rou for (ShardRouting startedShard : startedShards) { Releasables.close(asyncFetchStarted.remove(startedShard.shardId())); Releasables.close(asyncFetchStore.remove(startedShard.shardId())); + safelyRemoveShardFromBatch(startedShard); } } @@ -138,6 +171,7 @@ public void applyFailedShards(final List failedShards, final Routin for (FailedShard failedShard : failedShards) { Releasables.close(asyncFetchStarted.remove(failedShard.getRoutingEntry().shardId())); Releasables.close(asyncFetchStore.remove(failedShard.getRoutingEntry().shardId())); + safelyRemoveShardFromBatch(failedShard.getRoutingEntry()); } } @@ -145,15 +179,30 @@ public void applyFailedShards(final List failedShards, final Routin public void beforeAllocation(final RoutingAllocation allocation) { assert primaryShardAllocator != null; assert replicaShardAllocator != null; + assert primaryBatchShardAllocator != null; + assert replicaBatchShardAllocator != null; ensureAsyncFetchStorePrimaryRecency(allocation); } @Override public void afterPrimariesBeforeReplicas(RoutingAllocation allocation) { - assert replicaShardAllocator != null; - if (allocation.routingNodes().hasInactiveShards()) { - // cancel existing recoveries if we have a better match - replicaShardAllocator.processExistingRecoveries(allocation); + // ToDo: fetch from settings + boolean batchMode = true; + if (batchMode) { + assert replicaBatchShardAllocator != null; + List> storedShardBatches = batchIdToStoreShardBatch.values().stream() + .map(ShardsBatch::getBatchedShardRoutings) + .collect(Collectors.toList()); + if (allocation.routingNodes().hasInactiveShards()) { + // cancel existing recoveries if we have a better match + replicaBatchShardAllocator.processExistingRecoveries(allocation, storedShardBatches); + } + } else { + assert replicaShardAllocator != null; + if (allocation.routingNodes().hasInactiveShards()) { + // cancel existing recoveries if we have a better match + replicaShardAllocator.processExistingRecoveries(allocation); + } } } @@ -168,6 +217,99 @@ public void allocateUnassigned( innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler); } + @Override + public void allocateUnassignedBatch(final RoutingAllocation allocation, boolean primary) { + // create batches for unassigned shards + createBatches(allocation, primary); + + assert primaryBatchShardAllocator != null; + assert replicaBatchShardAllocator != null; + if (primary) { + batchIdToStartedShardBatch.values().forEach(shardsBatch -> primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation)); + } else { + batchIdToStoreShardBatch.values().forEach(batch -> replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShardRoutings(), allocation)); + } + } + + private void createBatches(RoutingAllocation allocation, boolean primary) { + RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); + // fetch all current batched shards + Set currentBatchedShards = primary? startedShardBatchLookup.keySet() : storeShardBatchLookup.keySet(); + Set shardsToBatch = Sets.newHashSet(); + // add all unassigned shards to the batch if they are not already in a batch + unassigned.forEach(shardRouting -> { + if ((currentBatchedShards.contains(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) { + assert shardRouting.unassigned(); + shardsToBatch.add(shardRouting); + } + }); + Iterator iterator = shardsToBatch.iterator(); + long batchSize = MAX_BATCH_SIZE; + Map addToCurrentBatch = new HashMap<>(); + while (iterator.hasNext()) { + ShardRouting currentShard = iterator.next(); + if (batchSize > 0) { + ShardBatchEntry shardBatchEntry = new ShardBatchEntry(IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(currentShard.index()).getSettings()) + , currentShard); + addToCurrentBatch.put(currentShard.shardId(), shardBatchEntry); + batchSize--; + iterator.remove(); + } + // add to batch if batch size full or last shard in unassigned list + if (batchSize == 0 || iterator.hasNext() == false) { + String batchUUId = UUIDs.base64UUID(); + + ShardsBatch shardsBatch = new ShardsBatch(batchUUId, addToCurrentBatch, primary); + // add the batch to list of current batches + addBatch(shardsBatch, primary); + addShardsIdsToLookup(addToCurrentBatch.keySet(), batchUUId, primary); + addToCurrentBatch.clear(); + batchSize = MAX_BATCH_SIZE; + } + } + } + + private void addBatch(ShardsBatch shardsBatch, boolean primary) { + ConcurrentMap batches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; + if (batches.containsKey(shardsBatch.getBatchId())) { + throw new IllegalStateException("Batch already exists. BatchId = " + shardsBatch.getBatchId()); + } + batches.put(shardsBatch.getBatchId(), shardsBatch); + } + + private void addShardsIdsToLookup(Set shards, String batchId, boolean primary) { + ConcurrentMap lookupMap = primary ? startedShardBatchLookup : storeShardBatchLookup; + shards.forEach(shardId -> { + if(lookupMap.containsKey(shardId)){ + throw new IllegalStateException("Shard is already Batched. ShardId = " + shardId + "Batch Id="+ lookupMap.get(shardId)); + } + lookupMap.put(shardId, batchId); + }); + } + + /** + * Safely remove a shard from the appropriate batch. + * If the shard is not in a batch, this is a no-op. + * Cleans the batch if it is empty after removing the shard. + * This method should be called when removing the shard from the batch instead {@link ShardsBatch#removeFromBatch(ShardRouting)} + * so that we can clean up the batch if it is empty and release the fetching resources + * @param shardRouting shard to be removed + */ + private void safelyRemoveShardFromBatch(ShardRouting shardRouting) { + String batchId = shardRouting.primary() ? startedShardBatchLookup.get(shardRouting.shardId()) : storeShardBatchLookup.get(shardRouting.shardId()); + if (batchId == null) { + return; + } + ConcurrentMap batches = shardRouting.primary() ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; + ShardsBatch batch = batches.get(batchId); + batch.removeFromBatch(shardRouting); + // remove the batch if it is empty + if (batch.getBatchedShards().isEmpty()) { + Releasables.close(batch.getAsyncFetcher()); + batches.remove(batchId); + } + } + // allow for testing infra to change shard allocators implementation protected static void innerAllocatedUnassigned( RoutingAllocation allocation, @@ -216,7 +358,13 @@ private void ensureAsyncFetchStorePrimaryRecency(RoutingAllocation allocation) { Sets.difference(newEphemeralIds, lastSeenEphemeralIds) ) ); + asyncFetchStore.values().forEach(fetch -> clearCacheForPrimary(fetch, allocation)); + storeShardBatchLookup.values().forEach(batch -> + clearCacheForBatchPrimary(batchIdToStoreShardBatch.get(batch), allocation) + ); + + // recalc to also (lazily) clear out old nodes. this.lastSeenEphemeralIds = newEphemeralIds; } @@ -232,6 +380,18 @@ private static void clearCacheForPrimary( } } + private static void clearCacheForBatchPrimary( + ShardsBatch batch, + RoutingAllocation allocation + ) { + List primaries = batch.getBatchedShards().stream() + .map(allocation.routingNodes()::activePrimary) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + AsyncBatchShardFetch fetch = batch.getAsyncFetcher(); + primaries.forEach(node -> fetch.clearCacheForNode(node.currentNodeId())); + } + private boolean hasNewNodes(DiscoveryNodes nodes) { for (final DiscoveryNode node : nodes.getDataNodes().values()) { if (lastSeenEphemeralIds.contains(node.getEphemeralId()) == false) { @@ -268,6 +428,32 @@ protected void reroute(ShardId shardId, String reason) { } } + class InternalBatchAsyncFetch extends AsyncBatchShardFetch { + + InternalBatchAsyncFetch(Logger logger, + String type, + Map map, + AsyncBatchShardFetch.Lister, T> action, + String batchUUId + ) { + super(logger, type, map, action, batchUUId); + } + + @Override + protected void reroute(String batchUUId, String reason) { + logger.trace("{} scheduling reroute for {}", batchUUId, reason); + assert rerouteService != null; + rerouteService.reroute( + "async_shard_fetch", + Priority.HIGH, + ActionListener.wrap( + r -> logger.trace("{} scheduled reroute completed for {}", batchUUId, reason), + e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", batchUUId, reason), e) + ) + ); + } + } + class InternalPrimaryShardAllocator extends PrimaryShardAllocator { private final TransportNodesListGatewayStartedShards startedAction; @@ -303,6 +489,59 @@ protected AsyncShardFetch.FetchResult fetchData(Set shardsEligibleForFetch, + Set inEligibleShards, + RoutingAllocation allocation) { + ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null; + shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting; + if (shardRouting == null) { + return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap()); + } + + String batchId = startedShardBatchLookup.getOrDefault(shardRouting.shardId(), null); + if (batchId == null) { + logger.debug("Shard {} has no batch id", shardRouting); + throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching"); + } + + + if (batchIdToStartedShardBatch.containsKey(batchId) == false) { + logger.debug("Batch {} has no started shard batch", batchId); + throw new IllegalStateException("Batch " + batchId + " has no started shard batch"); + } + + ShardsBatch shardsBatch = batchIdToStartedShardBatch.get(batchId); + // remove in eligible shards which allocator is not responsible for + inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch); + + if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) { + logger.debug("Batch {} is empty", batchId); + return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap()); + } + + Map> shardToIgnoreNodes = new HashMap<>(); + + for(ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()){ + shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); + } + AsyncBatchShardFetch asyncFetcher = shardsBatch.getAsyncFetcher(); + AsyncBatchShardFetch.FetchResult shardBatchState = asyncFetcher.fetchData( + allocation.nodes(), + shardToIgnoreNodes + ); + + if (shardBatchState.hasData()) { + shardBatchState.processAllocation(allocation); + } + return (AsyncBatchShardFetch.FetchResult) shardBatchState; + } + + } + class InternalReplicaShardAllocator extends ReplicaShardAllocator { private final TransportNodesListShardStoreMetadata storeAction; @@ -341,4 +580,180 @@ protected boolean hasInitiatedFetching(ShardRouting shard) { return asyncFetchStore.get(shard.shardId()) != null; } } + + class InternalReplicaBatchShardAllocator extends ReplicaShardBatchAllocator { + + @Override + @SuppressWarnings("unchecked") + protected AsyncBatchShardFetch.FetchResult fetchData(Set shardsEligibleForFetch, + Set inEligibleShards, + RoutingAllocation allocation) { + // get batch id for anyone given shard. We are assuming all shards will have same batch Id + ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null; + shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting; + if (shardRouting == null) { + return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap()); + } + + String batchId = storeShardBatchLookup.getOrDefault(shardRouting.shardId(), null); + if (batchId == null) { + logger.debug("Shard {} has no batch id", shardRouting); + throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching"); + } + + if (batchIdToStoreShardBatch.containsKey(batchId) == false) { + logger.debug("Batch {} has no store shard batch", batchId); + throw new IllegalStateException("Batch " + batchId + " has no shard store batch"); + } + + ShardsBatch shardsBatch = batchIdToStoreShardBatch.get(batchId); + // remove in eligible shards which allocator is not responsible for + inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch); + + if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) { + logger.debug("Batch {} is empty", batchId); + return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap()); + } + Map> shardToIgnoreNodes = new HashMap<>(); + for (ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()) { + shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); + } + AsyncBatchShardFetch asyncFetcher = shardsBatch.getAsyncFetcher(); + AsyncBatchShardFetch.FetchResult shardBatchStores = asyncFetcher.fetchData( + allocation.nodes(), + shardToIgnoreNodes + ); + if (shardBatchStores.hasData()) { + shardBatchStores.processAllocation(allocation); + } + return (AsyncBatchShardFetch.FetchResult) shardBatchStores; + } + + @Override + protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { + return null; + } + + @Override + protected boolean hasInitiatedFetching(ShardRouting shard) { + return false; + } + } + + /** + * Holds information about a batch of shards to be allocated. + * Async fetcher is used to fetch the data for the batch. + */ + private class ShardsBatch { + private final String batchId; + boolean primary; + + private final AsyncBatchShardFetch asyncBatch; + + private final Map batchInfo; + + public ShardsBatch(String batchId, Map shardsWithInfo, boolean primary) { + this.batchId = batchId; + this.batchInfo = new HashMap<>(shardsWithInfo); + // create a ShardId -> customDataPath map for async fetch + Map shardIdsMap = batchInfo.entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().getCustomDataPath() + )); + this.primary = primary; + if (primary) { + asyncBatch = new InternalBatchAsyncFetch<>( + logger, + "batch_shards_started", + shardIdsMap, + batchStartedAction, + batchId); + } else { + asyncBatch = new InternalBatchAsyncFetch<>( + logger, + "batch_shards_started", + shardIdsMap, + batchStoreAction, + batchId); + + } + } + + public void removeFromBatch(ShardRouting shard) { + + batchInfo.remove(shard.shardId()); + asyncBatch.shardsToCustomDataPathMap.remove(shard.shardId()); + assert shard.primary() == primary : "Illegal call to delete shard from batch"; + // remove from lookup + if (this.primary) { + startedShardBatchLookup.remove(shard.shardId()); + } else { + storeShardBatchLookup.remove(shard.shardId()); + } + // assert that fetcher and shards are the same as batched shards + assert batchInfo.size() == asyncBatch.shardsToCustomDataPathMap.size() : "Shards size is not equal to fetcher size"; + } + + Set getBatchedShardRoutings() { + return batchInfo.values().stream().map(ShardBatchEntry::getShardRouting).collect(Collectors.toSet()); + } + + Set getBatchedShards() { + return batchInfo.keySet(); + } + + public String getBatchId() { + return batchId; + } + + AsyncBatchShardFetch getAsyncFetcher() { + return asyncBatch; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || o instanceof ShardsBatch == false) { + return false; + } + ShardsBatch shardsBatch = (ShardsBatch) o; + return batchId.equals(shardsBatch.getBatchId()) && batchInfo.keySet().equals(shardsBatch.getBatchedShards()); + } + + @Override + public int hashCode() { + return Objects.hash(batchId); + } + + @Override + public String toString() { + return "batchId: " + batchId; + } + + } + + /** + * Holds information about a shard to be allocated in a batch. + */ + private class ShardBatchEntry { + + private final String customDataPath; + private final ShardRouting shardRouting; + + public ShardBatchEntry(String customDataPath, ShardRouting shardRouting) { + this.customDataPath = customDataPath; + this.shardRouting = shardRouting; + } + + public ShardRouting getShardRouting() { + return shardRouting; + } + + public String getCustomDataPath() { + return customDataPath; + } + } + } diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index 5216dd2fcb4b5..c8d335c698a13 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -336,7 +336,7 @@ private static Tuple> canBeAllocated * Takes the store info for nodes that have a shard store and adds them to the node decisions, * leaving the node explanations untouched for those nodes that do not have any store information. */ - private static List augmentExplanationsWithStoreInfo( + public static List augmentExplanationsWithStoreInfo( Map nodeDecisions, Map withShardStores ) { @@ -499,7 +499,7 @@ private static boolean canPerformOperationBasedRecovery( */ protected abstract boolean hasInitiatedFetching(ShardRouting shard); - private static class MatchingNode { + public static class MatchingNode { static final Comparator COMPARATOR = Comparator.comparing(m -> m.isNoopRecovery) .thenComparing(m -> m.retainingSeqNo) .thenComparing(m -> m.matchingBytes); @@ -556,5 +556,9 @@ boolean canPerformNoopRecovery(DiscoveryNode node) { public boolean hasAnyData() { return matchingNodes.isEmpty() == false; } + + public Map getNodeDecisions() { + return this.nodeDecisions; + } } } diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java new file mode 100644 index 0000000000000..1ed28b281f710 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -0,0 +1,518 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.NodeAllocationResult; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.gateway.AsyncBatchShardFetch.FetchResult; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; + +public abstract class ReplicaShardBatchAllocator extends BaseGatewayShardAllocator { + + /** + * Process existing recoveries of replicas and see if we need to cancel them if we find a better + * match. Today, a better match is one that can perform a no-op recovery while the previous recovery + * has to copy segment files. + */ + public void processExistingRecoveries(RoutingAllocation allocation, List> shardBatches) { + Metadata metadata = allocation.metadata(); + RoutingNodes routingNodes = allocation.routingNodes(); + List shardCancellationActions = new ArrayList<>(); + for (Set shardBatch : shardBatches) { + Set eligibleFetchShards = new HashSet<>(); + Set ineligibleShards = new HashSet<>(); + for (ShardRouting shard : shardBatch) { + if (shard.primary()) { + ineligibleShards.add(shard); + continue; + } + if (shard.initializing() == false) { + ineligibleShards.add(shard); + continue; + } + if (shard.relocatingNodeId() != null) { + ineligibleShards.add(shard); + continue; + } + + // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one... + if (shard.unassignedInfo() != null && shard.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) { + ineligibleShards.add(shard); + continue; + } + eligibleFetchShards.add(shard); + } + AsyncBatchShardFetch.FetchResult shardState = fetchData(eligibleFetchShards, ineligibleShards, allocation); + if (shardState.hasData()) { + logger.trace("{}: fetching new stores for initializing shard batch", eligibleFetchShards); + continue; // still fetching + } + for (ShardRouting shard: eligibleFetchShards) { + ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId()); + assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; + assert primaryShard.currentNodeId() != null; + final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); + final TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore= findStore(primaryNode, shardState, shard); + if (primaryStore == null) { + // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) + // just let the recovery find it out, no need to do anything about it for the initializing shard + logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard); + continue; + } + ReplicaShardAllocator.MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, shardState, false); + if (matchingNodes.getNodeWithHighestMatch() != null) { + DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); + DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); + // current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider + if (currentNode.equals(nodeWithHighestMatch) == false + && matchingNodes.canPerformNoopRecovery(nodeWithHighestMatch) + && canPerformOperationBasedRecovery(primaryStore, shardState, currentNode, shard) == false) { + // we found a better match that can perform noop recovery, cancel the existing allocation. + logger.debug( + "cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]", + currentNode, + nodeWithHighestMatch + ); + final Set failedNodeIds = shard.unassignedInfo() == null + ? Collections.emptySet() + : shard.unassignedInfo().getFailedNodeIds(); + UnassignedInfo unassignedInfo = new UnassignedInfo( + UnassignedInfo.Reason.REALLOCATED_REPLICA, + "existing allocation of replica to [" + + currentNode + + "] cancelled, can perform a noop recovery on [" + + nodeWithHighestMatch + + "]", + null, + 0, + allocation.getCurrentNanoTime(), + System.currentTimeMillis(), + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + failedNodeIds + ); + // don't cancel shard in the loop as it will cause a ConcurrentModificationException + shardCancellationActions.add( + () -> routingNodes.failShard( + logger, + shard, + unassignedInfo, + metadata.getIndexSafe(shard.index()), + allocation.changes() + ) + ); + } + } + } + } + for (Runnable action : shardCancellationActions) { + action.run(); + } + } + + private static boolean isResponsibleFor(final ShardRouting shard) { + return !shard.primary() // must be a replica + && shard.unassigned() // must be unassigned + // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one... + && shard.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED; + } + + abstract protected FetchResult fetchData(Set shardEligibleForFetch, + Set inEligibleShards, + RoutingAllocation allocation); + + @Override + public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) { + return null; + } + + @Override + public HashMap makeAllocationDecision(Set shards, RoutingAllocation allocation, Logger logger) { + HashMap shardAllocationDecisions = new HashMap<>(); + final boolean explain = allocation.debugDecision(); + final RoutingNodes routingNodes = allocation.routingNodes(); + Set shardsEligibleForFetch = new HashSet<>(); + Set shardsNotEligibleForFetch = new HashSet<>(); + HashMap>> nodeAllocationDecisions = new HashMap<>(); + for(ShardRouting shard : shards) { + if (!isResponsibleFor(shard)) { + // this allocator n is not responsible for allocating this shard + shardsNotEligibleForFetch.add(shard); + shardAllocationDecisions.put(shard, AllocateUnassignedDecision.NOT_TAKEN); + continue; + } + + Tuple> result = canBeAllocatedToAtLeastOneNode(shard, allocation); + Decision allocationDecision = result.v1(); + if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shard))){ + // only return early if we are not in explain mode, or we are in explain mode but we have not + // yet attempted to fetch any shard data + logger.trace("{}: ignoring allocation, can't be allocated on any node", shard); + shardsNotEligibleForFetch.add(shard); + shardAllocationDecisions.put(shard, + AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()), + result.v2() != null ? new ArrayList<>(result.v2().values()) : null)); + continue; + } + // storing the nodeDecisions in nodeAllocationDecisions if the decision is not YES + // so that we don't have to compute the decisions again + // ToDo: Check if we need to store or computing again will be cheaper/better + nodeAllocationDecisions.put(shard, result); + + shardsEligibleForFetch.add(shard); + } + + // only fetch data for eligible shards + final FetchResult shardsState = fetchData(shardsEligibleForFetch, shardsNotEligibleForFetch, allocation); + + // ToDo: Analyze if we need to create hashmaps here or sequential is better +// Map primaryNodesMap = shardsEligibleForFetch.stream() +// .map(x -> routingNodes.activePrimary(x.shardId())) +// .filter(Objects::nonNull) +// .filter(node -> node.currentNodeId() != null) +// .collect(Collectors.toMap(Function.identity(), node -> allocation.nodes().get(node.currentNodeId()))); +// +// Map primaryStoreMap = findStoresBatch(primaryNodesMap, shardsState); + + for (ShardRouting unassignedShard : shardsEligibleForFetch) { + if (!shardsState.hasData()) { + logger.trace("{}: ignoring allocation, still fetching shard stores", unassignedShard); + allocation.setHasPendingAsyncFetch(); + List nodeDecisions = null; + if (explain) { + nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); + } + shardAllocationDecisions.put(unassignedShard, + AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions)); + continue; + } + Tuple> result = nodeAllocationDecisions.get(unassignedShard); + ShardRouting primaryShard = routingNodes.activePrimary(unassignedShard.shardId()); + if (primaryShard == null) { + assert explain : "primary should only be null here if we are in explain mode, so we didn't " + + "exit early when canBeAllocatedToAtLeastOneNode didn't return a YES decision"; + shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.no( + UnassignedInfo.AllocationStatus.fromDecision(result.v1().type()), + result.v2() != null ? new ArrayList<>(result.v2().values()) : null + )); + continue; + } + assert primaryShard.currentNodeId() != null; + final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); + final TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore = findStore(primaryNode, shardsState, unassignedShard); + if (primaryStore == null) { + // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) + // we want to let the replica be allocated in order to expose the actual problem with the primary that the replica + // will try and recover from + // Note, this is the existing behavior, as exposed in running CorruptFileTest#testNoPrimaryData + logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", unassignedShard); + shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.NOT_TAKEN); + } + + // find the matching nodes + ReplicaShardAllocator.MatchingNodes matchingNodes = findMatchingNodes( + unassignedShard, + allocation, + false, + primaryNode, + primaryStore, + shardsState, + explain + ); + + assert explain == false || matchingNodes.getNodeDecisions() != null : "in explain mode, we must have individual node decisions"; + + List nodeDecisions = ReplicaShardAllocator.augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.getNodeDecisions()); + if (result.v1().type() != Decision.Type.YES) { + shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.fromDecision(result.v1().type()), nodeDecisions)); + continue; + } else if (matchingNodes.getNodeWithHighestMatch() != null) { + RoutingNode nodeWithHighestMatch = allocation.routingNodes().node(matchingNodes.getNodeWithHighestMatch().getId()); + // we only check on THROTTLE since we checked before on NO + Decision decision = allocation.deciders().canAllocate(unassignedShard, nodeWithHighestMatch, allocation); + if (decision.type() == Decision.Type.THROTTLE) { + logger.debug( + "[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store", + unassignedShard.index(), + unassignedShard.id(), + unassignedShard, + nodeWithHighestMatch.node() + ); + // we are throttling this, as we have enough other shards to allocate to this node, so ignore it for now + shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.throttle(nodeDecisions)); + continue; + } else { + logger.debug( + "[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", + unassignedShard.index(), + unassignedShard.id(), + unassignedShard, + nodeWithHighestMatch.node() + ); + // we found a match + shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.yes(nodeWithHighestMatch.node(), null, nodeDecisions, true)); + continue; + } + } else if (matchingNodes.hasAnyData() == false && unassignedShard.unassignedInfo().isDelayed()) { + // if we didn't manage to find *any* data (regardless of matching sizes), and the replica is + // unassigned due to a node leaving, so we delay allocation of this replica to see if the + // node with the shard copy will rejoin so we can re-use the copy it has + logger.debug("{}: allocation of [{}] is delayed", unassignedShard.shardId(), unassignedShard); + long remainingDelayMillis = 0L; + long totalDelayMillis = 0L; + if (explain) { + UnassignedInfo unassignedInfo = unassignedShard.unassignedInfo(); + Metadata metadata = allocation.metadata(); + IndexMetadata indexMetadata = metadata.index(unassignedShard.index()); + totalDelayMillis = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).getMillis(); + long remainingDelayNanos = unassignedInfo.getRemainingDelay(System.nanoTime(), indexMetadata.getSettings()); + remainingDelayMillis = TimeValue.timeValueNanos(remainingDelayNanos).millis(); + } + shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.delayed(remainingDelayMillis, totalDelayMillis, nodeDecisions)); + } + + shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.NOT_TAKEN); + } + return shardAllocationDecisions; + } + + private ReplicaShardAllocator.MatchingNodes findMatchingNodes( + ShardRouting shard, + RoutingAllocation allocation, + boolean noMatchFailedNodes, + DiscoveryNode primaryNode, + TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore, + FetchResult data, + boolean explain + ) { + Map matchingNodes = new HashMap<>(); + Map nodeDecisions = explain ? new HashMap<>() : null; + for (Map.Entry nodeStoreEntry : data.getData().entrySet()) { + DiscoveryNode discoNode = nodeStoreEntry.getKey(); + if (noMatchFailedNodes + && shard.unassignedInfo() != null + && shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) { + continue; + } + TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue() + .getNodeStoreFilesMetadataBatch().get(shard.shardId()).storeFilesMetadata(); + // we don't have any files at all, it is an empty index + if (storeFilesMetadata.isEmpty()) { + continue; + } + + RoutingNode node = allocation.routingNodes().node(discoNode.getId()); + if (node == null) { + continue; + } + + // check if we can allocate on that node... + // we only check for NO, since if this node is THROTTLING and it has enough "same data" + // then we will try and assign it next time + Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + ReplicaShardAllocator.MatchingNode matchingNode = null; + if (explain) { + matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetadata); + NodeAllocationResult.ShardStoreInfo shardStoreInfo = new NodeAllocationResult.ShardStoreInfo(matchingNode.matchingBytes); + nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision)); + } + + if (decision.type() == Decision.Type.NO) { + continue; + } + + if (matchingNode == null) { + matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetadata); + } + matchingNodes.put(discoNode, matchingNode); + if (logger.isTraceEnabled()) { + if (matchingNode.isNoopRecovery) { + logger.trace("{}: node [{}] can perform a noop recovery", shard, discoNode.getName()); + } else if (matchingNode.retainingSeqNo >= 0) { + logger.trace( + "{}: node [{}] can perform operation-based recovery with retaining sequence number [{}]", + shard, + discoNode.getName(), + matchingNode.retainingSeqNo + ); + } else { + logger.trace( + "{}: node [{}] has [{}/{}] bytes of re-usable data", + shard, + discoNode.getName(), + new ByteSizeValue(matchingNode.matchingBytes), + matchingNode.matchingBytes + ); + } + } + } + + return new ReplicaShardAllocator.MatchingNodes(matchingNodes, nodeDecisions); + } + + private static ReplicaShardAllocator.MatchingNode computeMatchingNode( + DiscoveryNode primaryNode, + TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore, + DiscoveryNode replicaNode, + TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata replicaStore + ) { + final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode); + final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode); + final boolean isNoopRecovery = (retainingSeqNoForReplica >= retainingSeqNoForPrimary && retainingSeqNoForPrimary >= 0) + || hasMatchingSyncId(primaryStore, replicaStore); + final long matchingBytes = computeMatchingBytes(primaryStore, replicaStore); + return new ReplicaShardAllocator.MatchingNode(matchingBytes, retainingSeqNoForReplica, isNoopRecovery); + } + + private static boolean hasMatchingSyncId( + TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata replicaStore + ) { + String primarySyncId = primaryStore.syncId(); + return primarySyncId != null && primarySyncId.equals(replicaStore.syncId()); + } + + private static long computeMatchingBytes( + TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata storeFilesMetadata + ) { + long sizeMatched = 0; + for (StoreFileMetadata storeFileMetadata : storeFilesMetadata) { + String metadataFileName = storeFileMetadata.name(); + if (primaryStore.fileExists(metadataFileName) && primaryStore.file(metadataFileName).isSame(storeFileMetadata)) { + sizeMatched += storeFileMetadata.length(); + } + } + return sizeMatched; + } + + /** + * Determines if the shard can be allocated on at least one node based on the allocation deciders. + * + * Returns the best allocation decision for allocating the shard on any node (i.e. YES if at least one + * node decided YES, THROTTLE if at least one node decided THROTTLE, and NO if none of the nodes decided + * YES or THROTTLE). If in explain mode, also returns the node-level explanations as the second element + * in the returned tuple. + */ + private static Tuple> canBeAllocatedToAtLeastOneNode( + ShardRouting shard, + RoutingAllocation allocation + ) { + Decision madeDecision = Decision.NO; + final boolean explain = allocation.debugDecision(); + Map nodeDecisions = explain ? new HashMap<>() : null; + for (final DiscoveryNode cursor : allocation.nodes().getDataNodes().values()) { + RoutingNode node = allocation.routingNodes().node(cursor.getId()); + if (node == null) { + continue; + } + // if we can't allocate it on a node, ignore it, for example, this handles + // cases for only allocating a replica after a primary + Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + if (decision.type() == Decision.Type.YES && madeDecision.type() != Decision.Type.YES) { + if (explain) { + madeDecision = decision; + } else { + return Tuple.tuple(decision, null); + } + } else if (madeDecision.type() == Decision.Type.NO && decision.type() == Decision.Type.THROTTLE) { + madeDecision = decision; + } + if (explain) { + nodeDecisions.put(node.nodeId(), new NodeAllocationResult(node.node(), null, decision)); + } + } + return Tuple.tuple(madeDecision, nodeDecisions); + } + + protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); + + protected abstract boolean hasInitiatedFetching(ShardRouting shard); + +// private static Map findStoresBatch(Map shardToNodeMap, +// FetchResult data) { +// Map shardStores = new HashMap<>(); +// shardToNodeMap.entrySet().forEach(entry -> { +// NodeStoreFilesMetadataBatch nodeFilesStore = data.getData().get(entry.getValue()); +// if (nodeFilesStore == null) { +// shardStores.put(entry.getKey(), null); +// } else { +// shardStores.put(entry.getKey(), nodeFilesStore.getNodeStoreFilesMetadataBatch().get(entry.getKey().shardId())); +// } +// }); +// return shardStores; +// } + + private static TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata findStore( + DiscoveryNode node, + FetchResult data, + ShardRouting shard + ) { + NodeStoreFilesMetadataBatch nodeFilesStore = data.getData().get(node); + if (nodeFilesStore == null) { + return null; + } + TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeFileStoreMetadata = nodeFilesStore.getNodeStoreFilesMetadataBatch().get(shard.shardId()); + if (nodeFileStoreMetadata.getStoreFileFetchException() != null) { + // Do we need to throw an exception here, to handle this case differently? + return null; + } + return nodeFileStoreMetadata.storeFilesMetadata(); + } + + private static boolean canPerformOperationBasedRecovery( + TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore, + FetchResult data, + DiscoveryNode targetNode, + ShardRouting shard + ) { + final NodeStoreFilesMetadataBatch nodeFilesStore = data.getData().get(targetNode); + if (nodeFilesStore == null) { + return false; + } + TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeFileStoreMetadata = nodeFilesStore.getNodeStoreFilesMetadataBatch().get(shard.shardId()); + if (nodeFileStoreMetadata.getStoreFileFetchException() != null) { + return false; + } + TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata targetNodeStore = nodeFileStoreMetadata.storeFilesMetadata(); + if (targetNodeStore == null || targetNodeStore.isEmpty()) { + return false; + } + if (hasMatchingSyncId(primaryStore, targetNodeStore)) { + return true; + } + return primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(targetNode) >= 0; + } +}