clientsSettings) {
+ // shutdown all unused clients
+ // others will shutdown on their respective release
+ releaseCachedClients();
+ this.staticClientSettings = MapBuilder.newMapBuilder(clientsSettings).immutableMap();
+ derivedClientSettings = emptyMap();
+ assert this.staticClientSettings.containsKey("default") : "always at least have 'default'";
+ // clients are built lazily by {@link client}
+ }
+
+ /**
+ * Attempts to retrieve a client by its repository metadata and settings from the cache.
+ * If the client does not exist it will be created.
+ */
+ public AmazonAsyncS3Reference client(
+ RepositoryMetadata repositoryMetadata,
+ AsyncExecutorContainer priorityExecutorBuilder,
+ AsyncExecutorContainer normalExecutorBuilder
+ ) {
+ final S3ClientSettings clientSettings = settings(repositoryMetadata);
+ {
+ final AmazonAsyncS3Reference clientReference = clientsCache.get(clientSettings);
+ if (clientReference != null && clientReference.tryIncRef()) {
+ return clientReference;
+ }
+ }
+ synchronized (this) {
+ final AmazonAsyncS3Reference existing = clientsCache.get(clientSettings);
+ if (existing != null && existing.tryIncRef()) {
+ return existing;
+ }
+ final AmazonAsyncS3Reference clientReference = new AmazonAsyncS3Reference(
+ buildClient(clientSettings, priorityExecutorBuilder, normalExecutorBuilder)
+ );
+ clientReference.incRef();
+ clientsCache = MapBuilder.newMapBuilder(clientsCache).put(clientSettings, clientReference).immutableMap();
+ return clientReference;
+ }
+ }
+
+ /**
+ * Either fetches {@link S3ClientSettings} for a given {@link RepositoryMetadata} from cached settings or creates them
+ * by overriding static client settings from {@link #staticClientSettings} with settings found in the repository metadata.
+ * @param repositoryMetadata Repository Metadata
+ * @return S3ClientSettings
+ */
+ S3ClientSettings settings(RepositoryMetadata repositoryMetadata) {
+ final Settings settings = repositoryMetadata.settings();
+ {
+ final S3ClientSettings existing = derivedClientSettings.get(settings);
+ if (existing != null) {
+ return existing;
+ }
+ }
+ final String clientName = S3Repository.CLIENT_NAME.get(settings);
+ final S3ClientSettings staticSettings = staticClientSettings.get(clientName);
+ if (staticSettings != null) {
+ synchronized (this) {
+ final S3ClientSettings existing = derivedClientSettings.get(settings);
+ if (existing != null) {
+ return existing;
+ }
+ final S3ClientSettings newSettings = staticSettings.refine(settings);
+ derivedClientSettings = MapBuilder.newMapBuilder(derivedClientSettings).put(settings, newSettings).immutableMap();
+ return newSettings;
+ }
+ }
+ throw new IllegalArgumentException(
+ "Unknown s3 client name ["
+ + clientName
+ + "]. Existing client configs: "
+ + Strings.collectionToDelimitedString(staticClientSettings.keySet(), ",")
+ );
+ }
+
+ // proxy for testing
+ synchronized AmazonAsyncS3WithCredentials buildClient(
+ final S3ClientSettings clientSettings,
+ AsyncExecutorContainer priorityExecutorBuilder,
+ AsyncExecutorContainer normalExecutorBuilder
+ ) {
+ setDefaultAwsProfilePath();
+ final S3AsyncClientBuilder builder = S3AsyncClient.builder();
+ builder.overrideConfiguration(buildOverrideConfiguration(clientSettings));
+ final AwsCredentialsProvider credentials = buildCredentials(logger, clientSettings);
+ builder.credentialsProvider(credentials);
+
+ String endpoint = Strings.hasLength(clientSettings.endpoint) ? clientSettings.endpoint : DEFAULT_S3_ENDPOINT;
+ if ((endpoint.startsWith("http://") || endpoint.startsWith("https://")) == false) {
+ // Manually add the schema to the endpoint to work around https://github.com/aws/aws-sdk-java/issues/2274
+ endpoint = clientSettings.protocol.toString() + "://" + endpoint;
+ }
+ logger.debug("using endpoint [{}] and region [{}]", endpoint, clientSettings.region);
+
+ // If the endpoint configuration isn't set on the builder then the default behaviour is to try
+ // and work out what region we are in and use an appropriate endpoint - see AwsClientBuilder#setRegion.
+ // In contrast, directly-constructed clients use s3.amazonaws.com unless otherwise instructed. We currently
+ // use a directly-constructed client, and need to keep the existing behaviour to avoid a breaking change,
+ // so to move to using the builder we must set it explicitly to keep the existing behaviour.
+ //
+ // We do this because directly constructing the client is deprecated (was already deprecated in 1.1.223 too)
+ // so this change removes that usage of a deprecated API.
+ builder.endpointOverride(URI.create(endpoint));
+ builder.region(Region.of(clientSettings.region));
+ if (clientSettings.pathStyleAccess) {
+ builder.forcePathStyle(true);
+ }
+
+ builder.httpClient(buildHttpClient(clientSettings, priorityExecutorBuilder.getAsyncTransferEventLoopGroup()));
+ builder.asyncConfiguration(
+ ClientAsyncConfiguration.builder()
+ .advancedOption(
+ SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
+ priorityExecutorBuilder.getFutureCompletionExecutor()
+ )
+ .build()
+ );
+ final S3AsyncClient priorityClient = SocketAccess.doPrivileged(builder::build);
+
+ builder.httpClient(buildHttpClient(clientSettings, normalExecutorBuilder.getAsyncTransferEventLoopGroup()));
+ builder.asyncConfiguration(
+ ClientAsyncConfiguration.builder()
+ .advancedOption(
+ SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
+ normalExecutorBuilder.getFutureCompletionExecutor()
+ )
+ .build()
+ );
+ final S3AsyncClient client = SocketAccess.doPrivileged(builder::build);
+
+ return AmazonAsyncS3WithCredentials.create(client, priorityClient, credentials);
+ }
+
+ static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSettings clientSettings) {
+ return ClientOverrideConfiguration.builder()
+ .retryPolicy(
+ RetryPolicy.builder()
+ .numRetries(clientSettings.maxRetries)
+ .throttlingBackoffStrategy(
+ clientSettings.throttleRetries ? BackoffStrategy.defaultThrottlingStrategy() : BackoffStrategy.none()
+ )
+ .build()
+ )
+ .apiCallAttemptTimeout(Duration.ofMillis(clientSettings.requestTimeoutMillis))
+ .build();
+ }
+
+ // pkg private for tests
+ static SdkAsyncHttpClient buildHttpClient(S3ClientSettings clientSettings, AsyncTransferEventLoopGroup asyncTransferEventLoopGroup) {
+ // the response metadata cache is only there for diagnostics purposes,
+ // but can force objects from every response to the old generation.
+ NettyNioAsyncHttpClient.Builder clientBuilder = NettyNioAsyncHttpClient.builder();
+
+ if (clientSettings.proxySettings.getType() != ProxySettings.ProxyType.DIRECT) {
+ ProxyConfiguration.Builder proxyConfiguration = ProxyConfiguration.builder();
+ proxyConfiguration.scheme(clientSettings.proxySettings.getType().toProtocol().toString());
+ proxyConfiguration.host(clientSettings.proxySettings.getHostName());
+ proxyConfiguration.port(clientSettings.proxySettings.getPort());
+ proxyConfiguration.username(clientSettings.proxySettings.getUsername());
+ proxyConfiguration.password(clientSettings.proxySettings.getPassword());
+ clientBuilder.proxyConfiguration(proxyConfiguration.build());
+ }
+
+ // TODO: add max retry and UseThrottleRetry. Replace values with settings and put these in default settings
+ clientBuilder.connectionTimeout(Duration.ofMillis(clientSettings.connectionTimeoutMillis));
+ clientBuilder.connectionAcquisitionTimeout(Duration.ofMillis(clientSettings.connectionAcquisitionTimeoutMillis));
+ clientBuilder.maxPendingConnectionAcquires(10_000);
+ clientBuilder.maxConcurrency(clientSettings.maxConnections);
+ clientBuilder.eventLoopGroup(SdkEventLoopGroup.create(asyncTransferEventLoopGroup.getEventLoopGroup()));
+ clientBuilder.tcpKeepAlive(true);
+
+ return clientBuilder.build();
+ }
+
+ // pkg private for tests
+ static AwsCredentialsProvider buildCredentials(Logger logger, S3ClientSettings clientSettings) {
+ final AwsCredentials basicCredentials = clientSettings.credentials;
+ final IrsaCredentials irsaCredentials = buildFromEnvironment(clientSettings.irsaCredentials);
+
+ // If IAM Roles for Service Accounts (IRSA) credentials are configured, start with them first
+ if (irsaCredentials != null) {
+ logger.debug("Using IRSA credentials");
+
+ final Region region = Region.of(clientSettings.region);
+ StsClient stsClient = SocketAccess.doPrivileged(() -> {
+ StsClientBuilder builder = StsClient.builder().region(region);
+
+ final String stsEndpoint = System.getProperty(STS_ENDPOINT_OVERRIDE_SYSTEM_PROPERTY);
+ if (stsEndpoint != null) {
+ builder = builder.endpointOverride(URI.create(stsEndpoint));
+ }
+
+ if (basicCredentials != null) {
+ builder = builder.credentialsProvider(StaticCredentialsProvider.create(basicCredentials));
+ } else {
+ builder = builder.credentialsProvider(DefaultCredentialsProvider.create());
+ }
+
+ return builder.build();
+ });
+
+ if (irsaCredentials.getIdentityTokenFile() == null) {
+ final StsAssumeRoleCredentialsProvider.Builder stsCredentialsProviderBuilder = StsAssumeRoleCredentialsProvider.builder()
+ .stsClient(stsClient)
+ .refreshRequest(
+ AssumeRoleRequest.builder()
+ .roleArn(irsaCredentials.getRoleArn())
+ .roleSessionName(irsaCredentials.getRoleSessionName())
+ .build()
+ );
+
+ final StsAssumeRoleCredentialsProvider stsCredentialsProvider = SocketAccess.doPrivileged(
+ stsCredentialsProviderBuilder::build
+ );
+
+ return new PrivilegedSTSAssumeRoleSessionCredentialsProvider<>(stsClient, stsCredentialsProvider);
+ } else {
+ final StsWebIdentityTokenFileCredentialsProvider.Builder stsCredentialsProviderBuilder =
+ StsWebIdentityTokenFileCredentialsProvider.builder()
+ .stsClient(stsClient)
+ .roleArn(irsaCredentials.getRoleArn())
+ .roleSessionName(irsaCredentials.getRoleSessionName())
+ .webIdentityTokenFile(Path.of(irsaCredentials.getIdentityTokenFile()));
+
+ final StsWebIdentityTokenFileCredentialsProvider stsCredentialsProvider = SocketAccess.doPrivileged(
+ stsCredentialsProviderBuilder::build
+ );
+
+ return new PrivilegedSTSAssumeRoleSessionCredentialsProvider<>(stsClient, stsCredentialsProvider);
+ }
+ } else if (basicCredentials != null) {
+ logger.debug("Using basic key/secret credentials");
+ return StaticCredentialsProvider.create(basicCredentials);
+ } else {
+ logger.debug("Using instance profile credentials");
+ return new PrivilegedInstanceProfileCredentialsProvider();
+ }
+ }
+
+ // Aws v2 sdk tries to load a default profile from home path which is restricted. Hence, setting these to random
+ // valid paths.
+ @SuppressForbidden(reason = "Need to provide this override to v2 SDK so that path does not default to home path")
+ private static void setDefaultAwsProfilePath() {
+ if (ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.getStringValue().isEmpty()) {
+ System.setProperty(ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.property(), System.getProperty("opensearch.path.conf"));
+ }
+ if (ProfileFileSystemSetting.AWS_CONFIG_FILE.getStringValue().isEmpty()) {
+ System.setProperty(ProfileFileSystemSetting.AWS_CONFIG_FILE.property(), System.getProperty("opensearch.path.conf"));
+ }
+ }
+
+ private static IrsaCredentials buildFromEnvironment(IrsaCredentials defaults) {
+ if (defaults == null) {
+ return null;
+ }
+
+ String webIdentityTokenFile = defaults.getIdentityTokenFile();
+ if (webIdentityTokenFile == null) {
+ webIdentityTokenFile = System.getenv(SdkSystemSetting.AWS_WEB_IDENTITY_TOKEN_FILE.environmentVariable());
+ }
+
+ String roleArn = defaults.getRoleArn();
+ if (roleArn == null) {
+ roleArn = System.getenv(SdkSystemSetting.AWS_ROLE_ARN.environmentVariable());
+ }
+
+ String roleSessionName = defaults.getRoleSessionName();
+ if (roleSessionName == null) {
+ roleSessionName = System.getenv(SdkSystemSetting.AWS_ROLE_SESSION_NAME.environmentVariable());
+ }
+
+ return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName);
+ }
+
+ private synchronized void releaseCachedClients() {
+ // the clients will shutdown when they will not be used anymore
+ for (final AmazonAsyncS3Reference clientReference : clientsCache.values()) {
+ clientReference.decRef();
+ }
+
+ // clear previously cached clients, they will be build lazily
+ clientsCache = emptyMap();
+ derivedClientSettings = emptyMap();
+ }
+
+ static class PrivilegedInstanceProfileCredentialsProvider implements AwsCredentialsProvider {
+ private final AwsCredentialsProvider credentials;
+
+ private PrivilegedInstanceProfileCredentialsProvider() {
+ this.credentials = initializeProvider();
+ }
+
+ private AwsCredentialsProvider initializeProvider() {
+ if (SdkSystemSetting.AWS_CONTAINER_CREDENTIALS_RELATIVE_URI.getStringValue().isPresent()
+ || SdkSystemSetting.AWS_CONTAINER_CREDENTIALS_FULL_URI.getStringValue().isPresent()) {
+
+ return ContainerCredentialsProvider.builder().asyncCredentialUpdateEnabled(true).build();
+ }
+ // InstanceProfileCredentialsProvider as last item of chain
+ return InstanceProfileCredentialsProvider.builder().asyncCredentialUpdateEnabled(true).build();
+ }
+
+ @Override
+ public AwsCredentials resolveCredentials() {
+ return SocketAccess.doPrivileged(credentials::resolveCredentials);
+ }
+ }
+
+ static class PrivilegedSTSAssumeRoleSessionCredentialsProvider
+ implements
+ AwsCredentialsProvider,
+ Closeable {
+ private final P credentials;
+ private final StsClient stsClient;
+
+ private PrivilegedSTSAssumeRoleSessionCredentialsProvider(@Nullable final StsClient stsClient, final P credentials) {
+ this.stsClient = stsClient;
+ this.credentials = credentials;
+ }
+
+ @Override
+ public void close() throws IOException {
+ SocketAccess.doPrivilegedIOException(() -> {
+ credentials.close();
+ if (stsClient != null) {
+ stsClient.close();
+ }
+ return null;
+ });
+ }
+
+ @Override
+ public AwsCredentials resolveCredentials() {
+ return SocketAccess.doPrivileged(credentials::resolveCredentials);
+ }
+ }
+
+ @Override
+ public void close() {
+ releaseCachedClients();
+ }
+}
diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java
index 49ebce77a59ad..81a902a6992d8 100644
--- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java
+++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java
@@ -39,11 +39,15 @@
import org.opensearch.action.ActionListener;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
+import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
+import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
+import org.opensearch.common.blobstore.stream.write.WriteContext;
+import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.common.collect.Tuple;
@@ -72,6 +76,8 @@
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import org.opensearch.core.common.Strings;
+import org.opensearch.repositories.s3.async.UploadRequest;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -82,6 +88,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -90,12 +97,13 @@
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
import static org.opensearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
-class S3BlobContainer extends AbstractBlobContainer {
+class S3BlobContainer extends AbstractBlobContainer implements VerifyingMultiStreamBlobContainer {
private static final Logger logger = LogManager.getLogger(S3BlobContainer.class);
/**
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
+ *
* @see S3 Documentation.
*/
private static final int MAX_BULK_DELETES = 1000;
@@ -166,6 +174,42 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
});
}
+ @Override
+ public void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException {
+ UploadRequest uploadRequest = new UploadRequest(
+ blobStore.bucket(),
+ buildKey(writeContext.getFileName()),
+ writeContext.getFileSize(),
+ writeContext.getWritePriority(),
+ writeContext.getUploadFinalizer(),
+ writeContext.doRemoteDataIntegrityCheck(),
+ writeContext.getExpectedChecksum()
+ );
+ try {
+ long partSize = blobStore.getAsyncTransferManager().calculateOptimalPartSize(writeContext.getFileSize());
+ StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize));
+ try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {
+
+ S3AsyncClient s3AsyncClient = writeContext.getWritePriority() == WritePriority.HIGH
+ ? amazonS3Reference.get().priorityClient()
+ : amazonS3Reference.get().client();
+ CompletableFuture completableFuture = blobStore.getAsyncTransferManager()
+ .uploadObject(s3AsyncClient, uploadRequest, streamContext);
+ completableFuture.whenComplete((response, throwable) -> {
+ if (throwable == null) {
+ completionListener.onResponse(response);
+ } else {
+ Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
+ completionListener.onFailure(ex);
+ }
+ });
+ }
+ } catch (Exception e) {
+ logger.info("exception error from blob container for file {}", writeContext.getFileName());
+ throw new IOException(e);
+ }
+ }
+
// package private for testing
long getLargeBlobThresholdInBytes() {
return blobStore.bufferSizeInBytes();
diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java
index 6a9be2df2bf72..30040e182cbc9 100644
--- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java
+++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java
@@ -42,6 +42,8 @@
import org.opensearch.common.unit.ByteSizeValue;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.StorageClass;
+import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
+import org.opensearch.repositories.s3.async.AsyncTransferManager;
import java.io.IOException;
import java.util.Locale;
@@ -53,6 +55,8 @@ class S3BlobStore implements BlobStore {
private final S3Service service;
+ private final S3AsyncService s3AsyncService;
+
private final String bucket;
private final ByteSizeValue bufferSize;
@@ -67,22 +71,41 @@ class S3BlobStore implements BlobStore {
private final StatsMetricPublisher statsMetricPublisher = new StatsMetricPublisher();
+ private final AsyncTransferManager asyncTransferManager;
+ private final AsyncExecutorContainer priorityExecutorBuilder;
+ private final AsyncExecutorContainer normalExecutorBuilder;
+ private final boolean multipartUploadEnabled;
+
S3BlobStore(
S3Service service,
+ S3AsyncService s3AsyncService,
+ boolean multipartUploadEnabled,
String bucket,
boolean serverSideEncryption,
ByteSizeValue bufferSize,
String cannedACL,
String storageClass,
- RepositoryMetadata repositoryMetadata
+ RepositoryMetadata repositoryMetadata,
+ AsyncTransferManager asyncTransferManager,
+ AsyncExecutorContainer priorityExecutorBuilder,
+ AsyncExecutorContainer normalExecutorBuilder
) {
this.service = service;
+ this.s3AsyncService = s3AsyncService;
+ this.multipartUploadEnabled = multipartUploadEnabled;
this.bucket = bucket;
this.serverSideEncryption = serverSideEncryption;
this.bufferSize = bufferSize;
this.cannedACL = initCannedACL(cannedACL);
this.storageClass = initStorageClass(storageClass);
this.repositoryMetadata = repositoryMetadata;
+ this.asyncTransferManager = asyncTransferManager;
+ this.normalExecutorBuilder = normalExecutorBuilder;
+ this.priorityExecutorBuilder = priorityExecutorBuilder;
+ }
+
+ public boolean isMultipartUploadEnabled() {
+ return multipartUploadEnabled;
}
@Override
@@ -94,6 +117,10 @@ public AmazonS3Reference clientReference() {
return service.client(repositoryMetadata);
}
+ public AmazonAsyncS3Reference asyncClientReference() {
+ return s3AsyncService.client(repositoryMetadata, priorityExecutorBuilder, normalExecutorBuilder);
+ }
+
int getMaxRetries() {
return service.settings(repositoryMetadata).maxRetries;
}
@@ -117,7 +144,12 @@ public BlobContainer blobContainer(BlobPath path) {
@Override
public void close() throws IOException {
- this.service.close();
+ if (service != null) {
+ this.service.close();
+ }
+ if (s3AsyncService != null) {
+ this.s3AsyncService.close();
+ }
}
@Override
@@ -170,4 +202,8 @@ public static ObjectCannedACL initCannedACL(String cannedACL) {
throw new BlobStoreException("cannedACL is not valid: [" + cannedACL + "]");
}
+
+ public AsyncTransferManager getAsyncTransferManager() {
+ return asyncTransferManager;
+ }
}
diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java
index 5f6be6ac01e76..8097629ed0773 100644
--- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java
+++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java
@@ -37,7 +37,7 @@
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.SecureSetting;
-import org.opensearch.common.settings.SecureString;
+import org.opensearch.core.common.settings.SecureString;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
@@ -172,6 +172,48 @@ final class S3ClientSettings {
key -> Setting.timeSetting(key, TimeValue.timeValueMillis(50_000), Property.NodeScope)
);
+ /** The request timeout for connecting to s3. */
+ static final Setting.AffixSetting REQUEST_TIMEOUT_SETTING = Setting.affixKeySetting(
+ PREFIX,
+ "request_timeout",
+ key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(2), Property.NodeScope)
+ );
+
+ /** The connection timeout for connecting to s3. */
+ static final Setting.AffixSetting CONNECTION_TIMEOUT_SETTING = Setting.affixKeySetting(
+ PREFIX,
+ "connection_timeout",
+ key -> Setting.timeSetting(key, TimeValue.timeValueSeconds(10), Property.NodeScope)
+ );
+
+ /** The connection TTL for connecting to s3. */
+ static final Setting.AffixSetting CONNECTION_TTL_SETTING = Setting.affixKeySetting(
+ PREFIX,
+ "connection_ttl",
+ key -> Setting.timeSetting(key, TimeValue.timeValueMillis(5000), Property.NodeScope)
+ );
+
+ /** The maximum connections to s3. */
+ static final Setting.AffixSetting MAX_CONNECTIONS_SETTING = Setting.affixKeySetting(
+ PREFIX,
+ "max_connections",
+ key -> Setting.intSetting(key, 100, Property.NodeScope)
+ );
+
+ /** Connection acquisition timeout for new connections to S3. */
+ static final Setting.AffixSetting CONNECTION_ACQUISITION_TIMEOUT = Setting.affixKeySetting(
+ PREFIX,
+ "connection_acquisition_timeout",
+ key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(2), Property.NodeScope)
+ );
+
+ /** The maximum pending connections to S3. */
+ static final Setting.AffixSetting MAX_PENDING_CONNECTION_ACQUIRES = Setting.affixKeySetting(
+ PREFIX,
+ "max_pending_connection_acquires",
+ key -> Setting.intSetting(key, 10_000, Property.NodeScope)
+ );
+
/** The number of retries to use when an s3 request fails. */
static final Setting.AffixSetting MAX_RETRIES_SETTING = Setting.affixKeySetting(
PREFIX,
@@ -232,6 +274,21 @@ final class S3ClientSettings {
/** The read timeout for the s3 client. */
final int readTimeoutMillis;
+ /** The request timeout for the s3 client */
+ final int requestTimeoutMillis;
+
+ /** The connection timeout for the s3 client */
+ final int connectionTimeoutMillis;
+
+ /** The connection TTL for the s3 client */
+ final int connectionTTLMillis;
+
+ /** The max number of connections for the s3 client */
+ final int maxConnections;
+
+ /** The connnection acquisition timeout for the s3 async client */
+ final int connectionAcquisitionTimeoutMillis;
+
/** The number of retries to use for the s3 client. */
final int maxRetries;
@@ -256,6 +313,11 @@ private S3ClientSettings(
String endpoint,
Protocol protocol,
int readTimeoutMillis,
+ int requestTimeoutMillis,
+ int connectionTimeoutMillis,
+ int connectionTTLMillis,
+ int maxConnections,
+ int connectionAcquisitionTimeoutMillis,
int maxRetries,
boolean throttleRetries,
boolean pathStyleAccess,
@@ -269,6 +331,11 @@ private S3ClientSettings(
this.endpoint = endpoint;
this.protocol = protocol;
this.readTimeoutMillis = readTimeoutMillis;
+ this.requestTimeoutMillis = requestTimeoutMillis;
+ this.connectionTimeoutMillis = connectionTimeoutMillis;
+ this.connectionTTLMillis = connectionTTLMillis;
+ this.maxConnections = maxConnections;
+ this.connectionAcquisitionTimeoutMillis = connectionAcquisitionTimeoutMillis;
this.maxRetries = maxRetries;
this.throttleRetries = throttleRetries;
this.pathStyleAccess = pathStyleAccess;
@@ -300,6 +367,24 @@ S3ClientSettings refine(Settings repositorySettings) {
final int newReadTimeoutMillis = Math.toIntExact(
getRepoSettingOrDefault(READ_TIMEOUT_SETTING, normalizedSettings, TimeValue.timeValueMillis(readTimeoutMillis)).millis()
);
+ final int newRequestTimeoutMillis = Math.toIntExact(
+ getRepoSettingOrDefault(REQUEST_TIMEOUT_SETTING, normalizedSettings, TimeValue.timeValueMillis(requestTimeoutMillis)).millis()
+ );
+ final int newConnectionTimeoutMillis = Math.toIntExact(
+ getRepoSettingOrDefault(CONNECTION_TIMEOUT_SETTING, normalizedSettings, TimeValue.timeValueMillis(connectionTimeoutMillis))
+ .millis()
+ );
+ final int newConnectionTTLMillis = Math.toIntExact(
+ getRepoSettingOrDefault(CONNECTION_TTL_SETTING, normalizedSettings, TimeValue.timeValueMillis(connectionTTLMillis)).millis()
+ );
+ final int newConnectionAcquisitionTimeoutMillis = Math.toIntExact(
+ getRepoSettingOrDefault(
+ CONNECTION_ACQUISITION_TIMEOUT,
+ normalizedSettings,
+ TimeValue.timeValueMillis(connectionAcquisitionTimeoutMillis)
+ ).millis()
+ );
+ final int newMaxConnections = Math.toIntExact(getRepoSettingOrDefault(MAX_CONNECTIONS_SETTING, normalizedSettings, maxConnections));
final int newMaxRetries = getRepoSettingOrDefault(MAX_RETRIES_SETTING, normalizedSettings, maxRetries);
final boolean newThrottleRetries = getRepoSettingOrDefault(USE_THROTTLE_RETRIES_SETTING, normalizedSettings, throttleRetries);
final boolean newPathStyleAccess = getRepoSettingOrDefault(USE_PATH_STYLE_ACCESS, normalizedSettings, pathStyleAccess);
@@ -321,6 +406,11 @@ S3ClientSettings refine(Settings repositorySettings) {
&& Objects.equals(proxySettings.getHostName(), newProxyHost)
&& proxySettings.getPort() == newProxyPort
&& newReadTimeoutMillis == readTimeoutMillis
+ && newRequestTimeoutMillis == requestTimeoutMillis
+ && newConnectionTimeoutMillis == connectionTimeoutMillis
+ && newConnectionTTLMillis == connectionTTLMillis
+ && newMaxConnections == maxConnections
+ && newConnectionAcquisitionTimeoutMillis == connectionAcquisitionTimeoutMillis
&& maxRetries == newMaxRetries
&& newThrottleRetries == throttleRetries
&& Objects.equals(credentials, newCredentials)
@@ -338,6 +428,11 @@ S3ClientSettings refine(Settings repositorySettings) {
newEndpoint,
newProtocol,
newReadTimeoutMillis,
+ newRequestTimeoutMillis,
+ newConnectionTimeoutMillis,
+ newConnectionTTLMillis,
+ newMaxConnections,
+ newConnectionAcquisitionTimeoutMillis,
newMaxRetries,
newThrottleRetries,
newPathStyleAccess,
@@ -463,6 +558,11 @@ static S3ClientSettings getClientSettings(final Settings settings, final String
getConfigValue(settings, clientName, ENDPOINT_SETTING),
awsProtocol,
Math.toIntExact(getConfigValue(settings, clientName, READ_TIMEOUT_SETTING).millis()),
+ Math.toIntExact(getConfigValue(settings, clientName, REQUEST_TIMEOUT_SETTING).millis()),
+ Math.toIntExact(getConfigValue(settings, clientName, CONNECTION_TIMEOUT_SETTING).millis()),
+ Math.toIntExact(getConfigValue(settings, clientName, CONNECTION_TTL_SETTING).millis()),
+ Math.toIntExact(getConfigValue(settings, clientName, MAX_CONNECTIONS_SETTING)),
+ Math.toIntExact(getConfigValue(settings, clientName, CONNECTION_ACQUISITION_TIMEOUT).millis()),
getConfigValue(settings, clientName, MAX_RETRIES_SETTING),
getConfigValue(settings, clientName, USE_THROTTLE_RETRIES_SETTING),
getConfigValue(settings, clientName, USE_PATH_STYLE_ACCESS),
@@ -532,6 +632,11 @@ public boolean equals(final Object o) {
}
final S3ClientSettings that = (S3ClientSettings) o;
return readTimeoutMillis == that.readTimeoutMillis
+ && requestTimeoutMillis == that.requestTimeoutMillis
+ && connectionTimeoutMillis == that.connectionTimeoutMillis
+ && connectionTTLMillis == that.connectionTTLMillis
+ && maxConnections == that.maxConnections
+ && connectionAcquisitionTimeoutMillis == that.connectionAcquisitionTimeoutMillis
&& maxRetries == that.maxRetries
&& throttleRetries == that.throttleRetries
&& Objects.equals(credentials, that.credentials)
@@ -552,6 +657,11 @@ public int hashCode() {
protocol,
proxySettings,
readTimeoutMillis,
+ requestTimeoutMillis,
+ connectionTimeoutMillis,
+ connectionTTLMillis,
+ maxConnections,
+ connectionAcquisitionTimeoutMillis,
maxRetries,
throttleRetries,
disableChunkedEncoding,
diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java
index 954b79035429f..d42bfc0be7e4f 100644
--- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java
+++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java
@@ -34,7 +34,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.ClusterState;
@@ -45,7 +44,7 @@
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.SecureSetting;
-import org.opensearch.common.settings.SecureString;
+import org.opensearch.core.common.settings.SecureString;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
@@ -57,6 +56,8 @@
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.ShardGenerations;
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;
+import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
+import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.threadpool.Scheduler;
@@ -103,6 +104,11 @@ class S3Repository extends MeteredBlobStoreRepository {
ByteSizeUnit.BYTES
);
+ private static final ByteSizeValue DEFAULT_MULTIPART_UPLOAD_MINIMUM_PART_SIZE = new ByteSizeValue(
+ ByteSizeUnit.MB.toBytes(16),
+ ByteSizeUnit.BYTES
+ );
+
static final Setting BUCKET_SETTING = Setting.simpleString("bucket");
/**
@@ -146,6 +152,26 @@ class S3Repository extends MeteredBlobStoreRepository {
MAX_PART_SIZE_USING_MULTIPART
);
+ /**
+ * Minimum part size for parallel multipart uploads
+ */
+ static final Setting PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING = Setting.byteSizeSetting(
+ "parallel_multipart_upload.minimum_part_size",
+ DEFAULT_MULTIPART_UPLOAD_MINIMUM_PART_SIZE,
+ MIN_PART_SIZE_USING_MULTIPART,
+ MAX_PART_SIZE_USING_MULTIPART,
+ Setting.Property.NodeScope
+ );
+
+ /**
+ * This setting controls whether parallel multipart uploads will be used when calling S3 or not
+ */
+ public static Setting PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING = Setting.boolSetting(
+ "parallel_multipart_upload.enabled",
+ true,
+ Setting.Property.NodeScope
+ );
+
/**
* Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g.
*/
@@ -193,6 +219,12 @@ class S3Repository extends MeteredBlobStoreRepository {
private final RepositoryMetadata repositoryMetadata;
+ private final AsyncTransferManager asyncUploadUtils;
+ private final S3AsyncService s3AsyncService;
+ private final boolean multipartUploadEnabled;
+ private final AsyncExecutorContainer priorityExecutorBuilder;
+ private final AsyncExecutorContainer normalExecutorBuilder;
+
/**
* Constructs an s3 backed repository
*/
@@ -201,7 +233,12 @@ class S3Repository extends MeteredBlobStoreRepository {
final NamedXContentRegistry namedXContentRegistry,
final S3Service service,
final ClusterService clusterService,
- final RecoverySettings recoverySettings
+ final RecoverySettings recoverySettings,
+ final AsyncTransferManager asyncUploadUtils,
+ final AsyncExecutorContainer priorityExecutorBuilder,
+ final AsyncExecutorContainer normalExecutorBuilder,
+ final S3AsyncService s3AsyncService,
+ final boolean multipartUploadEnabled
) {
super(
metadata,
@@ -212,8 +249,13 @@ class S3Repository extends MeteredBlobStoreRepository {
buildLocation(metadata)
);
this.service = service;
+ this.s3AsyncService = s3AsyncService;
+ this.multipartUploadEnabled = multipartUploadEnabled;
this.repositoryMetadata = metadata;
+ this.asyncUploadUtils = asyncUploadUtils;
+ this.priorityExecutorBuilder = priorityExecutorBuilder;
+ this.normalExecutorBuilder = normalExecutorBuilder;
// Parse and validate the user's S3 Storage Class setting
this.bucket = BUCKET_SETTING.get(metadata.settings());
@@ -314,7 +356,20 @@ public void deleteSnapshots(
@Override
protected S3BlobStore createBlobStore() {
- return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, repositoryMetadata);
+ return new S3BlobStore(
+ service,
+ s3AsyncService,
+ multipartUploadEnabled,
+ bucket,
+ serverSideEncryption,
+ bufferSize,
+ cannedACL,
+ storageClass,
+ repositoryMetadata,
+ asyncUploadUtils,
+ priorityExecutorBuilder,
+ normalExecutorBuilder
+ );
}
// only use for testing
diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java
index 828bf85fd7889..30f792346f9be 100644
--- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java
+++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java
@@ -32,44 +32,131 @@
package org.opensearch.repositories.s3;
+import org.opensearch.client.Client;
+import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
+import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
+import org.opensearch.env.NodeEnvironment;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.ReloadablePlugin;
import org.opensearch.plugins.RepositoryPlugin;
+import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
+import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
+import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup;
+import org.opensearch.repositories.s3.async.AsyncTransferManager;
+import org.opensearch.script.ScriptService;
+import org.opensearch.threadpool.ExecutorBuilder;
+import org.opensearch.threadpool.FixedExecutorBuilder;
+import org.opensearch.threadpool.ThreadPool;
+import org.opensearch.watcher.ResourceWatcherService;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.function.Supplier;
/**
* A plugin to add a repository type that writes to and from the AWS S3.
*/
public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {
+ private static final String PRIORITY_FUTURE_COMPLETION = "priority_future_completion";
+ private static final String PRIORITY_STREAM_READER = "priority_stream_reader";
+ private static final String FUTURE_COMPLETION = "future_completion";
+ private static final String STREAM_READER = "stream_reader";
protected final S3Service service;
+ private final S3AsyncService s3AsyncService;
+
private final Path configPath;
+ private AsyncExecutorContainer priorityExecutorBuilder;
+ private AsyncExecutorContainer normalExecutorBuilder;
+
public S3RepositoryPlugin(final Settings settings, final Path configPath) {
- this(settings, configPath, new S3Service(configPath));
+ this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath));
+ }
+
+ @Override
+ public List> getExecutorBuilders(Settings settings) {
+ List> executorBuilders = new ArrayList<>();
+ executorBuilders.add(
+ new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION)
+ );
+ executorBuilders.add(
+ new FixedExecutorBuilder(settings, PRIORITY_STREAM_READER, priorityPoolCount(settings), 10_000, PRIORITY_STREAM_READER)
+ );
+ executorBuilders.add(new FixedExecutorBuilder(settings, FUTURE_COMPLETION, normalPoolCount(settings), 10_000, FUTURE_COMPLETION));
+ executorBuilders.add(new FixedExecutorBuilder(settings, STREAM_READER, normalPoolCount(settings), 10_000, STREAM_READER));
+ return executorBuilders;
}
- S3RepositoryPlugin(final Settings settings, final Path configPath, final S3Service service) {
+ S3RepositoryPlugin(final Settings settings, final Path configPath, final S3Service service, final S3AsyncService s3AsyncService) {
this.service = Objects.requireNonNull(service, "S3 service must not be null");
this.configPath = configPath;
// eagerly load client settings so that secure settings are read
- final Map clientsSettings = S3ClientSettings.load(settings, configPath);
+ Map clientsSettings = S3ClientSettings.load(settings, configPath);
+ this.s3AsyncService = Objects.requireNonNull(s3AsyncService, "S3AsyncService must not be null");
this.service.refreshAndClearCache(clientsSettings);
+ this.s3AsyncService.refreshAndClearCache(clientsSettings);
+ }
+
+ private static int boundedBy(int value, int min, int max) {
+ return Math.min(max, Math.max(min, value));
+ }
+
+ private static int allocatedProcessors(Settings settings) {
+ return OpenSearchExecutors.allocatedProcessors(settings);
+ }
+
+ private static int priorityPoolCount(Settings settings) {
+ return boundedBy((allocatedProcessors(settings) + 1) / 2, 2, 4);
+ }
+
+ private static int normalPoolCount(Settings settings) {
+ return boundedBy((allocatedProcessors(settings) + 7) / 8, 1, 2);
+ }
+
+ @Override
+ public Collection