Skip to content

Commit

Permalink
[Remote Store] Permit backed futures to prevent timeouts during uploa…
Browse files Browse the repository at this point in the history
…d bursts (opensearch-project#12159)

Signed-off-by: vikasvb90 <vikasvb@amazon.com>
  • Loading branch information
vikasvb90 authored May 13, 2024
1 parent 83997fd commit c328c18
Show file tree
Hide file tree
Showing 26 changed files with 1,555 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,22 @@ protected S3Repository createRepository(
ClusterService clusterService,
RecoverySettings recoverySettings
) {
return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) {
return new S3Repository(
metadata,
registry,
service,
clusterService,
recoverySettings,
null,
null,
null,
null,
null,
false,
null,
null,
null
) {

@Override
public BlobStore blobStore() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.s3;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Generic stats of repository-s3 plugin.
*/
public class GenericStatsMetricPublisher {

private final AtomicLong normalPriorityQSize = new AtomicLong();
private final AtomicInteger normalPriorityPermits = new AtomicInteger();
private final AtomicLong lowPriorityQSize = new AtomicLong();
private final AtomicInteger lowPriorityPermits = new AtomicInteger();
private final long normalPriorityQCapacity;
private final int maxNormalPriorityPermits;
private final long lowPriorityQCapacity;
private final int maxLowPriorityPermits;

public GenericStatsMetricPublisher(
long normalPriorityQCapacity,
int maxNormalPriorityPermits,
long lowPriorityQCapacity,
int maxLowPriorityPermits
) {
this.normalPriorityQCapacity = normalPriorityQCapacity;
this.maxNormalPriorityPermits = maxNormalPriorityPermits;
this.lowPriorityQCapacity = lowPriorityQCapacity;
this.maxLowPriorityPermits = maxLowPriorityPermits;
}

public void updateNormalPriorityQSize(long qSize) {
normalPriorityQSize.addAndGet(qSize);
}

public void updateLowPriorityQSize(long qSize) {
lowPriorityQSize.addAndGet(qSize);
}

public void updateNormalPermits(boolean increment) {
if (increment) {
normalPriorityPermits.incrementAndGet();
} else {
normalPriorityPermits.decrementAndGet();
}
}

public void updateLowPermits(boolean increment) {
if (increment) {
lowPriorityPermits.incrementAndGet();
} else {
lowPriorityPermits.decrementAndGet();
}
}

public long getNormalPriorityQSize() {
return normalPriorityQSize.get();
}

public int getAcquiredNormalPriorityPermits() {
return normalPriorityPermits.get();
}

public long getLowPriorityQSize() {
return lowPriorityQSize.get();
}

public int getAcquiredLowPriorityPermits() {
return lowPriorityPermits.get();
}

Map<String, Long> stats() {
final Map<String, Long> results = new HashMap<>();
results.put("NormalPriorityQUtilization", (normalPriorityQSize.get() * 100) / normalPriorityQCapacity);
results.put("LowPriorityQUtilization", (lowPriorityQSize.get() * 100) / lowPriorityQCapacity);
results.put("NormalPriorityPermitsUtilization", (normalPriorityPermits.get() * 100L) / maxNormalPriorityPermits);
results.put("LowPriorityPermitsUtilization", (lowPriorityPermits.get() * 100L) / maxLowPriorityPermits);
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;

Expand Down Expand Up @@ -218,7 +219,14 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
writeContext.getMetadata()
);
try {
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
// Therefore, redirecting it to slow client.
if ((uploadRequest.getWritePriority() == WritePriority.LOW
&& blobStore.getLowPrioritySizeBasedBlockingQ().isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)
|| (uploadRequest.getWritePriority() != WritePriority.HIGH
&& uploadRequest.getWritePriority() != WritePriority.URGENT
&& blobStore.getNormalPrioritySizeBasedBlockingQ()
.isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)) {
StreamContext streamContext = SocketAccess.doPrivileged(
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
);
Expand Down Expand Up @@ -258,23 +266,55 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
} else {
s3AsyncClient = amazonS3Reference.get().client();
}
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(response);
} else {
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
completionListener.onFailure(ex);
}
});

if (writeContext.getWritePriority() == WritePriority.URGENT
|| writeContext.getWritePriority() == WritePriority.HIGH
|| blobStore.isPermitBackedTransferEnabled() == false) {
createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener);
} else if (writeContext.getWritePriority() == WritePriority.LOW) {
blobStore.getLowPrioritySizeBasedBlockingQ()
.produce(
new SizeBasedBlockingQ.Item(
writeContext.getFileSize(),
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
)
);
} else if (writeContext.getWritePriority() == WritePriority.NORMAL) {
blobStore.getNormalPrioritySizeBasedBlockingQ()
.produce(
new SizeBasedBlockingQ.Item(
writeContext.getFileSize(),
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
)
);
} else {
throw new IllegalStateException("Cannot perform upload for other priority types.");
}
}
} catch (Exception e) {
logger.info("exception error from blob container for file {}", writeContext.getFileName());
throw new IOException(e);
}
}

