Skip to content

Commit

Permalink
Fix SegmentReplication flaky integ tests (#8134)
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored Jun 27, 2023
1 parent 45a934d commit 0c7ba94
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.junit.Before;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
Expand Down Expand Up @@ -95,11 +96,16 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends SegmentReplicationBaseIT {

@Before
private void setup() {
internalCluster().startClusterManagerOnlyNode();
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
Expand All @@ -125,7 +131,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

// start another node, index another doc and replicate.
String nodeC = internalCluster().startNode();
String nodeC = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
Expand All @@ -134,10 +140,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
}

public void testRestartPrimary() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);
Expand All @@ -160,10 +166,10 @@ public void testRestartPrimary() throws Exception {

public void testCancelPrimaryAllocation() throws Exception {
// this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica.
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int initialDocCount = 1;
Expand All @@ -190,8 +196,8 @@ public void testCancelPrimaryAllocation() throws Exception {
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
final Settings settings = Settings.builder()
.put(indexSettings())
.put(
Expand Down Expand Up @@ -233,8 +239,8 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
}

public void testIndexReopenClose() throws Exception {
final String primary = internalCluster().startNode();
final String replica = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
final String replica = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -274,8 +280,8 @@ public void testMultipleShards() throws Exception {
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -310,8 +316,8 @@ public void testMultipleShards() throws Exception {
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -351,14 +357,13 @@ public void testReplicationAfterForceMerge() throws Exception {
* This test verifies that segment replication does not fail for closed indices
*/
public void testClosedIndices() {
internalCluster().startClusterManagerOnlyNode();
List<String> nodes = new ArrayList<>();
// start 1st node so that it contains the primary
nodes.add(internalCluster().startNode());
nodes.add(internalCluster().startDataOnlyNode());
createIndex(INDEX_NAME, super.indexSettings());
ensureYellowAndNoInitializingShards(INDEX_NAME);
// start 2nd node so that it contains the replica
nodes.add(internalCluster().startNode());
nodes.add(internalCluster().startDataOnlyNode());
ensureGreen(INDEX_NAME);

logger.info("--> Close index");
Expand All @@ -373,8 +378,7 @@ public void testClosedIndices() {
* @throws Exception when issue is encountered
*/
public void testNodeDropWithOngoingReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
Expand All @@ -385,7 +389,7 @@ public void testNodeDropWithOngoingReplication() throws Exception {
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
// Get replica allocation id
Expand Down Expand Up @@ -447,11 +451,11 @@ public void testNodeDropWithOngoingReplication() throws Exception {
}

public void testCancellation() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();

final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance(
SegmentReplicationSourceService.class,
Expand Down Expand Up @@ -506,7 +510,7 @@ public void testCancellation() throws Exception {
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

Expand All @@ -529,7 +533,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
Expand All @@ -544,8 +548,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
}

public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -591,9 +595,9 @@ public void testDeleteOperations() throws Exception {
*/
public void testReplicationPostDeleteAndForceMerge() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < initialDocCount; i++) {
Expand Down Expand Up @@ -648,7 +652,6 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception {
}

public void testUpdateOperations() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);
Expand Down Expand Up @@ -702,7 +705,6 @@ public void testDropPrimaryDuringReplication() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replica_count)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, settings);
final List<String> dataNodes = internalCluster().startDataOnlyNodes(6);
Expand Down Expand Up @@ -742,11 +744,10 @@ public void testDropPrimaryDuringReplication() throws Exception {
}

public void testReplicaHasDiffFilesThanPrimary() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
Expand Down Expand Up @@ -796,9 +797,9 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}

public void testPressureServiceStats() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
Expand Down Expand Up @@ -848,7 +849,7 @@ public void testPressureServiceStats() throws Exception {
assertEquals(0, replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats().size());

// start another replica.
String replicaNode_2 = internalCluster().startNode();
String replicaNode_2 = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
String docId = String.valueOf(initialDocCount + 1);
client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get();
Expand Down Expand Up @@ -887,10 +888,10 @@ public void testPressureServiceStats() throws Exception {
public void testScrollCreatedOnReplica() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// index 100 docs
Expand Down Expand Up @@ -981,15 +982,15 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
);

// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to control refreshes
.put("index.refresh_interval", -1)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int initialDocCount = 10;
Expand Down Expand Up @@ -1104,10 +1105,10 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
}

public void testPitCreatedOnReplica() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME)
.setId("1")
Expand Down Expand Up @@ -1234,13 +1235,13 @@ public void testPitCreatedOnReplica() throws Exception {
*/
public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
final List<String> nodes = new ArrayList<>();
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
nodes.add(primaryNode);
final Settings settings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build();
createIndex(INDEX_NAME, settings);
ensureGreen(INDEX_NAME);
// start a replica node, initially will be empty with no shard assignment.
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
nodes.add(replicaNode);

// index a doc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.remotestore;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -26,7 +25,6 @@
* This makes sure that the constructs/flows that are being tested with Segment Replication, holds true after enabling
* remote store.
*/
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7643")
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT {

Expand All @@ -49,7 +47,7 @@ protected Settings featureFlagSettings() {
}

@Before
public void setup() {
private void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
Expand Down
30 changes: 23 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1579,13 +1579,21 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
if (indexSettings.isSegRepEnabled() == false) {
return null;
}

Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>(
new GatedCloseable<>(null, () -> {}),
ReplicationCheckpoint.empty(shardId, getDefaultCodecName())
);

if (getEngineOrNull() == null) {
return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName()));
return nullSegmentInfosEmptyCheckpoint;
}
// do not close the snapshot - caller will close it.
final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot();
return Optional.ofNullable(snapshot.get()).map(segmentInfos -> {
try {
GatedCloseable<SegmentInfos> snapshot = null;
try {
snapshot = getSegmentInfosSnapshot();
if (snapshot.get() != null) {
SegmentInfos segmentInfos = snapshot.get();
return new Tuple<>(
snapshot,
new ReplicationCheckpoint(
Expand All @@ -1601,10 +1609,18 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
getEngine().config().getCodec().getName()
)
);
} catch (IOException e) {
throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e);
}
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName())));
} catch (IOException | AlreadyClosedException e) {
logger.error("Error Fetching SegmentInfos and latest checkpoint", e);
if (snapshot != null) {
try {
snapshot.close();
} catch (IOException ex) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e);
}
}
}
return nullSegmentInfosEmptyCheckpoint;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception {
/**
* Tests retry flow after snapshot and metadata files have been uploaded to remote store in the failed attempt.
* Snapshot and metadata files created in failed attempt should not break retry.
* @throws Exception
*/
public void testRefreshSuccessAfterFailureInFirstAttemptAfterSnapshotAndMetadataUpload() throws Exception {
int succeedOnAttempt = 1;
Expand Down

0 comments on commit 0c7ba94

Please sign in to comment.