From 0a52f6bd58866270d44f27fd6ab74183f67ba6b5 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Sun, 9 Jul 2023 15:20:59 -0700 Subject: [PATCH 1/7] [Segment Replication] Add logic back to update tracking replication checkpoint on source Signed-off-by: Suraj Singh --- .../OngoingSegmentReplications.java | 6 +----- .../SegmentReplicationSourceHandler.java | 20 +++++++++++++++++++ .../SegmentReplicationTargetService.java | 5 +++++ 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 050a66bedcf5d..f32887175d4f3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -120,11 +120,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener listener) { + // Short circuit when no files to transfer + if (request.getFilesToFetch().isEmpty()) { + // before completion, alert the primary of the replica's state. + this.updateVisibleCheckpointForShard(request.getTargetAllocationId(), copyState.getCheckpoint()); + listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + return; + } + final ReplicationTimer timer = new ReplicationTimer(); if (isReplicating.compareAndSet(false, true) == false) { throw new OpenSearchException("Replication to {} is already running.", shard.shardId()); @@ -159,6 +169,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene sendFileStep.whenComplete(r -> { try { + this.updateVisibleCheckpointForShard(allocationId, copyState.getCheckpoint()); future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); } finally { IOUtils.close(resources); @@ -176,6 +187,15 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene } } + // Replication checkpoint update for remote store indices happens via subsequent UPDATE_VISIBLE_CHECKPOINT transport call . + // For node-node, checkpoint update is done as part of this i.e. GET_SEGMENT_FILES call. + private void updateVisibleCheckpointForShard(String allocationId, ReplicationCheckpoint replicationCheckpoint) { + if (shard.indexSettings().isRemoteStoreEnabled() == false) { + // update visible checkpoint to primary + shard.updateVisibleCheckpointForShard(allocationId, replicationCheckpoint); + } + } + /** * Cancels the replication and interrupts all eligible threads. */ diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index a7e0c0ec887ab..2707ec26f15ab 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -292,6 +292,11 @@ public void onReplicationFailure( } protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) { + // Update replication checkpoint on source via transport call only supported for remote store integration. For node- + // node communication, checkpoint update is piggy-backed to GET_SEGMENT_FILES transport call + if (replicaShard.indexSettings().isRemoteStoreEnabled() == false) { + return; + } ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard(); final UpdateVisibleCheckpointRequest request = new UpdateVisibleCheckpointRequest( From bd015d143c5ae47cce031c723d0f6c155a70d0ef Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Sun, 9 Jul 2023 15:28:30 -0700 Subject: [PATCH 2/7] Update comment Signed-off-by: Suraj Singh --- .../indices/replication/SegmentReplicationSourceHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index 7417e44a103e5..cb9f67c651d0c 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -187,8 +187,8 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene } } - // Replication checkpoint update for remote store indices happens via subsequent UPDATE_VISIBLE_CHECKPOINT transport call . - // For node-node, checkpoint update is done as part of this i.e. GET_SEGMENT_FILES call. + // Update target replication checkpoint on source for node-node communication. For remote store enabled indices, checkpoint + // update on source is performed via separate UPDATE_VISIBLE_CHECKPOINT transport call private void updateVisibleCheckpointForShard(String allocationId, ReplicationCheckpoint replicationCheckpoint) { if (shard.indexSettings().isRemoteStoreEnabled() == false) { // update visible checkpoint to primary From b84762d9633c1c2bb47274a9528cc9b112178c1e Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 10 Jul 2023 12:02:17 -0700 Subject: [PATCH 3/7] Address review comments & mute breaking bwc-test Signed-off-by: Suraj Singh --- .../java/org/opensearch/upgrades/IndexingIT.java | 1 + .../SegmentReplicationSourceHandler.java | 13 ++----------- .../SegmentReplicationSourceHandlerTests.java | 7 +++++-- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index 93c0bc96a5183..b60ee09d39048 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -247,6 +247,7 @@ public void testIndexing() throws IOException, ParseException { * * @throws Exception */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8322") public void testIndexingWithSegRep() throws Exception { if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index cb9f67c651d0c..8df075f1ea3f9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -112,7 +112,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene // Short circuit when no files to transfer if (request.getFilesToFetch().isEmpty()) { // before completion, alert the primary of the replica's state. - this.updateVisibleCheckpointForShard(request.getTargetAllocationId(), copyState.getCheckpoint()); + shard.updateVisibleCheckpointForShard(request.getTargetAllocationId(), copyState.getCheckpoint()); listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); return; } @@ -169,7 +169,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene sendFileStep.whenComplete(r -> { try { - this.updateVisibleCheckpointForShard(allocationId, copyState.getCheckpoint()); + shard.updateVisibleCheckpointForShard(allocationId, copyState.getCheckpoint()); future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); } finally { IOUtils.close(resources); @@ -187,15 +187,6 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene } } - // Update target replication checkpoint on source for node-node communication. For remote store enabled indices, checkpoint - // update on source is performed via separate UPDATE_VISIBLE_CHECKPOINT transport call - private void updateVisibleCheckpointForShard(String allocationId, ReplicationCheckpoint replicationCheckpoint) { - if (shard.indexSettings().isRemoteStoreEnabled() == false) { - // update visible checkpoint to primary - shard.updateVisibleCheckpointForShard(allocationId, replicationCheckpoint); - } - } - /** * Cancels the replication and interrupts all eligible threads. */ diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index 607f9dd91e35e..b4e9166f377ec 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -196,11 +196,13 @@ public void testReplicationAlreadyRunning() throws IOException { 1 ); + final List expectedFiles = List.of(new StoreFileMetadata("_0.si", 20, "test", Version.CURRENT.luceneVersion)); + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - Collections.emptyList(), + expectedFiles, latestReplicationCheckpoint ); @@ -224,11 +226,12 @@ public void testCancelReplication() throws IOException, InterruptedException { 1 ); + final List expectedFiles = List.of(new StoreFileMetadata("_0.si", 20, "test", Version.CURRENT.luceneVersion)); final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - Collections.emptyList(), + expectedFiles, latestReplicationCheckpoint ); From 940fe8fac6321c2286e1a64e851ed74fbf980107 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 10 Jul 2023 13:37:27 -0700 Subject: [PATCH 4/7] Spotless check Signed-off-by: Suraj Singh --- .../indices/replication/SegmentReplicationSourceHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index 8df075f1ea3f9..c054c8aa677a9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -25,7 +25,6 @@ import org.opensearch.indices.recovery.DelayRecoveryException; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.MultiChunkTransfer; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.threadpool.ThreadPool; From f0f7bb36ca885079ffbca0a56b57f8a7c58869c7 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 10 Jul 2023 13:53:58 -0700 Subject: [PATCH 5/7] Stop timer inside try to prevent double stop on timer Signed-off-by: Suraj Singh --- .../indices/replication/SegmentReplicationSourceHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index c054c8aa677a9..110fe9aafbf5f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -170,9 +170,9 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene try { shard.updateVisibleCheckpointForShard(allocationId, copyState.getCheckpoint()); future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); + timer.stop(); } finally { IOUtils.close(resources); - timer.stop(); logger.trace( "[replication id {}] Source node completed sending files to target node [{}], timing: {}", request.getReplicationId(), From 23a2f0f12a91850c8b869e662c6f4e55dbfe2550 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 10 Jul 2023 16:23:47 -0700 Subject: [PATCH 6/7] Update PressureITs to wait for appropriate transport call for replica update Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationBaseIT.java | 12 +++++++++++- .../indices/replication/SegmentReplicationIT.java | 5 ----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 52fe85b51cebd..9160351aad572 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -19,6 +19,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.settings.Settings; import org.opensearch.common.lease.Releasable; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; @@ -197,6 +198,11 @@ protected IndexShard getIndexShard(String node, String indexName) { return indexService.getShard(shardId.get()); } + protected boolean segmentReplicationWithRemoteEnabled() { + return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings()).booleanValue() + && "true".equalsIgnoreCase(featureFlagSettings().get(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL)); + } + protected Releasable blockReplication(List nodes, CountDownLatch latch) { CountDownLatch pauseReplicationLatch = new CountDownLatch(nodes.size()); for (String node : nodes) { @@ -206,7 +212,11 @@ protected Releasable blockReplication(List nodes, CountDownLatch latch) node )); mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (action.equals(SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT)) { + String actionToWaitFor = SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES; + if (segmentReplicationWithRemoteEnabled()) { + actionToWaitFor = SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT; + } + if (action.equals(actionToWaitFor)) { try { latch.countDown(); pauseReplicationLatch.await(); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 873c05843fb56..7978dd0e3f800 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -1324,9 +1324,4 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception { ensureGreen(INDEX_NAME); waitForSearchableDocs(2, nodes); } - - private boolean segmentReplicationWithRemoteEnabled() { - return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings()).booleanValue() - && "true".equalsIgnoreCase(featureFlagSettings().get(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL)); - } } From 7c50cb1f7288e335b4065a4b220d6c3008ce6b47 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 10 Jul 2023 17:06:28 -0700 Subject: [PATCH 7/7] Spotless check Signed-off-by: Suraj Singh --- .../org/opensearch/indices/replication/SegmentReplicationIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 7978dd0e3f800..099c15267c2f7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -49,7 +49,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.lease.Releasable; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationPressureService;