diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f997f892c28c..39ff4c20e7292 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - [Segment Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414)) - [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386)) - [Segment Replication] Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366)) +- [Segment Replication] Fix timeout issue by calculating time needed to process getSegmentFiles ([#4434](https://github.com/opensearch-project/OpenSearch/pull/4434)) ### Security diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index aa0b5416dd0ff..e093007408eae 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -13,11 +13,13 @@ import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.TimeValue; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RetryableTransportClient; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportService; import java.util.List; @@ -78,6 +80,17 @@ public void getSegmentFiles( ) { final Writeable.Reader reader = GetSegmentFilesResponse::new; final ActionListener responseListener = ActionListener.map(listener, r -> r); + // Few of the below assumptions and calculations are added for experimental release of segment replication feature in 2.3 + // version. These can change in upcoming releases. + + // Storing the size of files to fetch in bytes. + final long sizeOfSegmentFiles = filesToFetch.stream().mapToLong(file -> file.length()).sum(); + + // Maximum size of files to fetch (segment files) in bytes, that can be processed in 1 minute for a m5.xlarge machine. + long baseSegmentFilesSize = 100000000; + + // Formula for calculating time needed to process a replication event's files to fetch process + final long timeToGetSegmentFiles = 1 + (sizeOfSegmentFiles / baseSegmentFilesSize); final GetSegmentFilesRequest request = new GetSegmentFilesRequest( replicationId, targetAllocationId, @@ -85,7 +98,10 @@ public void getSegmentFiles( filesToFetch, checkpoint ); - transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, responseListener, reader); + final TransportRequestOptions options = TransportRequestOptions.builder() + .withTimeout(TimeValue.timeValueMinutes(timeToGetSegmentFiles)) + .build(); + transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, options, responseListener, reader); } @Override