diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java new file mode 100644 index 0000000000000..5f7433126db57 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.Nullable; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static org.opensearch.test.OpenSearchIntegTestCase.client; +import static org.opensearch.test.OpenSearchTestCase.assertBusy; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase { + + protected static final String INDEX_NAME = "test-idx-1"; + protected static final int SHARD_COUNT = 1; + protected static final int REPLICA_COUNT = 1; + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); + } + + @Override + protected Collection> nodePlugins() { + return asList(MockTransportService.TestPlugin.class); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + @Nullable + protected ShardRouting getShardRoutingForNodeName(String nodeName) { + final ClusterState state = getClusterState(); + for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) { + for (ShardRouting shardRouting : shardRoutingTable.activeShards()) { + final String nodeId = shardRouting.currentNodeId(); + final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId); + if (discoveryNode.getName().equals(nodeName)) { + return shardRouting; + } + } + } + return null; + } + + protected void assertDocCounts(int expectedDocCount, String... nodeNames) { + for (String node : nodeNames) { + assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount); + } + } + + protected ClusterState getClusterState() { + return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + } + + protected DiscoveryNode getNodeContainingPrimaryShard() { + final ClusterState state = getClusterState(); + final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard(); + return state.nodes().resolveNode(primaryShard.currentNodeId()); + } + + /** + * Waits until all given nodes have at least the expected docCount. + * + * @param docCount - Expected Doc count. + * @param nodes - List of node names. + */ + protected void waitForSearchableDocs(long docCount, List nodes) throws Exception { + // wait until the replica has the latest segment generation. + assertBusy(() -> { + for (String node : nodes) { + final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + final long hits = response.getHits().getTotalHits().value; + if (hits < docCount) { + fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits); + } + } + }, 1, TimeUnit.MINUTES); + } + + protected void waitForSearchableDocs(long docCount, String... nodes) throws Exception { + waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList())); + } + + protected void verifyStoreContent() throws Exception { + assertBusy(() -> { + final ClusterState clusterState = getClusterState(); + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { + final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); + final String indexName = primaryRouting.getIndexName(); + final List replicaRouting = shardRoutingTable.replicaShards(); + final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName); + final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); + for (ShardRouting replica : replicaRouting) { + IndexShard replicaShard = getIndexShard(clusterState, replica, indexName); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( + primarySegmentMetadata, + replicaShard.getSegmentMetadataMap() + ); + if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { + fail( + "Expected no missing or different segments between primary and replica but diff was missing: " + + recoveryDiff.missing + + " Different: " + + recoveryDiff.different + + " Primary Replication Checkpoint : " + + primaryShard.getLatestReplicationCheckpoint() + + " Replica Replication Checkpoint: " + + replicaShard.getLatestReplicationCheckpoint() + ); + } + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + replicaShard.store().readLastCommittedSegmentsInfo(); + } + } + } + }, 1, TimeUnit.MINUTES); + } + + private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) { + return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName); + } + + protected IndexShard getIndexShard(String node, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return indexService.getShard(shardId.get()); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 043a5850aef05..0101379321932 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -11,47 +11,29 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.routing.IndexRoutingTable; -import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; -import org.opensearch.common.Nullable; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.index.Index; import org.opensearch.index.IndexModule; -import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.store.Store; -import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.plugins.Plugin; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; -import java.util.Collection; -import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.Optional; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static java.util.Arrays.asList; import static org.hamcrest.Matchers.equalTo; @@ -61,56 +43,8 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SegmentReplicationIT extends OpenSearchIntegTestCase { - - protected static final String INDEX_NAME = "test-idx-1"; - protected static final int SHARD_COUNT = 1; - protected static final int REPLICA_COUNT = 1; - - @Override - protected Collection> nodePlugins() { - return asList(MockTransportService.TestPlugin.class); - } - - @Override - public Settings indexSettings() { - return Settings.builder() - .put(super.indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .build(); - } - - @Override - protected boolean addMockInternalEngine() { - return false; - } - - @Override - protected Settings featureFlagSettings() { - return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); - } - - public void ingestDocs(int docCount) throws Exception { - try ( - BackgroundIndexer indexer = new BackgroundIndexer( - INDEX_NAME, - "_doc", - client(), - -1, - RandomizedTest.scaledRandomIntBetween(2, 5), - false, - random() - ) - ) { - indexer.start(docCount); - waitForDocs(docCount, indexer); - refresh(INDEX_NAME); - } - } - +public class SegmentReplicationIT extends SegmentReplicationBaseIT { + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -660,106 +594,4 @@ public void testDropPrimaryDuringReplication() throws Exception { verifyStoreContent(); } } - - /** - * Waits until all given nodes have at least the expected docCount. - * - * @param docCount - Expected Doc count. - * @param nodes - List of node names. - */ - private void waitForSearchableDocs(long docCount, List nodes) throws Exception { - // wait until the replica has the latest segment generation. - assertBusy(() -> { - for (String node : nodes) { - final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); - final long hits = response.getHits().getTotalHits().value; - if (hits < docCount) { - fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits); - } - } - }, 1, TimeUnit.MINUTES); - } - - private void waitForSearchableDocs(long docCount, String... nodes) throws Exception { - waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList())); - } - - private void verifyStoreContent() throws Exception { - assertBusy(() -> { - final ClusterState clusterState = getClusterState(); - for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { - for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { - final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); - final String indexName = primaryRouting.getIndexName(); - final List replicaRouting = shardRoutingTable.replicaShards(); - final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName); - final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); - for (ShardRouting replica : replicaRouting) { - IndexShard replicaShard = getIndexShard(clusterState, replica, indexName); - final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( - primarySegmentMetadata, - replicaShard.getSegmentMetadataMap() - ); - if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { - fail( - "Expected no missing or different segments between primary and replica but diff was missing: " - + recoveryDiff.missing - + " Different: " - + recoveryDiff.different - + " Primary Replication Checkpoint : " - + primaryShard.getLatestReplicationCheckpoint() - + " Replica Replication Checkpoint: " - + replicaShard.getLatestReplicationCheckpoint() - ); - } - // calls to readCommit will fail if a valid commit point and all its segments are not in the store. - replicaShard.store().readLastCommittedSegmentsInfo(); - } - } - } - }, 1, TimeUnit.MINUTES); - } - - private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) { - return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName); - } - - private IndexShard getIndexShard(String node, String indexName) { - final Index index = resolveIndex(indexName); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - IndexService indexService = indicesService.indexServiceSafe(index); - final Optional shardId = indexService.shardIds().stream().findFirst(); - return indexService.getShard(shardId.get()); - } - - @Nullable - private ShardRouting getShardRoutingForNodeName(String nodeName) { - final ClusterState state = getClusterState(); - for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) { - for (ShardRouting shardRouting : shardRoutingTable.activeShards()) { - final String nodeId = shardRouting.currentNodeId(); - final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId); - if (discoveryNode.getName().equals(nodeName)) { - return shardRouting; - } - } - } - return null; - } - - private void assertDocCounts(int expectedDocCount, String... nodeNames) { - for (String node : nodeNames) { - assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount); - } - } - - private ClusterState getClusterState() { - return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); - } - - private DiscoveryNode getNodeContainingPrimaryShard() { - final ClusterState state = getClusterState(); - final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard(); - return state.nodes().resolveNode(primaryShard.currentNodeId()); - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index fb8b6a7150b9a..5b0948dace75d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -29,25 +29,25 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; - /** * This test class verifies primary shard relocation with segment replication as replication strategy. */ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SegmentReplicationRelocationIT extends SegmentReplicationIT { +public class SegmentReplicationRelocationIT extends SegmentReplicationBaseIT { private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); - private void createIndex() { + private void createIndex(int replicaCount) { prepareCreate( INDEX_NAME, Settings.builder() .put("index.number_of_shards", 1) .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put("index.number_of_replicas", 1) + .put("index.number_of_replicas", replicaCount) + .put("index.refresh_interval", -1) ).get(); } @@ -55,18 +55,22 @@ private void createIndex() { * This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before * relocation and after relocation documents are indexed and documents are verified */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryRelocation() throws Exception { final String oldPrimary = internalCluster().startNode(); - createIndex(); + createIndex(1); final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(0, 200); - ingestDocs(initialDocCount); - - logger.info("--> verifying count {}", initialDocCount); - assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + final int initialDocCount = scaledRandomIntBetween(100, 1000); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 0; i < initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } logger.info("--> start another node"); final String newPrimary = internalCluster().startNode(); @@ -98,28 +102,28 @@ public void testPrimaryRelocation() throws Exception { logger.info("--> get the state, verify shard 1 primary moved from node1 to node2"); ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); - - logger.info("--> state {}", state); - assertEquals( state.getRoutingNodes().node(state.nodes().resolveNode(newPrimary).getId()).iterator().next().state(), ShardRoutingState.STARTED ); - final int finalDocCount = initialDocCount; - ingestDocs(finalDocCount); - refresh(INDEX_NAME); - - logger.info("--> verifying count again {}", initialDocCount + finalDocCount); - client().admin().indices().prepareRefresh().execute().actionGet(); - assertHitCount( - client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); - assertHitCount( - client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); + for (int i = initialDocCount; i < 2 * initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + flushAndRefresh(INDEX_NAME); + logger.info("--> verify count again {}", 2 * initialDocCount); + waitForSearchableDocs(2 * initialDocCount, newPrimary, replica); + verifyStoreContent(); } /** @@ -127,18 +131,22 @@ public void testPrimaryRelocation() throws Exception { * failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the * replicas. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryRelocationWithSegRepFailure() throws Exception { final String oldPrimary = internalCluster().startNode(); - createIndex(); + createIndex(1); final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(1, 100); - ingestDocs(initialDocCount); - - logger.info("--> verifying count {}", initialDocCount); - assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + final int initialDocCount = scaledRandomIntBetween(100, 1000); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 0; i < initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } logger.info("--> start another node"); final String newPrimary = internalCluster().startNode(); @@ -183,20 +191,24 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { .actionGet(); assertEquals(clusterHealthResponse.isTimedOut(), false); - final int finalDocCount = initialDocCount; - ingestDocs(finalDocCount); - refresh(INDEX_NAME); + for (int i = initialDocCount; i < 2 * initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } logger.info("Verify older primary is still refreshing replica nodes"); - client().admin().indices().prepareRefresh().execute().actionGet(); - assertHitCount( - client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); - assertHitCount( - client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + flushAndRefresh(INDEX_NAME); + waitForSearchableDocs(2 * initialDocCount, oldPrimary, replica); + verifyStoreContent(); } /** @@ -205,16 +217,10 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { */ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { final String primary = internalCluster().startNode(); - prepareCreate( - INDEX_NAME, - Settings.builder() - .put("index.number_of_shards", 1) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put("index.number_of_replicas", 0) - .put("index.refresh_interval", -1) - ).get(); - + createIndex(1); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + final int totalDocCount = 1000; for (int i = 0; i < 10; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); } @@ -233,12 +239,12 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E ); } - final String replica = internalCluster().startNode(); + final String newPrimary = internalCluster().startNode(); ClusterHealthResponse clusterHealthResponse = client().admin() .cluster() .prepareHealth() .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("2") + .setWaitForNodes("3") .execute() .actionGet(); assertEquals(clusterHealthResponse.isTimedOut(), false); @@ -247,9 +253,9 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E ActionFuture relocationListener = client().admin() .cluster() .prepareReroute() - .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica)) + .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, newPrimary)) .execute(); - for (int i = 20; i < 120; i++) { + for (int i = 20; i < totalDocCount; i++) { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) @@ -274,6 +280,118 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E client().admin().indices().prepareRefresh().execute().actionGet(); assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); }, 1, TimeUnit.MINUTES); - assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 120L); + flushAndRefresh(INDEX_NAME); + waitForSearchableDocs(totalDocCount, newPrimary, replica); + verifyStoreContent(); + } + + /** + * This test verifies delayed operations during primary handoff are replayed and searchable. It does so by halting + * segment replication which is performed while holding primary indexing permits which results in queuing of + * operations during handoff. The test verifies all docs ingested are searchable on new primary. + * + */ + public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { + final String primary = internalCluster().startNode(); + createIndex(1); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + final int totalDocCount = 2000; + + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush to have segments on disk"); + client().admin().indices().prepareFlush().execute().actionGet(); + + logger.info("--> index more docs so there are ops in the transaction log"); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 10; i < 20; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + final String newPrimary = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("3") + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + ensureGreen(INDEX_NAME); + + // Get mock transport service from newPrimary, halt recovery during segment replication (during handoff) to allow indexing in + // parallel. + MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + newPrimary + )); + CountDownLatch blockSegRepLatch = new CountDownLatch(1); + CountDownLatch waitForIndexingLatch = new CountDownLatch(1); + mockTargetTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, primary), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) { + blockSegRepLatch.countDown(); + try { + waitForIndexingLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + connection.sendRequest(requestId, action, request, options); + } + ); + Thread indexingThread = new Thread(() -> { + // Wait for relocation to halt at SegRep. Ingest docs at that point. + try { + blockSegRepLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + for (int i = 20; i < totalDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute() + ); + } + waitForIndexingLatch.countDown(); + }); + + logger.info("--> relocate the shard from primary to newPrimary"); + ActionFuture relocationListener = client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, newPrimary)) + .execute(); + + // This thread first waits for recovery to halt during segment replication. After which it ingests data to ensure + // documents are queued. + indexingThread.start(); + indexingThread.join(); + relocationListener.actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + logger.info("--> verifying count"); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + flushAndRefresh(INDEX_NAME); + waitForSearchableDocs(totalDocCount, replica, newPrimary); + verifyStoreContent(); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 37f92471d0cde..431519658edd1 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -61,7 +61,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; -import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; @@ -754,7 +753,6 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta * * @param performSegRep a {@link Runnable} that is executed after operations are blocked * @param consumer a {@link Runnable} that is executed after performSegRep - * @param listener ActionListener * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation * @throws IllegalStateException if the relocation target is no longer part of the replication group * @throws InterruptedException if blocking operations is interrupted @@ -762,8 +760,7 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta public void relocated( final String targetAllocationId, final Consumer consumer, - final Consumer performSegRep, - final ActionListener listener + final Runnable performSegRep ) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { @@ -781,32 +778,28 @@ public void relocated( assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED : "in-flight operations in progress while moving shard state to relocated"; - final StepListener segRepSyncListener = new StepListener<>(); - performSegRep.accept(segRepSyncListener); - segRepSyncListener.whenComplete(r -> { - /* - * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a - * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. - */ - verifyRelocatingState(); - final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId); + performSegRep.run(); + /* + * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a + * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. + */ + verifyRelocatingState(); + final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId); + try { + consumer.accept(primaryContext); + synchronized (mutex) { + verifyRelocatingState(); + replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under + // mutex + } + } catch (final Exception e) { try { - consumer.accept(primaryContext); - synchronized (mutex) { - verifyRelocatingState(); - replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under - // mutex - } - } catch (final Exception e) { - try { - replicationTracker.abortRelocationHandoff(); - } catch (final Exception inner) { - e.addSuppressed(inner); - } - throw e; + replicationTracker.abortRelocationHandoff(); + } catch (final Exception inner) { + e.addSuppressed(inner); } - listener.onResponse(null); - }, listener::onFailure); + throw e; + } }); } catch (TimeoutException e) { logger.warn("timed out waiting for relocation hand-off to complete"); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 276821dfb09b4..4f3e68aba16f3 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -818,34 +818,24 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis logger ); - final StepListener handoffListener = new StepListener<>(); if (request.isPrimaryRelocation()) { logger.trace("performing relocation hand-off"); - final Consumer forceSegRepConsumer = shard.indexSettings().isSegRepEnabled() + final Runnable forceSegRepRunnable = shard.indexSettings().isSegRepEnabled() ? recoveryTarget::forceSegmentFileSync - : res -> res.onResponse(null); + : () -> {}; // TODO: make relocated async // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done cancellableThreads.execute( - () -> shard.relocated( - request.targetAllocationId(), - recoveryTarget::handoffPrimaryContext, - forceSegRepConsumer, - handoffListener - ) + () -> shard.relocated(request.targetAllocationId(), recoveryTarget::handoffPrimaryContext, forceSegRepRunnable) ); /* * if the recovery process fails after disabling primary mode on the source shard, both relocation source and * target are failed (see {@link IndexShard#updateRoutingEntry}). */ - } else { - handoffListener.onResponse(null); } - handoffListener.whenComplete(res -> { - stopWatch.stop(); - logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); - listener.onResponse(null); - }, listener::onFailure); + stopWatch.stop(); + logger.info("finalizing recovery took [{}]", stopWatch.totalTime()); + listener.onResponse(null); }, listener::onFailure); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index c8cc5c4409e6b..7d63b4a67a694 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -217,7 +217,7 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { + public void forceSegmentFileSync() { throw new UnsupportedOperationException("Method not supported on target!"); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java index ef0d4abc44c7d..b43253c32d844 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java @@ -58,9 +58,8 @@ public interface RecoveryTargetHandler extends FileChunkWriter { * * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files * conflict with replicas when target is promoted as primary. - * @param listener segment replication event listener */ - void forceSegmentFileSync(ActionListener listener); + void forceSegmentFileSync(); /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index 5f638103a021c..a8dd083ba838e 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -192,16 +192,17 @@ public void indexTranslogOperations( * * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files * conflict with replicas when target is promoted as primary. - * @param listener segment replication event listener */ @Override - public void forceSegmentFileSync(ActionListener listener) { - final String action = SegmentReplicationTargetService.Actions.FORCE_SYNC; + public void forceSegmentFileSync() { final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); - final ForceSyncRequest request = new ForceSyncRequest(requestSeqNo, recoveryId, shardId); - final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; - final ActionListener responseListener = ActionListener.map(listener, r -> null); - retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); + transportService.submitRequest( + targetNode, + SegmentReplicationTargetService.Actions.FORCE_SYNC, + new ForceSyncRequest(requestSeqNo, recoveryId, shardId), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), + EmptyTransportResponseHandler.INSTANCE_SAME + ).txGet(); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index b63b84a5c1eab..750e7629783e7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -22,7 +22,6 @@ import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.indices.RunUnderPrimaryPermit; import org.opensearch.indices.recovery.DelayRecoveryException; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.MultiChunkTransfer; @@ -138,23 +137,12 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene ); }; - RunUnderPrimaryPermit.run(() -> { - final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); - ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); - if (targetShardRouting == null) { - logger.debug( - "delaying replication of {} as it is not listed as assigned to target node {}", - shard.shardId(), - targetNode - ); - throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); - } - }, - shard.shardId() + " validating recovery target [" + request.getTargetAllocationId() + "] registered ", - shard, - cancellableThreads, - logger - ); + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); + if (targetShardRouting == null) { + logger.debug("delaying replication of {} as it is not listed as assigned to target node {}", shard.shardId(), targetNode); + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } final StepListener sendFileStep = new StepListener<>(); Set storeFiles = new HashSet<>(Arrays.asList(shard.store().directory().listAll())); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index e7df266ca7133..0abbcd1595bb1 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -50,7 +50,6 @@ import org.apache.lucene.util.Constants; import org.junit.Assert; import org.opensearch.Assertions; -import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -939,12 +938,6 @@ public void onResponse(final Releasable releasable) { closeShards(indexShard); } - private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { - PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, ""); - return fut.get(); - } - private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); @@ -997,22 +990,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { AllocationId.newRelocation(routing.allocationId()) ); IndexShardTestCase.updateRoutingEntry(indexShard, routing); - indexShard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(indexShard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); engineClosed = false; break; } @@ -2025,22 +2003,7 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { Thread recoveryThread = new Thread(() -> { latch.countDown(); try { - shard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -2075,18 +2038,7 @@ public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { shard.relocated( routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> relocationStarted.countDown(), - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } + () -> {} ); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -2175,26 +2127,11 @@ public void run() { AtomicBoolean relocated = new AtomicBoolean(); final Thread recoveryThread = new Thread(() -> { try { - shard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - relocated.set(true); - } - - @Override - public void onFailure(Exception e) { - relocated.set(false); - fail(e.toString()); - } - } - ); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } + relocated.set(true); }); // ensure we wait for all primary operation locks to be acquired allPrimaryOperationLocksAcquired.await(); @@ -2225,48 +2162,25 @@ public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedE final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting routing = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, routing); - shard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting)); closeShards(shard); } - public void testRelocatedSegRepConsumerError() throws IOException, InterruptedException { + public void testRelocatedSegRepError() throws IOException, InterruptedException { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting routing = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, routing); - shard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onFailure(new ReplicationFailedException("Segment replication failed")), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - fail("Expected failure"); - } - - @Override - public void onFailure(Exception e) { - assertTrue(ExceptionsHelper.unwrapCause(e) instanceof ReplicationFailedException); - assertEquals(e.getMessage(), "Segment replication failed"); - } - } + ReplicationFailedException segRepException = expectThrows( + ReplicationFailedException.class, + () -> shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + () -> { throw new ReplicationFailedException("Segment replication failed"); } + ) ); + assertTrue(segRepException.getMessage().equals("Segment replication failed")); closeShards(shard); } @@ -2276,21 +2190,9 @@ public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOE final ShardRouting relocationRouting = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, relocationRouting); IndexShardTestCase.updateRoutingEntry(shard, originalRouting); - shard.relocated( - relocationRouting.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - fail("IllegalIndexShardStateException expected!"); - } - - @Override - public void onFailure(Exception e) { - assertTrue(ExceptionsHelper.unwrapCause(e) instanceof IllegalIndexShardStateException); - } - } + expectThrows( + IllegalIndexShardStateException.class, + () -> shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}) ); closeShards(shard); } @@ -2305,28 +2207,13 @@ public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, Thread relocationThread = new Thread(new AbstractRunnable() { @Override public void onFailure(Exception e) { - fail(e.toString()); + relocationException.set(e); } @Override protected void doRun() throws Exception { cyclicBarrier.await(); - shard.relocated( - relocationRouting.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - relocationException.set(e); - } - } - ); + shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); } }); relocationThread.start(); @@ -2374,48 +2261,22 @@ public void testRelocateMissingTarget() throws Exception { final ShardRouting toNode2 = ShardRoutingHelper.relocate(original, "node_2"); IndexShardTestCase.updateRoutingEntry(shard, toNode2); final AtomicBoolean relocated = new AtomicBoolean(); - shard.relocated( - toNode1.getTargetRelocatingShard().allocationId().getId(), - ctx -> relocated.set(true), - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - fail("Expected IllegalStateException!"); - } - @Override - public void onFailure(Exception e) { - assertTrue(ExceptionsHelper.unwrapCause(e) instanceof IllegalStateException); - assertThat( - e.getMessage(), - equalTo( - "relocation target [" - + toNode1.getTargetRelocatingShard().allocationId().getId() - + "] is no longer part of the replication group" - ) - ); - } - } + final IllegalStateException error = expectThrows( + IllegalStateException.class, + () -> shard.relocated(toNode1.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true), () -> {}) + ); + assertThat( + error.getMessage(), + equalTo( + "relocation target [" + + toNode1.getTargetRelocatingShard().allocationId().getId() + + "] is no longer part of the replication group" + ) ); assertFalse(relocated.get()); - shard.relocated( - toNode2.getTargetRelocatingShard().allocationId().getId(), - ctx -> relocated.set(true), - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(relocated.get()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + shard.relocated(toNode2.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true), () -> {}); assertTrue(relocated.get()); closeShards(shard); } @@ -2808,22 +2669,7 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc assertThat(shard.state(), equalTo(IndexShardState.STARTED)); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting); - shard.relocated( - inRecoveryRouting.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + shard.relocated(inRecoveryRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); assertTrue(shard.isRelocatedPrimary()); try { IndexShardTestCase.updateRoutingEntry(shard, origRouting); @@ -2857,27 +2703,13 @@ public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throw IndexShardTestCase.updateRoutingEntry(indexShard, routing); assertTrue(indexShard.isSyncNeeded()); try { - indexShard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - assertTrue(indexShard.isRelocatedPrimary()); - assertFalse(indexShard.isSyncNeeded()); - assertFalse(indexShard.getReplicationTracker().isPrimaryMode()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } + assertTrue(indexShard.isRelocatedPrimary()); + assertFalse(indexShard.isSyncNeeded()); + assertFalse(indexShard.getReplicationTracker().isPrimaryMode()); closeShards(indexShard); } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 44771faf36871..54e8682f90d22 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -15,8 +15,10 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingHelper; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.ClusterSettings; @@ -54,17 +56,21 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import static java.util.Arrays.asList; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -307,15 +313,12 @@ public void testPrimaryRelocation() throws Exception { ); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); - BiFunction, ActionListener, List> replicatePrimaryFunction = ( - shardList, - listener) -> { + Function, List> replicatePrimaryFunction = (shardList) -> { try { assert shardList.size() >= 2; final IndexShard primary = shardList.get(0); - return replicateSegments(primary, shardList.subList(1, shardList.size()), listener); + return replicateSegments(primary, shardList.subList(1, shardList.size())); } catch (IOException | InterruptedException e) { - listener.onFailure(e); throw new RuntimeException(e); } }; @@ -347,11 +350,12 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { ); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); - BiFunction, ActionListener, List> replicatePrimaryFunction = ( - shardList, - listener) -> { - listener.onFailure(new IOException("Expected failure")); - return null; + Function, List> replicatePrimaryFunction = (shardList) -> { + try { + throw new IOException("Expected failure"); + } catch (IOException e) { + throw new RuntimeException(e); + } }; Exception e = expectThrows( Exception.class, @@ -366,7 +370,7 @@ public void onDone(ReplicationState state) { @Override public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("Expected failure")); + assertEquals(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), "Expected failure"); } }), true, @@ -374,10 +378,119 @@ public void onFailure(ReplicationState state, ReplicationFailedException e, bool replicatePrimaryFunction ) ); - assertThat(e, hasToString(containsString("Expected failure"))); closeShards(primarySource, primaryTarget); } + // Todo: Remove this test when there is a better mechanism to test a functionality passing in different replication + // strategy. + public void testLockingBeforeAndAfterRelocated() throws Exception { + final IndexShard shard = newStartedShard(true, settings); + final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, routing); + CountDownLatch latch = new CountDownLatch(1); + Thread recoveryThread = new Thread(() -> { + latch.countDown(); + try { + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) { + // start finalization of recovery + recoveryThread.start(); + latch.await(); + // recovery can only be finalized after we release the current primaryOperationLock + assertFalse(shard.isRelocatedPrimary()); + } + // recovery can be now finalized + recoveryThread.join(); + assertTrue(shard.isRelocatedPrimary()); + final ExecutionException e = expectThrows(ExecutionException.class, () -> acquirePrimaryOperationPermitBlockingly(shard)); + assertThat(e.getCause(), instanceOf(ShardNotInPrimaryModeException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + + closeShards(shard); + } + + // Todo: Remove this test when there is a better mechanism to test a functionality passing in different replication + // strategy. + public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { + final IndexShard shard = newStartedShard(true, settings); + final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, routing); + final CountDownLatch startRecovery = new CountDownLatch(1); + final CountDownLatch relocationStarted = new CountDownLatch(1); + Thread recoveryThread = new Thread(() -> { + try { + startRecovery.await(); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> relocationStarted.countDown(), + () -> {} + ); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + recoveryThread.start(); + + final int numberOfAcquisitions = randomIntBetween(1, 10); + final List assertions = new ArrayList<>(numberOfAcquisitions); + final int recoveryIndex = randomIntBetween(0, numberOfAcquisitions - 1); + + for (int i = 0; i < numberOfAcquisitions; i++) { + final PlainActionFuture onLockAcquired; + if (i < recoveryIndex) { + final AtomicBoolean invoked = new AtomicBoolean(); + onLockAcquired = new PlainActionFuture() { + + @Override + public void onResponse(Releasable releasable) { + invoked.set(true); + releasable.close(); + super.onResponse(releasable); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(); + } + + }; + assertions.add(() -> assertTrue(invoked.get())); + } else if (recoveryIndex == i) { + startRecovery.countDown(); + relocationStarted.await(); + onLockAcquired = new PlainActionFuture<>(); + assertions.add(() -> { + final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); + assertThat(e.getCause(), instanceOf(ShardNotInPrimaryModeException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + }); + } else { + onLockAcquired = new PlainActionFuture<>(); + assertions.add(() -> { + final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); + assertThat(e.getCause(), instanceOf(ShardNotInPrimaryModeException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + }); + } + + shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.WRITE, "i_" + i); + } + + for (final Runnable assertion : assertions) { + assertion.run(); + } + + recoveryThread.join(); + + closeShards(shard); + } + public void testReplicaReceivesLowerGeneration() throws Exception { // when a replica gets incoming segments that are lower than what it currently has on disk. diff --git a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index ba54d3eb3dba8..307cb854f000a 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -1117,7 +1117,7 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) {} @Override - public void forceSegmentFileSync(ActionListener listener) {} + public void forceSegmentFileSync() {} @Override public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) {} diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index ad19473380063..6b0f50d80fba2 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -545,7 +545,7 @@ public void recoverReplica( markAsRecovering, inSyncIds, routingTable, - (a, b) -> null + (a) -> null ); OpenSearchIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); computeReplicationTargets(); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 3ae79a8a17879..954e36bb38116 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -66,6 +66,7 @@ import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -150,11 +151,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import static org.hamcrest.Matchers.contains; @@ -250,6 +253,12 @@ protected Store createStore(ShardId shardId, IndexSettings indexSettings, Direct return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); } + protected Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { + PlainActionFuture fut = new PlainActionFuture<>(); + indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, ""); + return fut.get(); + } + /** * Creates a new initializing shard. The shard will have its own unique data path. * @@ -838,7 +847,7 @@ protected DiscoveryNode getFakeDiscoNode(String id) { } protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { - recoverReplica(replica, primary, startReplica, (a, b) -> null); + recoverReplica(replica, primary, startReplica, (a) -> null); } /** recovers a replica from the given primary **/ @@ -846,7 +855,7 @@ protected void recoverReplica( IndexShard replica, IndexShard primary, boolean startReplica, - BiFunction, ActionListener, List> replicatePrimaryFunction + Function, List> replicatePrimaryFunction ) throws IOException { recoverReplica( replica, @@ -865,7 +874,7 @@ protected void recoverReplica( final boolean markAsRecovering, final boolean markAsStarted ) throws IOException { - recoverReplica(replica, primary, targetSupplier, markAsRecovering, markAsStarted, (a, b) -> null); + recoverReplica(replica, primary, targetSupplier, markAsRecovering, markAsStarted, (a) -> null); } /** recovers a replica from the given primary **/ @@ -875,7 +884,7 @@ protected void recoverReplica( final BiFunction targetSupplier, final boolean markAsRecovering, final boolean markAsStarted, - final BiFunction, ActionListener, List> replicatePrimaryFunction + final Function, List> replicatePrimaryFunction ) throws IOException { IndexShardRoutingTable.Builder newRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId()); newRoutingTable.addShard(primary.routingEntry()); @@ -908,7 +917,7 @@ protected final void recoverUnstartedReplica( final boolean markAsRecovering, final Set inSyncIds, final IndexShardRoutingTable routingTable, - final BiFunction, ActionListener, List> replicatePrimaryFunction + final Function, List> replicatePrimaryFunction ) throws IOException { final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); @@ -1319,11 +1328,8 @@ public void getSegmentFiles( * @param replicaShards - Replicas that will be updated. * @return {@link List} List of target components orchestrating replication. */ - public final List replicateSegments( - IndexShard primaryShard, - List replicaShards, - ActionListener... listeners - ) throws IOException, InterruptedException { + public final List replicateSegments(IndexShard primaryShard, List replicaShards) + throws IOException, InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); Map primaryMetadata; try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { @@ -1346,13 +1352,7 @@ public void onReplicationDone(SegmentReplicationState state) { assertTrue(recoveryDiff.missing.isEmpty()); assertTrue(recoveryDiff.different.isEmpty()); assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); - for (ActionListener listener : listeners) { - listener.onResponse(null); - } } catch (Exception e) { - for (ActionListener listener : listeners) { - listener.onFailure(e); - } throw ExceptionsHelper.convertToRuntime(e); } finally { countDownLatch.countDown(); diff --git a/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java index 45b11c95b4102..fa8b3c9e3a2c3 100644 --- a/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java @@ -46,7 +46,7 @@ import java.util.List; import java.util.concurrent.Executor; -import java.util.function.BiFunction; +import java.util.function.Function; /** * Wraps a {@link RecoveryTarget} to make all remote calls to be executed asynchronously using the provided {@code executor}. @@ -59,14 +59,14 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler { private final IndexShard replica; - private final BiFunction, ActionListener, List> replicatePrimaryFunction; + private final Function, List> replicatePrimaryFunction; public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) { this.executor = executor; this.target = target; this.primary = null; this.replica = null; - this.replicatePrimaryFunction = (a, b) -> null; + this.replicatePrimaryFunction = (a) -> null; } public AsyncRecoveryTarget( @@ -74,7 +74,7 @@ public AsyncRecoveryTarget( Executor executor, IndexShard primary, IndexShard replica, - BiFunction, ActionListener, List> replicatePrimaryFunction + Function, List> replicatePrimaryFunction ) { this.executor = executor; this.target = target; @@ -89,8 +89,8 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { - executor.execute(() -> this.replicatePrimaryFunction.apply(List.of(primary, replica), listener)); + public void forceSegmentFileSync() { + this.replicatePrimaryFunction.apply(List.of(primary, replica)); } @Override