private CompletableFuture<Void> createFileCompletableFuture(
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
StreamContext streamContext,
ActionListener<Void> completionListener
) {
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
return completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(response);
} else {
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
completionListener.onFailure(ex);
}
});
}

@ExperimentalApi
@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -56,6 +57,7 @@
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
import static org.opensearch.repositories.s3.S3Repository.PERMIT_BACKED_TRANSFER_ENABLED;
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
Expand All @@ -77,6 +79,8 @@ class S3BlobStore implements BlobStore {

private volatile boolean uploadRetryEnabled;

private volatile boolean permitBackedTransferEnabled;

private volatile boolean serverSideEncryption;

private volatile ObjectCannedACL cannedACL;
Expand All @@ -94,6 +98,9 @@ class S3BlobStore implements BlobStore {
private final AsyncExecutorContainer priorityExecutorBuilder;
private final AsyncExecutorContainer normalExecutorBuilder;
private final boolean multipartUploadEnabled;
private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private final GenericStatsMetricPublisher genericStatsMetricPublisher;

S3BlobStore(
S3Service service,
Expand All @@ -109,7 +116,10 @@ class S3BlobStore implements BlobStore {
AsyncTransferManager asyncTransferManager,
AsyncExecutorContainer urgentExecutorBuilder,
AsyncExecutorContainer priorityExecutorBuilder,
AsyncExecutorContainer normalExecutorBuilder
AsyncExecutorContainer normalExecutorBuilder,
SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ,
GenericStatsMetricPublisher genericStatsMetricPublisher
) {
this.service = service;
this.s3AsyncService = s3AsyncService;
Expand All @@ -128,6 +138,10 @@ class S3BlobStore implements BlobStore {
// Settings to initialize blobstore with.
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ;
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
this.genericStatsMetricPublisher = genericStatsMetricPublisher;
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
}

@Override
Expand All @@ -141,6 +155,7 @@ public void reload(RepositoryMetadata repositoryMetadata) {
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
}

@Override
Expand Down Expand Up @@ -168,6 +183,10 @@ public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}

public boolean isPermitBackedTransferEnabled() {
return permitBackedTransferEnabled;
}

public String bucket() {
return bucket;
}
Expand All @@ -184,6 +203,14 @@ public int getBulkDeletesSize() {
return bulkDeletesSize;
}

public SizeBasedBlockingQ getNormalPrioritySizeBasedBlockingQ() {
return normalPrioritySizeBasedBlockingQ;
}

public SizeBasedBlockingQ getLowPrioritySizeBasedBlockingQ() {
return lowPrioritySizeBasedBlockingQ;
}

@Override
public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
Expand All @@ -201,7 +228,9 @@ public void close() throws IOException {

@Override
public Map<String, Long> stats() {
return statsMetricPublisher.getStats().toMap();
Map<String, Long> stats = statsMetricPublisher.getStats().toMap();
stats.putAll(genericStatsMetricPublisher.stats());
return stats;
}

@Override
Expand All @@ -211,6 +240,7 @@ public Map<Metric, Map<String, Long>> extendedStats() {
}
Map<Metric, Map<String, Long>> extendedStats = new HashMap<>();
statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap()));
extendedStats.put(Metric.GENERIC_STATS, genericStatsMetricPublisher.stats());
return extendedStats;
}

Expand Down
Loading

0 comments on commit c328c18

Please sign in to comment.