From 9c1f30283fb00eca0b5203c7dbe991cbfc7746d0 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 11 Sep 2024 09:52:20 -0700 Subject: [PATCH] search replica recovery Signed-off-by: Marc Handalian --- gradle/run.gradle | 1 + .../opensearch/index/shard/IndexShardIT.java | 4 +- .../SearchReplicaReplicationIT.java | 4 +- .../cluster/routing/IndexRoutingTable.java | 2 +- .../routing/IndexShardRoutingTable.java | 10 +++-- .../cluster/routing/ShardRouting.java | 4 +- .../allocation/IndexMetadataUpdater.java | 13 ++++++ .../decider/ThrottlingAllocationDecider.java | 17 +++++++- .../org/opensearch/index/IndexModule.java | 5 ++- .../org/opensearch/index/IndexService.java | 13 +++--- .../opensearch/index/shard/IndexShard.java | 40 +++++++++++++++---- .../opensearch/index/shard/StoreRecovery.java | 2 +- .../opensearch/indices/IndicesService.java | 5 ++- .../replication/SegmentReplicator.java | 4 +- .../opensearch/index/IndexModuleTests.java | 2 +- .../replication/SegmentReplicatorTests.java | 6 +-- .../index/shard/IndexShardTestCase.java | 4 +- 17 files changed, 102 insertions(+), 34 deletions(-) diff --git a/gradle/run.gradle b/gradle/run.gradle index 34651f1d94964..e647826ab5c7a 100644 --- a/gradle/run.gradle +++ b/gradle/run.gradle @@ -45,6 +45,7 @@ testClusters { plugin('plugins:'.concat(p)) } } + systemProperty("opensearch.experimental.feature.read.write.split.enabled", "true") } } diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index f97950f2652a3..72c88b95d9376 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -716,8 +716,8 @@ public static final IndexShard newIndexShard( null, DefaultRemoteStoreSettings.INSTANCE, false, - IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting) - ); + IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting), + (s, r) -> {}); } private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) { diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java index a1b512c326ac5..bac0ea68cc328 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java @@ -27,7 +27,7 @@ public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT { @Before public void randomizeRemoteStoreEnabled() { - useRemoteStore = randomBoolean(); + useRemoteStore = false; } @Override @@ -70,7 +70,7 @@ public void testReplication() throws Exception { internalCluster().startClusterManagerOnlyNode(); final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); - ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME);it final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index 9cc3bb21e2d12..375deb94c2a4a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -617,7 +617,7 @@ private Builder initializeEmpty(IndexMetadata indexMetadata, UnassignedInfo unas shardId, false, true, - PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled + EmptyStoreRecoverySource.INSTANCE, unassignedInfo ) ); diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index f25cb14f65eca..0d1574cf87cdb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -131,13 +131,15 @@ public class IndexShardRoutingTable extends AbstractDiffable sor = newShardRoutingTable.searchOnlyReplicas().stream() + .filter(r -> r.allocationId() != null) + .map(r -> r.allocationId().getId()).collect(Collectors.toSet()); + assert newShardRoutingTable.assignedShards() .stream() .filter(ShardRouting::isRelocationTarget) @@ -279,12 +284,20 @@ private IndexMetadata.Builder updateInSyncAllocations( + " than maximum possible active shards " + maxActiveShards; Set assignedAllocations = assignedShards.stream().map(s -> s.allocationId().getId()).collect(Collectors.toSet()); + logger.info("Filtering out {}", sor); inSyncAllocationIds = inSyncAllocationIds.stream() + .filter(id -> sor.contains(id) == false) .sorted(Comparator.comparing(assignedAllocations::contains).reversed()) // values with routing entries first .limit(maxActiveShards) .collect(Collectors.toSet()); } + logger.info("WTF {} {}", inSyncAllocationIds, sor); + inSyncAllocationIds = inSyncAllocationIds.stream() + .filter(id -> sor.contains(id) == false) + .collect(Collectors.toSet()); + logger.info("WTF {} {}", inSyncAllocationIds, sor); + // only remove allocation id of failed active primary if there is at least one active shard remaining. Assume for example that // the primary fails but there is no new primary to fail over to. If we were to remove the allocation id of the primary from the // in-sync set, this could create an empty primary on the next allocation. diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 4bde1e282fe78..19aaf417ad75a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -190,8 +190,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return allocation.decision(YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries); } } else { + if (shardRouting.isSearchOnly()) { + // search replicas recover from store and trigger a round of segRep before being marked active. + // the replication source can be either be another node for node-node replication or directly from the store. + // In a node-node case outgoing replication events are throttled by bytes/s transferred. + } // Peer recovery - assert initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.PEER; + assert initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.PEER || + initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.EMPTY_STORE; if (shardRouting.unassignedReasonIndexCreated()) { return allocateInitialShardCopies(shardRouting, node, allocation); @@ -274,6 +280,15 @@ private Decision allocateShardCopies( ); } } else { + if (shardRouting.isSearchOnly()) { + return allocation.decision( + YES, + NAME, + "below shard recovery limit of incoming: [%d < %d]", + currentInRecoveries, + inRecoveriesLimit + ); + } // search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId()); if (primaryShard == null) { diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 79de97dc96fba..3b3847aced286 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -104,6 +104,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -653,7 +654,7 @@ public IndexService newIndexService( clusterDefaultRefreshIntervalSupplier, recoverySettings, remoteStoreSettings, - (s) -> {} + (s, r) -> {} ); } @@ -679,7 +680,7 @@ public IndexService newIndexService( Supplier clusterDefaultRefreshIntervalSupplier, RecoverySettings recoverySettings, RemoteStoreSettings remoteStoreSettings, - Consumer replicator + BiConsumer replicator ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index f1b36194bf62d..174456946d366 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -126,6 +126,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -196,7 +197,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final RemoteStoreSettings remoteStoreSettings; private final FileCache fileCache; private final CompositeIndexSettings compositeIndexSettings; - private final Consumer replicator; + private final BiConsumer replicator; public IndexService( IndexSettings indexSettings, @@ -235,7 +236,7 @@ public IndexService( RemoteStoreSettings remoteStoreSettings, FileCache fileCache, CompositeIndexSettings compositeIndexSettings, - Consumer replicator + BiConsumer replicator ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -395,7 +396,7 @@ public IndexService( remoteStoreSettings, null, null, - s -> {} + (s, r) -> {} ); } @@ -691,7 +692,8 @@ protected void closeInternal() { recoverySettings, remoteStoreSettings, seedRemote, - discoveryNodes + discoveryNodes, + replicator ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); @@ -1408,8 +1410,9 @@ private void maybeSyncSegments(boolean force) { if (getRefreshInterval().millis() > 0 || force) { for (IndexShard shard : this.shards.values()) { try { + logger.info("Is shard active {} {}", shard.routingEntry().isSearchOnly(), shard.routingEntry().active()); if (shard.routingEntry().isSearchOnly() && shard.routingEntry().active()) { - replicator.accept(shard); + shard.syncSegments(); } } catch (IndexShardClosedException | AlreadyClosedException ex) { // do nothing diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 6e12e4ed3da1a..6410e3c6c03b9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -281,6 +281,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final PendingReplicationActions pendingReplicationActions; private final ReplicationTracker replicationTracker; private final SegmentReplicationCheckpointPublisher checkpointPublisher; + private final BiConsumer replicator; protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; @@ -391,8 +392,8 @@ public IndexShard( final RecoverySettings recoverySettings, final RemoteStoreSettings remoteStoreSettings, boolean seedRemote, - final DiscoveryNodes discoveryNodes - ) throws IOException { + final DiscoveryNodes discoveryNodes, + final BiConsumer replicator) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); this.shardRouting = shardRouting; @@ -493,6 +494,7 @@ public boolean shouldCache(Query query) { this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote); this.discoveryNodes = discoveryNodes; + this.replicator = replicator; } public ThreadPool getThreadPool() { @@ -2010,6 +2012,10 @@ public void resetToWriteableEngine() throws IOException, InterruptedException, T indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); }); } + public void syncSegments() { + replicator.accept(this, () -> {}); + } + /** * Wrapper for a non-closing reader * @@ -2514,8 +2520,10 @@ public void openEngineAndRecoverFromTranslog(boolean syncFromRemote) throws IOEx translogConfig.setDownloadRemoteTranslogOnInit(true); } - getEngine().translogManager() - .recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), Long.MAX_VALUE); + if (routingEntry().isSearchOnly() == false) { + getEngine().translogManager() + .recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), Long.MAX_VALUE); + } } /** @@ -2889,10 +2897,28 @@ public void recoverFromLocalShards( public void recoverFromStore(ActionListener listener) { // we are the first primary, recover from the gateway // if its post api allocation, the index should exists - assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; + assert shardRouting.primary() || shardRouting.isSearchOnly() : "recover from store only makes sense if the shard is a primary shard"; assert shardRouting.initializing() : "can only start recovery on initializing shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromStore(this, listener); + ActionListener wrappedListener = ActionListener.wrap( + success -> { + if (success) { + if (routingEntry().isSearchOnly()) { + replicator.accept(this, () -> { + logger.info("Finished replication as part of recovery"); + listener.onResponse(true); + }); + } else { + listener.onResponse(true); + } + } else { + listener.onResponse(false); + } + }, + listener::onFailure + ); + + storeRecovery.recoverFromStore(this, wrappedListener); } public void restoreFromRemoteStore(ActionListener listener) { @@ -3873,7 +3899,7 @@ private void executeRecovery( markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> { if (r) { - recoveryListener.onDone(recoveryState); + recoveryListener.onDone(recoveryState); } }, e -> recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action)); } diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 6933e4e161dd1..1bc378bf8b99a 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -544,7 +544,7 @@ private boolean canRecover(IndexShard indexShard) { // got closed on us, just ignore this recovery return false; } - if (indexShard.routingEntry().primary() == false) { + if (indexShard.routingEntry().primary() == false && indexShard.routingEntry().isSearchOnly() == false) { throw new IndexShardRecoveryException(shardId, "Trying to recover when the shard is in backup state", null); } return true; diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 4593aedfe1f83..bddf19f0e25c1 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -188,6 +188,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -359,7 +360,7 @@ public class IndicesService extends AbstractLifecycleComponent private final SearchRequestStats searchRequestStats; private final FileCache fileCache; private final CompositeIndexSettings compositeIndexSettings; - private final Consumer replicator; + private final BiConsumer replicator; @Override protected void doStart() { @@ -397,7 +398,7 @@ public IndicesService( RemoteStoreSettings remoteStoreSettings, FileCache fileCache, CompositeIndexSettings compositeIndexSettings, - Consumer replicator + BiConsumer replicator ) { this.settings = settings; this.threadPool = threadPool; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java index ad3bc1933208c..67be77d828860 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java @@ -57,7 +57,7 @@ public SegmentReplicator(ThreadPool threadPool) { * Starts a replication event for the given shard. * @param shard - {@link IndexShard} replica shard */ - public void startReplication(IndexShard shard) { + public void startReplication(IndexShard shard, Runnable runnable) { if (sourceFactory.get() == null) return; startReplication( shard, @@ -67,6 +67,7 @@ public void startReplication(IndexShard shard) { @Override public void onReplicationDone(SegmentReplicationState state) { logger.trace("Completed replication for {}", shard.shardId()); + runnable.run(); } @Override @@ -75,6 +76,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile if (sendShardFailure) { shard.failShard("unrecoverable replication failure", e); } + runnable.run(); } } ); diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index bd86d3d396987..da856c9f1939f 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -265,7 +265,7 @@ private IndexService newIndexService(IndexModule module) throws IOException { () -> IndexSettings.DEFAULT_REFRESH_INTERVAL, DefaultRecoverySettings.INSTANCE, DefaultRemoteStoreSettings.INSTANCE, - s -> {} + (s, r) -> {} ); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java index 7acee449a1b46..d1b87b3732960 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java @@ -52,7 +52,7 @@ public void testStartReplicationWithoutSourceFactory() { SegmentReplicator segmentReplicator = new SegmentReplicator(threadpool); IndexShard shard = mock(IndexShard.class); - segmentReplicator.startReplication(shard); + segmentReplicator.startReplication(shard, () -> {}); Mockito.verifyNoInteractions(mock); } @@ -101,7 +101,7 @@ public void getSegmentFiles( } }); segmentReplicator.setSourceFactory(factory); - segmentReplicator.startReplication(replica); + segmentReplicator.startReplication(replica, () -> {}); assertBusy(() -> assertDocCount(replica, numDocs)); closeShards(primary, replica); } @@ -145,7 +145,7 @@ public void getSegmentFiles( AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false); replica.addShardFailureCallback((ig) -> failureCallbackTriggered.set(true)); segmentReplicator.setSourceFactory(factory); - segmentReplicator.startReplication(replica); + segmentReplicator.startReplication(replica, () -> {}); assertBusy(() -> assertTrue(failureCallbackTriggered.get())); closeShards(primary, replica); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 655a9eb7d5d38..93dc98d03e727 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -717,8 +717,8 @@ protected IndexShard newShard( DefaultRecoverySettings.INSTANCE, DefaultRemoteStoreSettings.INSTANCE, false, - discoveryNodes - ); + discoveryNodes, + (s, r) -> {}); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) { remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);