From de44084baacbe83896f5e2c5805357b834ad8b6e Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 5 Jan 2023 15:28:45 -0800 Subject: [PATCH] Fix flaky SR test testStartReplicaAfterPrimaryIndexesDocs. This test was failing because we are validating post recovery if a shard is able to perform segrep while also performing validation if a passed in checkopint. In the post recovery test this checkpoint is always empty, yet the shard will be ahead of this checkpoint after docs are indexed. This change differentiates shard validation from checkpoint validation. Signed-off-by: Marc Handalian Fix spotless. Signed-off-by: Marc Handalian Fix testIsSegmentReplicationAllowed_WrongEngineType. Signed-off-by: Marc Handalian Update warn logs in isSegmentReplicationAllowed. Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 + .../replication/SegmentReplicationIT.java | 1 - .../opensearch/index/shard/IndexShard.java | 50 +++++++++++++++---- .../cluster/IndicesClusterStateService.java | 7 +-- .../SegmentReplicationIndexShardTests.java | 7 +++ 5 files changed, 50 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af5484aa0fd1a..6598c70fa4760 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460)) - Increasing timeout of testQuorumRecovery to 90 seconds from 30 ([#5651](https://github.com/opensearch-project/OpenSearch/pull/5651)) - [Segment Replication] Fix for peer recovery ([#5344](https://github.com/opensearch-project/OpenSearch/pull/5344)) +- Fixed flaky test SegmentReplicationIT.testStartReplicaAfterPrimaryIndexesDocs ([#5722](https://github.com/opensearch-project/OpenSearch/pull/5722)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index dc634fb10e387..340ef1c0149ef 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -521,7 +521,6 @@ public void testCancellation() throws Exception { assertDocCounts(docCount, primaryNode); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String primaryNode = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5a5356282681e..72e8854d9a3ec 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -74,6 +74,7 @@ import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.common.Booleans; import org.opensearch.common.CheckedConsumer; import org.opensearch.common.CheckedFunction; @@ -1454,22 +1455,53 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { } /** - * Checks if checkpoint should be processed - * - * @param requestCheckpoint received checkpoint that is checked for processing - * @return true if checkpoint should be processed + * Checks if this shard is able to perform segment replication. + * @return - True if the shard is able to perform segment replication. */ - public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { - if (state().equals(IndexShardState.STARTED) == false) { - logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); + public boolean isSegmentReplicationAllowed() { + if (indexSettings.isSegRepEnabled() == false) { + logger.warn("Attempting to perform segment replication when it is not enabled on the index"); return false; } if (getReplicationTracker().isPrimaryMode()) { - logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints."); + logger.warn("Shard is in primary mode and cannot perform segment replication as a replica."); return false; } if (this.routingEntry().primary()) { - logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints."); + logger.warn("Shard is marked as primary and cannot perform segment replication as a replica"); + return false; + } + if (state().equals(IndexShardState.STARTED) == false + && (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) { + logger.warn( + () -> new ParameterizedMessage( + "Shard is not started or recovering {} {} and cannot perform segment replication as a replica", + state(), + shardRouting.state() + ) + ); + return false; + } + if (getReplicationEngine().isEmpty()) { + logger.warn( + () -> new ParameterizedMessage( + "Shard does not have the correct engine type to perform segment replication {}.", + getEngine().getClass() + ) + ); + return false; + } + return true; + } + + /** + * Checks if checkpoint should be processed + * + * @param requestCheckpoint received checkpoint that is checked for processing + * @return true if checkpoint should be processed + */ + public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + if (isSegmentReplicationAllowed() == false) { return false; } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 83f4e0c7cbed9..e8adcbdc1c89a 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -47,7 +47,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingNode; @@ -811,11 +810,7 @@ private void forceSegmentReplication( StepListener forceSegRepListener ) { IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id()); - if (indexShard != null - && indexShard.indexSettings().isSegRepEnabled() - && shardRouting.primary() == false - && shardRouting.state() == ShardRoutingState.INITIALIZING - && indexShard.state() == IndexShardState.POST_RECOVERY) { + if (indexShard != null && indexShard.isSegmentReplicationAllowed()) { segmentReplicationTargetService.startReplication( ReplicationCheckpoint.empty(shardRouting.shardId()), indexShard, diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index c46f97b5ec785..44771faf36871 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -27,6 +27,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; @@ -97,6 +98,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException { closeShards(indexShard); } + public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException { + final IndexShard indexShard = newShard(false, settings, new InternalEngineFactory()); + assertFalse(indexShard.isSegmentReplicationAllowed()); + closeShards(indexShard); + } + public void testSegmentReplication_Index_Update_Delete() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) {