diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java index da2c6e8c1b0ee..21184380d54a9 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -249,7 +249,22 @@ protected S3Repository createRepository( ClusterService clusterService, RecoverySettings recoverySettings ) { - return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) { + return new S3Repository( + metadata, + registry, + service, + clusterService, + recoverySettings, + null, + null, + null, + null, + null, + false, + null, + null, + null + ) { @Override public BlobStore blobStore() { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java new file mode 100644 index 0000000000000..136fd68223354 --- /dev/null +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Generic stats of repository-s3 plugin. + */ +public class GenericStatsMetricPublisher { + + private final AtomicLong normalPriorityQSize = new AtomicLong(); + private final AtomicInteger normalPriorityPermits = new AtomicInteger(); + private final AtomicLong lowPriorityQSize = new AtomicLong(); + private final AtomicInteger lowPriorityPermits = new AtomicInteger(); + private final long normalPriorityQCapacity; + private final int maxNormalPriorityPermits; + private final long lowPriorityQCapacity; + private final int maxLowPriorityPermits; + + public GenericStatsMetricPublisher( + long normalPriorityQCapacity, + int maxNormalPriorityPermits, + long lowPriorityQCapacity, + int maxLowPriorityPermits + ) { + this.normalPriorityQCapacity = normalPriorityQCapacity; + this.maxNormalPriorityPermits = maxNormalPriorityPermits; + this.lowPriorityQCapacity = lowPriorityQCapacity; + this.maxLowPriorityPermits = maxLowPriorityPermits; + } + + public void updateNormalPriorityQSize(long qSize) { + normalPriorityQSize.addAndGet(qSize); + } + + public void updateLowPriorityQSize(long qSize) { + lowPriorityQSize.addAndGet(qSize); + } + + public void updateNormalPermits(boolean increment) { + if (increment) { + normalPriorityPermits.incrementAndGet(); + } else { + normalPriorityPermits.decrementAndGet(); + } + } + + public void updateLowPermits(boolean increment) { + if (increment) { + lowPriorityPermits.incrementAndGet(); + } else { + lowPriorityPermits.decrementAndGet(); + } + } + + public long getNormalPriorityQSize() { + return normalPriorityQSize.get(); + } + + public int getAcquiredNormalPriorityPermits() { + return normalPriorityPermits.get(); + } + + public long getLowPriorityQSize() { + return lowPriorityQSize.get(); + } + + public int getAcquiredLowPriorityPermits() { + return lowPriorityPermits.get(); + } + + Map stats() { + final Map results = new HashMap<>(); + results.put("NormalPriorityQUtilization", (normalPriorityQSize.get() * 100) / normalPriorityQCapacity); + results.put("LowPriorityQUtilization", (lowPriorityQSize.get() * 100) / lowPriorityQCapacity); + results.put("NormalPriorityPermitsUtilization", (normalPriorityPermits.get() * 100L) / maxNormalPriorityPermits); + results.put("LowPriorityPermitsUtilization", (lowPriorityPermits.get() * 100L) / maxLowPriorityPermits); + return results; + } +} diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 14829a066ca3a..acf0c5e83a17b 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -90,6 +90,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import org.opensearch.repositories.s3.async.UploadRequest; import org.opensearch.repositories.s3.utils.HttpRangeUtils; @@ -218,7 +219,14 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp writeContext.getMetadata() ); try { - if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) { + // If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload. + // Therefore, redirecting it to slow client. + if ((uploadRequest.getWritePriority() == WritePriority.LOW + && blobStore.getLowPrioritySizeBasedBlockingQ().isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false) + || (uploadRequest.getWritePriority() != WritePriority.HIGH + && uploadRequest.getWritePriority() != WritePriority.URGENT + && blobStore.getNormalPrioritySizeBasedBlockingQ() + .isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)) { StreamContext streamContext = SocketAccess.doPrivileged( () -> writeContext.getStreamProvider(uploadRequest.getContentLength()) ); @@ -258,16 +266,30 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } else { s3AsyncClient = amazonS3Reference.get().client(); } - CompletableFuture completableFuture = blobStore.getAsyncTransferManager() - .uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher()); - completableFuture.whenComplete((response, throwable) -> { - if (throwable == null) { - completionListener.onResponse(response); - } else { - Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable; - completionListener.onFailure(ex); - } - }); + + if (writeContext.getWritePriority() == WritePriority.URGENT + || writeContext.getWritePriority() == WritePriority.HIGH + || blobStore.isPermitBackedTransferEnabled() == false) { + createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener); + } else if (writeContext.getWritePriority() == WritePriority.LOW) { + blobStore.getLowPrioritySizeBasedBlockingQ() + .produce( + new SizeBasedBlockingQ.Item( + writeContext.getFileSize(), + () -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener) + ) + ); + } else if (writeContext.getWritePriority() == WritePriority.NORMAL) { + blobStore.getNormalPrioritySizeBasedBlockingQ() + .produce( + new SizeBasedBlockingQ.Item( + writeContext.getFileSize(), + () -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener) + ) + ); + } else { + throw new IllegalStateException("Cannot perform upload for other priority types."); + } } } catch (Exception e) { logger.info("exception error from blob container for file {}", writeContext.getFileName()); @@ -275,6 +297,24 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } } + private CompletableFuture createFileCompletableFuture( + S3AsyncClient s3AsyncClient, + UploadRequest uploadRequest, + StreamContext streamContext, + ActionListener completionListener + ) { + CompletableFuture completableFuture = blobStore.getAsyncTransferManager() + .uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher()); + return completableFuture.whenComplete((response, throwable) -> { + if (throwable == null) { + completionListener.onResponse(response); + } else { + Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable; + completionListener.onFailure(ex); + } + }); + } + @ExperimentalApi @Override public void readBlobAsync(String blobName, ActionListener listener) { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java index fc70fbb0db00e..de815f9202f44 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java @@ -45,6 +45,7 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferManager; +import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import java.io.IOException; import java.util.Collections; @@ -56,6 +57,7 @@ import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING; import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE; import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING; +import static org.opensearch.repositories.s3.S3Repository.PERMIT_BACKED_TRANSFER_ENABLED; import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD; import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING; import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING; @@ -77,6 +79,8 @@ class S3BlobStore implements BlobStore { private volatile boolean uploadRetryEnabled; + private volatile boolean permitBackedTransferEnabled; + private volatile boolean serverSideEncryption; private volatile ObjectCannedACL cannedACL; @@ -94,6 +98,9 @@ class S3BlobStore implements BlobStore { private final AsyncExecutorContainer priorityExecutorBuilder; private final AsyncExecutorContainer normalExecutorBuilder; private final boolean multipartUploadEnabled; + private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; + private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; + private final GenericStatsMetricPublisher genericStatsMetricPublisher; S3BlobStore( S3Service service, @@ -109,7 +116,10 @@ class S3BlobStore implements BlobStore { AsyncTransferManager asyncTransferManager, AsyncExecutorContainer urgentExecutorBuilder, AsyncExecutorContainer priorityExecutorBuilder, - AsyncExecutorContainer normalExecutorBuilder + AsyncExecutorContainer normalExecutorBuilder, + SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ, + SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ, + GenericStatsMetricPublisher genericStatsMetricPublisher ) { this.service = service; this.s3AsyncService = s3AsyncService; @@ -128,6 +138,10 @@ class S3BlobStore implements BlobStore { // Settings to initialize blobstore with. this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings()); this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings()); + this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ; + this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; + this.genericStatsMetricPublisher = genericStatsMetricPublisher; + this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings()); } @Override @@ -141,6 +155,7 @@ public void reload(RepositoryMetadata repositoryMetadata) { this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings()); this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings()); this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings()); + this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings()); } @Override @@ -168,6 +183,10 @@ public boolean isUploadRetryEnabled() { return uploadRetryEnabled; } + public boolean isPermitBackedTransferEnabled() { + return permitBackedTransferEnabled; + } + public String bucket() { return bucket; } @@ -184,6 +203,14 @@ public int getBulkDeletesSize() { return bulkDeletesSize; } + public SizeBasedBlockingQ getNormalPrioritySizeBasedBlockingQ() { + return normalPrioritySizeBasedBlockingQ; + } + + public SizeBasedBlockingQ getLowPrioritySizeBasedBlockingQ() { + return lowPrioritySizeBasedBlockingQ; + } + @Override public BlobContainer blobContainer(BlobPath path) { return new S3BlobContainer(path, this); @@ -201,7 +228,9 @@ public void close() throws IOException { @Override public Map stats() { - return statsMetricPublisher.getStats().toMap(); + Map stats = statsMetricPublisher.getStats().toMap(); + stats.putAll(genericStatsMetricPublisher.stats()); + return stats; } @Override @@ -211,6 +240,7 @@ public Map> extendedStats() { } Map> extendedStats = new HashMap<>(); statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap())); + extendedStats.put(Metric.GENERIC_STATS, genericStatsMetricPublisher.stats()); return extendedStats; } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index f7772a57c9afd..01b75c0b915f2 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -49,6 +49,7 @@ import org.opensearch.common.settings.SecureSetting; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.common.settings.SecureString; @@ -63,6 +64,7 @@ import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferManager; +import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.threadpool.Scheduler; @@ -156,6 +158,15 @@ class S3Repository extends MeteredBlobStoreRepository { Setting.Property.NodeScope ); + /** + * Whether large uploads need to be redirected to slow sync s3 client. + */ + static final Setting PERMIT_BACKED_TRANSFER_ENABLED = Setting.boolSetting( + "permit_backed_transfer_enabled", + true, + Setting.Property.NodeScope + ); + /** * Whether retry on uploads are enabled. This setting wraps inputstream with buffered stream to enable retries. */ @@ -193,6 +204,37 @@ class S3Repository extends MeteredBlobStoreRepository { true, Setting.Property.NodeScope ); + /** + * Percentage of total available permits to be available for priority transfers. + */ + public static Setting S3_PRIORITY_PERMIT_ALLOCATION_PERCENT = Setting.intSetting( + "s3_priority_permit_alloc_perc", + 70, + 21, + 80, + Setting.Property.NodeScope + ); + + /** + * Duration in minutes to wait for a permit in case no permit is available. + */ + public static Setting S3_PERMIT_WAIT_DURATION_MIN = Setting.intSetting( + "s3_permit_wait_duration_min", + 5, + 1, + 10, + Setting.Property.NodeScope + ); + + /** + * Number of transfer queue consumers + */ + public static Setting S3_TRANSFER_QUEUE_CONSUMERS = new Setting<>( + "s3_transfer_queue_consumers", + (s) -> Integer.toString(Math.max(5, OpenSearchExecutors.allocatedProcessors(s) * 2)), + (s) -> Setting.parseInt(s, 5, "s3_transfer_queue_consumers"), + Setting.Property.NodeScope + ); /** * Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g. @@ -252,6 +294,9 @@ class S3Repository extends MeteredBlobStoreRepository { private final AsyncExecutorContainer priorityExecutorBuilder; private final AsyncExecutorContainer normalExecutorBuilder; private final Path pluginConfigPath; + private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; + private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; + private final GenericStatsMetricPublisher genericStatsMetricPublisher; private volatile int bulkDeletesSize; @@ -267,7 +312,10 @@ class S3Repository extends MeteredBlobStoreRepository { final AsyncExecutorContainer priorityExecutorBuilder, final AsyncExecutorContainer normalExecutorBuilder, final S3AsyncService s3AsyncService, - final boolean multipartUploadEnabled + final boolean multipartUploadEnabled, + final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ, + final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ, + final GenericStatsMetricPublisher genericStatsMetricPublisher ) { this( metadata, @@ -281,7 +329,10 @@ class S3Repository extends MeteredBlobStoreRepository { normalExecutorBuilder, s3AsyncService, multipartUploadEnabled, - Path.of("") + Path.of(""), + normalPrioritySizeBasedBlockingQ, + lowPrioritySizeBasedBlockingQ, + genericStatsMetricPublisher ); } @@ -300,7 +351,10 @@ class S3Repository extends MeteredBlobStoreRepository { final AsyncExecutorContainer normalExecutorBuilder, final S3AsyncService s3AsyncService, final boolean multipartUploadEnabled, - Path pluginConfigPath + Path pluginConfigPath, + final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ, + final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ, + final GenericStatsMetricPublisher genericStatsMetricPublisher ) { super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata)); this.service = service; @@ -311,6 +365,9 @@ class S3Repository extends MeteredBlobStoreRepository { this.urgentExecutorBuilder = urgentExecutorBuilder; this.priorityExecutorBuilder = priorityExecutorBuilder; this.normalExecutorBuilder = normalExecutorBuilder; + this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ; + this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; + this.genericStatsMetricPublisher = genericStatsMetricPublisher; validateRepositoryMetadata(metadata); readRepositoryMetadata(); @@ -373,7 +430,10 @@ protected S3BlobStore createBlobStore() { asyncUploadUtils, urgentExecutorBuilder, priorityExecutorBuilder, - normalExecutorBuilder + normalExecutorBuilder, + normalPrioritySizeBasedBlockingQ, + lowPrioritySizeBasedBlockingQ, + genericStatsMetricPublisher ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index e7d2a4d024e60..110d91bfbd822 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -41,6 +41,9 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; @@ -53,6 +56,8 @@ import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; +import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; +import org.opensearch.repositories.s3.async.TransferSemaphoresHolder; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; @@ -69,6 +74,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; /** @@ -82,6 +89,8 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private static final String PRIORITY_STREAM_READER = "priority_stream_reader"; private static final String FUTURE_COMPLETION = "future_completion"; private static final String STREAM_READER = "stream_reader"; + private static final String LOW_TRANSFER_QUEUE_CONSUMER = "low_transfer_queue_consumer"; + private static final String NORMAL_TRANSFER_QUEUE_CONSUMER = "normal_transfer_queue_consumer"; protected final S3Service service; private final S3AsyncService s3AsyncService; @@ -91,6 +100,12 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private AsyncExecutorContainer urgentExecutorBuilder; private AsyncExecutorContainer priorityExecutorBuilder; private AsyncExecutorContainer normalExecutorBuilder; + private ExecutorService lowTransferQConsumerService; + private ExecutorService normalTransferQConsumerService; + private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; + private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; + private TransferSemaphoresHolder transferSemaphoresHolder; + private GenericStatsMetricPublisher genericStatsMetricPublisher; public S3RepositoryPlugin(final Settings settings, final Path configPath) { this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath)); @@ -120,9 +135,36 @@ public List> getExecutorBuilders(Settings settings) { TimeValue.timeValueMinutes(5) ) ); + executorBuilders.add( + new FixedExecutorBuilder( + settings, + LOW_TRANSFER_QUEUE_CONSUMER, + lowPriorityTransferQConsumers(settings), + 10, + "thread_pool." + LOW_TRANSFER_QUEUE_CONSUMER + ) + ); + executorBuilders.add( + new FixedExecutorBuilder( + settings, + NORMAL_TRANSFER_QUEUE_CONSUMER, + normalPriorityTransferQConsumers(settings), + 10, + "thread_pool." + NORMAL_TRANSFER_QUEUE_CONSUMER + ) + ); return executorBuilders; } + private int lowPriorityTransferQConsumers(Settings settings) { + double lowPriorityAllocation = ((double) (100 - S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(settings))) / 100; + return Math.max(2, (int) (lowPriorityAllocation * S3Repository.S3_TRANSFER_QUEUE_CONSUMERS.get(settings))); + } + + private int normalPriorityTransferQConsumers(Settings settings) { + return S3Repository.S3_TRANSFER_QUEUE_CONSUMERS.get(settings); + } + static int halfNumberOfProcessors(int numberOfProcessors) { return (numberOfProcessors + 1) / 2; } @@ -189,7 +231,67 @@ public Collection createComponents( threadPool.executor(STREAM_READER), new AsyncTransferEventLoopGroup(normalEventLoopThreads) ); - return Collections.emptyList(); + + this.lowTransferQConsumerService = threadPool.executor(LOW_TRANSFER_QUEUE_CONSUMER); + this.normalTransferQConsumerService = threadPool.executor(NORMAL_TRANSFER_QUEUE_CONSUMER); + + // High number of permit allocation because each op acquiring permit performs disk IO, computation and network IO. + int availablePermits = Math.max(allocatedProcessors(clusterService.getSettings()) * 4, 10); + double priorityPermitAllocation = ((double) S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(clusterService.getSettings())) + / 100; + int normalPriorityPermits = (int) (priorityPermitAllocation * availablePermits); + int lowPriorityPermits = availablePermits - normalPriorityPermits; + + int normalPriorityConsumers = normalPriorityTransferQConsumers(clusterService.getSettings()); + int lowPriorityConsumers = lowPriorityTransferQConsumers(clusterService.getSettings()); + + ByteSizeValue normalPriorityQCapacity = new ByteSizeValue(normalPriorityConsumers * 10L, ByteSizeUnit.GB); + ByteSizeValue lowPriorityQCapacity = new ByteSizeValue(lowPriorityConsumers * 20L, ByteSizeUnit.GB); + + this.genericStatsMetricPublisher = new GenericStatsMetricPublisher( + normalPriorityQCapacity.getBytes(), + normalPriorityPermits, + lowPriorityQCapacity.getBytes(), + lowPriorityPermits + ); + + this.normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + normalPriorityQCapacity, + normalTransferQConsumerService, + normalPriorityConsumers, + genericStatsMetricPublisher, + SizeBasedBlockingQ.QueueEventType.NORMAL + ); + + LowPrioritySizeBasedBlockingQ lowPrioritySizeBasedBlockingQ = new LowPrioritySizeBasedBlockingQ( + lowPriorityQCapacity, + lowTransferQConsumerService, + lowPriorityConsumers, + genericStatsMetricPublisher + ); + this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; + this.transferSemaphoresHolder = new TransferSemaphoresHolder( + normalPriorityPermits, + lowPriorityPermits, + S3Repository.S3_PERMIT_WAIT_DURATION_MIN.get(clusterService.getSettings()), + TimeUnit.MINUTES, + genericStatsMetricPublisher + ); + + return CollectionUtils.arrayAsArrayList(this.normalPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ); + } + + // New class because in core, components are injected via guice only by instance creation due to which + // same binding types fail. + private static final class LowPrioritySizeBasedBlockingQ extends SizeBasedBlockingQ { + public LowPrioritySizeBasedBlockingQ( + ByteSizeValue capacity, + ExecutorService executorService, + int consumers, + GenericStatsMetricPublisher genericStatsMetricPublisher + ) { + super(capacity, executorService, consumers, genericStatsMetricPublisher, QueueEventType.LOW); + } } // proxy method for testing @@ -204,7 +306,8 @@ protected S3Repository createRepository( S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(), normalExecutorBuilder.getStreamReader(), priorityExecutorBuilder.getStreamReader(), - urgentExecutorBuilder.getStreamReader() + urgentExecutorBuilder.getStreamReader(), + transferSemaphoresHolder ); return new S3Repository( metadata, @@ -218,7 +321,10 @@ protected S3Repository createRepository( normalExecutorBuilder, s3AsyncService, S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()), - configPath + configPath, + normalPrioritySizeBasedBlockingQ, + lowPrioritySizeBasedBlockingQ, + genericStatsMetricPublisher ); } @@ -263,7 +369,9 @@ public List> getSettings() { S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING, S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING, S3Repository.REDIRECT_LARGE_S3_UPLOAD, - S3Repository.UPLOAD_RETRY_ENABLED + S3Repository.UPLOAD_RETRY_ENABLED, + S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT, + S3Repository.PERMIT_BACKED_TRANSFER_ENABLED ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3TransferRejectedException.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3TransferRejectedException.java new file mode 100644 index 0000000000000..c9fa93ea0f5c3 --- /dev/null +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3TransferRejectedException.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3; + +import org.opensearch.OpenSearchException; + +/** + * Thrown when transfer event is rejected due to breach in event queue size. + */ +public class S3TransferRejectedException extends OpenSearchException { + public S3TransferRejectedException(String msg) { + super(msg); + } +} diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java index b4c4ed0ecaa75..4c95a0ffc5ec3 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java @@ -23,6 +23,7 @@ import org.opensearch.common.StreamContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.io.InputStreamContainer; +import org.opensearch.repositories.s3.S3TransferRejectedException; import org.opensearch.repositories.s3.SocketAccess; import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.repositories.s3.io.CheckedContainer; @@ -34,6 +35,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceArray; /** @@ -41,7 +44,7 @@ */ public class AsyncPartsHandler { - private static Logger log = LogManager.getLogger(AsyncPartsHandler.class); + private static final Logger log = LogManager.getLogger(AsyncPartsHandler.class); /** * Uploads parts of the upload multipart request* @@ -55,9 +58,10 @@ public class AsyncPartsHandler { * @param completedParts Reference of completed parts * @param inputStreamContainers Checksum containers * @param statsMetricPublisher sdk metric publisher + * @param maxRetryablePartSize Max content size which can be used for retries in buffered streams. * @return list of completable futures - * @throws IOException thrown in case of an IO error */ + @SuppressWarnings({ "rawtypes", "unchecked" }) public static List> uploadParts( S3AsyncClient s3AsyncClient, ExecutorService executorService, @@ -69,35 +73,52 @@ public static List> uploadParts( AtomicReferenceArray completedParts, AtomicReferenceArray inputStreamContainers, StatsMetricPublisher statsMetricPublisher, - boolean uploadRetryEnabled - ) throws IOException { + boolean uploadRetryEnabled, + TransferSemaphoresHolder transferSemaphoresHolder, + long maxRetryablePartSize + ) throws InterruptedException { List> futures = new ArrayList<>(); + TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { - InputStreamContainer inputStreamContainer = streamContext.provideStream(partIdx); - inputStreamContainers.set(partIdx, new CheckedContainer(inputStreamContainer.getContentLength())); - UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder() - .bucket(uploadRequest.getBucket()) - .partNumber(partIdx + 1) - .key(uploadRequest.getKey()) - .uploadId(uploadId) - .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)) - .contentLength(inputStreamContainer.getContentLength()); - if (uploadRequest.doRemoteDataIntegrityCheck()) { - uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); - } - uploadPart( - s3AsyncClient, - executorService, - priorityExecutorService, - urgentExecutorService, - completedParts, - inputStreamContainers, - futures, - uploadPartRequestBuilder.build(), - inputStreamContainer, - uploadRequest, - uploadRetryEnabled + Semaphore semaphore = maybeAcquireSemaphore( + transferSemaphoresHolder, + requestContext, + uploadRequest.getWritePriority(), + uploadRequest.getKey() ); + try { + InputStreamContainer inputStreamContainer = streamContext.provideStream(partIdx); + inputStreamContainers.set(partIdx, new CheckedContainer(inputStreamContainer.getContentLength())); + UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder() + .bucket(uploadRequest.getBucket()) + .partNumber(partIdx + 1) + .key(uploadRequest.getKey()) + .uploadId(uploadId) + .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)) + .contentLength(inputStreamContainer.getContentLength()); + if (uploadRequest.doRemoteDataIntegrityCheck()) { + uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); + } + uploadPart( + s3AsyncClient, + executorService, + priorityExecutorService, + urgentExecutorService, + completedParts, + inputStreamContainers, + futures, + uploadPartRequestBuilder.build(), + inputStreamContainer, + uploadRequest, + uploadRetryEnabled, + maxRetryablePartSize, + semaphore + ); + } catch (Exception ex) { + if (semaphore != null) { + semaphore.release(); + } + } } return futures; @@ -137,14 +158,54 @@ public static InputStream maybeRetryInputStream( InputStream inputStream, WritePriority writePriority, boolean uploadRetryEnabled, - long contentLength + long contentLength, + long maxRetryablePartSize ) { - if (uploadRetryEnabled == true && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) { - return new BufferedInputStream(inputStream, (int) (contentLength + 1)); + // Since we are backing uploads with limited permits, it is ok to use buffered stream. Maximum in-memory buffer + // would be (max permits * maxRetryablePartSize) excluding urgent + if (uploadRetryEnabled == true + && (contentLength <= maxRetryablePartSize || writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) { + return new UploadTrackedBufferedInputStream(inputStream, (int) (contentLength + 1)); } return inputStream; } + public static Semaphore maybeAcquireSemaphore( + TransferSemaphoresHolder transferSemaphoresHolder, + TransferSemaphoresHolder.RequestContext requestContext, + WritePriority writePriority, + String file + ) throws InterruptedException { + final TransferSemaphoresHolder.TypeSemaphore semaphore; + if (writePriority != WritePriority.HIGH && writePriority != WritePriority.URGENT) { + semaphore = transferSemaphoresHolder.acquirePermit(writePriority, requestContext); + if (semaphore == null) { + throw new S3TransferRejectedException("Permit not available for transfer of file " + file); + } + } else { + semaphore = null; + } + + return semaphore; + } + + /** + * Overridden stream to identify upload streams among all buffered stream instances for triaging. + */ + static class UploadTrackedBufferedInputStream extends BufferedInputStream { + AtomicBoolean closed = new AtomicBoolean(); + + public UploadTrackedBufferedInputStream(InputStream in, int length) { + super(in, length); + } + + @Override + public void close() throws IOException { + super.close(); + closed.set(true); + } + } + private static void uploadPart( S3AsyncClient s3AsyncClient, ExecutorService executorService, @@ -156,8 +217,11 @@ private static void uploadPart( UploadPartRequest uploadPartRequest, InputStreamContainer inputStreamContainer, UploadRequest uploadRequest, - boolean uploadRetryEnabled + boolean uploadRetryEnabled, + long maxRetryablePartSize, + Semaphore semaphore ) { + Integer partNumber = uploadPartRequest.partNumber(); ExecutorService streamReadExecutor; @@ -173,7 +237,8 @@ private static void uploadPart( inputStreamContainer.getInputStream(), uploadRequest.getWritePriority(), uploadRetryEnabled, - uploadPartRequest.contentLength() + uploadPartRequest.contentLength(), + maxRetryablePartSize ); CompletableFuture uploadPartResponseFuture = SocketAccess.doPrivileged( () -> s3AsyncClient.uploadPart( @@ -183,6 +248,10 @@ private static void uploadPart( ); CompletableFuture convertFuture = uploadPartResponseFuture.whenComplete((resp, throwable) -> { + if (semaphore != null) { + semaphore.release(); + } + try { inputStream.close(); } catch (IOException ex) { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 80538059d17b8..0f9bf3be77d73 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -21,6 +21,7 @@ import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CompletableFutureUtils; @@ -48,6 +49,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -64,6 +66,10 @@ public final class AsyncTransferManager { private final ExecutorService priorityExecutorService; private final ExecutorService urgentExecutorService; private final long minimumPartSize; + private final long maxRetryablePartSize; + + @SuppressWarnings("rawtypes") + private final TransferSemaphoresHolder transferSemaphoresHolder; /** * The max number of parts on S3 side is 10,000 @@ -74,19 +80,22 @@ public final class AsyncTransferManager { * Construct a new object of AsyncTransferManager * * @param minimumPartSize The minimum part size for parallel multipart uploads - * @param executorService The stream reader {@link ExecutorService} for normal priority uploads - * @param priorityExecutorService The stream read {@link ExecutorService} for high priority uploads */ + @SuppressWarnings("rawtypes") public AsyncTransferManager( long minimumPartSize, ExecutorService executorService, ExecutorService priorityExecutorService, - ExecutorService urgentExecutorService + ExecutorService urgentExecutorService, + TransferSemaphoresHolder transferSemaphoresHolder ) { this.executorService = executorService; this.priorityExecutorService = priorityExecutorService; this.minimumPartSize = minimumPartSize; + // 10% buffer to allow additional metadata size in content such as encryption. + this.maxRetryablePartSize = (long) (minimumPartSize + 0.1 * minimumPartSize); this.urgentExecutorService = urgentExecutorService; + this.transferSemaphoresHolder = transferSemaphoresHolder; } /** @@ -108,7 +117,21 @@ public CompletableFuture uploadObject( try { if (streamContext.getNumberOfParts() == 1) { log.debug(() -> "Starting the upload as a single upload part request"); - uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext.provideStream(0), returnFuture, statsMetricPublisher); + TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); + Semaphore semaphore = AsyncPartsHandler.maybeAcquireSemaphore( + transferSemaphoresHolder, + requestContext, + uploadRequest.getWritePriority(), + uploadRequest.getKey() + ); + try { + uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher, semaphore); + } catch (Exception ex) { + if (semaphore != null) { + semaphore.release(); + } + throw ex; + } } else { log.debug(() -> "Starting the upload as multipart upload request"); uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher); @@ -146,21 +169,19 @@ private void uploadInParts( // Ensure cancellations are forwarded to the createMultipartUploadFuture future CompletableFutureUtils.forwardExceptionTo(returnFuture, createMultipartUploadFuture); - createMultipartUploadFuture.whenComplete((createMultipartUploadResponse, throwable) -> { - if (throwable != null) { - handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable); - } else { - log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId()); - doUploadInParts( - s3AsyncClient, - uploadRequest, - streamContext, - returnFuture, - createMultipartUploadResponse.uploadId(), - statsMetricPublisher - ); - } - }); + String uploadId; + try { + // Block main thread here so that upload of parts doesn't get executed in future completion thread. + // We should never execute latent operation like acquisition of permit in future completion pool. + CreateMultipartUploadResponse createMultipartUploadResponse = createMultipartUploadFuture.get(); + uploadId = createMultipartUploadResponse.uploadId(); + log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId()); + } catch (Exception ex) { + handleException(returnFuture, () -> "Failed to initiate multipart upload", ex); + return; + } + + doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, uploadId, statsMetricPublisher); } private void doUploadInParts( @@ -189,7 +210,9 @@ private void doUploadInParts( completedParts, inputStreamContainers, statsMetricPublisher, - uploadRequest.isUploadRetryEnabled() + uploadRequest.isUploadRetryEnabled(), + transferSemaphoresHolder, + maxRetryablePartSize ); } catch (Exception ex) { try { @@ -320,12 +343,14 @@ public long calculateOptimalPartSize(long contentLengthOfSource, WritePriority w return (long) Math.max(optimalPartSize, minimumPartSize); } + @SuppressWarnings("unchecked") private void uploadInOneChunk( S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, - InputStreamContainer inputStreamContainer, + StreamContext streamContext, CompletableFuture returnFuture, - StatsMetricPublisher statsMetricPublisher + StatsMetricPublisher statsMetricPublisher, + Semaphore semaphore ) { PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder() .bucket(uploadRequest.getBucket()) @@ -340,6 +365,7 @@ private void uploadInOneChunk( putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum())); } + PutObjectRequest putObjectRequest = putObjectRequestBuilder.build(); ExecutorService streamReadExecutor; if (uploadRequest.getWritePriority() == WritePriority.URGENT) { streamReadExecutor = urgentExecutorService; @@ -349,25 +375,33 @@ private void uploadInOneChunk( streamReadExecutor = executorService; } - InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream( - inputStreamContainer.getInputStream(), - uploadRequest.getWritePriority(), - uploadRequest.isUploadRetryEnabled(), - uploadRequest.getContentLength() - ); - CompletableFuture putObjectFuture = SocketAccess.doPrivileged( - () -> s3AsyncClient.putObject( - putObjectRequestBuilder.build(), - AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor) - ).handle((resp, throwable) -> { - try { - inputStream.close(); - } catch (IOException e) { - log.error( - () -> new ParameterizedMessage("Failed to close stream while uploading single file {}.", uploadRequest.getKey()), - e - ); - } + CompletableFuture putObjectFuture = SocketAccess.doPrivileged(() -> { + InputStream inputStream = null; + CompletableFuture putObjectRespFuture; + try { + InputStreamContainer inputStreamContainer = streamContext.provideStream(0); + inputStream = AsyncPartsHandler.maybeRetryInputStream( + inputStreamContainer.getInputStream(), + uploadRequest.getWritePriority(), + uploadRequest.isUploadRetryEnabled(), + uploadRequest.getContentLength(), + maxRetryablePartSize + ); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromInputStream( + inputStream, + inputStreamContainer.getContentLength(), + streamReadExecutor + ); + putObjectRespFuture = s3AsyncClient.putObject(putObjectRequest, asyncRequestBody); + } catch (Exception e) { + releaseResourcesSafely(semaphore, inputStream, uploadRequest.getKey()); + return CompletableFuture.failedFuture(e); + } + + InputStream finalInputStream = inputStream; + return putObjectRespFuture.handle((resp, throwable) -> { + releaseResourcesSafely(semaphore, finalInputStream, uploadRequest.getKey()); + if (throwable != null) { Throwable unwrappedThrowable = ExceptionsHelper.unwrap(throwable, S3Exception.class); if (unwrappedThrowable != null) { @@ -395,13 +429,27 @@ private void uploadInOneChunk( } return null; - }) - ); + }); + }); CompletableFutureUtils.forwardExceptionTo(returnFuture, putObjectFuture); CompletableFutureUtils.forwardResultTo(putObjectFuture, returnFuture); } + private void releaseResourcesSafely(Semaphore semaphore, InputStream inputStream, String file) { + if (semaphore != null) { + semaphore.release(); + } + + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + log.error(() -> new ParameterizedMessage("Failed to close stream while uploading single file {}.", file), e); + } + } + } + private void deleteUploadedObject(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest) { DeleteObjectRequest deleteObjectRequest = DeleteObjectRequest.builder() .bucket(uploadRequest.getBucket()) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java new file mode 100644 index 0000000000000..170c80f5d4db6 --- /dev/null +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java @@ -0,0 +1,230 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.repositories.s3.GenericStatsMetricPublisher; +import org.opensearch.repositories.s3.S3TransferRejectedException; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Queue implementation to accept events based on their storage attribute. If size of queue is breached, then transfer + * event is rejected. + */ +public class SizeBasedBlockingQ extends AbstractLifecycleComponent { + private static final Logger log = LogManager.getLogger(SizeBasedBlockingQ.class); + + protected final LinkedBlockingQueue queue; + protected final Lock lock; + protected final Condition notEmpty; + + protected final AtomicLong currentSize; + protected final ByteSizeValue capacity; + protected final AtomicBoolean closed; + protected final ExecutorService executorService; + protected final int consumers; + private final GenericStatsMetricPublisher genericStatsMetricPublisher; + private final QueueEventType queueEventType; + + /** + * Constructor to create sized based blocking queue. + */ + public SizeBasedBlockingQ( + ByteSizeValue capacity, + ExecutorService executorService, + int consumers, + GenericStatsMetricPublisher genericStatsMetricPublisher, + QueueEventType queueEventType + ) { + this.queue = new LinkedBlockingQueue<>(); + this.lock = new ReentrantLock(); + this.notEmpty = lock.newCondition(); + this.currentSize = new AtomicLong(); + this.capacity = capacity; + this.closed = new AtomicBoolean(); + this.executorService = executorService; + this.consumers = consumers; + this.genericStatsMetricPublisher = genericStatsMetricPublisher; + this.queueEventType = queueEventType; + } + + public enum QueueEventType { + NORMAL, + LOW; + } + + @Override + protected void doStart() { + for (int worker = 0; worker < consumers; worker++) { + Thread consumer = new Consumer(queue, currentSize, lock, notEmpty, closed, genericStatsMetricPublisher, queueEventType); + executorService.submit(consumer); + } + } + + /** + * Add an item to the queue + */ + public void produce(Item item) throws InterruptedException { + if (item == null || item.size <= 0) { + throw new IllegalStateException("Invalid item input to produce."); + } + log.debug(() -> "Transfer queue event received of size: " + item.size + ". Current queue utilisation: " + currentSize.get()); + + if (currentSize.get() + item.size >= capacity.getBytes()) { + throw new S3TransferRejectedException("S3 Transfer queue capacity reached"); + } + + final Lock lock = this.lock; + final AtomicLong currentSize = this.currentSize; + lock.lock(); + try { + if (currentSize.get() + item.size >= capacity.getBytes()) { + throw new S3TransferRejectedException("S3 Transfer queue capacity reached"); + } + if (closed.get()) { + throw new AlreadyClosedException("Transfer queue is already closed."); + } + queue.put(item); + currentSize.addAndGet(item.size); + notEmpty.signalAll(); + updateStats(item.size, queueEventType, genericStatsMetricPublisher); + } finally { + lock.unlock(); + } + + } + + private static void updateStats(long itemSize, QueueEventType queueEventType, GenericStatsMetricPublisher genericStatsMetricPublisher) { + if (queueEventType == QueueEventType.NORMAL) { + genericStatsMetricPublisher.updateNormalPriorityQSize(itemSize); + } else if (queueEventType == QueueEventType.LOW) { + genericStatsMetricPublisher.updateLowPriorityQSize(itemSize); + } + } + + public int getSize() { + return queue.size(); + } + + public boolean isMaxCapacityBelowContentLength(long contentLength) { + return contentLength < capacity.getBytes(); + } + + protected static class Consumer extends Thread { + private final LinkedBlockingQueue queue; + private final Lock lock; + private final Condition notEmpty; + private final AtomicLong currentSize; + private final AtomicBoolean closed; + private final GenericStatsMetricPublisher genericStatsMetricPublisher; + private final QueueEventType queueEventType; + + public Consumer( + LinkedBlockingQueue queue, + AtomicLong currentSize, + Lock lock, + Condition notEmpty, + AtomicBoolean closed, + GenericStatsMetricPublisher genericStatsMetricPublisher, + QueueEventType queueEventType + ) { + this.queue = queue; + this.lock = lock; + this.notEmpty = notEmpty; + this.currentSize = currentSize; + this.closed = closed; + this.genericStatsMetricPublisher = genericStatsMetricPublisher; + this.queueEventType = queueEventType; + } + + @Override + public void run() { + while (true) { + try { + consume(); + } catch (AlreadyClosedException ex) { + return; + } catch (Exception ex) { + log.error("Failed to consume transfer event", ex); + } + } + } + + private void consume() throws InterruptedException { + final Lock lock = this.lock; + final AtomicLong currentSize = this.currentSize; + lock.lock(); + Item item; + try { + if (closed.get()) { + throw new AlreadyClosedException("transfer queue closed"); + } + while (currentSize.get() == 0) { + notEmpty.await(); + if (closed.get()) { + throw new AlreadyClosedException("transfer queue closed"); + } + } + + item = queue.take(); + currentSize.addAndGet(-item.size); + updateStats(-item.size, queueEventType, genericStatsMetricPublisher); + } finally { + lock.unlock(); + } + + try { + item.consumable.run(); + } catch (Exception ex) { + log.error("Exception on executing item consumable", ex); + } + } + + } + + public static class Item { + private final long size; + private final Runnable consumable; + + public Item(long size, Runnable consumable) { + this.size = size; + this.consumable = consumable; + } + } + + @Override + protected void doStop() { + doClose(); + } + + @Override + protected void doClose() { + lock.lock(); + try { + if (closed.get() == true) { + return; + } + closed.set(true); + notEmpty.signalAll(); + } finally { + lock.unlock(); + } + } +} diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java new file mode 100644 index 0000000000000..7dccedb8d5278 --- /dev/null +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java @@ -0,0 +1,186 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.repositories.s3.GenericStatsMetricPublisher; + +import java.util.Objects; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * Transfer semaphore holder for controlled transfer of data to remote. + */ +public class TransferSemaphoresHolder { + private static final Logger log = LogManager.getLogger(TransferSemaphoresHolder.class); + // For tests + protected TypeSemaphore lowPrioritySemaphore; + protected TypeSemaphore normalPrioritySemaphore; + private final int normalPriorityPermits; + private final int lowPriorityPermits; + private final int acquireWaitDuration; + private final TimeUnit acquireWaitDurationUnit; + + /** + * Constructor to create semaphores holder. + */ + public TransferSemaphoresHolder( + int normalPriorityPermits, + int lowPriorityPermits, + int acquireWaitDuration, + TimeUnit timeUnit, + GenericStatsMetricPublisher genericStatsPublisher + ) { + + this.normalPriorityPermits = normalPriorityPermits; + this.lowPriorityPermits = lowPriorityPermits; + this.normalPrioritySemaphore = new TypeSemaphore( + normalPriorityPermits, + TypeSemaphore.PermitType.NORMAL, + genericStatsPublisher::updateNormalPermits + ); + this.lowPrioritySemaphore = new TypeSemaphore( + lowPriorityPermits, + TypeSemaphore.PermitType.LOW, + genericStatsPublisher::updateLowPermits + ); + this.acquireWaitDuration = acquireWaitDuration; + this.acquireWaitDurationUnit = timeUnit; + } + + /** + * Overridden semaphore to identify transfer semaphores among all other semaphores for triaging. + */ + public static class TypeSemaphore extends Semaphore { + private final PermitType permitType; + private final Consumer permitChangeConsumer; + + public enum PermitType { + NORMAL, + LOW; + } + + public TypeSemaphore(int permits, PermitType permitType, Consumer permitChangeConsumer) { + super(permits); + this.permitType = permitType; + this.permitChangeConsumer = permitChangeConsumer; + } + + public PermitType getType() { + return permitType; + } + + @Override + public boolean tryAcquire() { + boolean acquired = super.tryAcquire(); + if (acquired) { + permitChangeConsumer.accept(true); + } + return acquired; + } + + @Override + public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { + boolean acquired = super.tryAcquire(timeout, unit); + if (acquired) { + permitChangeConsumer.accept(true); + } + return acquired; + } + + @Override + public void release() { + super.release(); + permitChangeConsumer.accept(false); + } + } + + /** + * For multiple part requests of a single file, request context object will be set with the decision if low + * priority permits can also be utilized in high priority transfers of parts of the file. If high priority get fully + * consumed then low priority permits will be acquired for transfer. + * + * If a low priority transfer request comes in and a high priority transfer is in progress then till current + * high priority transfer finishes, low priority transfer may have to compete. This is an acceptable side effect + * because low priority transfers are generally heavy and it is ok to have slow progress in the beginning. + * + */ + public static class RequestContext { + + private final boolean lowPriorityPermitsConsumable; + + private RequestContext(boolean lowPriorityPermitsConsumable) { + this.lowPriorityPermitsConsumable = lowPriorityPermitsConsumable; + } + + } + + public RequestContext createRequestContext() { + return new RequestContext(this.lowPrioritySemaphore.availablePermits() == lowPriorityPermits); + } + + /** + * Acquire permit based on the availability and based on the transfer priority. + * A high priority event can acquire a low priority semaphore if all low permits are available. + * A low priority event can acquire a high priority semaphore if at least 40% of high permits are available. We + * reserve this bandwidth to ensure that high priority events never wait for permits in case of ongoing low priority + * transfers. + */ + public TypeSemaphore acquirePermit(WritePriority writePriority, RequestContext requestContext) throws InterruptedException { + log.debug( + () -> "Acquire permit request for transfer type: " + + writePriority + + ". Available high priority permits: " + + normalPrioritySemaphore.availablePermits() + + " and low priority permits: " + + lowPrioritySemaphore.availablePermits() + ); + // Try acquiring low priority permit or high priority permit immediately if available. + // Otherwise, we wait for low priority permit. + if (Objects.requireNonNull(writePriority) == WritePriority.LOW) { + if (lowPrioritySemaphore.tryAcquire()) { + return lowPrioritySemaphore; + } else if (normalPrioritySemaphore.availablePermits() > 0.4 * normalPriorityPermits && normalPrioritySemaphore.tryAcquire()) { + return normalPrioritySemaphore; + } else if (lowPrioritySemaphore.tryAcquire(acquireWaitDuration, acquireWaitDurationUnit)) { + return lowPrioritySemaphore; + } + return null; + } + + // Try acquiring high priority permit or low priority permit immediately if available. + // Otherwise, we wait for high priority permit. + if (normalPrioritySemaphore.tryAcquire()) { + return normalPrioritySemaphore; + } else if (requestContext.lowPriorityPermitsConsumable && lowPrioritySemaphore.tryAcquire()) { + return lowPrioritySemaphore; + } else if (normalPrioritySemaphore.tryAcquire(acquireWaitDuration, acquireWaitDurationUnit)) { + return normalPrioritySemaphore; + } + return null; + } + + /** + * Used in tests. + */ + public int getNormalPriorityPermits() { + return normalPriorityPermits; + } + + /** + * Used in tests. + */ + public int getLowPriorityPermits() { + return lowPriorityPermits; + } +} diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java index b944a72225d36..79b58ff215c54 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java @@ -26,8 +26,8 @@ public class UploadRequest { private final CheckedConsumer uploadFinalizer; private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; - private boolean uploadRetryEnabled; private final Map metadata; + private final boolean uploadRetryEnabled; /** * Construct a new UploadRequest object diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java index f84d953baae8e..573a4f3f51a41 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java @@ -303,7 +303,22 @@ protected S3Repository createRepository( ClusterService clusterService, RecoverySettings recoverySettings ) { - return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) { + return new S3Repository( + metadata, + registry, + service, + clusterService, + recoverySettings, + null, + null, + null, + null, + null, + false, + null, + null, + null + ) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 4173f8b66387f..9b413ac81d766 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -47,7 +47,10 @@ import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; +import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; +import org.opensearch.repositories.s3.async.TransferSemaphoresHolder; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.Scheduler; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -65,6 +68,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -91,8 +95,13 @@ public class S3BlobContainerMockClientTests extends OpenSearchTestCase implement private MockS3AsyncService asyncService; private ExecutorService futureCompletionService; private ExecutorService streamReaderService; + private ExecutorService remoteTransferRetry; + private ExecutorService transferQueueConsumerService; + private ScheduledExecutorService scheduler; private AsyncTransferEventLoopGroup transferNIOGroup; private S3BlobContainer blobContainer; + private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; + private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; static class MockS3AsyncService extends S3AsyncService { @@ -364,7 +373,27 @@ public void setUp() throws Exception { asyncService = new MockS3AsyncService(configPath(), 1000); futureCompletionService = Executors.newSingleThreadExecutor(); streamReaderService = Executors.newSingleThreadExecutor(); + remoteTransferRetry = Executors.newFixedThreadPool(20); + transferQueueConsumerService = Executors.newFixedThreadPool(20); + scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1); transferNIOGroup = new AsyncTransferEventLoopGroup(1); + GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); + normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 10L, ByteSizeUnit.GB), + transferQueueConsumerService, + 10, + genericStatsMetricPublisher, + SizeBasedBlockingQ.QueueEventType.NORMAL + ); + lowPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 20L, ByteSizeUnit.GB), + transferQueueConsumerService, + 5, + genericStatsMetricPublisher, + SizeBasedBlockingQ.QueueEventType.NORMAL + ); + normalPrioritySizeBasedBlockingQ.start(); + lowPrioritySizeBasedBlockingQ.start(); blobContainer = createBlobContainer(); super.setUp(); } @@ -373,6 +402,14 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { IOUtils.close(asyncService); + futureCompletionService.shutdown(); + streamReaderService.shutdown(); + remoteTransferRetry.shutdown(); + transferQueueConsumerService.shutdown(); + normalPrioritySizeBasedBlockingQ.close(); + lowPrioritySizeBasedBlockingQ.close(); + scheduler.shutdown(); + transferNIOGroup.close(); super.tearDown(); } @@ -394,7 +431,7 @@ private S3BlobStore createBlobStore() { streamReaderService, transferNIOGroup ); - + GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); return new S3BlobStore( null, asyncService, @@ -410,11 +447,21 @@ private S3BlobStore createBlobStore() { S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader(), - asyncExecutorContainer.getStreamReader() + asyncExecutorContainer.getStreamReader(), + new TransferSemaphoresHolder( + 3, + Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), + 5, + TimeUnit.MINUTES, + genericStatsMetricPublisher + ) ), asyncExecutorContainer, asyncExecutorContainer, - asyncExecutorContainer + asyncExecutorContainer, + normalPrioritySizeBasedBlockingQ, + lowPrioritySizeBasedBlockingQ, + genericStatsMetricPublisher ); } @@ -574,19 +621,32 @@ private int calculateNumberOfParts(long contentLength, long partSize) { return (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1); } - public void testFailureWhenLargeFileRedirected() throws IOException, ExecutionException, InterruptedException { - testLargeFilesRedirectedToSlowSyncClient(true); + public void testFailureWhenLargeFileRedirected() throws IOException, InterruptedException { + testLargeFilesRedirectedToSlowSyncClient(true, WritePriority.LOW); + testLargeFilesRedirectedToSlowSyncClient(true, WritePriority.NORMAL); } - public void testLargeFileRedirected() throws IOException, ExecutionException, InterruptedException { - testLargeFilesRedirectedToSlowSyncClient(false); + public void testLargeFileRedirected() throws IOException, InterruptedException { + testLargeFilesRedirectedToSlowSyncClient(false, WritePriority.LOW); + testLargeFilesRedirectedToSlowSyncClient(false, WritePriority.NORMAL); } - private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) throws IOException, InterruptedException { - final ByteSizeValue partSize = new ByteSizeValue(1024, ByteSizeUnit.MB); - + private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, WritePriority writePriority) throws IOException, + InterruptedException { + ByteSizeValue capacity = new ByteSizeValue(1, ByteSizeUnit.GB); int numberOfParts = 20; - final long lastPartSize = new ByteSizeValue(20, ByteSizeUnit.MB).getBytes(); + final ByteSizeValue partSize = new ByteSizeValue(capacity.getBytes() / numberOfParts + 1, ByteSizeUnit.BYTES); + + GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); + SizeBasedBlockingQ sizeBasedBlockingQ = new SizeBasedBlockingQ( + capacity, + transferQueueConsumerService, + 10, + genericStatsMetricPublisher, + SizeBasedBlockingQ.QueueEventType.NORMAL + ); + + final long lastPartSize = new ByteSizeValue(200, ByteSizeUnit.MB).getBytes(); final long blobSize = ((numberOfParts - 1) * partSize.getBytes()) + lastPartSize; CountDownLatch countDownLatch = new CountDownLatch(1); AtomicReference exceptionRef = new AtomicReference<>(); @@ -609,6 +669,9 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize); + when(blobStore.getLowPrioritySizeBasedBlockingQ()).thenReturn(sizeBasedBlockingQ); + when(blobStore.getNormalPrioritySizeBasedBlockingQ()).thenReturn(sizeBasedBlockingQ); + final boolean serverSideEncryption = randomBoolean(); when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption); @@ -658,7 +721,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t .streamContextSupplier(streamContextSupplier) .fileSize(blobSize) .failIfAlreadyExists(false) - .writePriority(WritePriority.HIGH) + .writePriority(writePriority) .uploadFinalizer(Assert::assertTrue) .doRemoteDataIntegrityCheck(false) .metadata(new HashMap<>()) @@ -693,5 +756,4 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t } }); } - } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index 10578090da75c..96ef28d24c14f 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -67,6 +67,8 @@ import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; +import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; +import org.opensearch.repositories.s3.async.TransferSemaphoresHolder; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -87,6 +89,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -114,7 +118,12 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes private S3AsyncService asyncService; private ExecutorService futureCompletionService; private ExecutorService streamReaderService; + private ExecutorService remoteTransferRetry; + private ExecutorService transferQueueConsumerService; + private ScheduledExecutorService scheduler; private AsyncTransferEventLoopGroup transferNIOGroup; + private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; + private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; @Before public void setUp() throws Exception { @@ -125,7 +134,26 @@ public void setUp() throws Exception { futureCompletionService = Executors.newSingleThreadExecutor(); streamReaderService = Executors.newSingleThreadExecutor(); transferNIOGroup = new AsyncTransferEventLoopGroup(1); - + remoteTransferRetry = Executors.newFixedThreadPool(20); + transferQueueConsumerService = Executors.newFixedThreadPool(2); + scheduler = new ScheduledThreadPoolExecutor(1); + GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); + normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB), + transferQueueConsumerService, + 2, + genericStatsMetricPublisher, + SizeBasedBlockingQ.QueueEventType.NORMAL + ); + lowPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB), + transferQueueConsumerService, + 2, + genericStatsMetricPublisher, + SizeBasedBlockingQ.QueueEventType.LOW + ); + normalPrioritySizeBasedBlockingQ.start(); + lowPrioritySizeBasedBlockingQ.start(); // needed by S3AsyncService SocketAccess.doPrivileged(() -> System.setProperty("opensearch.path.conf", configPath().toString())); super.setUp(); @@ -137,6 +165,11 @@ public void tearDown() throws Exception { streamReaderService.shutdown(); futureCompletionService.shutdown(); + remoteTransferRetry.shutdown(); + transferQueueConsumerService.shutdown(); + scheduler.shutdown(); + normalPrioritySizeBasedBlockingQ.close(); + lowPrioritySizeBasedBlockingQ.close(); IOUtils.close(transferNIOGroup); if (previousOpenSearchPathConf != null) { @@ -205,7 +238,7 @@ protected AsyncMultiStreamBlobContainer createBlobContainer( streamReaderService, transferNIOGroup ); - + GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); return new S3BlobContainer( BlobPath.cleanPath(), new S3BlobStore( @@ -223,11 +256,21 @@ protected AsyncMultiStreamBlobContainer createBlobContainer( S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader(), - asyncExecutorContainer.getStreamReader() + asyncExecutorContainer.getStreamReader(), + new TransferSemaphoresHolder( + 3, + Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), + 5, + TimeUnit.MINUTES, + genericStatsMetricPublisher + ) ), asyncExecutorContainer, asyncExecutorContainer, - asyncExecutorContainer + asyncExecutorContainer, + normalPrioritySizeBasedBlockingQ, + lowPrioritySizeBasedBlockingQ, + genericStatsMetricPublisher ) ) { @Override diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java index 6fec535ae6301..f8e9903bb3577 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java @@ -169,7 +169,11 @@ private S3Repository createS3Repo(RepositoryMetadata metadata) { null, null, null, - false + false, + null, + null, + null, + null ) { @Override protected void assertSnapshotOrGenericThread() { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index 04d1819bef02b..89add3cdbfc60 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -33,6 +33,7 @@ import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.repositories.blobstore.ZeroInputStream; +import org.opensearch.repositories.s3.GenericStatsMetricPublisher; import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -46,6 +47,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.ArgumentMatchers.any; @@ -63,11 +65,19 @@ public class AsyncTransferManagerTests extends OpenSearchTestCase { @Before public void setUp() throws Exception { s3AsyncClient = mock(S3AsyncClient.class); + GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); asyncTransferManager = new AsyncTransferManager( ByteSizeUnit.MB.toBytes(5), Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor(), - Executors.newSingleThreadExecutor() + Executors.newSingleThreadExecutor(), + new TransferSemaphoresHolder( + 3, + Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), + 5, + TimeUnit.MINUTES, + genericStatsMetricPublisher + ) ); super.setUp(); } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java new file mode 100644 index 0000000000000..5be4037407d23 --- /dev/null +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.repositories.s3.GenericStatsMetricPublisher; +import org.opensearch.repositories.s3.S3TransferRejectedException; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.After; +import org.junit.Before; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SizeBasedBlockingQTests extends OpenSearchTestCase { + private ExecutorService consumerService; + private ExecutorService producerService; + + @Override + @Before + public void setUp() throws Exception { + this.consumerService = Executors.newFixedThreadPool(10); + this.producerService = Executors.newFixedThreadPool(100); + super.setUp(); + } + + @After + public void tearDown() throws Exception { + consumerService.shutdown(); + producerService.shutdown(); + super.tearDown(); + } + + public void testProducerConsumerOfBulkItems() throws InterruptedException { + GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); + SizeBasedBlockingQ.QueueEventType queueEventType = randomBoolean() + ? SizeBasedBlockingQ.QueueEventType.NORMAL + : SizeBasedBlockingQ.QueueEventType.LOW; + SizeBasedBlockingQ sizeBasedBlockingQ = new SizeBasedBlockingQ( + new ByteSizeValue(ByteSizeUnit.BYTES.toBytes(10)), + consumerService, + 10, + genericStatsMetricPublisher, + queueEventType + ); + sizeBasedBlockingQ.start(); + int numOfItems = randomIntBetween(100, 1000); + CountDownLatch latch = new CountDownLatch(numOfItems); + AtomicBoolean unknownError = new AtomicBoolean(); + for (int i = 0; i < numOfItems; i++) { + final int idx = i; + producerService.submit(() -> { + boolean throwException = randomBoolean(); + + SizeBasedBlockingQ.Item item = new TestItemToStr(randomIntBetween(1, 5), () -> { + latch.countDown(); + if (throwException) { + throw new RuntimeException("throwing random exception"); + } + }, idx); + + try { + sizeBasedBlockingQ.produce(item); + } catch (InterruptedException e) { + latch.countDown(); + unknownError.set(true); + throw new RuntimeException(e); + } catch (S3TransferRejectedException ex) { + latch.countDown(); + } + }); + } + latch.await(); + sizeBasedBlockingQ.close(); + assertFalse(unknownError.get()); + assertEquals(0L, genericStatsMetricPublisher.getNormalPriorityQSize()); + assertEquals(0L, genericStatsMetricPublisher.getLowPriorityQSize()); + } + + static class TestItemToStr extends SizeBasedBlockingQ.Item { + private final int id; + + public TestItemToStr(long size, Runnable consumable, int id) { + super(size, consumable); + this.id = id; + } + + @Override + public String toString() { + return String.valueOf(id); + } + } +} diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolderTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolderTests.java new file mode 100644 index 0000000000000..236f02c5eb1f7 --- /dev/null +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolderTests.java @@ -0,0 +1,276 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.repositories.s3.GenericStatsMetricPublisher; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.mockito.Mockito; + +import static org.opensearch.repositories.s3.async.TransferSemaphoresHolder.TypeSemaphore.PermitType; + +public class TransferSemaphoresHolderTests extends OpenSearchTestCase { + + public void testAllocation() { + int availablePermits = randomIntBetween(5, 20); + double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); + int normalPermits = (int) (availablePermits * priorityAllocation); + int lowPermits = availablePermits - normalPermits; + GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); + TransferSemaphoresHolder transferSemaphoresHolder = new TransferSemaphoresHolder( + normalPermits, + lowPermits, + 1, + TimeUnit.NANOSECONDS, + genericStatsPublisher + ); + assertEquals(normalPermits, transferSemaphoresHolder.getNormalPriorityPermits()); + assertEquals(lowPermits, transferSemaphoresHolder.getLowPriorityPermits()); + assertEquals(0, genericStatsPublisher.getAcquiredNormalPriorityPermits()); + assertEquals(0, genericStatsPublisher.getAcquiredLowPriorityPermits()); + } + + public void testLowPriorityEventPermitAcquisition() throws InterruptedException { + int availablePermits = randomIntBetween(5, 50); + double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); + int normalPermits = (int) (availablePermits * priorityAllocation); + int lowPermits = availablePermits - normalPermits; + GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); + TransferSemaphoresHolder transferSemaphoresHolder = new TransferSemaphoresHolder( + normalPermits, + lowPermits, + 1, + TimeUnit.NANOSECONDS, + genericStatsPublisher + ); + + List semaphores = new ArrayList<>(); + int normalPermitsEligibleForLowEvents = normalPermits - (int) (normalPermits * 0.4); + + int lowAcquisitionsExpected = (normalPermitsEligibleForLowEvents + lowPermits); + for (int i = 0; i < lowAcquisitionsExpected; i++) { + TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); + TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( + WritePriority.LOW, + requestContext + ); + semaphores.add(acquiredSemaphore); + if (i >= lowPermits) { + assertEquals(PermitType.NORMAL, acquiredSemaphore.getType()); + } else { + assertEquals(PermitType.LOW, acquiredSemaphore.getType()); + } + } + + for (int i = 0; i < normalPermits - normalPermitsEligibleForLowEvents; i++) { + TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); + TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( + WritePriority.NORMAL, + requestContext + ); + assertEquals(PermitType.NORMAL, acquiredSemaphore.getType()); + semaphores.add(acquiredSemaphore); + } + + TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); + TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( + WritePriority.LOW, + requestContext + ); + assertNull(acquiredSemaphore); + + assertEquals(availablePermits, semaphores.size()); + semaphores.forEach(Semaphore::release); + assertEquals(normalPermits, transferSemaphoresHolder.getNormalPriorityPermits()); + assertEquals(lowPermits, transferSemaphoresHolder.getLowPriorityPermits()); + assertEquals(0, genericStatsPublisher.getAcquiredNormalPriorityPermits()); + assertEquals(0, genericStatsPublisher.getAcquiredLowPriorityPermits()); + + } + + public void testNormalPermitEventAcquisition() throws InterruptedException { + int availablePermits = randomIntBetween(5, 50); + double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); + int normalPermits = (int) (availablePermits * priorityAllocation); + int lowPermits = availablePermits - normalPermits; + GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); + TransferSemaphoresHolder transferSemaphoresHolder = new TransferSemaphoresHolder( + normalPermits, + lowPermits, + 1, + TimeUnit.NANOSECONDS, + genericStatsPublisher + ); + + List semaphores = new ArrayList<>(); + List lowSemaphores = new ArrayList<>(); + int normalAcquisitionsExpected = normalPermits + lowPermits; + TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); + for (int i = 0; i < normalAcquisitionsExpected; i++) { + TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( + WritePriority.NORMAL, + requestContext + ); + semaphores.add(acquiredSemaphore); + if (i >= normalPermits) { + assertEquals(PermitType.LOW, acquiredSemaphore.getType()); + lowSemaphores.add(acquiredSemaphore); + } else { + assertEquals(PermitType.NORMAL, acquiredSemaphore.getType()); + } + } + assertEquals(availablePermits, semaphores.size()); + + int lowAcquired = lowPermits; + + Semaphore removedLowSemaphore = lowSemaphores.remove(0); + removedLowSemaphore.release(); + semaphores.remove(removedLowSemaphore); + + requestContext = transferSemaphoresHolder.createRequestContext(); + TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( + WritePriority.LOW, + requestContext + ); + semaphores.add(acquiredSemaphore); + lowSemaphores.add(acquiredSemaphore); + while (lowAcquired > 1) { + requestContext = transferSemaphoresHolder.createRequestContext(); + acquiredSemaphore = transferSemaphoresHolder.acquirePermit(WritePriority.NORMAL, requestContext); + assertNull(acquiredSemaphore); + lowAcquired--; + } + + semaphores.forEach(Semaphore::release); + assertEquals(normalPermits, transferSemaphoresHolder.getNormalPriorityPermits()); + assertEquals(lowPermits, transferSemaphoresHolder.getLowPriorityPermits()); + assertEquals(0, genericStatsPublisher.getAcquiredNormalPriorityPermits()); + assertEquals(0, genericStatsPublisher.getAcquiredLowPriorityPermits()); + } + + private static class TestTransferSemaphoresHolder extends TransferSemaphoresHolder { + AtomicInteger normalWaitCount = new AtomicInteger(); + AtomicInteger lowWaitCount = new AtomicInteger(); + + /** + * Constructor to create semaphores holder. + */ + public TestTransferSemaphoresHolder( + int normalPermits, + int lowPermits, + int acquireWaitDuration, + TimeUnit timeUnit, + GenericStatsMetricPublisher genericStatsMetricPublisher + ) throws InterruptedException { + super(normalPermits, lowPermits, acquireWaitDuration, timeUnit, genericStatsMetricPublisher); + TypeSemaphore executingNormalSemaphore = normalPrioritySemaphore; + TypeSemaphore executingLowSemaphore = lowPrioritySemaphore; + + this.normalPrioritySemaphore = Mockito.spy(normalPrioritySemaphore); + this.lowPrioritySemaphore = Mockito.spy(lowPrioritySemaphore); + Mockito.doAnswer(invocation -> { + normalWaitCount.incrementAndGet(); + return false; + }).when(normalPrioritySemaphore).tryAcquire(Mockito.anyLong(), Mockito.any(TimeUnit.class)); + Mockito.doAnswer(invocation -> executingNormalSemaphore.availablePermits()).when(normalPrioritySemaphore).availablePermits(); + Mockito.doAnswer(invocation -> executingNormalSemaphore.tryAcquire()).when(normalPrioritySemaphore).tryAcquire(); + + Mockito.doAnswer(invocation -> { + lowWaitCount.incrementAndGet(); + return false; + }).when(lowPrioritySemaphore).tryAcquire(Mockito.anyLong(), Mockito.any(TimeUnit.class)); + Mockito.doAnswer(invocation -> executingLowSemaphore.availablePermits()).when(lowPrioritySemaphore).availablePermits(); + Mockito.doAnswer(invocation -> executingLowSemaphore.tryAcquire()).when(lowPrioritySemaphore).tryAcquire(); + } + } + + public void testNormalSemaphoreAcquiredWait() throws InterruptedException { + int availablePermits = randomIntBetween(10, 50); + double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); + int normalPermits = (int) (availablePermits * priorityAllocation); + GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); + TestTransferSemaphoresHolder transferSemaphoresHolder = new TestTransferSemaphoresHolder( + normalPermits, + availablePermits - normalPermits, + 5, + TimeUnit.MINUTES, + genericStatsPublisher + ); + + TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); + TransferSemaphoresHolder.TypeSemaphore lowSemaphore = transferSemaphoresHolder.acquirePermit(WritePriority.LOW, requestContext); + assertEquals(PermitType.LOW, lowSemaphore.getType()); + for (int i = 0; i < normalPermits; i++) { + requestContext = transferSemaphoresHolder.createRequestContext(); + TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( + WritePriority.NORMAL, + requestContext + ); + assertEquals(PermitType.NORMAL, acquiredSemaphore.getType()); + } + + TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( + WritePriority.NORMAL, + requestContext + ); + assertNull(acquiredSemaphore); + assertEquals(1, transferSemaphoresHolder.normalWaitCount.get()); + assertEquals(0, transferSemaphoresHolder.lowWaitCount.get()); + } + + public void testLowSemaphoreAcquiredWait() throws InterruptedException { + int availablePermits = randomIntBetween(10, 50); + double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); + int normalPermits = (int) (availablePermits * priorityAllocation); + int lowPermits = availablePermits - normalPermits; + GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); + TestTransferSemaphoresHolder transferSemaphoresHolder = new TestTransferSemaphoresHolder( + normalPermits, + lowPermits, + 5, + TimeUnit.MINUTES, + genericStatsPublisher + ); + + TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); + int normalPermitsEligibleForLowEvents = normalPermits - (int) (normalPermits * 0.4); + for (int i = 0; i < normalPermitsEligibleForLowEvents; i++) { + TransferSemaphoresHolder.TypeSemaphore lowSemaphore = transferSemaphoresHolder.acquirePermit( + WritePriority.NORMAL, + requestContext + ); + assertEquals(PermitType.NORMAL, lowSemaphore.getType()); + } + + for (int i = 0; i < lowPermits; i++) { + requestContext = transferSemaphoresHolder.createRequestContext(); + TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( + WritePriority.LOW, + requestContext + ); + assertEquals(PermitType.LOW, acquiredSemaphore.getType()); + } + + TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( + WritePriority.LOW, + requestContext + ); + assertNull(acquiredSemaphore); + assertEquals(1, transferSemaphoresHolder.lowWaitCount.get()); + assertEquals(0, transferSemaphoresHolder.normalWaitCount.get()); + } + +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java index 0f6646d37f950..8ce8ec8e01abe 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java @@ -75,6 +75,7 @@ default void reload(RepositoryMetadata repositoryMetadata) {} * Metrics for BlobStore interactions */ enum Metric { + GENERIC_STATS("generic_stats"), REQUEST_SUCCESS("request_success_total"), REQUEST_FAILURE("request_failures_total"), REQUEST_LATENCY("request_time_in_millis"), diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java index 3f341c878c3c7..4e8db0a3a8c69 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java @@ -14,7 +14,12 @@ * @opensearch.internal */ public enum WritePriority { + // Used for segment transfers during refresh, flush or merges NORMAL, + // Used for transfer of translog or ckp files. HIGH, - URGENT + // Used for transfer of remote cluster state + URGENT, + // All other background transfers such as in snapshot recovery, recovery from local store or index etc. + LOW } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 351aec6e3af6c..bfb841307af49 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -437,10 +437,14 @@ private void uploadNewSegments( batchUploadListener.onFailure(ex); }); statsListener.beforeUpload(src); - remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener); + remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener, isLowPriorityUpload()); } } + private boolean isLowPriorityUpload() { + return isLocalOrSnapshotRecovery(); + } + /** * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded. * diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 345583bbbd1be..ab76150f8f83d 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -29,6 +29,7 @@ import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.index.store.exception.ChecksumCombinationException; import java.io.FileNotFoundException; @@ -323,11 +324,12 @@ public boolean copyFrom( String remoteFileName, IOContext context, Runnable postUploadRunner, - ActionListener listener + ActionListener listener, + boolean lowPriorityUpload ) { if (blobContainer instanceof AsyncMultiStreamBlobContainer) { try { - uploadBlob(from, src, remoteFileName, context, postUploadRunner, listener); + uploadBlob(from, src, remoteFileName, context, postUploadRunner, listener, lowPriorityUpload); } catch (Exception e) { listener.onFailure(e); } @@ -342,7 +344,8 @@ private void uploadBlob( String remoteFileName, IOContext ioContext, Runnable postUploadRunner, - ActionListener listener + ActionListener listener, + boolean lowPriorityUpload ) throws Exception { long expectedChecksum = calculateChecksumOfChecksum(from, src); long contentLength; @@ -353,12 +356,13 @@ private void uploadBlob( if (getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported(); } + lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15); RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( src, remoteFileName, contentLength, true, - WritePriority.NORMAL, + lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL, (size, position) -> uploadRateLimiter.apply(new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)), expectedChecksum, remoteIntegrityEnabled diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index ec1163fe91b6c..8c0ecb4cc783a 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -453,7 +453,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException { * @param context IOContext to be used to open IndexInput of file during remote upload * @param listener Listener to handle upload callback events */ - public void copyFrom(Directory from, String src, IOContext context, ActionListener listener) { + public void copyFrom(Directory from, String src, IOContext context, ActionListener listener, boolean lowPriorityUpload) { try { final String remoteFileName = getNewRemoteSegmentFilename(src); boolean uploaded = remoteDataDirectory.copyFrom(from, src, remoteFileName, context, () -> { @@ -462,7 +462,7 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen } catch (IOException e) { throw new RuntimeException("Exception in segment postUpload for file " + src, e); } - }, listener); + }, listener, lowPriorityUpload); if (uploaded == false) { copyFrom(from, src, src, context); listener.onResponse(null); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index 9e38e1749d434..ee81369725e6f 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -104,7 +104,8 @@ public void onResponse(Void t) { public void onFailure(Exception e) { fail("Listener responded with exception" + e); } - } + }, + false ); assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); assertTrue(postUploadInvoked.get()); @@ -141,7 +142,8 @@ public void onResponse(Void t) { public void onFailure(Exception e) { countDownLatch.countDown(); } - } + }, + false ); assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); assertFalse(postUploadInvoked.get()); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index b1e2028d761f0..567199cf64cd8 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -639,7 +639,7 @@ public void onResponse(Void unused) { @Override public void onFailure(Exception e) {} }; - remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener); + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener, false); assertTrue(latch.await(5000, TimeUnit.SECONDS)); assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); storeDirectory.close(); @@ -683,7 +683,7 @@ public void onFailure(Exception e) { latch.countDown(); } }; - remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener); + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener, false); assertTrue(latch.await(5000, TimeUnit.SECONDS)); assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename));