From 3c8eafdb7d59647b169e8dd9e5b00e64b82a01e6 Mon Sep 17 00:00:00 2001 From: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Date: Tue, 16 Apr 2024 09:30:27 +0530 Subject: [PATCH] [Remote Store - Dual Replication] Create missing Retention Leases for docrep shard copies during failover (#13159) Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> --- .../RemoteDualReplicationIT.java | 60 +++++++++++++++++-- .../index/seqno/ReplicationTracker.java | 35 +++++++++-- 2 files changed, 86 insertions(+), 9 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java index 34b60d5f3e9b3..e316bae5d8ebc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java @@ -41,6 +41,7 @@ public class RemoteDualReplicationIT extends MigrationBaseTestCase { private final String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica"; private final String REMOTE_PRI_DOCREP_REMOTE_REP = "remote-primary-docrep-remote-replica"; private final String FAILOVER_REMOTE_TO_DOCREP = "failover-remote-to-docrep"; + private final String FAILOVER_REMOTE_TO_REMOTE = "failover-remote-to-remote"; @Override protected Collection> nodePlugins() { @@ -241,14 +242,63 @@ RLs on remote enabled copies are brought up to (GlobalCkp + 1) upon a flush requ */ extraSettings = Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), "3s").build(); testRemotePrimaryDocRepAndRemoteReplica(); - DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); - assertBusy(() -> { - for (ShardStats shardStats : internalCluster().client() + pollAndCheckRetentionLeases(REMOTE_PRI_DOCREP_REMOTE_REP); + } + + public void testMissingRetentionLeaseCreatedOnFailedOverRemoteReplica() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + + logger.info("---> Starting docrep data node"); + internalCluster().startDataOnlyNode(); + + Settings zeroReplicasAndOverridenSyncIntervals = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .build(); + createIndex(FAILOVER_REMOTE_TO_REMOTE, zeroReplicasAndOverridenSyncIntervals); + ensureGreen(FAILOVER_REMOTE_TO_REMOTE); + + indexBulk(FAILOVER_REMOTE_TO_REMOTE, 100); + + logger.info("---> Starting first remote node"); + initDocRepToRemoteMigration(); + addRemote = true; + String firstRemoteNode = internalCluster().startDataOnlyNode(); + String primaryShardHostingNode = primaryNodeName(FAILOVER_REMOTE_TO_REMOTE); + logger.info("---> Moving primary copy from {} to remote enabled node {}", primaryShardHostingNode, firstRemoteNode); + assertAcked( + internalCluster().client() + .admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_REMOTE, 0, primaryShardHostingNode, firstRemoteNode)) + .get() + ); + ensureGreen(FAILOVER_REMOTE_TO_REMOTE); + assertReplicaAndPrimaryConsistency(FAILOVER_REMOTE_TO_REMOTE, 100, 0); + + String secondRemoteNode = internalCluster().startDataOnlyNode(); + Settings twoReplicas = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build(); + assertAcked( + internalCluster().client() .admin() .indices() - .prepareStats(REMOTE_PRI_DOCREP_REMOTE_REP) + .prepareUpdateSettings() + .setIndices(FAILOVER_REMOTE_TO_REMOTE) + .setSettings(twoReplicas) .get() - .getShards()) { + ); + ensureGreen(FAILOVER_REMOTE_TO_REMOTE); + + logger.info("---> Checking retention leases"); + pollAndCheckRetentionLeases(FAILOVER_REMOTE_TO_REMOTE); + } + + private void pollAndCheckRetentionLeases(String indexName) throws Exception { + DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); + assertBusy(() -> { + for (ShardStats shardStats : internalCluster().client().admin().indices().prepareStats(indexName).get().getShards()) { ShardRouting shardRouting = shardStats.getShardRouting(); DiscoveryNode discoveryNode = nodes.get(shardRouting.currentNodeId()); RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases(); diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 599c54f293e89..39b1cc130d6a5 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -255,6 +255,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private final Function isShardOnRemoteEnabledNode; + /** + * Flag to indicate whether {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)} + * has been run successfully + */ + private boolean createdMissingRetentionLeases; + /** * Get all retention leases tracked on this shard. * @@ -955,7 +961,13 @@ private boolean invariant() { assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked"; } - if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) { + if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases + // Skip assertion if createMissingPeerRecoveryRetentionLeases has not yet run after activating primary context + // This is required since during an ongoing remote store migration, + // remote enabled primary taking over primary context from another remote enabled shard + // might not have retention leases for docrep shard copies + // (since all RetentionLease sync actions are blocked on remote shard copies) + && createdMissingRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { final CheckpointState cps = checkpoints.get(shardRouting.allocationId().getId()); @@ -1843,19 +1855,34 @@ private synchronized void setHasAllPeerRecoveryRetentionLeases() { assert invariant(); } + private synchronized void setCreatedMissingRetentionLeases() { + createdMissingRetentionLeases = true; + assert invariant(); + } + public synchronized boolean hasAllPeerRecoveryRetentionLeases() { return hasAllPeerRecoveryRetentionLeases; } /** - * Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version - * prior to {@code LegacyESVersion#V_7_4_0} that does not create peer-recovery retention leases. + * Create any required peer-recovery retention leases that do not currently exist. This can happen if either: + * - We just did a rolling upgrade from a version prior to {@code LegacyESVersion#V_7_4_0} that does not create peer-recovery retention leases. + * - In a mixed mode cluster (during remote store migration), a remote enabled primary shard copy fails over to another remote enabled shard copy, + * but the replication group still has other shards in docrep nodes */ public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener listener) { - if (hasAllPeerRecoveryRetentionLeases == false) { + // Create missing RetentionLeases if the primary is on a remote enabled + // and the replication group has at-least one shard copy in docrep enabled node + // No-Op if retention leases for the tracked shard copy already exists + boolean createMissingRetentionLeasesDuringMigration = indexSettings.isAssignedOnRemoteNode() + && replicationGroup.getReplicationTargets() + .stream() + .anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId()) == false); + if (hasAllPeerRecoveryRetentionLeases == false || createMissingRetentionLeasesDuringMigration) { final List shardRoutings = routingTable.assignedShards(); final GroupedActionListener groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> { setHasAllPeerRecoveryRetentionLeases(); + setCreatedMissingRetentionLeases(); listener.onResponse(null); }, listener::onFailure), shardRoutings.size()); for (ShardRouting shardRouting : shardRoutings) {