Skip to content

Commit

Permalink
Reduce amount of docs ingested for SegRep ITs
Browse files Browse the repository at this point in the history
This change reduces the amount of docs we ingest inside SegmentReplicationIT. These tests
were often ingesting 1-200 docs where it was not required. Many only required a few so that
segments are created.
This speeds up tests when run with remote store through SegmentReplicationUsingRemoteStore.
Also add Testlogging annotation on the replication package for known flaky tests.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Oct 12, 2023
1 parent 6c02261 commit 7ded839
Showing 1 changed file with 40 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -240,7 +241,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
createIndex(INDEX_NAME, settings);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
final int initialDocCount = scaledRandomIntBetween(0, 10);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand All @@ -257,7 +258,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, nodeA, nodeB);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int additionalDocCount = scaledRandomIntBetween(0, 10);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
Expand All @@ -276,7 +277,7 @@ public void testIndexReopenClose() throws Exception {
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(100, 200);
final int initialDocCount = scaledRandomIntBetween(1, 10);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand Down Expand Up @@ -311,7 +312,7 @@ public void testScrollWithConcurrentIndexAndSearch() throws Exception {
ensureGreen(INDEX_NAME);
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
final List<ActionFuture<SearchResponse>> pendingSearchResponse = new ArrayList<>();
final int searchCount = randomIntBetween(10, 20);
final int searchCount = randomIntBetween(1, 2);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());

for (int i = 0; i < searchCount; i++) {
Expand Down Expand Up @@ -356,6 +357,7 @@ public void testScrollWithConcurrentIndexAndSearch() throws Exception {
waitForSearchableDocs(INDEX_NAME, 2 * searchCount, List.of(primary, replica));
}

@TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE")
public void testMultipleShards() throws Exception {
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
Expand All @@ -369,7 +371,7 @@ public void testMultipleShards() throws Exception {
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(1, 200);
final int initialDocCount = scaledRandomIntBetween(1, 10);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand All @@ -386,7 +388,7 @@ public void testMultipleShards() throws Exception {
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, nodeA, nodeB);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int additionalDocCount = scaledRandomIntBetween(0, 10);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
Expand All @@ -405,8 +407,8 @@ public void testReplicationAfterForceMerge() throws Exception {
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int initialDocCount = scaledRandomIntBetween(0, 10);
final int additionalDocCount = scaledRandomIntBetween(0, 10);
final int expectedHitCount = initialDocCount + additionalDocCount;
try (
BackgroundIndexer indexer = new BackgroundIndexer(
Expand Down Expand Up @@ -511,7 +513,7 @@ public void testNodeDropWithOngoingReplication() throws Exception {
connection.sendRequest(requestId, action, request, options);
}
);
final int docCount = scaledRandomIntBetween(10, 200);
final int docCount = scaledRandomIntBetween(1, 10);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
Expand Down Expand Up @@ -572,7 +574,7 @@ public void testCancellation() throws Exception {
}
);

final int docCount = scaledRandomIntBetween(0, 200);
final int docCount = scaledRandomIntBetween(0, 10);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand Down Expand Up @@ -632,13 +634,14 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
verifyStoreContent();
}

@TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE")
public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(1, 20);
final int initialDocCount = scaledRandomIntBetween(1, 5);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand All @@ -655,7 +658,7 @@ public void testDeleteOperations() throws Exception {
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, nodeA, nodeB);

final int additionalDocCount = scaledRandomIntBetween(0, 20);
final int additionalDocCount = scaledRandomIntBetween(0, 2);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
Expand Down Expand Up @@ -684,14 +687,14 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception {
createIndex(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(10, 200);
final int initialDocCount = scaledRandomIntBetween(1, 10);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, primary, replica);

final int deletedDocCount = randomIntBetween(10, initialDocCount);
final int deletedDocCount = randomIntBetween(1, initialDocCount);
for (int i = 0; i < deletedDocCount; i++) {
client(primary).prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
}
Expand All @@ -712,7 +715,7 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception {
);

// add some docs to the xlog and drop primary.
final int additionalDocs = randomIntBetween(1, 50);
final int additionalDocs = randomIntBetween(1, 5);
for (int i = initialDocCount; i < initialDocCount + additionalDocs; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
Expand Down Expand Up @@ -743,7 +746,7 @@ public void testUpdateOperations() throws Exception {
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
final int initialDocCount = scaledRandomIntBetween(1, 5);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand All @@ -760,7 +763,7 @@ public void testUpdateOperations() throws Exception {
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, asList(primary, replica));

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int additionalDocCount = scaledRandomIntBetween(0, 5);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
Expand Down Expand Up @@ -795,7 +798,7 @@ public void testDropPrimaryDuringReplication() throws Exception {
final List<String> dataNodes = internalCluster().startDataOnlyNodes(6);
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
int initialDocCount = scaledRandomIntBetween(5, 10);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand Down Expand Up @@ -829,6 +832,7 @@ public void testDropPrimaryDuringReplication() throws Exception {
}
}

@TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE")
public void testReplicaHasDiffFilesThanPrimary() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
Expand All @@ -840,7 +844,7 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
IndexWriterConfig iwc = newIndexWriterConfig().setOpenMode(IndexWriterConfig.OpenMode.APPEND);

// create a doc to index
int numDocs = 2 + random().nextInt(100);
int numDocs = 2 + random().nextInt(10);

List<Document> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
Expand Down Expand Up @@ -869,7 +873,7 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
replicaShard.finalizeReplication(segmentInfos);
ensureYellow(INDEX_NAME);

final int docCount = scaledRandomIntBetween(10, 200);
final int docCount = scaledRandomIntBetween(10, 20);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
// Refresh, this should trigger round of segment replication
Expand All @@ -889,7 +893,7 @@ public void testPressureServiceStats() throws Exception {
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
int initialDocCount = scaledRandomIntBetween(10, 20);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand Down Expand Up @@ -995,8 +999,8 @@ public void testScrollCreatedOnReplica() throws Exception {
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// index 100 docs
for (int i = 0; i < 100; i++) {
// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
Expand Down Expand Up @@ -1032,7 +1036,7 @@ public void testScrollCreatedOnReplica() throws Exception {
// force call flush
flush(INDEX_NAME);

for (int i = 3; i < 50; i++) {
for (int i = 3; i < 5; i++) {
client().prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);
if (randomBoolean()) {
Expand Down Expand Up @@ -1070,7 +1074,7 @@ public void testScrollCreatedOnReplica() throws Exception {

currentFiles = List.of(replicaShard.store().directory().listAll());
assertFalse("Files should be cleaned up post scroll clear request", currentFiles.containsAll(snapshottedSegments));
assertEquals(100, scrollHits);
assertEquals(10, scrollHits);
}

/**
Expand Down Expand Up @@ -1215,19 +1219,8 @@ public void testPitCreatedOnReplica() throws Exception {
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME)
.setId("1")
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);

client().prepareIndex(INDEX_NAME)
.setId("2")
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
for (int i = 3; i < 100; i++) {
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo", randomInt())
Expand Down Expand Up @@ -1277,7 +1270,7 @@ public void testPitCreatedOnReplica() throws Exception {
}

flush(INDEX_NAME);
for (int i = 101; i < 200; i++) {
for (int i = 11; i < 20; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo", randomInt())
Expand Down Expand Up @@ -1832,7 +1825,7 @@ public void testSendCorruptBytesToReplica() throws Exception {
}
}
);
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 5; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
Expand All @@ -1846,7 +1839,7 @@ public void testSendCorruptBytesToReplica() throws Exception {
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
// reset checkIndex to ensure our original shard doesn't throw
resetCheckIndexStatus();
waitForSearchableDocs(100, primaryNode, replicaNode);
waitForSearchableDocs(5, primaryNode, replicaNode);
}

public void testWipeSegmentBetweenSyncs() throws Exception {
Expand All @@ -1865,21 +1858,19 @@ public void testWipeSegmentBetweenSyncs() throws Exception {
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(0))
.setSource(jsonBuilder().startObject().field("field", 0).endObject())
.get();
refresh(INDEX_NAME);
ensureGreen(INDEX_NAME);
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);

final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
waitForSearchableDocs(INDEX_NAME, 10, List.of(replicaNode));
waitForSearchableDocs(INDEX_NAME, 1, List.of(replicaNode));
indexShard.store().directory().deleteFile("_0.si");

for (int i = 11; i < 21; i++) {
for (int i = 1; i < 5; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
Expand All @@ -1888,7 +1879,7 @@ public void testWipeSegmentBetweenSyncs() throws Exception {
refresh(INDEX_NAME);
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
resetCheckIndexStatus();
waitForSearchableDocs(20, primaryNode, replicaNode);
waitForSearchableDocs(5, primaryNode, replicaNode);
}

private void waitForNewPeerRecovery(String replicaNode, long originalRecoveryTime) throws Exception {
Expand Down

0 comments on commit 7ded839

Please sign in to comment.