Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky Segment Replication test testStartReplicaAfterPrimaryIndexesDocs. #5722

Merged
merged 2 commits into from
Jan 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,6 @@ public void testCancellation() throws Exception {
assertDocCounts(docCount, primaryNode);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
Expand Down
50 changes: 41 additions & 9 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.common.Booleans;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.CheckedFunction;
Expand Down Expand Up @@ -1454,22 +1455,53 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
}

/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
* Checks if this target shard should start a round of segment replication.
* @return - True if the shard is able to perform segment replication.
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (state().equals(IndexShardState.STARTED) == false) {
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
public boolean isSegmentReplicationAllowed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isSegmentReplicationAllowed does not sound that it is meant for target/replica. isSegRepSyncAllowed or isSegRepAllowedOnReplica ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not just invoked for replicas, even after primary is recovered it will be invoked from IndicesClusterStateService#forceSegmentReplication.

if (indexSettings.isSegRepEnabled() == false) {
logger.warn("Attempting to perform segment replication when it is not enabled on the index");
return false;
}
if (getReplicationTracker().isPrimaryMode()) {
logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints.");
logger.warn("Shard is in primary mode and cannot perform segment replication as a replica.");
return false;
}
if (this.routingEntry().primary()) {
logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints.");
logger.warn("Shard is marked as primary and cannot perform segment replication as a replica");
return false;
}
if (state().equals(IndexShardState.STARTED) == false
&& (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) {
logger.warn(
() -> new ParameterizedMessage(
"Shard is not started or recovering {} {} and cannot perform segment replication as a replica",
state(),
shardRouting.state()
)
);
return false;
}
if (getReplicationEngine().isEmpty()) {
logger.warn(
() -> new ParameterizedMessage(
"Shard does not have the correct engine type to perform segment replication {}.",
getEngine().getClass()
)
);
return false;
}
return true;
}

/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (isSegmentReplicationAllowed() == false) {
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingNode;
Expand Down Expand Up @@ -811,11 +810,7 @@ private void forceSegmentReplication(
StepListener<Void> forceSegRepListener
) {
IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id());
if (indexShard != null
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
&& indexShard.indexSettings().isSegRepEnabled()
&& shardRouting.primary() == false
&& shardRouting.state() == ShardRoutingState.INITIALIZING
&& indexShard.state() == IndexShardState.POST_RECOVERY) {
if (indexShard != null && indexShard.isSegmentReplicationAllowed()) {
segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(shardRouting.shardId()),
indexShard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.mapper.MapperService;
Expand Down Expand Up @@ -97,6 +98,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException {
closeShards(indexShard);
}

public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException {
final IndexShard indexShard = newShard(false, settings, new InternalEngineFactory());
assertFalse(indexShard.isSegmentReplicationAllowed());
closeShards(indexShard);
}

public void testSegmentReplication_Index_Update_Delete() throws Exception {
String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}";
try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) {
Expand Down