From 334c5972caf8f176f2f1eadce88e532d5d3473be Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 26 Jan 2023 20:46:52 -0700 Subject: [PATCH] Fix flaky SegmentReplicationITs. (#6015) * Fix flaky SegmentReplicationITs. This change fixes flakiness with segment replication ITs. It does this by updating the wait condition used to ensure replicas are up to date to wait until a searched docCount is reached instead of output of the Segments API that can change if there are concurrent refreshes. It also does this by updating the method used to assert segment stats to wait until the assertion holds true rather than at a point in time. This method is also updated to assert store metadata directly over API output. Signed-off-by: Marc Handalian * Fix error message to print expected and actual doc counts. Signed-off-by: Marc Handalian * PR feedback. Signed-off-by: Marc Handalian * spotless. Signed-off-by: Marc Handalian Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 325 ++++++------------ .../opensearch/index/shard/IndexShard.java | 13 + .../replication/SegmentReplicationTarget.java | 16 +- .../SegmentReplicationTargetTests.java | 8 +- 4 files changed, 133 insertions(+), 229 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index a1be794bf1191..5fefe067e3b62 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -11,16 +11,14 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.action.admin.indices.segments.IndexShardSegments; -import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; -import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; -import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; @@ -32,8 +30,9 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; -import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; @@ -44,7 +43,6 @@ import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; -import java.io.IOException; import java.util.Collection; import java.util.Arrays; import java.util.List; @@ -53,9 +51,9 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; +import static java.util.Arrays.asList; import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -71,7 +69,7 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); + return asList(MockTransportService.TestPlugin.class); } @Override @@ -110,11 +108,9 @@ public void ingestDocs(int docCount) throws Exception { indexer.start(docCount); waitForDocs(docCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); } } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -125,9 +121,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + waitForSearchableDocs(1, primary, replica); // index another doc but don't refresh, we will ensure this is searchable once replica is promoted. client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); @@ -139,6 +133,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); assertNotNull(replicaShardRouting); assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); + refresh(INDEX_NAME); assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); // assert we can index into the new primary. @@ -150,13 +145,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeC).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(4, nodeC, replica); + verifyStoreContent(); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testRestartPrimary() throws Exception { final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -170,8 +162,7 @@ public void testRestartPrimary() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertDocCounts(initialDocCount, replica, primary); + waitForSearchableDocs(initialDocCount, replica, primary); internalCluster().restartNode(primary); ensureGreen(INDEX_NAME); @@ -179,13 +170,10 @@ public void testRestartPrimary() throws Exception { assertEquals(getNodeContainingPrimaryShard().getName(), replica); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertDocCounts(initialDocCount, replica, primary); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(initialDocCount, replica, primary); + verifyStoreContent(); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testCancelPrimaryAllocation() throws Exception { // this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica. final String primary = internalCluster().startNode(); @@ -199,10 +187,9 @@ public void testCancelPrimaryAllocation() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertDocCounts(initialDocCount, replica, primary); + waitForSearchableDocs(initialDocCount, replica, primary); - final IndexShard indexShard = getIndexShard(primary); + final IndexShard indexShard = getIndexShard(primary, INDEX_NAME); client().admin() .cluster() .prepareReroute() @@ -214,15 +201,13 @@ public void testCancelPrimaryAllocation() throws Exception { assertEquals(getNodeContainingPrimaryShard().getName(), replica); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertDocCounts(initialDocCount, replica, primary); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(initialDocCount, replica, primary); + verifyStoreContent(); } /** * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. - * + *

* TODO: Ignoring this test as its flaky and needs separate fix */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") @@ -234,6 +219,7 @@ public void testAddNewReplicaFailure() throws Exception { prepareCreate( INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ).get(); logger.info("--> index 10 docs"); @@ -292,7 +278,6 @@ public void testAddNewReplicaFailure() throws Exception { assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); @@ -314,10 +299,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; @@ -325,12 +307,10 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { waitForDocs(expectedHitCount, indexer); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + verifyStoreContent(); } } @@ -355,12 +335,8 @@ public void testIndexReopenClose() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); flush(INDEX_NAME); - waitForReplicaUpdate(); + waitForSearchableDocs(initialDocCount, primary, replica); } - - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - logger.info("--> Closing the index "); client().admin().indices().prepareClose(INDEX_NAME).get(); @@ -368,8 +344,8 @@ public void testIndexReopenClose() throws Exception { client().admin().indices().prepareOpen(INDEX_NAME).get(); ensureGreen(INDEX_NAME); - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, primary, replica); + verifyStoreContent(); } public void testMultipleShards() throws Exception { @@ -400,10 +376,7 @@ public void testMultipleShards() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; @@ -411,12 +384,10 @@ public void testMultipleShards() throws Exception { waitForDocs(expectedHitCount, indexer); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + verifyStoreContent(); } } @@ -444,24 +415,17 @@ public void testReplicationAfterForceMerge() throws Exception { waitForDocs(initialDocCount, indexer); flush(INDEX_NAME); - waitForReplicaUpdate(); - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); // Index a second set of docs so we can merge into one segment. indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - - ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + verifyStoreContent(); } } @@ -476,7 +440,7 @@ public void testCancellation() throws Exception { SegmentReplicationSourceService.class, primaryNode ); - final IndexShard primaryShard = getIndexShard(primaryNode); + final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); CountDownLatch latch = new CountDownLatch(1); @@ -557,10 +521,10 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); + waitForSearchableDocs(3, primaryNode, replicaNode); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); - assertSegmentStats(REPLICA_COUNT); + verifyStoreContent(); } public void testDeleteOperations() throws Exception { @@ -584,20 +548,13 @@ public void testDeleteOperations() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); @@ -606,32 +563,18 @@ public void testDeleteOperations() throws Exception { client(nodeA).prepareDelete(INDEX_NAME, id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertBusy(() -> { - final long nodeA_Count = client(nodeA).prepareSearch(INDEX_NAME) - .setSize(0) - .setPreference("_only_local") - .get() - .getHits() - .getTotalHits().value; - assertEquals(expectedHitCount - 1, nodeA_Count); - final long nodeB_Count = client(nodeB).prepareSearch(INDEX_NAME) - .setSize(0) - .setPreference("_only_local") - .get() - .getHits() - .getTotalHits().value; - assertEquals(expectedHitCount - 1, nodeB_Count); - }, 5, TimeUnit.SECONDS); + waitForSearchableDocs(expectedHitCount - 1, nodeA, nodeB); + verifyStoreContent(); } } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testUpdateOperations() throws Exception { - final String primary = internalCluster().startNode(); + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureYellow(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); try ( @@ -648,20 +591,13 @@ public void testUpdateOperations() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, asList(primary, replica)); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); - waitForReplicaUpdate(); - - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, asList(primary, replica)); Set ids = indexer.getIds(); String id = ids.toArray()[0].toString(); @@ -673,69 +609,24 @@ public void testUpdateOperations() throws Exception { assertEquals(2, updateResponse.getVersion()); refresh(INDEX_NAME); - waitForReplicaUpdate(); + verifyStoreContent(); assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); - - } - } - - private void assertSegmentStats(int numberOfReplicas) throws IOException { - final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); - - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - - // There will be an entry in the list for each index. - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - - // Separate Primary & replica shards ShardSegments. - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegments = segmentListMap.get(false); - - assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - - assertEquals( - "There should be a ShardSegment entry for each replica in the replicationGroup", - numberOfReplicas, - replicaShardSegments.size() - ); - - for (ShardSegments shardSegment : replicaShardSegments) { - final Map latestReplicaSegments = getLatestSegments(shardSegment); - for (Segment replicaSegment : latestReplicaSegments.values()) { - final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName()); - assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration()); - assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs()); - assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs()); - assertEquals(replicaSegment.getSize(), primarySegment.getSize()); - } - - // Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point. - // This ensures the previous commit point is not wiped. - final ShardRouting replicaShardRouting = shardSegment.getShardRouting(); - ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); - final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); - IndexShard indexShard = getIndexShard(replicaNode.getName()); - // calls to readCommit will fail if a valid commit point and all its segments are not in the store. - indexShard.store().readLastCommittedSegmentsInfo(); - } } } public void testDropPrimaryDuringReplication() throws Exception { + final int replica_count = 6; final Settings settings = Settings.builder() .put(indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replica_count) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, settings); - internalCluster().startDataOnlyNodes(6); + final List dataNodes = internalCluster().startDataOnlyNodes(6); ensureGreen(INDEX_NAME); int initialDocCount = scaledRandomIntBetween(100, 200); @@ -758,85 +649,93 @@ public void testDropPrimaryDuringReplication() throws Exception { ensureYellow(INDEX_NAME); // start another replica. - internalCluster().startDataOnlyNode(); + dataNodes.add(internalCluster().startDataOnlyNode()); ensureGreen(INDEX_NAME); // index another doc and refresh - without this the new replica won't catch up. - client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + String docId = String.valueOf(initialDocCount + 1); + client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get(); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertSegmentStats(6); + waitForSearchableDocs(initialDocCount + 1, dataNodes); + verifyStoreContent(); } } /** - * Waits until the replica is caught up to the latest primary segments gen. - * @throws Exception if assertion fails + * Waits until all given nodes have at least the expected docCount. + * + * @param docCount - Expected Doc count. + * @param nodes - List of node names. */ - private void waitForReplicaUpdate() throws Exception { + private void waitForSearchableDocs(long docCount, List nodes) throws Exception { // wait until the replica has the latest segment generation. assertBusy(() -> { - final IndicesSegmentResponse indicesSegmentResponse = client().admin() - .indices() - .segments(new IndicesSegmentsRequest()) - .actionGet(); - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegments = segmentListMap.get(false); - // if we don't have any segments yet, proceed. - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); - if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) { - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); - for (ShardSegments shardSegments : replicaShardSegments) { - logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments()); - final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() - .stream() - .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); - assertTrue(isReplicaCaughtUpToPrimary); - } + for (String node : nodes) { + final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + final long hits = response.getHits().getTotalHits().value; + if (hits < docCount) { + fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits); } } - }); + }, 1, TimeUnit.MINUTES); } - private IndexShard getIndexShard(String node) { - final Index index = resolveIndex(INDEX_NAME); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - IndexService indexService = indicesService.indexServiceSafe(index); - final Optional shardId = indexService.shardIds().stream().findFirst(); - return indexService.getShard(shardId.get()); + private void waitForSearchableDocs(long docCount, String... nodes) throws Exception { + waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList())); } - private List getShardSegments(IndicesSegmentResponse indicesSegmentResponse) { - return indicesSegmentResponse.getIndices() - .values() - .stream() // get list of IndexSegments - .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group - .map(IndexShardSegments::getShards) // get list of segments across replication group - .collect(Collectors.toList()); + private void verifyStoreContent() throws Exception { + assertBusy(() -> { + final ClusterState clusterState = getClusterState(); + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { + final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); + final String indexName = primaryRouting.getIndexName(); + final List replicaRouting = shardRoutingTable.replicaShards(); + final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName); + final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); + for (ShardRouting replica : replicaRouting) { + IndexShard replicaShard = getIndexShard(clusterState, replica, indexName); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( + primarySegmentMetadata, + replicaShard.getSegmentMetadataMap() + ); + if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { + fail( + "Expected no missing or different segments between primary and replica but diff was missing: " + + recoveryDiff.missing + + " Different: " + + recoveryDiff.different + + " Primary Replication Checkpoint : " + + primaryShard.getLatestReplicationCheckpoint() + + " Replica Replication Checkpoint: " + + replicaShard.getLatestReplicationCheckpoint() + ); + } + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + replicaShard.store().readLastCommittedSegmentsInfo(); + } + } + } + }, 1, TimeUnit.MINUTES); } - private Map getLatestSegments(ShardSegments segments) { - final Optional generation = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare); - final Long latestPrimaryGen = generation.get(); - return segments.getSegments() - .stream() - .filter(s -> s.getGeneration() == latestPrimaryGen) - .collect(Collectors.toMap(Segment::getName, Function.identity())); + private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) { + return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName); } - private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { - return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); + private IndexShard getIndexShard(String node, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return indexService.getShard(shardId.get()); } @Nullable private ShardRouting getShardRoutingForNodeName(String nodeName) { - final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + final ClusterState state = getClusterState(); for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) { for (ShardRouting shardRouting : shardRoutingTable.activeShards()) { final String nodeId = shardRouting.currentNodeId(); @@ -855,8 +754,12 @@ private void assertDocCounts(int expectedDocCount, String... nodeNames) { } } + private ClusterState getClusterState() { + return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + } + private DiscoveryNode getNodeContainingPrimaryShard() { - final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + final ClusterState state = getClusterState(); final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard(); return state.nodes().resolveNode(primaryShard.currentNodeId()); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b3b9209f2032e..9efe0ea0f5320 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1558,6 +1558,19 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { } } + /** + * Fetch a map of StoreFileMetadata for each segment from the latest SegmentInfos. + * This is used to compute diffs for segment replication. + * + * @return - Map of Segment Filename to its {@link StoreFileMetadata} + * @throws IOException - When there is an error loading metadata from the store. + */ + public Map getSegmentMetadataMap() throws IOException { + try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { + return store.getSegmentMetadataMap(snapshot.get()); + } + } + /** * Fails the shard and marks the shard store as corrupted if * e is caused by index corruption diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 1a73b1ca4fb4b..308fe73d09862 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -22,7 +22,6 @@ import org.opensearch.action.StepListener; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.shard.IndexShard; @@ -38,8 +37,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collections; -import java.util.Map; /** * Represents the target of a replication event. @@ -173,8 +170,8 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener getMetadataMap() throws IOException { - if (indexShard.getSegmentInfosSnapshot() == null) { - return Collections.emptyMap(); - } - try (final GatedCloseable snapshot = indexShard.getSegmentInfosSnapshot()) { - return store.getSegmentMetadataMap(snapshot.get()); - } - } - @Override protected void onCancel(String reason) { cancellableThreads.cancel(reason); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 002f745fabda2..6970a8a53b9e6 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -363,8 +363,8 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + when(spyIndexShard.getSegmentMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) { @@ -415,8 +415,8 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) {