diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java new file mode 100644 index 0000000000000..5f7433126db57 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.Nullable; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static org.opensearch.test.OpenSearchIntegTestCase.client; +import static org.opensearch.test.OpenSearchTestCase.assertBusy; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase { + + protected static final String INDEX_NAME = "test-idx-1"; + protected static final int SHARD_COUNT = 1; + protected static final int REPLICA_COUNT = 1; + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); + } + + @Override + protected Collection> nodePlugins() { + return asList(MockTransportService.TestPlugin.class); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + @Nullable + protected ShardRouting getShardRoutingForNodeName(String nodeName) { + final ClusterState state = getClusterState(); + for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) { + for (ShardRouting shardRouting : shardRoutingTable.activeShards()) { + final String nodeId = shardRouting.currentNodeId(); + final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId); + if (discoveryNode.getName().equals(nodeName)) { + return shardRouting; + } + } + } + return null; + } + + protected void assertDocCounts(int expectedDocCount, String... nodeNames) { + for (String node : nodeNames) { + assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount); + } + } + + protected ClusterState getClusterState() { + return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + } + + protected DiscoveryNode getNodeContainingPrimaryShard() { + final ClusterState state = getClusterState(); + final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard(); + return state.nodes().resolveNode(primaryShard.currentNodeId()); + } + + /** + * Waits until all given nodes have at least the expected docCount. + * + * @param docCount - Expected Doc count. + * @param nodes - List of node names. + */ + protected void waitForSearchableDocs(long docCount, List nodes) throws Exception { + // wait until the replica has the latest segment generation. + assertBusy(() -> { + for (String node : nodes) { + final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + final long hits = response.getHits().getTotalHits().value; + if (hits < docCount) { + fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits); + } + } + }, 1, TimeUnit.MINUTES); + } + + protected void waitForSearchableDocs(long docCount, String... nodes) throws Exception { + waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList())); + } + + protected void verifyStoreContent() throws Exception { + assertBusy(() -> { + final ClusterState clusterState = getClusterState(); + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { + final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); + final String indexName = primaryRouting.getIndexName(); + final List replicaRouting = shardRoutingTable.replicaShards(); + final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName); + final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); + for (ShardRouting replica : replicaRouting) { + IndexShard replicaShard = getIndexShard(clusterState, replica, indexName); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( + primarySegmentMetadata, + replicaShard.getSegmentMetadataMap() + ); + if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { + fail( + "Expected no missing or different segments between primary and replica but diff was missing: " + + recoveryDiff.missing + + " Different: " + + recoveryDiff.different + + " Primary Replication Checkpoint : " + + primaryShard.getLatestReplicationCheckpoint() + + " Replica Replication Checkpoint: " + + replicaShard.getLatestReplicationCheckpoint() + ); + } + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + replicaShard.store().readLastCommittedSegmentsInfo(); + } + } + } + }, 1, TimeUnit.MINUTES); + } + + private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) { + return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName); + } + + protected IndexShard getIndexShard(String node, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return indexService.getShard(shardId.get()); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index dc634fb10e387..0101379321932 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -11,51 +11,31 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.action.admin.indices.segments.IndexShardSegments; -import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; -import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; -import org.opensearch.action.admin.indices.segments.ShardSegments; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; -import org.opensearch.common.Nullable; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.index.Index; import org.opensearch.index.IndexModule; -import org.opensearch.index.IndexService; -import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.plugins.Plugin; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; -import java.io.IOException; -import java.util.Collection; -import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.Optional; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; +import static java.util.Arrays.asList; import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -63,70 +43,19 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SegmentReplicationIT extends OpenSearchIntegTestCase { - - protected static final String INDEX_NAME = "test-idx-1"; - protected static final int SHARD_COUNT = 1; - protected static final int REPLICA_COUNT = 1; - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); - } - - @Override - public Settings indexSettings() { - return Settings.builder() - .put(super.indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .build(); - } - - @Override - protected boolean addMockInternalEngine() { - return false; - } - - @Override - protected Settings featureFlagSettings() { - return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); - } - - public void ingestDocs(int docCount) throws Exception { - try ( - BackgroundIndexer indexer = new BackgroundIndexer( - INDEX_NAME, - "_doc", - client(), - -1, - RandomizedTest.scaledRandomIntBetween(2, 5), - false, - random() - ) - ) { - indexer.start(docCount); - waitForDocs(docCount, indexer); - refresh(INDEX_NAME); - waitForReplicaUpdate(); - } - } - +public class SegmentReplicationIT extends SegmentReplicationBaseIT { + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryStopped_ReplicaPromoted() throws Exception { - final String primary = internalCluster().startNode(featureFlagSettings()); + final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(featureFlagSettings()); + final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + waitForSearchableDocs(1, primary, replica); // index another doc but don't refresh, we will ensure this is searchable once replica is promoted. client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); @@ -138,6 +67,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); assertNotNull(replicaShardRouting); assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); + refresh(INDEX_NAME); assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); // assert we can index into the new primary. @@ -145,21 +75,19 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); // start another node, index another doc and replicate. - String nodeC = internalCluster().startNode(featureFlagSettings()); + String nodeC = internalCluster().startNode(); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeC).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(4, nodeC, replica); + verifyStoreContent(); } public void testRestartPrimary() throws Exception { - final String primary = internalCluster().startNode(featureFlagSettings()); + final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(featureFlagSettings()); + final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); assertEquals(getNodeContainingPrimaryShard().getName(), primary); @@ -168,8 +96,7 @@ public void testRestartPrimary() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertDocCounts(initialDocCount, replica, primary); + waitForSearchableDocs(initialDocCount, replica, primary); internalCluster().restartNode(primary); ensureGreen(INDEX_NAME); @@ -177,18 +104,16 @@ public void testRestartPrimary() throws Exception { assertEquals(getNodeContainingPrimaryShard().getName(), replica); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertDocCounts(initialDocCount, replica, primary); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(initialDocCount, replica, primary); + verifyStoreContent(); } public void testCancelPrimaryAllocation() throws Exception { // this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica. - final String primary = internalCluster().startNode(featureFlagSettings()); + final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(featureFlagSettings()); + final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); final int initialDocCount = 1; @@ -196,10 +121,9 @@ public void testCancelPrimaryAllocation() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertDocCounts(initialDocCount, replica, primary); + waitForSearchableDocs(initialDocCount, replica, primary); - final IndexShard indexShard = getIndexShard(primary); + final IndexShard indexShard = getIndexShard(primary, INDEX_NAME); client().admin() .cluster() .prepareReroute() @@ -211,26 +135,25 @@ public void testCancelPrimaryAllocation() throws Exception { assertEquals(getNodeContainingPrimaryShard().getName(), replica); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertDocCounts(initialDocCount, replica, primary); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(initialDocCount, replica, primary); + verifyStoreContent(); } /** * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. - * + *

* TODO: Ignoring this test as its flaky and needs separate fix */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testAddNewReplicaFailure() throws Exception { logger.info("--> starting [Primary Node] ..."); - final String primaryNode = internalCluster().startNode(featureFlagSettings()); + final String primaryNode = internalCluster().startNode(); logger.info("--> creating test index ..."); prepareCreate( INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ).get(); logger.info("--> index 10 docs"); @@ -248,7 +171,7 @@ public void testAddNewReplicaFailure() throws Exception { assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); logger.info("--> start empty node to add replica shard"); - final String replicaNode = internalCluster().startNode(featureFlagSettings()); + final String replicaNode = internalCluster().startNode(); // Mock transport service to add behaviour of throwing corruption exception during segment replication process. MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( @@ -289,10 +212,9 @@ public void testAddNewReplicaFailure() throws Exception { assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { - final String nodeA = internalCluster().startNode(featureFlagSettings()); - final String nodeB = internalCluster().startNode(featureFlagSettings()); + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -311,10 +233,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; @@ -322,18 +241,16 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { waitForDocs(expectedHitCount, indexer); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + verifyStoreContent(); } } public void testIndexReopenClose() throws Exception { - final String primary = internalCluster().startNode(featureFlagSettings()); - final String replica = internalCluster().startNode(featureFlagSettings()); + final String primary = internalCluster().startNode(); + final String replica = internalCluster().startNode(); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -352,12 +269,8 @@ public void testIndexReopenClose() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); flush(INDEX_NAME); - waitForReplicaUpdate(); + waitForSearchableDocs(initialDocCount, primary, replica); } - - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - logger.info("--> Closing the index "); client().admin().indices().prepareClose(INDEX_NAME).get(); @@ -365,8 +278,8 @@ public void testIndexReopenClose() throws Exception { client().admin().indices().prepareOpen(INDEX_NAME).get(); ensureGreen(INDEX_NAME); - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, primary, replica); + verifyStoreContent(); } public void testMultipleShards() throws Exception { @@ -377,8 +290,8 @@ public void testMultipleShards() throws Exception { .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - final String nodeA = internalCluster().startNode(featureFlagSettings()); - final String nodeB = internalCluster().startNode(featureFlagSettings()); + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); createIndex(INDEX_NAME, indexSettings); ensureGreen(INDEX_NAME); @@ -397,10 +310,7 @@ public void testMultipleShards() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; @@ -408,18 +318,16 @@ public void testMultipleShards() throws Exception { waitForDocs(expectedHitCount, indexer); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + verifyStoreContent(); } } public void testReplicationAfterForceMerge() throws Exception { - final String nodeA = internalCluster().startNode(featureFlagSettings()); - final String nodeB = internalCluster().startNode(featureFlagSettings()); + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -441,39 +349,32 @@ public void testReplicationAfterForceMerge() throws Exception { waitForDocs(initialDocCount, indexer); flush(INDEX_NAME); - waitForReplicaUpdate(); - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); // Index a second set of docs so we can merge into one segment. indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - - ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + verifyStoreContent(); } } public void testCancellation() throws Exception { - final String primaryNode = internalCluster().startNode(featureFlagSettings()); + final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); ensureYellow(INDEX_NAME); - final String replicaNode = internalCluster().startNode(featureFlagSettings()); + final String replicaNode = internalCluster().startNode(); final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance( SegmentReplicationSourceService.class, primaryNode ); - final IndexShard primaryShard = getIndexShard(primaryNode); + final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); CountDownLatch latch = new CountDownLatch(1); @@ -521,9 +422,8 @@ public void testCancellation() throws Exception { assertDocCounts(docCount, primaryNode); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { - final String primaryNode = internalCluster().startNode(featureFlagSettings()); + final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); ensureGreen(INDEX_NAME); @@ -546,7 +446,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { .prepareUpdateSettings(INDEX_NAME) .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) ); - final String replicaNode = internalCluster().startNode(featureFlagSettings()); + final String replicaNode = internalCluster().startNode(); ensureGreen(INDEX_NAME); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); @@ -554,15 +454,15 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); + waitForSearchableDocs(3, primaryNode, replicaNode); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); - assertSegmentStats(REPLICA_COUNT); + verifyStoreContent(); } public void testDeleteOperations() throws Exception { - final String nodeA = internalCluster().startNode(featureFlagSettings()); - final String nodeB = internalCluster().startNode(featureFlagSettings()); + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -581,20 +481,13 @@ public void testDeleteOperations() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); @@ -603,31 +496,18 @@ public void testDeleteOperations() throws Exception { client(nodeA).prepareDelete(INDEX_NAME, id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertBusy(() -> { - final long nodeA_Count = client(nodeA).prepareSearch(INDEX_NAME) - .setSize(0) - .setPreference("_only_local") - .get() - .getHits() - .getTotalHits().value; - assertEquals(expectedHitCount - 1, nodeA_Count); - final long nodeB_Count = client(nodeB).prepareSearch(INDEX_NAME) - .setSize(0) - .setPreference("_only_local") - .get() - .getHits() - .getTotalHits().value; - assertEquals(expectedHitCount - 1, nodeB_Count); - }, 5, TimeUnit.SECONDS); + waitForSearchableDocs(expectedHitCount - 1, nodeA, nodeB); + verifyStoreContent(); } } public void testUpdateOperations() throws Exception { - final String primary = internalCluster().startNode(featureFlagSettings()); + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureYellow(INDEX_NAME); - final String replica = internalCluster().startNode(featureFlagSettings()); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); try ( @@ -644,20 +524,13 @@ public void testUpdateOperations() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, asList(primary, replica)); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); - waitForReplicaUpdate(); - - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, asList(primary, replica)); Set ids = indexer.getIds(); String id = ids.toArray()[0].toString(); @@ -669,69 +542,24 @@ public void testUpdateOperations() throws Exception { assertEquals(2, updateResponse.getVersion()); refresh(INDEX_NAME); - waitForReplicaUpdate(); + verifyStoreContent(); assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); - - } - } - - private void assertSegmentStats(int numberOfReplicas) throws IOException { - final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); - - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - - // There will be an entry in the list for each index. - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - - // Separate Primary & replica shards ShardSegments. - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegments = segmentListMap.get(false); - - assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - - assertEquals( - "There should be a ShardSegment entry for each replica in the replicationGroup", - numberOfReplicas, - replicaShardSegments.size() - ); - - for (ShardSegments shardSegment : replicaShardSegments) { - final Map latestReplicaSegments = getLatestSegments(shardSegment); - for (Segment replicaSegment : latestReplicaSegments.values()) { - final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName()); - assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration()); - assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs()); - assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs()); - assertEquals(replicaSegment.getSize(), primarySegment.getSize()); - } - - // Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point. - // This ensures the previous commit point is not wiped. - final ShardRouting replicaShardRouting = shardSegment.getShardRouting(); - ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); - final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); - IndexShard indexShard = getIndexShard(replicaNode.getName()); - // calls to readCommit will fail if a valid commit point and all its segments are not in the store. - indexShard.store().readLastCommittedSegmentsInfo(); - } } } public void testDropPrimaryDuringReplication() throws Exception { + final int replica_count = 6; final Settings settings = Settings.builder() .put(indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replica_count) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(featureFlagSettings()); - final String primaryNode = internalCluster().startDataOnlyNode(featureFlagSettings()); + final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); + final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, settings); - internalCluster().startDataOnlyNodes(6, featureFlagSettings()); + final List dataNodes = internalCluster().startDataOnlyNodes(6); ensureGreen(INDEX_NAME); int initialDocCount = scaledRandomIntBetween(100, 200); @@ -754,106 +582,16 @@ public void testDropPrimaryDuringReplication() throws Exception { ensureYellow(INDEX_NAME); // start another replica. - internalCluster().startDataOnlyNode(featureFlagSettings()); + dataNodes.add(internalCluster().startDataOnlyNode()); ensureGreen(INDEX_NAME); // index another doc and refresh - without this the new replica won't catch up. - client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + String docId = String.valueOf(initialDocCount + 1); + client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get(); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertSegmentStats(6); + waitForSearchableDocs(initialDocCount + 1, dataNodes); + verifyStoreContent(); } } - - /** - * Waits until the replica is caught up to the latest primary segments gen. - * @throws Exception if assertion fails - */ - private void waitForReplicaUpdate() throws Exception { - // wait until the replica has the latest segment generation. - assertBusy(() -> { - final IndicesSegmentResponse indicesSegmentResponse = client().admin() - .indices() - .segments(new IndicesSegmentsRequest()) - .actionGet(); - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegments = segmentListMap.get(false); - // if we don't have any segments yet, proceed. - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); - if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) { - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); - for (ShardSegments shardSegments : replicaShardSegments) { - logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments()); - final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() - .stream() - .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); - assertTrue(isReplicaCaughtUpToPrimary); - } - } - } - }); - } - - private IndexShard getIndexShard(String node) { - final Index index = resolveIndex(INDEX_NAME); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - IndexService indexService = indicesService.indexServiceSafe(index); - final Optional shardId = indexService.shardIds().stream().findFirst(); - return indexService.getShard(shardId.get()); - } - - private List getShardSegments(IndicesSegmentResponse indicesSegmentResponse) { - return indicesSegmentResponse.getIndices() - .values() - .stream() // get list of IndexSegments - .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group - .map(IndexShardSegments::getShards) // get list of segments across replication group - .collect(Collectors.toList()); - } - - private Map getLatestSegments(ShardSegments segments) { - final Optional generation = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare); - final Long latestPrimaryGen = generation.get(); - return segments.getSegments() - .stream() - .filter(s -> s.getGeneration() == latestPrimaryGen) - .collect(Collectors.toMap(Segment::getName, Function.identity())); - } - - private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { - return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); - } - - @Nullable - private ShardRouting getShardRoutingForNodeName(String nodeName) { - final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); - for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) { - for (ShardRouting shardRouting : shardRoutingTable.activeShards()) { - final String nodeId = shardRouting.currentNodeId(); - final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId); - if (discoveryNode.getName().equals(nodeName)) { - return shardRouting; - } - } - } - return null; - } - - private void assertDocCounts(int expectedDocCount, String... nodeNames) { - for (String node : nodeNames) { - assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount); - } - } - - private DiscoveryNode getNodeContainingPrimaryShard() { - final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); - final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard(); - return state.nodes().resolveNode(primaryShard.currentNodeId()); - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index bc07ef502dc79..5b0948dace75d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -29,25 +29,25 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; - /** * This test class verifies primary shard relocation with segment replication as replication strategy. */ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SegmentReplicationRelocationIT extends SegmentReplicationIT { +public class SegmentReplicationRelocationIT extends SegmentReplicationBaseIT { private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); - private void createIndex() { + private void createIndex(int replicaCount) { prepareCreate( INDEX_NAME, Settings.builder() .put("index.number_of_shards", 1) .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put("index.number_of_replicas", 1) + .put("index.number_of_replicas", replicaCount) + .put("index.refresh_interval", -1) ).get(); } @@ -56,19 +56,24 @@ private void createIndex() { * relocation and after relocation documents are indexed and documents are verified */ public void testPrimaryRelocation() throws Exception { - final String oldPrimary = internalCluster().startNode(featureFlagSettings()); - createIndex(); - final String replica = internalCluster().startNode(featureFlagSettings()); + final String oldPrimary = internalCluster().startNode(); + createIndex(1); + final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(0, 200); - ingestDocs(initialDocCount); - - logger.info("--> verifying count {}", initialDocCount); - assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + final int initialDocCount = scaledRandomIntBetween(100, 1000); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 0; i < initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } logger.info("--> start another node"); - final String newPrimary = internalCluster().startNode(featureFlagSettings()); + final String newPrimary = internalCluster().startNode(); ClusterHealthResponse clusterHealthResponse = client().admin() .cluster() .prepareHealth() @@ -97,28 +102,28 @@ public void testPrimaryRelocation() throws Exception { logger.info("--> get the state, verify shard 1 primary moved from node1 to node2"); ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); - - logger.info("--> state {}", state); - assertEquals( state.getRoutingNodes().node(state.nodes().resolveNode(newPrimary).getId()).iterator().next().state(), ShardRoutingState.STARTED ); - final int finalDocCount = initialDocCount; - ingestDocs(finalDocCount); - refresh(INDEX_NAME); - - logger.info("--> verifying count again {}", initialDocCount + finalDocCount); - client().admin().indices().prepareRefresh().execute().actionGet(); - assertHitCount( - client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); - assertHitCount( - client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); + for (int i = initialDocCount; i < 2 * initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + flushAndRefresh(INDEX_NAME); + logger.info("--> verify count again {}", 2 * initialDocCount); + waitForSearchableDocs(2 * initialDocCount, newPrimary, replica); + verifyStoreContent(); } /** @@ -127,19 +132,24 @@ public void testPrimaryRelocation() throws Exception { * replicas. */ public void testPrimaryRelocationWithSegRepFailure() throws Exception { - final String oldPrimary = internalCluster().startNode(featureFlagSettings()); - createIndex(); - final String replica = internalCluster().startNode(featureFlagSettings()); + final String oldPrimary = internalCluster().startNode(); + createIndex(1); + final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(1, 100); - ingestDocs(initialDocCount); - - logger.info("--> verifying count {}", initialDocCount); - assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + final int initialDocCount = scaledRandomIntBetween(100, 1000); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 0; i < initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } logger.info("--> start another node"); - final String newPrimary = internalCluster().startNode(featureFlagSettings()); + final String newPrimary = internalCluster().startNode(); ClusterHealthResponse clusterHealthResponse = client().admin() .cluster() .prepareHealth() @@ -181,20 +191,24 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { .actionGet(); assertEquals(clusterHealthResponse.isTimedOut(), false); - final int finalDocCount = initialDocCount; - ingestDocs(finalDocCount); - refresh(INDEX_NAME); + for (int i = initialDocCount; i < 2 * initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } logger.info("Verify older primary is still refreshing replica nodes"); - client().admin().indices().prepareRefresh().execute().actionGet(); - assertHitCount( - client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); - assertHitCount( - client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + flushAndRefresh(INDEX_NAME); + waitForSearchableDocs(2 * initialDocCount, oldPrimary, replica); + verifyStoreContent(); } /** @@ -202,17 +216,11 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { * */ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { - final String primary = internalCluster().startNode(featureFlagSettings()); - prepareCreate( - INDEX_NAME, - Settings.builder() - .put("index.number_of_shards", 1) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put("index.number_of_replicas", 0) - .put("index.refresh_interval", -1) - ).get(); - + final String primary = internalCluster().startNode(); + createIndex(1); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + final int totalDocCount = 1000; for (int i = 0; i < 10; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); } @@ -231,12 +239,12 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E ); } - final String replica = internalCluster().startNode(featureFlagSettings()); + final String newPrimary = internalCluster().startNode(); ClusterHealthResponse clusterHealthResponse = client().admin() .cluster() .prepareHealth() .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("2") + .setWaitForNodes("3") .execute() .actionGet(); assertEquals(clusterHealthResponse.isTimedOut(), false); @@ -245,9 +253,9 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E ActionFuture relocationListener = client().admin() .cluster() .prepareReroute() - .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica)) + .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, newPrimary)) .execute(); - for (int i = 20; i < 120; i++) { + for (int i = 20; i < totalDocCount; i++) { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) @@ -272,6 +280,118 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E client().admin().indices().prepareRefresh().execute().actionGet(); assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); }, 1, TimeUnit.MINUTES); - assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 120L); + flushAndRefresh(INDEX_NAME); + waitForSearchableDocs(totalDocCount, newPrimary, replica); + verifyStoreContent(); + } + + /** + * This test verifies delayed operations during primary handoff are replayed and searchable. It does so by halting + * segment replication which is performed while holding primary indexing permits which results in queuing of + * operations during handoff. The test verifies all docs ingested are searchable on new primary. + * + */ + public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { + final String primary = internalCluster().startNode(); + createIndex(1); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + final int totalDocCount = 2000; + + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush to have segments on disk"); + client().admin().indices().prepareFlush().execute().actionGet(); + + logger.info("--> index more docs so there are ops in the transaction log"); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 10; i < 20; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + final String newPrimary = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("3") + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + ensureGreen(INDEX_NAME); + + // Get mock transport service from newPrimary, halt recovery during segment replication (during handoff) to allow indexing in + // parallel. + MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + newPrimary + )); + CountDownLatch blockSegRepLatch = new CountDownLatch(1); + CountDownLatch waitForIndexingLatch = new CountDownLatch(1); + mockTargetTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, primary), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) { + blockSegRepLatch.countDown(); + try { + waitForIndexingLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + connection.sendRequest(requestId, action, request, options); + } + ); + Thread indexingThread = new Thread(() -> { + // Wait for relocation to halt at SegRep. Ingest docs at that point. + try { + blockSegRepLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + for (int i = 20; i < totalDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute() + ); + } + waitForIndexingLatch.countDown(); + }); + + logger.info("--> relocate the shard from primary to newPrimary"); + ActionFuture relocationListener = client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, newPrimary)) + .execute(); + + // This thread first waits for recovery to halt during segment replication. After which it ingests data to ensure + // documents are queued. + indexingThread.start(); + indexingThread.join(); + relocationListener.actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + logger.info("--> verifying count"); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + flushAndRefresh(INDEX_NAME); + waitForSearchableDocs(totalDocCount, replica, newPrimary); + verifyStoreContent(); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5a5356282681e..a19b89f64ad73 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -62,7 +62,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; -import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; @@ -751,7 +750,6 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta * * @param performSegRep a {@link Runnable} that is executed after operations are blocked * @param consumer a {@link Runnable} that is executed after performSegRep - * @param listener ActionListener * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation * @throws IllegalStateException if the relocation target is no longer part of the replication group * @throws InterruptedException if blocking operations is interrupted @@ -759,8 +757,7 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta public void relocated( final String targetAllocationId, final Consumer consumer, - final Consumer performSegRep, - final ActionListener listener + final Runnable performSegRep ) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { @@ -770,32 +767,28 @@ public void relocated( assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED : "in-flight operations in progress while moving shard state to relocated"; - final StepListener segRepSyncListener = new StepListener<>(); - performSegRep.accept(segRepSyncListener); - segRepSyncListener.whenComplete(r -> { - /* - * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a - * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. - */ - verifyRelocatingState(); - final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId); + performSegRep.run(); + /* + * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a + * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. + */ + verifyRelocatingState(); + final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId); + try { + consumer.accept(primaryContext); + synchronized (mutex) { + verifyRelocatingState(); + replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under + // mutex + } + } catch (final Exception e) { try { - consumer.accept(primaryContext); - synchronized (mutex) { - verifyRelocatingState(); - replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under - // mutex - } - } catch (final Exception e) { - try { - replicationTracker.abortRelocationHandoff(); - } catch (final Exception inner) { - e.addSuppressed(inner); - } - throw e; + replicationTracker.abortRelocationHandoff(); + } catch (final Exception inner) { + e.addSuppressed(inner); } - listener.onResponse(null); - }, listener::onFailure); + throw e; + } }); } catch (TimeoutException e) { logger.warn("timed out waiting for relocation hand-off to complete"); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index e2f5ec76f2bd1..c8c23cec6fd94 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -820,34 +820,24 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis logger ); - final StepListener handoffListener = new StepListener<>(); if (request.isPrimaryRelocation()) { logger.trace("performing relocation hand-off"); - final Consumer forceSegRepConsumer = shard.indexSettings().isSegRepEnabled() + final Runnable forceSegRepRunnable = shard.indexSettings().isSegRepEnabled() ? recoveryTarget::forceSegmentFileSync - : res -> res.onResponse(null); + : () -> {}; // TODO: make relocated async // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done cancellableThreads.execute( - () -> shard.relocated( - request.targetAllocationId(), - recoveryTarget::handoffPrimaryContext, - forceSegRepConsumer, - handoffListener - ) + () -> shard.relocated(request.targetAllocationId(), recoveryTarget::handoffPrimaryContext, forceSegRepRunnable) ); /* * if the recovery process fails after disabling primary mode on the source shard, both relocation source and * target are failed (see {@link IndexShard#updateRoutingEntry}). */ - } else { - handoffListener.onResponse(null); } - handoffListener.whenComplete(res -> { - stopWatch.stop(); - logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); - listener.onResponse(null); - }, listener::onFailure); + stopWatch.stop(); + logger.info("finalizing recovery took [{}]", stopWatch.totalTime()); + listener.onResponse(null); }, listener::onFailure); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index f0918813da62b..7dc78e4f622fe 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -264,7 +264,7 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { + public void forceSegmentFileSync() { throw new UnsupportedOperationException("Method not supported on target!"); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java index ef0d4abc44c7d..b43253c32d844 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java @@ -58,9 +58,8 @@ public interface RecoveryTargetHandler extends FileChunkWriter { * * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files * conflict with replicas when target is promoted as primary. - * @param listener segment replication event listener */ - void forceSegmentFileSync(ActionListener listener); + void forceSegmentFileSync(); /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index 5f638103a021c..a8dd083ba838e 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -192,16 +192,17 @@ public void indexTranslogOperations( * * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files * conflict with replicas when target is promoted as primary. - * @param listener segment replication event listener */ @Override - public void forceSegmentFileSync(ActionListener listener) { - final String action = SegmentReplicationTargetService.Actions.FORCE_SYNC; + public void forceSegmentFileSync() { final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); - final ForceSyncRequest request = new ForceSyncRequest(requestSeqNo, recoveryId, shardId); - final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; - final ActionListener responseListener = ActionListener.map(listener, r -> null); - retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); + transportService.submitRequest( + targetNode, + SegmentReplicationTargetService.Actions.FORCE_SYNC, + new ForceSyncRequest(requestSeqNo, recoveryId, shardId), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), + EmptyTransportResponseHandler.INSTANCE_SAME + ).txGet(); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index b63b84a5c1eab..750e7629783e7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -22,7 +22,6 @@ import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.indices.RunUnderPrimaryPermit; import org.opensearch.indices.recovery.DelayRecoveryException; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.MultiChunkTransfer; @@ -138,23 +137,12 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene ); }; - RunUnderPrimaryPermit.run(() -> { - final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); - ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); - if (targetShardRouting == null) { - logger.debug( - "delaying replication of {} as it is not listed as assigned to target node {}", - shard.shardId(), - targetNode - ); - throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); - } - }, - shard.shardId() + " validating recovery target [" + request.getTargetAllocationId() + "] registered ", - shard, - cancellableThreads, - logger - ); + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); + if (targetShardRouting == null) { + logger.debug("delaying replication of {} as it is not listed as assigned to target node {}", shard.shardId(), targetNode); + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } final StepListener sendFileStep = new StepListener<>(); Set storeFiles = new HashSet<>(Arrays.asList(shard.store().directory().listAll())); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 5e979b934ebec..0abbcd1595bb1 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -50,7 +50,6 @@ import org.apache.lucene.util.Constants; import org.junit.Assert; import org.opensearch.Assertions; -import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -136,6 +135,7 @@ import org.opensearch.index.translog.TestTranslog; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogStats; +import org.opensearch.index.translog.listener.TranslogEventListener; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -938,12 +938,6 @@ public void onResponse(final Releasable releasable) { closeShards(indexShard); } - private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { - PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, ""); - return fut.get(); - } - private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); @@ -996,22 +990,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { AllocationId.newRelocation(routing.allocationId()) ); IndexShardTestCase.updateRoutingEntry(indexShard, routing); - indexShard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(indexShard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); engineClosed = false; break; } @@ -1233,6 +1212,66 @@ public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception closeShards(replica); } + public void testGetChangesSnapshotThrowsAssertForSegRep() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(8), + true, + ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT.toString()) + .build(); + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1); + final AtomicBoolean synced = new AtomicBoolean(); + final IndexShard primaryShard = newShard( + shardRouting, + indexMetadata.build(), + null, + new InternalEngineFactory(), + () -> synced.set(true), + RetentionLeaseSyncer.EMPTY, + null + ); + expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1)); + closeShard(primaryShard, false); + } + + public void testGetChangesSnapshotThrowsAssertForRemoteStore() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(8), + true, + ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .build(); + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1); + final AtomicBoolean synced = new AtomicBoolean(); + final IndexShard primaryShard = newShard( + shardRouting, + indexMetadata.build(), + null, + new InternalEngineFactory(), + () -> synced.set(true), + RetentionLeaseSyncer.EMPTY, + null + ); + expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1)); + closeShard(primaryShard, false); + } + public void testGlobalCheckpointSync() throws IOException { // create the primary shard with a callback that sets a boolean when the global checkpoint sync is invoked final ShardId shardId = new ShardId("index", "_na_", 0); @@ -1964,22 +2003,7 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { Thread recoveryThread = new Thread(() -> { latch.countDown(); try { - shard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -2014,18 +2038,7 @@ public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { shard.relocated( routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> relocationStarted.countDown(), - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } + () -> {} ); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -2114,26 +2127,11 @@ public void run() { AtomicBoolean relocated = new AtomicBoolean(); final Thread recoveryThread = new Thread(() -> { try { - shard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - relocated.set(true); - } - - @Override - public void onFailure(Exception e) { - relocated.set(false); - fail(e.toString()); - } - } - ); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } + relocated.set(true); }); // ensure we wait for all primary operation locks to be acquired allPrimaryOperationLocksAcquired.await(); @@ -2164,48 +2162,25 @@ public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedE final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting routing = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, routing); - shard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting)); closeShards(shard); } - public void testRelocatedSegRepConsumerError() throws IOException, InterruptedException { + public void testRelocatedSegRepError() throws IOException, InterruptedException { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting routing = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, routing); - shard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onFailure(new ReplicationFailedException("Segment replication failed")), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - fail("Expected failure"); - } - - @Override - public void onFailure(Exception e) { - assertTrue(ExceptionsHelper.unwrapCause(e) instanceof ReplicationFailedException); - assertEquals(e.getMessage(), "Segment replication failed"); - } - } + ReplicationFailedException segRepException = expectThrows( + ReplicationFailedException.class, + () -> shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + () -> { throw new ReplicationFailedException("Segment replication failed"); } + ) ); + assertTrue(segRepException.getMessage().equals("Segment replication failed")); closeShards(shard); } @@ -2215,21 +2190,9 @@ public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOE final ShardRouting relocationRouting = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, relocationRouting); IndexShardTestCase.updateRoutingEntry(shard, originalRouting); - shard.relocated( - relocationRouting.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - fail("IllegalIndexShardStateException expected!"); - } - - @Override - public void onFailure(Exception e) { - assertTrue(ExceptionsHelper.unwrapCause(e) instanceof IllegalIndexShardStateException); - } - } + expectThrows( + IllegalIndexShardStateException.class, + () -> shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}) ); closeShards(shard); } @@ -2244,28 +2207,13 @@ public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, Thread relocationThread = new Thread(new AbstractRunnable() { @Override public void onFailure(Exception e) { - fail(e.toString()); + relocationException.set(e); } @Override protected void doRun() throws Exception { cyclicBarrier.await(); - shard.relocated( - relocationRouting.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - relocationException.set(e); - } - } - ); + shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); } }); relocationThread.start(); @@ -2313,48 +2261,22 @@ public void testRelocateMissingTarget() throws Exception { final ShardRouting toNode2 = ShardRoutingHelper.relocate(original, "node_2"); IndexShardTestCase.updateRoutingEntry(shard, toNode2); final AtomicBoolean relocated = new AtomicBoolean(); - shard.relocated( - toNode1.getTargetRelocatingShard().allocationId().getId(), - ctx -> relocated.set(true), - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - fail("Expected IllegalStateException!"); - } - @Override - public void onFailure(Exception e) { - assertTrue(ExceptionsHelper.unwrapCause(e) instanceof IllegalStateException); - assertThat( - e.getMessage(), - equalTo( - "relocation target [" - + toNode1.getTargetRelocatingShard().allocationId().getId() - + "] is no longer part of the replication group" - ) - ); - } - } + final IllegalStateException error = expectThrows( + IllegalStateException.class, + () -> shard.relocated(toNode1.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true), () -> {}) + ); + assertThat( + error.getMessage(), + equalTo( + "relocation target [" + + toNode1.getTargetRelocatingShard().allocationId().getId() + + "] is no longer part of the replication group" + ) ); assertFalse(relocated.get()); - shard.relocated( - toNode2.getTargetRelocatingShard().allocationId().getId(), - ctx -> relocated.set(true), - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(relocated.get()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + shard.relocated(toNode2.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true), () -> {}); assertTrue(relocated.get()); closeShards(shard); } @@ -2375,7 +2297,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { long primaryTerm = shard.getOperationPrimaryTerm(); shard.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete shard.applyDeleteOperationOnReplica(1, primaryTerm, 2, "id"); - shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation + shard.getEngine().translogManager().rollTranslogGeneration(); // isolate the delete in it's own generation shard.applyIndexOperationOnReplica( 0, primaryTerm, @@ -2423,7 +2345,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { replayedOps = 3; } else { if (randomBoolean()) { - shard.getEngine().rollTranslogGeneration(); + shard.getEngine().translogManager().rollTranslogGeneration(); } translogOps = 5; replayedOps = 5; @@ -2696,7 +2618,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { ); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); - shard.getEngine().rollTranslogGeneration(); + shard.getEngine().translogManager().rollTranslogGeneration(); shard.markSeqNoAsNoop(1, primaryTerm, "test"); shard.applyIndexOperationOnReplica( 2, @@ -2747,22 +2669,7 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc assertThat(shard.state(), equalTo(IndexShardState.STARTED)); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting); - shard.relocated( - inRecoveryRouting.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + shard.relocated(inRecoveryRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); assertTrue(shard.isRelocatedPrimary()); try { IndexShardTestCase.updateRoutingEntry(shard, origRouting); @@ -2772,6 +2679,40 @@ public void onFailure(Exception e) { closeShards(shard); } + public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throws IOException { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, "seg-test") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test") + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC) + .build(); + final IndexShard indexShard = newStartedShard(true, settings, new NRTReplicationEngineFactory()); + ShardRouting routing = indexShard.routingEntry(); + routing = newShardRouting( + routing.shardId(), + routing.currentNodeId(), + "otherNode", + true, + ShardRoutingState.RELOCATING, + AllocationId.newRelocation(routing.allocationId()) + ); + IndexShardTestCase.updateRoutingEntry(indexShard, routing); + assertTrue(indexShard.isSyncNeeded()); + try { + indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertTrue(indexShard.isRelocatedPrimary()); + assertFalse(indexShard.isSyncNeeded()); + assertFalse(indexShard.getReplicationTracker().isPrimaryMode()); + closeShards(indexShard); + } + public void testRestoreShard() throws IOException { final IndexShard source = newStartedShard(true); IndexShard target = newStartedShard(true); @@ -4400,19 +4341,17 @@ public void testResetEngine() throws Exception { public void testCloseShardWhileResettingEngine() throws Exception { CountDownLatch readyToCloseLatch = new CountDownLatch(1); CountDownLatch closeDoneLatch = new CountDownLatch(1); - IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { + IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config, new TranslogEventListener() { @Override - public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) - throws IOException { + public void onBeginTranslogRecovery() { readyToCloseLatch.countDown(); try { closeDoneLatch.await(); } catch (InterruptedException e) { throw new AssertionError(e); } - return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); } - }); + })); Thread closeShardThread = new Thread(() -> { try { @@ -4459,20 +4398,17 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover public void testSnapshotWhileResettingEngine() throws Exception { CountDownLatch readyToSnapshotLatch = new CountDownLatch(1); CountDownLatch snapshotDoneLatch = new CountDownLatch(1); - IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { + IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config, new TranslogEventListener() { @Override - public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) - throws IOException { - InternalEngine engine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); + public void onAfterTranslogRecovery() { readyToSnapshotLatch.countDown(); try { snapshotDoneLatch.await(); } catch (InterruptedException e) { throw new AssertionError(e); } - return engine; } - }); + })); indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint())); final long globalCheckpoint = randomLongBetween(shard.getLastKnownGlobalCheckpoint(), shard.getLocalCheckpoint()); @@ -4839,30 +4775,29 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { throw new AssertionError(e); } }; - EngineConfig configWithWarmer = new EngineConfig( - config.getShardId(), - config.getThreadPool(), - config.getIndexSettings(), - warmer, - config.getStore(), - config.getMergePolicy(), - config.getAnalyzer(), - config.getSimilarity(), - new CodecService(null, logger), - config.getEventListener(), - config.getQueryCache(), - config.getQueryCachingPolicy(), - config.getTranslogConfig(), - config.getFlushMergesAfter(), - config.getExternalRefreshListener(), - config.getInternalRefreshListener(), - config.getIndexSort(), - config.getCircuitBreakerService(), - config.getGlobalCheckpointSupplier(), - config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), - config.getTombstoneDocSupplier() - ); + EngineConfig configWithWarmer = new EngineConfig.Builder().shardId(config.getShardId()) + .threadPool(config.getThreadPool()) + .indexSettings(config.getIndexSettings()) + .warmer(warmer) + .store(config.getStore()) + .mergePolicy(config.getMergePolicy()) + .analyzer(config.getAnalyzer()) + .similarity(config.getSimilarity()) + .codecService(new CodecService(null, logger)) + .eventListener(config.getEventListener()) + .queryCache(config.getQueryCache()) + .queryCachingPolicy(config.getQueryCachingPolicy()) + .translogConfig(config.getTranslogConfig()) + .flushMergesAfter(config.getFlushMergesAfter()) + .externalRefreshListener(config.getExternalRefreshListener()) + .internalRefreshListener(config.getInternalRefreshListener()) + .indexSort(config.getIndexSort()) + .circuitBreakerService(config.getCircuitBreakerService()) + .globalCheckpointSupplier(config.getGlobalCheckpointSupplier()) + .retentionLeasesSupplier(config.retentionLeasesSupplier()) + .primaryTermSupplier(config.getPrimaryTermSupplier()) + .tombstoneDocSupplier(config.getTombstoneDocSupplier()) + .build(); return new InternalEngine(configWithWarmer); }); Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard))); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index c46f97b5ec785..13c1f05f1e60c 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -15,8 +15,10 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingHelper; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.ClusterSettings; @@ -53,17 +55,21 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import static java.util.Arrays.asList; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -300,15 +306,12 @@ public void testPrimaryRelocation() throws Exception { ); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); - BiFunction, ActionListener, List> replicatePrimaryFunction = ( - shardList, - listener) -> { + Function, List> replicatePrimaryFunction = (shardList) -> { try { assert shardList.size() >= 2; final IndexShard primary = shardList.get(0); - return replicateSegments(primary, shardList.subList(1, shardList.size()), listener); + return replicateSegments(primary, shardList.subList(1, shardList.size())); } catch (IOException | InterruptedException e) { - listener.onFailure(e); throw new RuntimeException(e); } }; @@ -340,11 +343,12 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { ); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); - BiFunction, ActionListener, List> replicatePrimaryFunction = ( - shardList, - listener) -> { - listener.onFailure(new IOException("Expected failure")); - return null; + Function, List> replicatePrimaryFunction = (shardList) -> { + try { + throw new IOException("Expected failure"); + } catch (IOException e) { + throw new RuntimeException(e); + } }; Exception e = expectThrows( Exception.class, @@ -359,7 +363,7 @@ public void onDone(ReplicationState state) { @Override public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("Expected failure")); + assertEquals(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), "Expected failure"); } }), true, @@ -367,10 +371,119 @@ public void onFailure(ReplicationState state, ReplicationFailedException e, bool replicatePrimaryFunction ) ); - assertThat(e, hasToString(containsString("Expected failure"))); closeShards(primarySource, primaryTarget); } + // Todo: Remove this test when there is a better mechanism to test a functionality passing in different replication + // strategy. + public void testLockingBeforeAndAfterRelocated() throws Exception { + final IndexShard shard = newStartedShard(true, settings); + final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, routing); + CountDownLatch latch = new CountDownLatch(1); + Thread recoveryThread = new Thread(() -> { + latch.countDown(); + try { + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) { + // start finalization of recovery + recoveryThread.start(); + latch.await(); + // recovery can only be finalized after we release the current primaryOperationLock + assertFalse(shard.isRelocatedPrimary()); + } + // recovery can be now finalized + recoveryThread.join(); + assertTrue(shard.isRelocatedPrimary()); + final ExecutionException e = expectThrows(ExecutionException.class, () -> acquirePrimaryOperationPermitBlockingly(shard)); + assertThat(e.getCause(), instanceOf(ShardNotInPrimaryModeException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + + closeShards(shard); + } + + // Todo: Remove this test when there is a better mechanism to test a functionality passing in different replication + // strategy. + public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { + final IndexShard shard = newStartedShard(true, settings); + final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, routing); + final CountDownLatch startRecovery = new CountDownLatch(1); + final CountDownLatch relocationStarted = new CountDownLatch(1); + Thread recoveryThread = new Thread(() -> { + try { + startRecovery.await(); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> relocationStarted.countDown(), + () -> {} + ); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + recoveryThread.start(); + + final int numberOfAcquisitions = randomIntBetween(1, 10); + final List assertions = new ArrayList<>(numberOfAcquisitions); + final int recoveryIndex = randomIntBetween(0, numberOfAcquisitions - 1); + + for (int i = 0; i < numberOfAcquisitions; i++) { + final PlainActionFuture onLockAcquired; + if (i < recoveryIndex) { + final AtomicBoolean invoked = new AtomicBoolean(); + onLockAcquired = new PlainActionFuture() { + + @Override + public void onResponse(Releasable releasable) { + invoked.set(true); + releasable.close(); + super.onResponse(releasable); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(); + } + + }; + assertions.add(() -> assertTrue(invoked.get())); + } else if (recoveryIndex == i) { + startRecovery.countDown(); + relocationStarted.await(); + onLockAcquired = new PlainActionFuture<>(); + assertions.add(() -> { + final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); + assertThat(e.getCause(), instanceOf(ShardNotInPrimaryModeException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + }); + } else { + onLockAcquired = new PlainActionFuture<>(); + assertions.add(() -> { + final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); + assertThat(e.getCause(), instanceOf(ShardNotInPrimaryModeException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + }); + } + + shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.WRITE, "i_" + i); + } + + for (final Runnable assertion : assertions) { + assertion.run(); + } + + recoveryThread.join(); + + closeShards(shard); + } + public void testReplicaReceivesLowerGeneration() throws Exception { // when a replica gets incoming segments that are lower than what it currently has on disk. diff --git a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index 0f5bba4f0c332..47ae48eba0692 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -1117,7 +1117,7 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) {} @Override - public void forceSegmentFileSync(ActionListener listener) {} + public void forceSegmentFileSync() {} @Override public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) {} diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index ad19473380063..6b0f50d80fba2 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -545,7 +545,7 @@ public void recoverReplica( markAsRecovering, inSyncIds, routingTable, - (a, b) -> null + (a) -> null ); OpenSearchIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); computeReplicationTargets(); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 341598ffa7b88..179502f633ef6 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -64,6 +64,7 @@ import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -144,11 +145,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import static org.hamcrest.Matchers.contains; @@ -244,6 +247,12 @@ protected Store createStore(ShardId shardId, IndexSettings indexSettings, Direct return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); } + protected Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { + PlainActionFuture fut = new PlainActionFuture<>(); + indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, ""); + return fut.get(); + } + /** * Creates a new initializing shard. The shard will have its own unique data path. * @@ -809,7 +818,7 @@ protected DiscoveryNode getFakeDiscoNode(String id) { } protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { - recoverReplica(replica, primary, startReplica, (a, b) -> null); + recoverReplica(replica, primary, startReplica, (a) -> null); } /** recovers a replica from the given primary **/ @@ -817,7 +826,7 @@ protected void recoverReplica( IndexShard replica, IndexShard primary, boolean startReplica, - BiFunction, ActionListener, List> replicatePrimaryFunction + Function, List> replicatePrimaryFunction ) throws IOException { recoverReplica( replica, @@ -836,7 +845,7 @@ protected void recoverReplica( final boolean markAsRecovering, final boolean markAsStarted ) throws IOException { - recoverReplica(replica, primary, targetSupplier, markAsRecovering, markAsStarted, (a, b) -> null); + recoverReplica(replica, primary, targetSupplier, markAsRecovering, markAsStarted, (a) -> null); } /** recovers a replica from the given primary **/ @@ -846,7 +855,7 @@ protected void recoverReplica( final BiFunction targetSupplier, final boolean markAsRecovering, final boolean markAsStarted, - final BiFunction, ActionListener, List> replicatePrimaryFunction + final Function, List> replicatePrimaryFunction ) throws IOException { IndexShardRoutingTable.Builder newRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId()); newRoutingTable.addShard(primary.routingEntry()); @@ -879,7 +888,7 @@ protected final void recoverUnstartedReplica( final boolean markAsRecovering, final Set inSyncIds, final IndexShardRoutingTable routingTable, - final BiFunction, ActionListener, List> replicatePrimaryFunction + final Function, List> replicatePrimaryFunction ) throws IOException { final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); @@ -1290,11 +1299,8 @@ public void getSegmentFiles( * @param replicaShards - Replicas that will be updated. * @return {@link List} List of target components orchestrating replication. */ - public final List replicateSegments( - IndexShard primaryShard, - List replicaShards, - ActionListener... listeners - ) throws IOException, InterruptedException { + public final List replicateSegments(IndexShard primaryShard, List replicaShards) + throws IOException, InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); Map primaryMetadata; try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { @@ -1317,13 +1323,7 @@ public void onReplicationDone(SegmentReplicationState state) { assertTrue(recoveryDiff.missing.isEmpty()); assertTrue(recoveryDiff.different.isEmpty()); assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); - for (ActionListener listener : listeners) { - listener.onResponse(null); - } } catch (Exception e) { - for (ActionListener listener : listeners) { - listener.onFailure(e); - } throw ExceptionsHelper.convertToRuntime(e); } finally { countDownLatch.countDown(); diff --git a/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java index 45b11c95b4102..fa8b3c9e3a2c3 100644 --- a/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java @@ -46,7 +46,7 @@ import java.util.List; import java.util.concurrent.Executor; -import java.util.function.BiFunction; +import java.util.function.Function; /** * Wraps a {@link RecoveryTarget} to make all remote calls to be executed asynchronously using the provided {@code executor}. @@ -59,14 +59,14 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler { private final IndexShard replica; - private final BiFunction, ActionListener, List> replicatePrimaryFunction; + private final Function, List> replicatePrimaryFunction; public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) { this.executor = executor; this.target = target; this.primary = null; this.replica = null; - this.replicatePrimaryFunction = (a, b) -> null; + this.replicatePrimaryFunction = (a) -> null; } public AsyncRecoveryTarget( @@ -74,7 +74,7 @@ public AsyncRecoveryTarget( Executor executor, IndexShard primary, IndexShard replica, - BiFunction, ActionListener, List> replicatePrimaryFunction + Function, List> replicatePrimaryFunction ) { this.executor = executor; this.target = target; @@ -89,8 +89,8 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { - executor.execute(() -> this.replicatePrimaryFunction.apply(List.of(primary, replica), listener)); + public void forceSegmentFileSync() { + this.replicatePrimaryFunction.apply(List.of(primary, replica)); } @Override