Skip to content

Commit

Permalink
Add shard routing primary check when processing a checkpoint. (#4716)
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Oct 10, 2022
1 parent 2a7e46b commit a4b990d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386))
- [Segment Replication] Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366))
- [Segment Replication] Fix timeout issue by calculating time needed to process getSegmentFiles ([#4434](https://github.com/opensearch-project/OpenSearch/pull/4434))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on segments_N from primary shards.
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on segments_N from primary shards ([#4450](https://github.com/opensearch-project/OpenSearch/pull/4450))
- [Segment Replication] Adding check to make sure checkpoint is not processed when a shard's shard routing is primary ([#4716](https://github.com/opensearch-project/OpenSearch/pull/4716))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,10 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints.");
return false;
}
if (this.routingEntry().primary()) {
logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints.");
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;

public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase {

Expand Down Expand Up @@ -208,6 +209,36 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException {
closeShards(shard);
}

/**
* here we are starting a new primary shard and testing that we don't process a checkpoint on a shard when it's shard routing is primary.
*/
public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
IndexShard primaryShard = newStartedShard(true);
SegmentReplicationTargetService sut;
sut = prepareForReplication(primaryShard);
SegmentReplicationTargetService spy = spy(sut);

// Starting a new shard in PrimaryMode and shard routing primary.
IndexShard spyShard = spy(primaryShard);
String id = primaryShard.routingEntry().allocationId().getId();

// Starting relocation handoff
primaryShard.getReplicationTracker().startRelocationHandoff(id);

// Completing relocation handoff.
primaryShard.getReplicationTracker().completeRelocationHandoff();

// Assert that primary shard is no longer in Primary Mode and shard routing is still Primary
assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode());
assertEquals(true, primaryShard.routingEntry().primary());

spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, 0L), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(primaryShard);
}

public void testReplicaReceivesGenIncrease() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
Expand Down

0 comments on commit a4b990d

Please sign in to comment.