diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs index 542f17af3053..9a296bc58537 100644 --- a/object_store/src/aws/builder.rs +++ b/object_store/src/aws/builder.rs @@ -17,7 +17,7 @@ use crate::aws::client::{S3Client, S3Config}; use crate::aws::credential::{ - InstanceCredentialProvider, TaskCredentialProvider, WebIdentityProvider, + InstanceCredentialProvider, SessionProvider, TaskCredentialProvider, WebIdentityProvider, }; use crate::aws::{ AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, S3ConditionalPut, S3CopyIfNotExists, @@ -31,6 +31,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use tracing::info; use url::Url; @@ -77,6 +78,9 @@ enum Error { source: reqwest::Error, }, + #[snafu(display("Invalid Zone suffix for bucket '{bucket}'"))] + ZoneSuffix { bucket: String }, + #[snafu(display("Failed to parse the region for bucket '{}'", bucket))] RegionParse { bucket: String }, } @@ -134,6 +138,8 @@ pub struct AmazonS3Builder { imdsv1_fallback: ConfigValue, /// When set to true, virtual hosted style request has to be used virtual_hosted_style_request: ConfigValue, + /// When set to true, S3 express is used + s3_express: ConfigValue, /// When set to true, unsigned payload option has to be used unsigned_payload: ConfigValue, /// Checksum algorithm which has to be used for object integrity check during upload @@ -307,6 +313,13 @@ pub enum AmazonS3ConfigKey { /// - `disable_tagging` DisableTagging, + /// Enable Support for S3 Express One Zone + /// + /// Supported keys: + /// - `aws_s3_express` + /// - `s3_express` + S3Express, + /// Client options Client(ClientConfigKey), } @@ -322,6 +335,7 @@ impl AsRef for AmazonS3ConfigKey { Self::Token => "aws_session_token", Self::ImdsV1Fallback => "aws_imdsv1_fallback", Self::VirtualHostedStyleRequest => "aws_virtual_hosted_style_request", + Self::S3Express => "aws_s3_express", Self::DefaultRegion => "aws_default_region", Self::MetadataEndpoint => "aws_metadata_endpoint", Self::UnsignedPayload => "aws_unsigned_payload", @@ -351,6 +365,7 @@ impl FromStr for AmazonS3ConfigKey { "aws_virtual_hosted_style_request" | "virtual_hosted_style_request" => { Ok(Self::VirtualHostedStyleRequest) } + "aws_s3_express" | "s3_express" => Ok(Self::S3Express), "aws_imdsv1_fallback" | "imdsv1_fallback" => Ok(Self::ImdsV1Fallback), "aws_metadata_endpoint" | "metadata_endpoint" => Ok(Self::MetadataEndpoint), "aws_unsigned_payload" | "unsigned_payload" => Ok(Self::UnsignedPayload), @@ -448,6 +463,7 @@ impl AmazonS3Builder { AmazonS3ConfigKey::VirtualHostedStyleRequest => { self.virtual_hosted_style_request.parse(value) } + AmazonS3ConfigKey::S3Express => self.s3_express.parse(value), AmazonS3ConfigKey::DefaultRegion => { self.region = self.region.or_else(|| Some(value.into())) } @@ -497,6 +513,7 @@ impl AmazonS3Builder { AmazonS3ConfigKey::VirtualHostedStyleRequest => { Some(self.virtual_hosted_style_request.to_string()) } + AmazonS3ConfigKey::S3Express => Some(self.s3_express.to_string()), AmazonS3ConfigKey::MetadataEndpoint => self.metadata_endpoint.clone(), AmazonS3ConfigKey::UnsignedPayload => Some(self.unsigned_payload.to_string()), AmazonS3ConfigKey::Checksum => { @@ -619,7 +636,8 @@ impl AmazonS3Builder { } /// Sets if virtual hosted style request has to be used. - /// If `virtual_hosted_style_request` is : + /// + /// If `virtual_hosted_style_request` is: /// * false (default): Path style request is used /// * true: Virtual hosted style request is used /// @@ -632,6 +650,12 @@ impl AmazonS3Builder { self } + /// Configure this as an S3 Express One Zone Bucket + pub fn with_s3_express(mut self, s3_express: bool) -> Self { + self.s3_express = s3_express.into(); + self + } + /// Set the retry configuration pub fn with_retry(mut self, retry_config: RetryConfig) -> Self { self.retry_config = retry_config; @@ -823,18 +847,39 @@ impl AmazonS3Builder { )) as _ }; - // If `endpoint` is provided then its assumed to be consistent with - // `virtual_hosted_style_request`. i.e. if `virtual_hosted_style_request` is true then - // `endpoint` should have bucket name included. - let bucket_endpoint = if self.virtual_hosted_style_request.get()? { - self.endpoint - .clone() - .unwrap_or_else(|| format!("https://{bucket}.s3.{region}.amazonaws.com")) - } else { - match &self.endpoint { - None => format!("https://s3.{region}.amazonaws.com/{bucket}"), - Some(endpoint) => format!("{endpoint}/{bucket}"), + let (session_provider, zonal_endpoint) = match self.s3_express.get()? { + true => { + let zone = parse_bucket_az(&bucket).context(ZoneSuffixSnafu { bucket: &bucket })?; + + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-express-Regions-and-Zones.html + let endpoint = format!("https://{bucket}.s3express-{zone}.{region}.amazonaws.com"); + + let session = Arc::new( + TokenCredentialProvider::new( + SessionProvider { + endpoint: endpoint.clone(), + region: region.clone(), + credentials: Arc::clone(&credentials), + }, + self.client_options.client()?, + self.retry_config.clone(), + ) + .with_min_ttl(Duration::from_secs(60)), // Credentials only valid for 5 minutes + ); + (Some(session as _), Some(endpoint)) } + false => (None, None), + }; + + // If `endpoint` is provided it's assumed to be consistent with `virtual_hosted_style_request` or `s3_express`. + // For example, if `virtual_hosted_style_request` is true then `endpoint` should have bucket name included. + let virtual_hosted = self.virtual_hosted_style_request.get()?; + let bucket_endpoint = match (&self.endpoint, zonal_endpoint, virtual_hosted) { + (Some(endpoint), _, true) => endpoint.clone(), + (Some(endpoint), _, false) => format!("{endpoint}/{bucket}"), + (None, Some(endpoint), _) => endpoint, + (None, None, true) => format!("https://{bucket}.s3.{region}.amazonaws.com"), + (None, None, false) => format!("https://s3.{region}.amazonaws.com/{bucket}"), }; let config = S3Config { @@ -843,6 +888,7 @@ impl AmazonS3Builder { bucket, bucket_endpoint, credentials, + session_provider, retry_config: self.retry_config, client_options: self.client_options, sign_payload: !self.unsigned_payload.get()?, @@ -859,6 +905,13 @@ impl AmazonS3Builder { } } +/// Extracts the AZ from a S3 Express One Zone bucket name +/// +/// +fn parse_bucket_az(bucket: &str) -> Option<&str> { + Some(bucket.strip_suffix("--x-s3")?.rsplit_once("--")?.1) +} + #[cfg(test)] mod tests { use super::*; @@ -1088,4 +1141,18 @@ mod tests { "Generic Config error: \"md5\" is not a valid checksum algorithm" ); } + + #[test] + fn test_parse_bucket_az() { + let cases = [ + ("bucket-base-name--usw2-az1--x-s3", Some("usw2-az1")), + ("bucket-base--name--azid--x-s3", Some("azid")), + ("bucket-base-name", None), + ("bucket-base-name--x-s3", None), + ]; + + for (bucket, expected) in cases { + assert_eq!(parse_bucket_az(bucket), expected) + } + } } diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 45d97ead6d3e..e06a0ce1846b 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -18,7 +18,8 @@ use crate::aws::checksum::Checksum; use crate::aws::credential::{AwsCredential, CredentialExt}; use crate::aws::{ - AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET, + AwsAuthorizer, AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, STORE, + STRICT_PATH_ENCODE_SET, }; use crate::client::get::GetClient; use crate::client::header::{get_etag, HeaderConfig}; @@ -171,6 +172,7 @@ pub struct S3Config { pub bucket: String, pub bucket_endpoint: String, pub credentials: AwsCredentialProvider, + pub session_provider: Option, pub retry_config: RetryConfig, pub client_options: ClientOptions, pub sign_payload: bool, @@ -186,12 +188,54 @@ impl S3Config { format!("{}/{}", self.bucket_endpoint, encode_path(path)) } + async fn get_session_credential(&self) -> Result> { + let credential = match self.skip_signature { + false => { + let provider = self.session_provider.as_ref().unwrap_or(&self.credentials); + Some(provider.get_credential().await?) + } + true => None, + }; + + Ok(SessionCredential { + credential, + session_token: self.session_provider.is_some(), + config: self, + }) + } + pub(crate) async fn get_credential(&self) -> Result>> { Ok(match self.skip_signature { false => Some(self.credentials.get_credential().await?), true => None, }) } + + #[inline] + pub(crate) fn is_s3_express(&self) -> bool { + self.session_provider.is_some() + } +} + +struct SessionCredential<'a> { + credential: Option>, + session_token: bool, + config: &'a S3Config, +} + +impl<'a> SessionCredential<'a> { + fn authorizer(&self) -> Option> { + let mut authorizer = + AwsAuthorizer::new(self.credential.as_deref()?, "s3", &self.config.region) + .with_sign_payload(self.config.sign_payload); + + if self.session_token { + let token = HeaderName::from_static("x-amz-s3session-token"); + authorizer = authorizer.with_token_header(token) + } + + Some(authorizer) + } } #[derive(Debug, Snafu)] @@ -219,6 +263,7 @@ pub(crate) struct Request<'a> { config: &'a S3Config, builder: RequestBuilder, payload_sha256: Option>, + use_session_creds: bool, } impl<'a> Request<'a> { @@ -237,16 +282,18 @@ impl<'a> Request<'a> { } pub async fn send(self) -> Result { - let credential = self.config.get_credential().await?; + let credential = match self.use_session_creds { + true => self.config.get_session_credential().await?, + false => SessionCredential { + credential: self.config.get_credential().await?, + session_token: false, + config: self.config, + }, + }; + let path = self.path.as_ref(); self.builder - .with_aws_sigv4( - credential.as_deref(), - &self.config.region, - "s3", - self.config.sign_payload, - self.payload_sha256.as_deref(), - ) + .with_aws_sigv4(credential.authorizer(), self.payload_sha256.as_deref()) .send_retry(&self.config.retry_config) .await .context(RetrySnafu { path }) @@ -300,6 +347,7 @@ impl S3Client { builder, payload_sha256, config: &self.config, + use_session_creds: true, } } @@ -309,19 +357,13 @@ impl S3Client { path: &Path, query: &T, ) -> Result<()> { - let credential = self.config.get_credential().await?; + let credential = self.config.get_session_credential().await?; let url = self.config.path_url(path); self.client .request(Method::DELETE, url) .query(query) - .with_aws_sigv4( - credential.as_deref(), - &self.config.region, - "s3", - self.config.sign_payload, - None, - ) + .with_aws_sigv4(credential.authorizer(), None) .send_retry(&self.config.retry_config) .await .map_err(|e| e.error(STORE, path.to_string()))?; @@ -341,7 +383,7 @@ impl S3Client { return Ok(Vec::new()); } - let credential = self.config.get_credential().await?; + let credential = self.config.get_session_credential().await?; let url = format!("{}?delete", self.config.bucket_endpoint); let mut buffer = Vec::new(); @@ -399,13 +441,7 @@ impl S3Client { let response = builder .header(CONTENT_TYPE, "application/xml") .body(body) - .with_aws_sigv4( - credential.as_deref(), - &self.config.region, - "s3", - self.config.sign_payload, - payload_sha256.as_deref(), - ) + .with_aws_sigv4(credential.authorizer(), payload_sha256.as_deref()) .send_retry(&self.config.retry_config) .await .context(DeleteObjectsRequestSnafu {})? @@ -452,23 +488,18 @@ impl S3Client { path: from, config: &self.config, payload_sha256: None, + use_session_creds: false, } } pub async fn create_multipart(&self, location: &Path) -> Result { - let credential = self.config.get_credential().await?; + let credential = self.config.get_session_credential().await?; let url = format!("{}?uploads=", self.config.path_url(location),); let response = self .client .request(Method::POST, url) - .with_aws_sigv4( - credential.as_deref(), - &self.config.region, - "s3", - self.config.sign_payload, - None, - ) + .with_aws_sigv4(credential.authorizer(), None) .send_retry(&self.config.retry_config) .await .context(CreateMultipartRequestSnafu)? @@ -510,7 +541,7 @@ impl S3Client { let request = CompleteMultipartUpload::from(parts); let body = quick_xml::se::to_string(&request).unwrap(); - let credential = self.config.get_credential().await?; + let credential = self.config.get_session_credential().await?; let url = self.config.path_url(location); let response = self @@ -518,13 +549,7 @@ impl S3Client { .request(Method::POST, url) .query(&[("uploadId", upload_id)]) .body(body) - .with_aws_sigv4( - credential.as_deref(), - &self.config.region, - "s3", - self.config.sign_payload, - None, - ) + .with_aws_sigv4(credential.authorizer(), None) .send_retry(&self.config.retry_config) .await .context(CompleteMultipartRequestSnafu)?; @@ -547,18 +572,12 @@ impl S3Client { #[cfg(test)] pub async fn get_object_tagging(&self, path: &Path) -> Result { - let credential = self.config.get_credential().await?; + let credential = self.config.get_session_credential().await?; let url = format!("{}?tagging", self.config.path_url(path)); let response = self .client .request(Method::GET, url) - .with_aws_sigv4( - credential.as_deref(), - &self.config.region, - "s3", - self.config.sign_payload, - None, - ) + .with_aws_sigv4(credential.authorizer(), None) .send_retry(&self.config.retry_config) .await .map_err(|e| e.error(STORE, path.to_string()))?; @@ -578,7 +597,7 @@ impl GetClient for S3Client { /// Make an S3 GET request async fn get_request(&self, path: &Path, options: GetOptions) -> Result { - let credential = self.config.get_credential().await?; + let credential = self.config.get_session_credential().await?; let url = self.config.path_url(path); let method = match options.head { true => Method::HEAD, @@ -593,13 +612,7 @@ impl GetClient for S3Client { let response = builder .with_get_options(options) - .with_aws_sigv4( - credential.as_deref(), - &self.config.region, - "s3", - self.config.sign_payload, - None, - ) + .with_aws_sigv4(credential.authorizer(), None) .send_retry(&self.config.retry_config) .await .map_err(|e| e.error(STORE, path.to_string()))?; @@ -618,7 +631,7 @@ impl ListClient for S3Client { token: Option<&str>, offset: Option<&str>, ) -> Result<(ListResult, Option)> { - let credential = self.config.get_credential().await?; + let credential = self.config.get_session_credential().await?; let url = self.config.bucket_endpoint.clone(); let mut query = Vec::with_capacity(4); @@ -645,13 +658,7 @@ impl ListClient for S3Client { .client .request(Method::GET, &url) .query(&query) - .with_aws_sigv4( - credential.as_deref(), - &self.config.region, - "s3", - self.config.sign_payload, - None, - ) + .with_aws_sigv4(credential.authorizer(), None) .send_retry(&self.config.retry_config) .await .context(ListRequestSnafu)? diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs index d290da838d78..f8614f4f563c 100644 --- a/object_store/src/aws/credential.rs +++ b/object_store/src/aws/credential.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::aws::{STORE, STRICT_ENCODE_SET, STRICT_PATH_ENCODE_SET}; +use crate::aws::{AwsCredentialProvider, STORE, STRICT_ENCODE_SET, STRICT_PATH_ENCODE_SET}; use crate::client::retry::RetryExt; use crate::client::token::{TemporaryToken, TokenCache}; use crate::client::TokenProvider; @@ -24,16 +24,40 @@ use crate::{CredentialProvider, Result, RetryConfig}; use async_trait::async_trait; use bytes::Buf; use chrono::{DateTime, Utc}; +use hyper::header::HeaderName; use percent_encoding::utf8_percent_encode; -use reqwest::header::{HeaderMap, HeaderValue}; +use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION}; use reqwest::{Client, Method, Request, RequestBuilder, StatusCode}; use serde::Deserialize; +use snafu::{ResultExt, Snafu}; use std::collections::BTreeMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tracing::warn; use url::Url; +#[derive(Debug, Snafu)] +#[allow(clippy::enum_variant_names)] +enum Error { + #[snafu(display("Error performing CreateSession request: {source}"))] + CreateSessionRequest { source: crate::client::retry::Error }, + + #[snafu(display("Error getting CreateSession response: {source}"))] + CreateSessionResponse { source: reqwest::Error }, + + #[snafu(display("Invalid CreateSessionOutput response: {source}"))] + CreateSessionOutput { source: quick_xml::DeError }, +} + +impl From for crate::Error { + fn from(value: Error) -> Self { + Self::Generic { + store: STORE, + source: Box::new(value), + } + } +} + type StdError = Box; /// SHA256 hash of empty string @@ -75,13 +99,13 @@ pub struct AwsAuthorizer<'a> { credential: &'a AwsCredential, service: &'a str, region: &'a str, + token_header: Option, sign_payload: bool, } -const DATE_HEADER: &str = "x-amz-date"; -const HASH_HEADER: &str = "x-amz-content-sha256"; -const TOKEN_HEADER: &str = "x-amz-security-token"; -const AUTH_HEADER: &str = "authorization"; +static DATE_HEADER: HeaderName = HeaderName::from_static("x-amz-date"); +static HASH_HEADER: HeaderName = HeaderName::from_static("x-amz-content-sha256"); +static TOKEN_HEADER: HeaderName = HeaderName::from_static("x-amz-security-token"); const ALGORITHM: &str = "AWS4-HMAC-SHA256"; impl<'a> AwsAuthorizer<'a> { @@ -93,6 +117,7 @@ impl<'a> AwsAuthorizer<'a> { region, date: None, sign_payload: true, + token_header: None, } } @@ -103,6 +128,12 @@ impl<'a> AwsAuthorizer<'a> { self } + /// Overrides the header name for security tokens, defaults to `x-amz-security-token` + pub(crate) fn with_token_header(mut self, header: HeaderName) -> Self { + self.token_header = Some(header); + self + } + /// Authorize `request` with an optional pre-calculated SHA256 digest by attaching /// the relevant [AWS SigV4] headers /// @@ -119,7 +150,8 @@ impl<'a> AwsAuthorizer<'a> { pub fn authorize(&self, request: &mut Request, pre_calculated_digest: Option<&[u8]>) { if let Some(ref token) = self.credential.token { let token_val = HeaderValue::from_str(token).unwrap(); - request.headers_mut().insert(TOKEN_HEADER, token_val); + let header = self.token_header.as_ref().unwrap_or(&TOKEN_HEADER); + request.headers_mut().insert(header, token_val); } let host = &request.url()[url::Position::BeforeHost..url::Position::AfterPort]; @@ -129,7 +161,7 @@ impl<'a> AwsAuthorizer<'a> { let date = self.date.unwrap_or_else(Utc::now); let date_str = date.format("%Y%m%dT%H%M%SZ").to_string(); let date_val = HeaderValue::from_str(&date_str).unwrap(); - request.headers_mut().insert(DATE_HEADER, date_val); + request.headers_mut().insert(&DATE_HEADER, date_val); let digest = match self.sign_payload { false => UNSIGNED_PAYLOAD.to_string(), @@ -146,7 +178,7 @@ impl<'a> AwsAuthorizer<'a> { }; let header_digest = HeaderValue::from_str(&digest).unwrap(); - request.headers_mut().insert(HASH_HEADER, header_digest); + request.headers_mut().insert(&HASH_HEADER, header_digest); let (signed_headers, canonical_headers) = canonicalize_headers(request.headers()); @@ -174,7 +206,9 @@ impl<'a> AwsAuthorizer<'a> { ); let authorization_val = HeaderValue::from_str(&authorisation).unwrap(); - request.headers_mut().insert(AUTH_HEADER, authorization_val); + request + .headers_mut() + .insert(&AUTHORIZATION, authorization_val); } pub(crate) fn sign(&self, method: Method, url: &mut Url, expires_in: Duration) { @@ -284,10 +318,7 @@ pub trait CredentialExt { /// Sign a request fn with_aws_sigv4( self, - credential: Option<&AwsCredential>, - region: &str, - service: &str, - sign_payload: bool, + authorizer: Option>, payload_sha256: Option<&[u8]>, ) -> Self; } @@ -295,20 +326,14 @@ pub trait CredentialExt { impl CredentialExt for RequestBuilder { fn with_aws_sigv4( self, - credential: Option<&AwsCredential>, - region: &str, - service: &str, - sign_payload: bool, + authorizer: Option>, payload_sha256: Option<&[u8]>, ) -> Self { - match credential { - Some(credential) => { + match authorizer { + Some(authorizer) => { let (client, request) = self.build_split(); let mut request = request.expect("request valid"); - - AwsAuthorizer::new(credential, service, region) - .with_sign_payload(sign_payload) - .authorize(&mut request, payload_sha256); + authorizer.authorize(&mut request, payload_sha256); Self::from_parts(client, request) } @@ -555,20 +580,20 @@ struct AssumeRoleResponse { #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] struct AssumeRoleResult { - credentials: AssumeRoleCredentials, + credentials: SessionCredentials, } #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] -struct AssumeRoleCredentials { +struct SessionCredentials { session_token: String, secret_access_key: String, access_key_id: String, expiration: DateTime, } -impl From for AwsCredential { - fn from(s: AssumeRoleCredentials) -> Self { +impl From for AwsCredential { + fn from(s: SessionCredentials) -> Self { Self { key_id: s.access_key_id, secret_key: s.secret_access_key, @@ -659,6 +684,56 @@ async fn task_credential( }) } +/// A session provider as used by S3 Express One Zone +/// +/// +#[derive(Debug)] +pub struct SessionProvider { + pub endpoint: String, + pub region: String, + pub credentials: AwsCredentialProvider, +} + +#[async_trait] +impl TokenProvider for SessionProvider { + type Credential = AwsCredential; + + async fn fetch_token( + &self, + client: &Client, + retry: &RetryConfig, + ) -> Result>> { + let creds = self.credentials.get_credential().await?; + let authorizer = AwsAuthorizer::new(&creds, "s3", &self.region); + + let bytes = client + .get(format!("{}?session", self.endpoint)) + .with_aws_sigv4(Some(authorizer), None) + .send_retry(retry) + .await + .context(CreateSessionRequestSnafu)? + .bytes() + .await + .context(CreateSessionResponseSnafu)?; + + let resp: CreateSessionOutput = + quick_xml::de::from_reader(bytes.reader()).context(CreateSessionOutputSnafu)?; + + let creds = resp.credentials; + Ok(TemporaryToken { + token: Arc::new(creds.into()), + // Credentials last 5 minutes - https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateSession.html + expiry: Some(Instant::now() + Duration::from_secs(5 * 60)), + }) + } +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +struct CreateSessionOutput { + credentials: SessionCredentials, +} + #[cfg(test)] mod tests { use super::*; @@ -700,10 +775,11 @@ mod tests { service: "ec2", region: "us-east-1", sign_payload: true, + token_header: None, }; signer.authorize(&mut request, None); - assert_eq!(request.headers().get(AUTH_HEADER).unwrap(), "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=a3c787a7ed37f7fdfbfd2d7056a3d7c9d85e6d52a2bfbec73793c0be6e7862d4") + assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(), "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=a3c787a7ed37f7fdfbfd2d7056a3d7c9d85e6d52a2bfbec73793c0be6e7862d4") } #[test] @@ -737,11 +813,12 @@ mod tests { credential: &credential, service: "ec2", region: "us-east-1", + token_header: None, sign_payload: false, }; authorizer.authorize(&mut request, None); - assert_eq!(request.headers().get(AUTH_HEADER).unwrap(), "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=653c3d8ea261fd826207df58bc2bb69fbb5003e9eb3c0ef06e4a51f2a81d8699"); + assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(), "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=653c3d8ea261fd826207df58bc2bb69fbb5003e9eb3c0ef06e4a51f2a81d8699"); } #[test] @@ -762,6 +839,7 @@ mod tests { credential: &credential, service: "s3", region: "us-east-1", + token_header: None, sign_payload: false, }; @@ -813,11 +891,12 @@ mod tests { credential: &credential, service: "s3", region: "us-east-1", + token_header: None, sign_payload: true, }; authorizer.authorize(&mut request, None); - assert_eq!(request.headers().get(AUTH_HEADER).unwrap(), "AWS4-HMAC-SHA256 Credential=H20ABqCkLZID4rLe/20220809/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=9ebf2f92872066c99ac94e573b4e1b80f4dbb8a32b1e8e23178318746e7d1b4d") + assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(), "AWS4-HMAC-SHA256 Credential=H20ABqCkLZID4rLe/20220809/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=9ebf2f92872066c99ac94e573b4e1b80f4dbb8a32b1e8e23178318746e7d1b4d") } #[tokio::test] diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index f12a42137856..4331ae2a340a 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize, Serializer}; use crate::aws::client::S3Client; use crate::aws::credential::CredentialExt; -use crate::aws::AwsCredential; +use crate::aws::{AwsAuthorizer, AwsCredential}; use crate::client::get::GetClientExt; use crate::client::retry::Error as RetryError; use crate::client::retry::RetryExt; @@ -365,6 +365,7 @@ impl DynamoCommit { req: R, ) -> Result { let region = &s3.config.region; + let authorizer = cred.map(|x| AwsAuthorizer::new(x, "dynamodb", region)); let builder = match &s3.config.endpoint { Some(e) => s3.client.post(e), @@ -378,7 +379,7 @@ impl DynamoCommit { .timeout(Duration::from_millis(self.timeout)) .json(&req) .header("X-Amz-Target", target) - .with_aws_sigv4(cred, region, "dynamodb", true, None) + .with_aws_sigv4(authorizer, None) .send_retry(&s3.config.retry_config) .await } diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index d167c78e4c8c..4e8852475c34 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -269,6 +269,16 @@ impl ObjectStore for AmazonS3 { prefix: Option<&Path>, offset: &Path, ) -> BoxStream<'_, Result> { + if self.client.config.is_s3_express() { + let offset = offset.clone(); + // S3 Express does not support start-after + return self + .client + .list(prefix) + .try_filter(move |f| futures::future::ready(f.location > offset)) + .boxed(); + } + self.client.list_with_offset(prefix, offset) } @@ -388,11 +398,15 @@ mod tests { multipart(&integration, &integration).await; signing(&integration).await; - tagging(&integration, !config.disable_tagging, |p| { - let client = Arc::clone(&integration.client); - async move { client.get_object_tagging(&p).await } - }) - .await; + // Object tagging is not supported by S3 Express One Zone + if config.session_provider.is_none() { + tagging(&integration, !config.disable_tagging, |p| { + let client = Arc::clone(&integration.client); + async move { client.get_object_tagging(&p).await } + }) + .await; + } + if test_not_exists { copy_if_not_exists(&integration).await; } diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 4a78927d0988..252e9fdcadf5 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -679,6 +679,13 @@ mod cloud { cache: Default::default(), } } + + /// Override the minimum remaining TTL for a cached token to be used + #[cfg(feature = "aws")] + pub fn with_min_ttl(mut self, min_ttl: Duration) -> Self { + self.cache = self.cache.with_min_ttl(min_ttl); + self + } } #[async_trait] diff --git a/object_store/src/client/token.rs b/object_store/src/client/token.rs index 7e48d351d9a3..7a3c8079449b 100644 --- a/object_store/src/client/token.rs +++ b/object_store/src/client/token.rs @@ -16,7 +16,7 @@ // under the License. use std::future::Future; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::sync::Mutex; /// A temporary authentication token with an associated expiry @@ -34,17 +34,25 @@ pub struct TemporaryToken { #[derive(Debug)] pub struct TokenCache { cache: Mutex>>, + min_ttl: Duration, } impl Default for TokenCache { fn default() -> Self { Self { cache: Default::default(), + min_ttl: Duration::from_secs(300), } } } impl TokenCache { + /// Override the minimum remaining TTL for a cached token to be used + #[cfg(feature = "aws")] + pub fn with_min_ttl(self, min_ttl: Duration) -> Self { + Self { min_ttl, ..self } + } + pub async fn get_or_insert_with(&self, f: F) -> Result where F: FnOnce() -> Fut + Send, @@ -55,13 +63,7 @@ impl TokenCache { if let Some(cached) = locked.as_ref() { match cached.expiry { - Some(ttl) - if ttl - .checked_duration_since(now) - .unwrap_or_default() - .as_secs() - > 300 => - { + Some(ttl) if ttl.checked_duration_since(now).unwrap_or_default() > self.min_ttl => { return Ok(cached.token.clone()); } None => return Ok(cached.token.clone()),