diff --git a/CHANGELOG.md b/CHANGELOG.md index 19dc88c2f10f8..8e6d41aa9bb7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,7 +87,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386)) - [Segment Replication] Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366)) - [Segment Replication] Fix timeout issue by calculating time needed to process getSegmentFiles ([#4434](https://github.com/opensearch-project/OpenSearch/pull/4434)) -- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on segments_N from primary shards. +- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on segments_N from primary shards ([#4450](https://github.com/opensearch-project/OpenSearch/pull/4450)) +- [Segment Replication] Adding check to make sure checkpoint is not processed when a shard's shard routing is primary ([#4716](https://github.com/opensearch-project/OpenSearch/pull/4716)) ### Security diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index c57255806719e..8e91d071a6b9a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1432,6 +1432,10 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints."); return false; } + if (this.routingEntry().primary()) { + logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints."); + return false; + } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); if (localCheckpoint.isAheadOf(requestCheckpoint)) { logger.trace( diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 007317f6e71cd..da04ea1b9914b 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -62,6 +62,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { @@ -208,6 +209,36 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { closeShards(shard); } + /** + * here we are starting a new primary shard and testing that we don't process a checkpoint on a shard when it's shard routing is primary. + */ + public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { + IndexShard primaryShard = newStartedShard(true); + SegmentReplicationTargetService sut; + sut = prepareForReplication(primaryShard); + SegmentReplicationTargetService spy = spy(sut); + + // Starting a new shard in PrimaryMode and shard routing primary. + IndexShard spyShard = spy(primaryShard); + String id = primaryShard.routingEntry().allocationId().getId(); + + // Starting relocation handoff + primaryShard.getReplicationTracker().startRelocationHandoff(id); + + // Completing relocation handoff. + primaryShard.getReplicationTracker().completeRelocationHandoff(); + + // Assert that primary shard is no longer in Primary Mode and shard routing is still Primary + assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode()); + assertEquals(true, primaryShard.routingEntry().primary()); + + spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, 0L), spyShard); + + // Verify that checkpoint is not processed as shard routing is primary. + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(primaryShard); + } + public void testReplicaReceivesGenIncrease() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { shards.startAll();