Skip to content

Commit

Permalink
Support S3 Express One Zone (#5268)
Browse files Browse the repository at this point in the history
* Support S3 Express One Zone

* Fix endpoint

* Update object_store/src/aws/builder.rs

* Fix credential caching

* Review feedback

* Clippy
  • Loading branch information
tustvold authored Jan 5, 2024
1 parent 2f5dcdf commit f7101ec
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 124 deletions.
93 changes: 80 additions & 13 deletions object_store/src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::aws::client::{S3Client, S3Config};
use crate::aws::credential::{
InstanceCredentialProvider, TaskCredentialProvider, WebIdentityProvider,
InstanceCredentialProvider, SessionProvider, TaskCredentialProvider, WebIdentityProvider,
};
use crate::aws::{
AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, S3ConditionalPut, S3CopyIfNotExists,
Expand All @@ -31,6 +31,7 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tracing::info;
use url::Url;

Expand Down Expand Up @@ -77,6 +78,9 @@ enum Error {
source: reqwest::Error,
},

#[snafu(display("Invalid Zone suffix for bucket '{bucket}'"))]
ZoneSuffix { bucket: String },

#[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
RegionParse { bucket: String },
}
Expand Down Expand Up @@ -134,6 +138,8 @@ pub struct AmazonS3Builder {
imdsv1_fallback: ConfigValue<bool>,
/// When set to true, virtual hosted style request has to be used
virtual_hosted_style_request: ConfigValue<bool>,
/// When set to true, S3 express is used
s3_express: ConfigValue<bool>,
/// When set to true, unsigned payload option has to be used
unsigned_payload: ConfigValue<bool>,
/// Checksum algorithm which has to be used for object integrity check during upload
Expand Down Expand Up @@ -307,6 +313,13 @@ pub enum AmazonS3ConfigKey {
/// - `disable_tagging`
DisableTagging,

/// Enable Support for S3 Express One Zone
///
/// Supported keys:
/// - `aws_s3_express`
/// - `s3_express`
S3Express,

/// Client options
Client(ClientConfigKey),
}
Expand All @@ -322,6 +335,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::Token => "aws_session_token",
Self::ImdsV1Fallback => "aws_imdsv1_fallback",
Self::VirtualHostedStyleRequest => "aws_virtual_hosted_style_request",
Self::S3Express => "aws_s3_express",
Self::DefaultRegion => "aws_default_region",
Self::MetadataEndpoint => "aws_metadata_endpoint",
Self::UnsignedPayload => "aws_unsigned_payload",
Expand Down Expand Up @@ -351,6 +365,7 @@ impl FromStr for AmazonS3ConfigKey {
"aws_virtual_hosted_style_request" | "virtual_hosted_style_request" => {
Ok(Self::VirtualHostedStyleRequest)
}
"aws_s3_express" | "s3_express" => Ok(Self::S3Express),
"aws_imdsv1_fallback" | "imdsv1_fallback" => Ok(Self::ImdsV1Fallback),
"aws_metadata_endpoint" | "metadata_endpoint" => Ok(Self::MetadataEndpoint),
"aws_unsigned_payload" | "unsigned_payload" => Ok(Self::UnsignedPayload),
Expand Down Expand Up @@ -448,6 +463,7 @@ impl AmazonS3Builder {
AmazonS3ConfigKey::VirtualHostedStyleRequest => {
self.virtual_hosted_style_request.parse(value)
}
AmazonS3ConfigKey::S3Express => self.s3_express.parse(value),
AmazonS3ConfigKey::DefaultRegion => {
self.region = self.region.or_else(|| Some(value.into()))
}
Expand Down Expand Up @@ -497,6 +513,7 @@ impl AmazonS3Builder {
AmazonS3ConfigKey::VirtualHostedStyleRequest => {
Some(self.virtual_hosted_style_request.to_string())
}
AmazonS3ConfigKey::S3Express => Some(self.s3_express.to_string()),
AmazonS3ConfigKey::MetadataEndpoint => self.metadata_endpoint.clone(),
AmazonS3ConfigKey::UnsignedPayload => Some(self.unsigned_payload.to_string()),
AmazonS3ConfigKey::Checksum => {
Expand Down Expand Up @@ -619,7 +636,8 @@ impl AmazonS3Builder {
}

/// Sets if virtual hosted style request has to be used.
/// If `virtual_hosted_style_request` is :
///
/// If `virtual_hosted_style_request` is:
/// * false (default): Path style request is used
/// * true: Virtual hosted style request is used
///
Expand All @@ -632,6 +650,12 @@ impl AmazonS3Builder {
self
}

/// Configure this as an S3 Express One Zone Bucket
pub fn with_s3_express(mut self, s3_express: bool) -> Self {
self.s3_express = s3_express.into();
self
}

/// Set the retry configuration
pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
self.retry_config = retry_config;
Expand Down Expand Up @@ -823,18 +847,39 @@ impl AmazonS3Builder {
)) as _
};

// If `endpoint` is provided then its assumed to be consistent with
// `virtual_hosted_style_request`. i.e. if `virtual_hosted_style_request` is true then
// `endpoint` should have bucket name included.
let bucket_endpoint = if self.virtual_hosted_style_request.get()? {
self.endpoint
.clone()
.unwrap_or_else(|| format!("https://{bucket}.s3.{region}.amazonaws.com"))
} else {
match &self.endpoint {
None => format!("https://s3.{region}.amazonaws.com/{bucket}"),
Some(endpoint) => format!("{endpoint}/{bucket}"),
let (session_provider, zonal_endpoint) = match self.s3_express.get()? {
true => {
let zone = parse_bucket_az(&bucket).context(ZoneSuffixSnafu { bucket: &bucket })?;

// https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-express-Regions-and-Zones.html
let endpoint = format!("https://{bucket}.s3express-{zone}.{region}.amazonaws.com");

let session = Arc::new(
TokenCredentialProvider::new(
SessionProvider {
endpoint: endpoint.clone(),
region: region.clone(),
credentials: Arc::clone(&credentials),
},
self.client_options.client()?,
self.retry_config.clone(),
)
.with_min_ttl(Duration::from_secs(60)), // Credentials only valid for 5 minutes
);
(Some(session as _), Some(endpoint))
}
false => (None, None),
};

// If `endpoint` is provided it's assumed to be consistent with `virtual_hosted_style_request` or `s3_express`.
// For example, if `virtual_hosted_style_request` is true then `endpoint` should have bucket name included.
let virtual_hosted = self.virtual_hosted_style_request.get()?;
let bucket_endpoint = match (&self.endpoint, zonal_endpoint, virtual_hosted) {
(Some(endpoint), _, true) => endpoint.clone(),
(Some(endpoint), _, false) => format!("{endpoint}/{bucket}"),
(None, Some(endpoint), _) => endpoint,
(None, None, true) => format!("https://{bucket}.s3.{region}.amazonaws.com"),
(None, None, false) => format!("https://s3.{region}.amazonaws.com/{bucket}"),
};

let config = S3Config {
Expand All @@ -843,6 +888,7 @@ impl AmazonS3Builder {
bucket,
bucket_endpoint,
credentials,
session_provider,
retry_config: self.retry_config,
client_options: self.client_options,
sign_payload: !self.unsigned_payload.get()?,
Expand All @@ -859,6 +905,13 @@ impl AmazonS3Builder {
}
}

/// Extracts the AZ from a S3 Express One Zone bucket name
///
/// <https://docs.aws.amazon.com/AmazonS3/latest/userguide/directory-bucket-naming-rules.html>
fn parse_bucket_az(bucket: &str) -> Option<&str> {
Some(bucket.strip_suffix("--x-s3")?.rsplit_once("--")?.1)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1088,4 +1141,18 @@ mod tests {
"Generic Config error: \"md5\" is not a valid checksum algorithm"
);
}

#[test]
fn test_parse_bucket_az() {
let cases = [
("bucket-base-name--usw2-az1--x-s3", Some("usw2-az1")),
("bucket-base--name--azid--x-s3", Some("azid")),
("bucket-base-name", None),
("bucket-base-name--x-s3", None),
];

for (bucket, expected) in cases {
assert_eq!(parse_bucket_az(bucket), expected)
}
}
}
Loading

0 comments on commit f7101ec

Please sign in to comment.