From a6bc3f83520dbe6b15ccd807714105a06ec33959 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 1 Sep 2022 12:08:30 -0700 Subject: [PATCH 1/4] [Segment Replication] Extend FileChunkWriter to allow cancel on retryable transport client Signed-off-by: Suraj Singh --- .../java/org/opensearch/indices/recovery/FileChunkWriter.java | 2 ++ .../indices/replication/OngoingSegmentReplications.java | 2 +- .../indices/replication/SegmentReplicationSourceHandler.java | 4 ++++ .../replication/SegmentReplicationSourceHandlerTests.java | 4 ++++ 4 files changed, 11 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java index cb43af3b82e09..f1cc7b8dd1d89 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java @@ -28,4 +28,6 @@ void writeFileChunk( int totalTranslogOps, ActionListener listener ); + + default void cancel() {} } diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 828aa29192fe3..1a97d334df58f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -126,7 +126,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener Date: Thu, 1 Sep 2022 12:12:19 -0700 Subject: [PATCH 2/4] Add changelog entry Signed-off-by: Suraj Singh --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 76f134f10c29e..1fbdc8a8f8764 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225)) - Bugs for dependabot changelog verifier workflow ([#4364](https://github.com/opensearch-project/OpenSearch/pull/4364)) - Fix flaky random test `NRTReplicationEngineTests.testUpdateSegments` ([#4352](https://github.com/opensearch-project/OpenSearch/pull/4352)) +- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) From afc50309691566f63aa3932aba214bd2a808def6 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 1 Sep 2022 13:12:39 -0700 Subject: [PATCH 3/4] Address review comments Signed-off-by: Suraj Singh --- .../indices/replication/RemoteSegmentFileChunkWriter.java | 5 +++++ .../indices/replication/SegmentReplicationSourceHandler.java | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java index 05f1c9d757e5c..b3909a3c0f8df 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java @@ -122,4 +122,9 @@ public void writeFileChunk( reader ); } + + @Override + public void cancel() { + retryableTransportClient.cancel(); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index 7c3bf6f4bc7d9..b63b84a5c1eab 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -189,7 +189,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene } /** - * Cancels the recovery and interrupts all eligible threads. + * Cancels the replication and interrupts all eligible threads. */ public void cancel(String reason) { writer.cancel(); From 45428ad33ce0dbd9d69b2b690e201221a48ec604 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 1 Sep 2022 18:41:43 -0700 Subject: [PATCH 4/4] Integration test Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationIT.java | 72 ++++++++++++++++++- .../SegmentReplicationSourceHandlerTests.java | 1 - 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index a9b6787d87bdf..16e9d78b17826 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -33,17 +33,23 @@ import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.plugins.Plugin; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.Collection; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -65,6 +71,11 @@ public static void assumeFeatureFlag() { assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); } + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + @Override public Settings indexSettings() { return Settings.builder() @@ -318,6 +329,65 @@ public void testReplicationAfterForceMerge() throws Exception { } } + public void testCancellation() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); + ensureYellow(INDEX_NAME); + + final String replicaNode = internalCluster().startNode(); + + final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance( + SegmentReplicationSourceService.class, + primaryNode + ); + final IndexShard primaryShard = getIndexShard(primaryNode); + + CountDownLatch latch = new CountDownLatch(1); + + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + primaryNode + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, replicaNode), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { + FileChunkRequest req = (FileChunkRequest) request; + logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk()); + if (req.name().endsWith("cfs") && req.lastChunk()) { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + connection.sendRequest(requestId, action, request, options); + } + ); + + final int docCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(docCount); + waitForDocs(docCount, indexer); + + flush(INDEX_NAME); + } + segmentReplicationSourceService.beforeIndexShardClosed(primaryShard.shardId(), primaryShard, indexSettings()); + latch.countDown(); + assertDocCounts(docCount, primaryNode); + } + public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index b1009a769d806..5f6ec7e505805 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -36,7 +36,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.times; - public class SegmentReplicationSourceHandlerTests extends IndexShardTestCase { private final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);