Skip to content

Commit

Permalink
Fix flaky SR test testStartReplicaAfterPrimaryIndexesDocs.
Browse files Browse the repository at this point in the history
This test was failing because we are validating post recovery if a shard is able to perform segrep while also performing validation if a passed in checkopint.  In the post recovery test this checkpoint is always empty, yet the shard will be ahead of this checkpoint after docs are indexed.  This change differentiates shard validation from checkpoint validation.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Jan 6, 2023
1 parent b3e25bb commit c2202ee
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,6 @@ public void testCancellation() throws Exception {
assertDocCounts(docCount, primaryNode);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
Expand Down
34 changes: 27 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.common.Booleans;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.CheckedFunction;
Expand Down Expand Up @@ -1438,15 +1439,13 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
}
}


/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
* Checks if this shard is able to perform segment replication.
* @return - True if the shard is able to perform segment replication.
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (state().equals(IndexShardState.STARTED) == false) {
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
public boolean isSegmentReplicationAllowed() {
if (indexSettings.isSegRepEnabled() == false) {
return false;
}
if (getReplicationTracker().isPrimaryMode()) {
Expand All @@ -1457,6 +1456,27 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints.");
return false;
}
if (state().equals(IndexShardState.STARTED) == false && (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) {
logger.warn(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started or recovering {} {}", state(), shardRouting.state()));
return false;
}
if (getReplicationEngine().isEmpty()) {
logger.warn(() -> new ParameterizedMessage("Ignoring checkpoint, attempting to perform segrep with wrong engine type {}", getEngine().getClass()));
return false;
}
return true;
}

/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (isSegmentReplicationAllowed() == false) {
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingNode;
Expand Down Expand Up @@ -811,11 +810,7 @@ private void forceSegmentReplication(
StepListener<Void> forceSegRepListener
) {
IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id());
if (indexShard != null
&& indexShard.indexSettings().isSegRepEnabled()
&& shardRouting.primary() == false
&& shardRouting.state() == ShardRoutingState.INITIALIZING
&& indexShard.state() == IndexShardState.POST_RECOVERY) {
if (indexShard != null && indexShard.isSegmentReplicationAllowed()) {
segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(shardRouting.shardId()),
indexShard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.mapper.MapperService;
Expand Down Expand Up @@ -90,6 +91,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException {
closeShards(indexShard);
}

public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException {
final IndexShard indexShard = newStartedShard(false, settings, new InternalEngineFactory());
assertFalse(indexShard.isSegmentReplicationAllowed());
closeShards(indexShard);
}

public void testSegmentReplication_Index_Update_Delete() throws Exception {
String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}";
try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) {
Expand Down

0 comments on commit c2202ee

Please sign in to comment.