Skip to content

Commit

Permalink
Add changelog entry, address review comments, add failover test
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Sep 16, 2022
1 parent 6fda602 commit 988d007
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 25 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- 2.3.0 release notes ([#4457](https://github.com/opensearch-project/OpenSearch/pull/4457))
- Added missing javadocs for `:distribution:tools` modules ([#4483](https://github.com/opensearch-project/OpenSearch/pull/4483))
- Add BWC version 2.3.1 ([#4513](https://github.com/opensearch-project/OpenSearch/pull/4513))
- [Segment Replication] Add snapshot and restore tests for segment replication feature ([#3993](https://github.com/opensearch-project/OpenSearch/pull/3993))

### Dependencies
- Bumps `reactive-streams` from 1.0.3 to 1.0.4
Expand Down Expand Up @@ -91,4 +92,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)


[Unreleased]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...HEAD
[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x
[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand All @@ -44,7 +48,7 @@ public class SegmentReplicationSnapshotIT extends AbstractSnapshotIntegTestCase

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
// assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
}

public Settings segRepEnableIndexSettings() {
Expand All @@ -62,6 +66,14 @@ public Settings.Builder getShardSettings() {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT);
}

public Settings restoreIndexSegRepSettings() {
return Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
}

public Settings restoreIndexDocRepSettings() {
return Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
Expand All @@ -70,7 +82,7 @@ protected boolean addMockInternalEngine() {
public void ingestData(int docCount, String indexName) throws Exception {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
indexName,
"_doc",
client(),
-1,
Expand All @@ -85,14 +97,20 @@ public void ingestData(int docCount, String indexName) throws Exception {
}
}

public void startClusterWithSettings(Settings indexSettings) throws Exception {
// Create 2 node cluster
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
// Start cluster with provided settings and return the node names as list
public List<String> startClusterWithSettings(Settings indexSettings, int replicaCount) throws Exception {
// Start primary
final String primaryNode = internalCluster().startNode();
List<String> nodeNames = new ArrayList<>();
nodeNames.add(primaryNode);
for(int i=0; i<replicaCount; i++) {
nodeNames.add(internalCluster().startNode());
}
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);
// Ingest data
ingestData(DOC_COUNT, INDEX_NAME);
return nodeNames;
}

public void createSnapshot() {
Expand Down Expand Up @@ -129,7 +147,8 @@ public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSetting
}

public void testRestoreOnSegRep() throws Exception {
startClusterWithSettings(segRepEnableIndexSettings());
// Start cluster with one primary and one replica node
startClusterWithSettings(segRepEnableIndexSettings(), 1);
createSnapshot();
// Delete index
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME));
Expand All @@ -152,12 +171,11 @@ public void testRestoreOnSegRep() throws Exception {
assertHitCount(resp, DOC_COUNT);
}

public void testRestoreOnSegRepDuringIngestion() throws Exception {
startClusterWithSettings(segRepEnableIndexSettings());
public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Exception {
startClusterWithSettings(segRepEnableIndexSettings(), 1);
createSnapshot();
// Delete index
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME));
Thread.sleep(5000);
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME));

logger.info("Restore from snapshot");
Expand All @@ -174,18 +192,17 @@ public void testRestoreOnSegRepDuringIngestion() throws Exception {
.get();
assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT");
SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT);
assertHitCount(resp, DOC_COUNT + 5000);
}

public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception {
startClusterWithSettings(docRepEnableIndexSettings());
startClusterWithSettings(docRepEnableIndexSettings(), 1);
createSnapshot();
// Delete index
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME));
Thread.sleep(5000);
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());

logger.info("Restore from snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(segRepEnableIndexSettings());
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexSegRepSettings());

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
Expand All @@ -202,14 +219,14 @@ public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception {
}

public void testSnapshotOnSegRep_RestoreOnDocRep() throws Exception {
startClusterWithSettings(segRepEnableIndexSettings());
// Start a cluster with one primary and one replica
startClusterWithSettings(segRepEnableIndexSettings(), 1);
createSnapshot();
// Delete index
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME));
Thread.sleep(5000);
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());

logger.info("Restore from snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(docRepEnableIndexSettings());
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings());

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
Expand All @@ -225,14 +242,13 @@ public void testSnapshotOnSegRep_RestoreOnDocRep() throws Exception {
}

public void testSnapshotOnDocRep_RestoreOnDocRep() throws Exception {
startClusterWithSettings(docRepEnableIndexSettings());
startClusterWithSettings(docRepEnableIndexSettings(), 1);
createSnapshot();
// Delete index
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME));
Thread.sleep(5000);
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());

logger.info("Restore from snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(docRepEnableIndexSettings());
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings());

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
Expand All @@ -247,4 +263,31 @@ public void testSnapshotOnDocRep_RestoreOnDocRep() throws Exception {
SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT);
}

public void testRestoreOnReplicaNode() throws Exception {
List<String> nodeNames = startClusterWithSettings(segRepEnableIndexSettings(), 1);
final String primaryNode = nodeNames.get(0);
createSnapshot();
// Delete index
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME));

// stop the primary node so that restoration happens on replica node
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));

RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null);

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
logger.info("Ensure cluster is green");
internalCluster().startNode();
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME))
.get();
assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT");
SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT);
}
}

0 comments on commit 988d007

Please sign in to comment.