diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 65d0fcb147..a807184c47 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -96,6 +96,7 @@ jobs: # https://github.com/rust-lang/cargo/issues/10280 CARGO_NET_GIT_FETCH_WITH_CLI: "true" RUST_BACKTRACE: "1" + RUST_LOG: debug AWS_DEFAULT_REGION: "us-east-1" AWS_ACCESS_KEY_ID: deltalake AWS_SECRET_ACCESS_KEY: weloverust diff --git a/crates/aws/Cargo.toml b/crates/aws/Cargo.toml index 9fe0c05934..c68af0f6c5 100644 --- a/crates/aws/Cargo.toml +++ b/crates/aws/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-aws" -version = "0.2.0" +version = "0.2.1" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/aws/src/constants.rs b/crates/aws/src/constants.rs new file mode 100644 index 0000000000..73d2da1b48 --- /dev/null +++ b/crates/aws/src/constants.rs @@ -0,0 +1,138 @@ +//! Constants used for modifying and configuring various AWS S3 (or similar) connections with +//! delta-rs +//! + +use lazy_static::lazy_static; +use std::time::Duration; + +/// Custom S3 endpoint. +pub const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL"; +/// Custom DynamoDB endpoint. +/// If DynamoDB endpoint is not supplied, will use S3 endpoint (AWS_ENDPOINT_URL) +/// If it is supplied, this endpoint takes precedence over the global endpoint set in AWS_ENDPOINT_URL for DynamoDB +pub const AWS_ENDPOINT_URL_DYNAMODB: &str = "AWS_ENDPOINT_URL_DYNAMODB"; +/// The AWS region. +pub const AWS_REGION: &str = "AWS_REGION"; +/// The AWS profile. +pub const AWS_PROFILE: &str = "AWS_PROFILE"; +/// The AWS_ACCESS_KEY_ID to use for S3. +pub const AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID"; +/// The AWS_SECRET_ACCESS_KEY to use for S3. +pub const AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY"; +/// The AWS_SESSION_TOKEN to use for S3. +pub const AWS_SESSION_TOKEN: &str = "AWS_SESSION_TOKEN"; +/// Uses either "path" (the default) or "virtual", which turns on +/// [virtual host addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html). +pub const AWS_S3_ADDRESSING_STYLE: &str = "AWS_S3_ADDRESSING_STYLE"; +/// Locking provider to use for safe atomic rename. +/// `dynamodb` is currently the only supported locking provider. +/// If not set, safe atomic rename is not available. +pub const AWS_S3_LOCKING_PROVIDER: &str = "AWS_S3_LOCKING_PROVIDER"; +/// The role to assume for S3 writes. +pub const AWS_IAM_ROLE_ARN: &str = "AWS_IAM_ROLE_ARN"; +/// The role to assume. Please use [AWS_IAM_ROLE_ARN] instead +#[deprecated(since = "0.20.0", note = "Please use AWS_IAM_ROLE_ARN instead")] +pub const AWS_S3_ASSUME_ROLE_ARN: &str = "AWS_S3_ASSUME_ROLE_ARN"; +/// The role session name to use when a role is assumed. If not provided a random session name is generated. +pub const AWS_IAM_ROLE_SESSION_NAME: &str = "AWS_IAM_ROLE_SESSION_NAME"; +/// The role session name to use when a role is assumed. If not provided a random session name is generated. +#[deprecated( + since = "0.20.0", + note = "Please use AWS_IAM_ROLE_SESSION_NAME instead" +)] +pub const AWS_S3_ROLE_SESSION_NAME: &str = "AWS_S3_ROLE_SESSION_NAME"; +/// The `pool_idle_timeout` option of aws http client. Has to be lower than 20 seconds, which is +/// default S3 server timeout . +/// However, since rusoto uses hyper as a client, its default timeout is 90 seconds +/// . +/// Hence, the `connection closed before message completed` could occur. +/// To avoid that, the default value of this setting is 15 seconds if it's not set otherwise. +pub const AWS_S3_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_S3_POOL_IDLE_TIMEOUT_SECONDS"; +/// The `pool_idle_timeout` for the as3_constants sts client. See +/// the reasoning in `AWS_S3_POOL_IDLE_TIMEOUT_SECONDS`. +pub const AWS_STS_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_STS_POOL_IDLE_TIMEOUT_SECONDS"; +/// The number of retries for S3 GET requests failed with 500 Internal Server Error. +pub const AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES: &str = + "AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES"; +/// The web identity token file to use when using a web identity provider. +/// NOTE: web identity related options are set in the environment when +/// creating an instance of [crate::storage::s3::S3StorageOptions]. +/// See also . +pub const AWS_WEB_IDENTITY_TOKEN_FILE: &str = "AWS_WEB_IDENTITY_TOKEN_FILE"; +/// The role name to use for web identity. +/// NOTE: web identity related options are set in the environment when +/// creating an instance of [crate::storage::s3::S3StorageOptions]. +/// See also . +pub const AWS_ROLE_ARN: &str = "AWS_ROLE_ARN"; +/// The role session name to use for web identity. +/// NOTE: web identity related options are set in the environment when +/// creating an instance of [crate::storage::s3::S3StorageOptions]. +/// See also . +pub const AWS_ROLE_SESSION_NAME: &str = "AWS_ROLE_SESSION_NAME"; +/// Allow http connections - mainly useful for integration tests +pub const AWS_ALLOW_HTTP: &str = "AWS_ALLOW_HTTP"; + +/// If set to "true", allows creating commits without concurrent writer protection. +/// Only safe if there is one writer to a given table. +pub const AWS_S3_ALLOW_UNSAFE_RENAME: &str = "AWS_S3_ALLOW_UNSAFE_RENAME"; + +/// If set to "true", disables the imds client +/// Defaults to "true" +pub const AWS_EC2_METADATA_DISABLED: &str = "AWS_EC2_METADATA_DISABLED"; + +/// The timeout in milliseconds for the EC2 metadata endpoint +/// Defaults to 100 +pub const AWS_EC2_METADATA_TIMEOUT: &str = "AWS_EC2_METADATA_TIMEOUT"; + +/// The list of option keys owned by the S3 module. +/// Option keys not contained in this list will be added to the `extra_opts` +/// field of [crate::storage::s3::S3StorageOptions]. +pub const S3_OPTS: &[&str] = &[ + AWS_ENDPOINT_URL, + AWS_ENDPOINT_URL_DYNAMODB, + AWS_REGION, + AWS_PROFILE, + AWS_ACCESS_KEY_ID, + AWS_SECRET_ACCESS_KEY, + AWS_SESSION_TOKEN, + AWS_S3_LOCKING_PROVIDER, + AWS_S3_ASSUME_ROLE_ARN, + AWS_S3_ROLE_SESSION_NAME, + AWS_WEB_IDENTITY_TOKEN_FILE, + AWS_ROLE_ARN, + AWS_ROLE_SESSION_NAME, + AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, + AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, + AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, + AWS_EC2_METADATA_DISABLED, + AWS_EC2_METADATA_TIMEOUT, +]; + +pub const DEFAULT_LOCK_TABLE_NAME: &str = "delta_log"; +pub const LOCK_TABLE_KEY_NAME: &str = "DELTA_DYNAMO_TABLE_NAME"; +pub const BILLING_MODE_KEY_NAME: &str = "DELTA_DYNAMO_BILLING_MODE"; +pub const MAX_ELAPSED_REQUEST_TIME_KEY_NAME: &str = "DELTA_DYNAMO_MAX_ELAPSED_REQUEST_TIME"; + +pub const ATTR_TABLE_PATH: &str = "tablePath"; +pub const ATTR_FILE_NAME: &str = "fileName"; +pub const ATTR_TEMP_PATH: &str = "tempPath"; +pub const ATTR_COMPLETE: &str = "complete"; +pub const ATTR_EXPIRE_TIME: &str = "expireTime"; + +pub const STRING_TYPE: &str = "S"; + +pub const KEY_TYPE_HASH: &str = "HASH"; +pub const KEY_TYPE_RANGE: &str = "RANGE"; + +lazy_static! { + pub static ref CONDITION_EXPR_CREATE: String = format!( + "attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME})" + ); + + pub static ref CONDITION_DELETE_INCOMPLETE: String = format!( + "(complete = :f) or (attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME}))" + ); +} + +pub const CONDITION_UPDATE_INCOMPLETE: &str = "complete = :f"; +pub const DEFAULT_COMMIT_ENTRY_EXPIRATION_DELAY: Duration = Duration::from_secs(86_400); diff --git a/crates/aws/src/credentials.rs b/crates/aws/src/credentials.rs index 9ddf19b74c..d7bdbb746a 100644 --- a/crates/aws/src/credentials.rs +++ b/crates/aws/src/credentials.rs @@ -1,118 +1,250 @@ -use std::{sync::Arc, time::Duration}; - -use aws_config::{ - ecs::EcsCredentialsProvider, - environment::{EnvironmentVariableCredentialsProvider, EnvironmentVariableRegionProvider}, - imds::credentials::ImdsCredentialsProvider, - meta::{credentials::CredentialsProviderChain, region::RegionProviderChain}, - profile::ProfileFileCredentialsProvider, - provider_config::ProviderConfig, - web_identity_token::WebIdentityTokenCredentialsProvider, +//! Custom AWS credential providers used by delta-rs +//! + +use std::sync::Arc; + +use aws_config::default_provider::credentials::DefaultCredentialsChain; +use aws_config::meta::credentials::CredentialsProviderChain; +use aws_config::sts::AssumeRoleProvider; +use aws_config::SdkConfig; +use aws_credential_types::provider::error::CredentialsError; +use aws_credential_types::provider::{future, ProvideCredentials}; +use aws_credential_types::Credentials; + +use deltalake_core::storage::object_store::aws::{AmazonS3ConfigKey, AwsCredential}; +use deltalake_core::storage::object_store::{ + CredentialProvider, Error as ObjectStoreError, Result as ObjectStoreResult, }; -use aws_credential_types::provider::{self, ProvideCredentials}; -use tracing::Instrument; +use deltalake_core::storage::StorageOptions; +use deltalake_core::DeltaResult; +use tracing::log::*; -const IMDS_PROVIDER_NAME: &str = "Ec2InstanceMetadata"; +use crate::constants; -#[derive(Debug)] -pub struct ConfiguredCredentialChain { - provider_chain: CredentialsProviderChain, +/// An [object_store::CredentialProvider] which handles converting a populated [SdkConfig] +/// into a necessary [AwsCredential] type for configuring [object_store::aws::AmazonS3] +#[derive(Clone, Debug)] +pub(crate) struct AWSForObjectStore { + sdk_config: SdkConfig, } -#[derive(Debug)] -pub struct NoOpCredentials {} +impl AWSForObjectStore { + pub(crate) fn new(sdk_config: SdkConfig) -> Self { + Self { sdk_config } + } +} + +#[async_trait::async_trait] +impl CredentialProvider for AWSForObjectStore { + type Credential = AwsCredential; + + /// Invoke the underlying [AssumeRoleProvider] to retrieve the temporary credentials associated + /// with the role assumed + async fn get_credential(&self) -> ObjectStoreResult> { + let provider = self + .sdk_config + .credentials_provider() + .ok_or(ObjectStoreError::NotImplemented)?; + let credentials = + provider + .provide_credentials() + .await + .map_err(|e| ObjectStoreError::NotSupported { + source: Box::new(e), + })?; -pub fn new_region_provider(disable_imds: bool, imds_timeout: u64) -> RegionProviderChain { - let env_provider = EnvironmentVariableRegionProvider::new(); - let profile_file = aws_config::profile::region::ProfileFileRegionProvider::default(); - if disable_imds { - return RegionProviderChain::first_try(env_provider).or_else(profile_file); + debug!( + "CredentialProvider for Object Store using access key: {}", + credentials.access_key_id() + ); + + Ok(Arc::new(Self::Credential { + key_id: credentials.access_key_id().into(), + secret_key: credentials.secret_access_key().into(), + token: credentials.session_token().map(|o| o.to_string()), + })) } +} - RegionProviderChain::first_try(env_provider) - .or_else(profile_file) - .or_else( - aws_config::imds::region::Builder::default() - .imds_client( - aws_config::imds::Client::builder() - .connect_timeout(Duration::from_millis(imds_timeout)) - .read_timeout(Duration::from_millis(imds_timeout)) - .build(), - ) - .build(), - ) +/// An [object_store::CredentialProvider] which handles retrieving the necessary +/// temporary credentials associated with the assumed role +#[derive(Debug)] +pub(crate) struct AssumeRoleCredentialProvider { + sdk_config: SdkConfig, } -impl ConfiguredCredentialChain { - pub fn new(disable_imds: bool, imds_timeout: u64, conf: &ProviderConfig) -> Self { - let imds_provider = Self::build_imds_provider(conf, disable_imds, imds_timeout); - let env_provider = EnvironmentVariableCredentialsProvider::default(); - let profile_provider = ProfileFileCredentialsProvider::builder() - .configure(conf) - .with_custom_provider(IMDS_PROVIDER_NAME, imds_provider.clone()) - .build(); - let web_identity_token_provider = WebIdentityTokenCredentialsProvider::builder() - .configure(conf) - .build(); - - let ecs_provider = EcsCredentialsProvider::builder().configure(conf).build(); - - let provider_chain = CredentialsProviderChain::first_try("Environment", env_provider) - .or_else("Profile", profile_provider) - .or_else("WebIdentityToken", web_identity_token_provider) - .or_else("EcsContainer", ecs_provider) - .or_else(IMDS_PROVIDER_NAME, imds_provider); - - Self { provider_chain } +impl AssumeRoleCredentialProvider { + fn session_name(&self) -> String { + /* + if let Some(_) = str_option(options, s3_constants::AWS_S3_ROLE_SESSION_NAME) { + warn!( + "AWS_S3_ROLE_SESSION_NAME is deprecated please AWS_IAM_ROLE_SESSION_NAME instead!" + ); + } + str_option(options, s3_constants::AWS_IAM_ROLE_SESSION_NAME) + .or(str_option(options, s3_constants::AWS_S3_ROLE_SESSION_NAME)) + .unwrap_or("delta-rs".into()) + */ + todo!() } - async fn credentials(&self) -> provider::Result { - self.provider_chain - .provide_credentials() - .instrument(tracing::debug_span!("provide_credentials", provider = %"default_chain")) - .await + fn iam_role(&self) -> String { + todo!() } +} - fn build_imds_provider( - conf: &ProviderConfig, - disable_imds: bool, - imds_timeout: u64, - ) -> Arc { - if disable_imds { - return Arc::new(NoOpCredentials {}); - } +#[async_trait::async_trait] +impl CredentialProvider for AssumeRoleCredentialProvider { + type Credential = AwsCredential; - let imds_provider = ImdsCredentialsProvider::builder() - .configure(conf) - .imds_client( - aws_config::imds::Client::builder() - .connect_timeout(Duration::from_millis(imds_timeout)) - .read_timeout(Duration::from_millis(imds_timeout)) - .build(), - ) - .build(); - Arc::new(imds_provider) + /// Invoke the underlying [AssumeRoleProvider] to retrieve the temporary credentials associated + /// with the role assumed + async fn get_credential(&self) -> ObjectStoreResult> { + let provider = AssumeRoleProvider::builder(self.iam_role()) + .configure(&self.sdk_config) + .session_name(self.session_name()) + .build() + .await; + let credentials = + provider + .provide_credentials() + .await + .map_err(|e| ObjectStoreError::NotSupported { + source: Box::new(e), + })?; + + Ok(Arc::new(Self::Credential { + key_id: credentials.access_key_id().into(), + secret_key: credentials.secret_access_key().into(), + token: credentials.session_token().map(|o| o.to_string()), + })) } } -impl ProvideCredentials for ConfiguredCredentialChain { - fn provide_credentials<'a>( - &'a self, - ) -> aws_credential_types::provider::future::ProvideCredentials<'a> - where - Self: 'a, - { - aws_credential_types::provider::future::ProvideCredentials::new(self.credentials()) +/// Name of the [OptionsCredentialsProvider] for AWS SDK use +const OPTS_PROVIDER: &str = "DeltaStorageOptionsProvider"; + +/// The [OptionsCredentialsProvider] helps users plug specific AWS credentials into their +/// [StorageOptions] in such a way that the AWS SDK code will be properly +/// loaded with those credentials before following the +/// [aws_config::default_provider::credentials::DefaultCredentialsChain] +#[derive(Clone, Debug)] +pub(crate) struct OptionsCredentialsProvider { + options: StorageOptions, +} + +impl OptionsCredentialsProvider { + /// Look at the options configured on the provider and return an appropriate + /// [Credentials] instance for AWS SDK credential resolution + fn credentials(&self) -> aws_credential_types::provider::Result { + debug!("Attempting to pull credentials from `StorageOptions`"); + let access_key = self.options.0.get(constants::AWS_ACCESS_KEY_ID).ok_or( + CredentialsError::not_loaded("access key not in StorageOptions"), + )?; + let secret_key = self.options.0.get(constants::AWS_SECRET_ACCESS_KEY).ok_or( + CredentialsError::not_loaded("secret key not in StorageOptions"), + )?; + let session_token = self.options.0.get(constants::AWS_SESSION_TOKEN).cloned(); + + Ok(Credentials::new( + access_key, + secret_key, + session_token, + None, + OPTS_PROVIDER, + )) } } -impl ProvideCredentials for NoOpCredentials { - fn provide_credentials<'a>(&'a self) -> provider::future::ProvideCredentials<'a> +impl ProvideCredentials for OptionsCredentialsProvider { + fn provide_credentials<'a>(&'a self) -> future::ProvideCredentials<'a> where Self: 'a, { - aws_credential_types::provider::future::ProvideCredentials::new(std::future::ready(Err( - provider::error::CredentialsError::not_loaded_no_source(), - ))) + future::ProvideCredentials::ready(self.credentials()) + } +} + +/// Take a set of [StorageOptions] and produce an appropriate AWS SDK [SdkConfig] +/// for use with various AWS SDK APIs, such as in our [crate::logstore::S3DynamoDbLogStore] +pub async fn resolve_credentials(options: StorageOptions) -> DeltaResult { + let options_provider = OptionsCredentialsProvider { options }; + + let default_provider = DefaultCredentialsChain::builder().build().await; + let credentials_provider = + CredentialsProviderChain::first_try("StorageOptions", options_provider) + .or_else("DefaultChain", default_provider); + + Ok(aws_config::from_env() + .credentials_provider(credentials_provider) + .load() + .await) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::constants; + use maplit::hashmap; + use serial_test::serial; + + #[tokio::test] + #[serial] + async fn test_options_credentials_provider() { + let options = StorageOptions(hashmap! { + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + }); + + let config = resolve_credentials(options).await; + assert!(config.is_ok(), "{config:?}"); + let config = config.unwrap(); + + if let Some(provider) = &config.credentials_provider() { + let credentials = provider + .provide_credentials() + .await + .expect("Failed to provide credentials"); + assert_eq!( + "test_id", + credentials.access_key_id(), + "The access key should come from our options! {credentials:?}" + ); + assert_eq!( + "test_secret", + credentials.secret_access_key(), + "The secret should come from our options! {credentials:?}" + ); + } else { + panic!("Could not retrieve credentials from the SdkConfig: {config:?}"); + } + } + + #[tokio::test] + #[serial] + async fn test_options_credentials_provider_session_token() { + let options = StorageOptions(hashmap! { + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + constants::AWS_SESSION_TOKEN.to_string() => "test_token".to_string(), + }); + + let config = resolve_credentials(options) + .await + .expect("Failed to resolve_credentials"); + + if let Some(provider) = &config.credentials_provider() { + let credentials = provider + .provide_credentials() + .await + .expect("Failed to provide credentials"); + assert_eq!( + Some("test_token"), + credentials.session_token(), + "The session token should come from our options! {credentials:?}" + ); + } else { + panic!("Could not retrieve credentials from the SdkConfig: {config:?}"); + } } } diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index 187462cb12..9834fc8b54 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -1,5 +1,9 @@ -//! Lock client implementation based on DynamoDb. +//! AWS S3 and similar tooling for delta-rs +//! +//! This module also contains the [S3DynamoDbLogStore] implemtnation for concurrent writer support +//! with AWS S3 specifically. +pub mod constants; mod credentials; pub mod errors; pub mod logstore; @@ -608,42 +612,6 @@ pub enum CreateLockTableResult { TableAlreadyExists, } -pub mod constants { - use std::time::Duration; - - use lazy_static::lazy_static; - - pub const DEFAULT_LOCK_TABLE_NAME: &str = "delta_log"; - pub const LOCK_TABLE_KEY_NAME: &str = "DELTA_DYNAMO_TABLE_NAME"; - pub const BILLING_MODE_KEY_NAME: &str = "DELTA_DYNAMO_BILLING_MODE"; - pub const MAX_ELAPSED_REQUEST_TIME_KEY_NAME: &str = "DELTA_DYNAMO_MAX_ELAPSED_REQUEST_TIME"; - - pub const ATTR_TABLE_PATH: &str = "tablePath"; - pub const ATTR_FILE_NAME: &str = "fileName"; - pub const ATTR_TEMP_PATH: &str = "tempPath"; - pub const ATTR_COMPLETE: &str = "complete"; - pub const ATTR_EXPIRE_TIME: &str = "expireTime"; - - pub const STRING_TYPE: &str = "S"; - - pub const KEY_TYPE_HASH: &str = "HASH"; - pub const KEY_TYPE_RANGE: &str = "RANGE"; - - lazy_static! { - pub static ref CONDITION_EXPR_CREATE: String = format!( - "attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME})" - ); - - pub static ref CONDITION_DELETE_INCOMPLETE: String = format!( - "(complete = :f) or (attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME}))" - ); - } - - pub const CONDITION_UPDATE_INCOMPLETE: &str = "complete = :f"; - - pub const DEFAULT_COMMIT_ENTRY_EXPIRATION_DELAY: Duration = Duration::from_secs(86_400); -} - /// Extract a field from an item's attribute value map, producing a descriptive error /// of the various failure cases. fn extract_required_string_field<'a>( diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 563e2de284..ef2dfc6f9f 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -1,29 +1,29 @@ //! AWS S3 storage backend. -use aws_config::default_provider::token::DefaultTokenChain; -use aws_config::meta::region::ProvideRegion; -use aws_config::provider_config::ProviderConfig; use aws_config::{Region, SdkConfig}; use bytes::Bytes; +use deltalake_core::storage::object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey}; use deltalake_core::storage::object_store::{ - aws::AmazonS3ConfigKey, parse_url_opts, GetOptions, GetResult, ListResult, ObjectMeta, - ObjectStore, PutOptions, PutResult, Result as ObjectStoreResult, + parse_url_opts, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreScheme, PutMultipartOpts, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, }; use deltalake_core::storage::{ limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions, }; -use deltalake_core::{DeltaResult, ObjectStoreError, Path}; +use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path}; use futures::stream::BoxStream; use futures::Future; -use object_store::{MultipartUpload, PutMultipartOpts, PutPayload}; use std::collections::HashMap; use std::fmt::Debug; use std::ops::Range; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use tracing::log::*; use url::Url; +use crate::constants; use crate::errors::DynamoDbConfigError; #[cfg(feature = "native-tls")] use crate::native; @@ -72,14 +72,29 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { storage_options: &StorageOptions, ) -> DeltaResult<(ObjectStoreRef, Path)> { let options = self.with_env_s3(storage_options); - let (inner, prefix) = parse_url_opts( - url, - options.0.iter().filter_map(|(key, value)| { - let s3_key = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()).ok()?; - Some((s3_key, value.clone())) - }), - )?; + let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials( + storage_options.clone(), + ))??; + + let os_credentials = Arc::new(crate::credentials::AWSForObjectStore::new(sdk_config)); + + let mut builder = AmazonS3Builder::new().with_url(url.to_string()); + + for (key, value) in options.0.iter() { + if let Ok(key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) { + builder = builder.with_config(key, value.clone()); + } + } + + let (_scheme, path) = + ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError { + source: Box::new(e), + })?; + let prefix = Path::parse(path)?; + let inner = builder.with_credentials(os_credentials).build()?; + let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?; + debug!("Initialized the object store: {store:?}"); Ok((store, prefix)) } @@ -147,29 +162,29 @@ impl PartialEq for S3StorageOptions { impl S3StorageOptions { /// Creates an instance of S3StorageOptions from the given HashMap. pub fn from_map(options: &HashMap) -> DeltaResult { - let extra_opts = options + let extra_opts: HashMap = options .iter() .filter(|(k, _)| !s3_constants::S3_OPTS.contains(&k.as_str())) .map(|(k, v)| (k.to_owned(), v.to_owned())) .collect(); // Copy web identity values provided in options but not the environment into the environment // to get picked up by the `from_k8s_env` call in `get_web_identity_provider`. - Self::ensure_env_var(options, s3_constants::AWS_REGION); - Self::ensure_env_var(options, s3_constants::AWS_PROFILE); - Self::ensure_env_var(options, s3_constants::AWS_ACCESS_KEY_ID); - Self::ensure_env_var(options, s3_constants::AWS_SECRET_ACCESS_KEY); - Self::ensure_env_var(options, s3_constants::AWS_SESSION_TOKEN); - Self::ensure_env_var(options, s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE); - Self::ensure_env_var(options, s3_constants::AWS_ROLE_ARN); - Self::ensure_env_var(options, s3_constants::AWS_ROLE_SESSION_NAME); + Self::ensure_env_var(options, constants::AWS_REGION); + Self::ensure_env_var(options, constants::AWS_PROFILE); + Self::ensure_env_var(options, constants::AWS_ACCESS_KEY_ID); + Self::ensure_env_var(options, constants::AWS_SECRET_ACCESS_KEY); + Self::ensure_env_var(options, constants::AWS_SESSION_TOKEN); + Self::ensure_env_var(options, constants::AWS_WEB_IDENTITY_TOKEN_FILE); + Self::ensure_env_var(options, constants::AWS_ROLE_ARN); + Self::ensure_env_var(options, constants::AWS_ROLE_SESSION_NAME); let s3_pool_idle_timeout = - Self::u64_or_default(options, s3_constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, 15); + Self::u64_or_default(options, constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, 15); let sts_pool_idle_timeout = - Self::u64_or_default(options, s3_constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, 10); + Self::u64_or_default(options, constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, 10); let s3_get_internal_server_error_retries = Self::u64_or_default( options, - s3_constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, + constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, 10, ) as usize; @@ -178,74 +193,18 @@ impl S3StorageOptions { .map(|addressing_style| addressing_style == "virtual") .unwrap_or(false); - let allow_unsafe_rename = str_option(options, s3_constants::AWS_S3_ALLOW_UNSAFE_RENAME) + let allow_unsafe_rename = str_option(options, constants::AWS_S3_ALLOW_UNSAFE_RENAME) .map(|val| str_is_truthy(&val)) .unwrap_or(false); - let disable_imds = str_option(options, s3_constants::AWS_EC2_METADATA_DISABLED) - .map(|val| str_is_truthy(&val)) - .unwrap_or(true); - let imds_timeout = - Self::u64_or_default(options, s3_constants::AWS_EC2_METADATA_TIMEOUT, 100); - let (loader, provider_config) = - if let Some(endpoint_url) = str_option(options, s3_constants::AWS_ENDPOINT_URL) { - let (region_provider, provider_config) = Self::create_provider_config( - str_option(options, s3_constants::AWS_REGION) - .or_else(|| std::env::var("AWS_DEFAULT_REGION").ok()) - .map_or(Region::from_static("custom"), Region::new), - )?; - let loader = aws_config::from_env() - .endpoint_url(endpoint_url) - .region(region_provider); - (loader, provider_config) - } else { - let (region_provider, provider_config) = Self::create_provider_config( - crate::credentials::new_region_provider(disable_imds, imds_timeout), - )?; - ( - aws_config::from_env().region(region_provider), - provider_config, - ) - }; - - let credentials_provider = crate::credentials::ConfiguredCredentialChain::new( - disable_imds, - imds_timeout, - &provider_config, - ); - - let token_provider: DefaultTokenChain = execute_sdk_future( - DefaultTokenChain::builder() - .region(crate::credentials::new_region_provider( - disable_imds, - imds_timeout, - )) - .build(), - )?; - #[cfg(feature = "native-tls")] - let sdk_config = execute_sdk_future( - loader - .http_client(native::use_native_tls_client( - str_option(options, s3_constants::AWS_ALLOW_HTTP) - .map(|val| str_is_truthy(&val)) - .unwrap_or(false), - )) - .credentials_provider(credentials_provider) - .token_provider(token_provider) - .load(), - )?; - #[cfg(feature = "rustls")] - let sdk_config = execute_sdk_future( - loader - .credentials_provider(credentials_provider) - .token_provider(token_provider) - .load(), - )?; + let storage_options = StorageOptions(options.clone()); + let sdk_config = + execute_sdk_future(crate::credentials::resolve_credentials(storage_options))??; Ok(Self { virtual_hosted_style_request, - locking_provider: str_option(options, s3_constants::AWS_S3_LOCKING_PROVIDER), - dynamodb_endpoint: str_option(options, s3_constants::AWS_ENDPOINT_URL_DYNAMODB), + locking_provider: str_option(options, constants::AWS_S3_LOCKING_PROVIDER), + dynamodb_endpoint: str_option(options, constants::AWS_ENDPOINT_URL_DYNAMODB), s3_pool_idle_timeout: Duration::from_secs(s3_pool_idle_timeout), sts_pool_idle_timeout: Duration::from_secs(sts_pool_idle_timeout), s3_get_internal_server_error_retries, @@ -255,24 +214,16 @@ impl S3StorageOptions { }) } + /// Return the configured endpoint URL for S3 operations pub fn endpoint_url(&self) -> Option<&str> { self.sdk_config.endpoint_url() } + /// Return the configured region used for S3 operations pub fn region(&self) -> Option<&Region> { self.sdk_config.region() } - fn create_provider_config( - region_provider: T, - ) -> DeltaResult<(T, ProviderConfig)> { - let region = execute_sdk_future(region_provider.region())?; - Ok(( - region_provider, - ProviderConfig::default().with_region(region), - )) - } - fn u64_or_default(map: &HashMap, key: &str, default: u64) -> u64 { str_option(map, key) .and_then(|v| v.parse().ok()) @@ -281,7 +232,9 @@ impl S3StorageOptions { fn ensure_env_var(map: &HashMap, key: &str) { if let Some(val) = str_option(map, key) { - std::env::set_var(key, val); + unsafe { + std::env::set_var(key, val); + } } } @@ -307,7 +260,7 @@ where cfg = Some(handle.block_on(future)); }); }); - cfg.ok_or(deltalake_core::DeltaTableError::ObjectStore { + cfg.ok_or(DeltaTableError::ObjectStore { source: ObjectStoreError::Generic { store: STORE_NAME, source: Box::new(DynamoDbConfigError::InitializationError), @@ -334,7 +287,11 @@ pub struct S3StorageBackend { impl std::fmt::Display for S3StorageBackend { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "S3StorageBackend") + write!( + f, + "S3StorageBackend {{ allow_unsafe_rename: {}, inner: {} }}", + self.allow_unsafe_rename, self.inner + ) } } @@ -352,7 +309,11 @@ impl S3StorageBackend { impl std::fmt::Debug for S3StorageBackend { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - write!(fmt, "S3StorageBackend") + write!( + fmt, + "S3StorageBackend {{ allow_unsafe_rename: {}, inner: {:?} }}", + self.allow_unsafe_rename, self.inner + ) } } @@ -442,100 +403,12 @@ impl ObjectStore for S3StorageBackend { /// Storage option keys to use when creating [crate::storage::s3::S3StorageOptions]. /// The same key should be used whether passing a key in the hashmap or setting it as an environment variable. /// Provided keys may include configuration for the S3 backend and also the optional DynamoDb lock used for atomic rename. +#[deprecated( + since = "0.20.0", + note = "s3_constants has moved up to deltalake_aws::constants::*" +)] pub mod s3_constants { - /// Custom S3 endpoint. - pub const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL"; - /// Custom DynamoDB endpoint. - /// If DynamoDB endpoint is not supplied, will use S3 endpoint (AWS_ENDPOINT_URL) - /// If it is supplied, this endpoint takes precedence over the global endpoint set in AWS_ENDPOINT_URL for DynamoDB - pub const AWS_ENDPOINT_URL_DYNAMODB: &str = "AWS_ENDPOINT_URL_DYNAMODB"; - /// The AWS region. - pub const AWS_REGION: &str = "AWS_REGION"; - /// The AWS profile. - pub const AWS_PROFILE: &str = "AWS_PROFILE"; - /// The AWS_ACCESS_KEY_ID to use for S3. - pub const AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID"; - /// The AWS_SECRET_ACCESS_KEY to use for S3. - pub const AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY"; - /// The AWS_SESSION_TOKEN to use for S3. - pub const AWS_SESSION_TOKEN: &str = "AWS_SESSION_TOKEN"; - /// Uses either "path" (the default) or "virtual", which turns on - /// [virtual host addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html). - pub const AWS_S3_ADDRESSING_STYLE: &str = "AWS_S3_ADDRESSING_STYLE"; - /// Locking provider to use for safe atomic rename. - /// `dynamodb` is currently the only supported locking provider. - /// If not set, safe atomic rename is not available. - pub const AWS_S3_LOCKING_PROVIDER: &str = "AWS_S3_LOCKING_PROVIDER"; - /// The role to assume for S3 writes. - pub const AWS_S3_ASSUME_ROLE_ARN: &str = "AWS_S3_ASSUME_ROLE_ARN"; - /// The role session name to use when a role is assumed. If not provided a random session name is generated. - pub const AWS_S3_ROLE_SESSION_NAME: &str = "AWS_S3_ROLE_SESSION_NAME"; - /// The `pool_idle_timeout` option of aws http client. Has to be lower than 20 seconds, which is - /// default S3 server timeout . - /// However, since rusoto uses hyper as a client, its default timeout is 90 seconds - /// . - /// Hence, the `connection closed before message completed` could occur. - /// To avoid that, the default value of this setting is 15 seconds if it's not set otherwise. - pub const AWS_S3_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_S3_POOL_IDLE_TIMEOUT_SECONDS"; - /// The `pool_idle_timeout` for the as3_constants sts client. See - /// the reasoning in `AWS_S3_POOL_IDLE_TIMEOUT_SECONDS`. - pub const AWS_STS_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_STS_POOL_IDLE_TIMEOUT_SECONDS"; - /// The number of retries for S3 GET requests failed with 500 Internal Server Error. - pub const AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES: &str = - "AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES"; - /// The web identity token file to use when using a web identity provider. - /// NOTE: web identity related options are set in the environment when - /// creating an instance of [crate::storage::s3::S3StorageOptions]. - /// See also . - pub const AWS_WEB_IDENTITY_TOKEN_FILE: &str = "AWS_WEB_IDENTITY_TOKEN_FILE"; - /// The role name to use for web identity. - /// NOTE: web identity related options are set in the environment when - /// creating an instance of [crate::storage::s3::S3StorageOptions]. - /// See also . - pub const AWS_ROLE_ARN: &str = "AWS_ROLE_ARN"; - /// The role session name to use for web identity. - /// NOTE: web identity related options are set in the environment when - /// creating an instance of [crate::storage::s3::S3StorageOptions]. - /// See also . - pub const AWS_ROLE_SESSION_NAME: &str = "AWS_ROLE_SESSION_NAME"; - /// Allow http connections - mainly useful for integration tests - pub const AWS_ALLOW_HTTP: &str = "AWS_ALLOW_HTTP"; - - /// If set to "true", allows creating commits without concurrent writer protection. - /// Only safe if there is one writer to a given table. - pub const AWS_S3_ALLOW_UNSAFE_RENAME: &str = "AWS_S3_ALLOW_UNSAFE_RENAME"; - - /// If set to "true", disables the imds client - /// Defaults to "true" - pub const AWS_EC2_METADATA_DISABLED: &str = "AWS_EC2_METADATA_DISABLED"; - - /// The timeout in milliseconds for the EC2 metadata endpoint - /// Defaults to 100 - pub const AWS_EC2_METADATA_TIMEOUT: &str = "AWS_EC2_METADATA_TIMEOUT"; - - /// The list of option keys owned by the S3 module. - /// Option keys not contained in this list will be added to the `extra_opts` - /// field of [crate::storage::s3::S3StorageOptions]. - pub const S3_OPTS: &[&str] = &[ - AWS_ENDPOINT_URL, - AWS_ENDPOINT_URL_DYNAMODB, - AWS_REGION, - AWS_PROFILE, - AWS_ACCESS_KEY_ID, - AWS_SECRET_ACCESS_KEY, - AWS_SESSION_TOKEN, - AWS_S3_LOCKING_PROVIDER, - AWS_S3_ASSUME_ROLE_ARN, - AWS_S3_ROLE_SESSION_NAME, - AWS_WEB_IDENTITY_TOKEN_FILE, - AWS_ROLE_ARN, - AWS_ROLE_SESSION_NAME, - AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, - AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, - AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, - AWS_EC2_METADATA_DISABLED, - AWS_EC2_METADATA_TIMEOUT, - ]; + pub use crate::constants::*; } pub(crate) fn str_option(map: &HashMap, key: &str) -> Option { @@ -552,11 +425,9 @@ pub(crate) fn str_option(map: &HashMap, key: &str) -> Option "http://localhost:1234".to_string(), - s3_constants::AWS_REGION.to_string() => "us-west-2".to_string(), - s3_constants::AWS_PROFILE.to_string() => "default".to_string(), - s3_constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(), - s3_constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(), - s3_constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(), - s3_constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(), - s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(), - s3_constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(), - s3_constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(), - s3_constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string() => "3".to_string(), - s3_constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), - s3_constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + constants::AWS_ENDPOINT_URL.to_string() => "http://localhost:1234".to_string(), + constants::AWS_REGION.to_string() => "us-west-2".to_string(), + constants::AWS_PROFILE.to_string() => "default".to_string(), + constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(), + constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(), + constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(), + constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(), + constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(), + constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(), + constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(), + constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string() => "3".to_string(), + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), }).unwrap(); + // Get a default SdkConfig first, this ensures that if there are environment or profile + // information in the default load of credentials for the test run that it will pass + // the equivalence below + let storage_options = StorageOptions(HashMap::new()); + let sdk_config = + execute_sdk_future(crate::credentials::resolve_credentials(storage_options)) + .expect("Failed to run future") + .expect("Failed to load default SdkConfig") + .to_builder() + .endpoint_url("http://localhost:1234".to_string()) + .region(Region::from_static("us-west-2")) + .build(); + assert_eq!( S3StorageOptions { - sdk_config: SdkConfig::builder() - .endpoint_url("http://localhost:1234".to_string()) - .region(Region::from_static("us-west-2")) - .build(), + sdk_config, virtual_hosted_style_request: true, locking_provider: Some("another_locking_provider".to_string()), dynamodb_endpoint: None, @@ -724,20 +611,20 @@ mod tests { ScopedEnv::run(|| { clear_env_of_aws_keys(); let options = S3StorageOptions::from_map(&hashmap! { - s3_constants::AWS_ENDPOINT_URL.to_string() => "http://localhost:1234".to_string(), - s3_constants::AWS_ENDPOINT_URL_DYNAMODB.to_string() => "http://localhost:2345".to_string(), - s3_constants::AWS_REGION.to_string() => "us-west-2".to_string(), - s3_constants::AWS_PROFILE.to_string() => "default".to_string(), - s3_constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(), - s3_constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(), - s3_constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(), - s3_constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(), - s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(), - s3_constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(), - s3_constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(), - s3_constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string() => "3".to_string(), - s3_constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), - s3_constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + constants::AWS_ENDPOINT_URL.to_string() => "http://localhost:1234".to_string(), + constants::AWS_ENDPOINT_URL_DYNAMODB.to_string() => "http://localhost:2345".to_string(), + constants::AWS_REGION.to_string() => "us-west-2".to_string(), + constants::AWS_PROFILE.to_string() => "default".to_string(), + constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(), + constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(), + constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(), + constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(), + constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(), + constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(), + constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(), + constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string() => "3".to_string(), + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), }).unwrap(); assert_eq!( @@ -767,30 +654,30 @@ mod tests { fn storage_options_mixed_test() { ScopedEnv::run(|| { clear_env_of_aws_keys(); - std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "http://localhost"); + std::env::set_var(constants::AWS_ENDPOINT_URL, "http://localhost"); std::env::set_var( - s3_constants::AWS_ENDPOINT_URL_DYNAMODB, + constants::AWS_ENDPOINT_URL_DYNAMODB, "http://localhost:dynamodb", ); - std::env::set_var(s3_constants::AWS_REGION, "us-west-1"); - std::env::set_var(s3_constants::AWS_PROFILE, "default"); - std::env::set_var(s3_constants::AWS_ACCESS_KEY_ID, "wrong_key_id"); - std::env::set_var(s3_constants::AWS_SECRET_ACCESS_KEY, "wrong_secret_key"); - std::env::set_var(s3_constants::AWS_S3_LOCKING_PROVIDER, "dynamodb"); + std::env::set_var(constants::AWS_REGION, "us-west-1"); + std::env::set_var(constants::AWS_PROFILE, "default"); + std::env::set_var(constants::AWS_ACCESS_KEY_ID, "wrong_key_id"); + std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "wrong_secret_key"); + std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb"); std::env::set_var( - s3_constants::AWS_S3_ASSUME_ROLE_ARN, + constants::AWS_S3_ASSUME_ROLE_ARN, "arn:aws:iam::123456789012:role/some_role", ); - std::env::set_var(s3_constants::AWS_S3_ROLE_SESSION_NAME, "session_name"); - std::env::set_var(s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file"); + std::env::set_var(constants::AWS_S3_ROLE_SESSION_NAME, "session_name"); + std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file"); - std::env::set_var(s3_constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, "1"); - std::env::set_var(s3_constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, "2"); - std::env::set_var(s3_constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, "3"); + std::env::set_var(constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, "1"); + std::env::set_var(constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, "2"); + std::env::set_var(constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, "3"); let options = S3StorageOptions::from_map(&hashmap! { - s3_constants::AWS_ACCESS_KEY_ID.to_string() => "test_id_mixed".to_string(), - s3_constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret_mixed".to_string(), - s3_constants::AWS_REGION.to_string() => "us-west-2".to_string(), + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id_mixed".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret_mixed".to_string(), + constants::AWS_REGION.to_string() => "us-west-2".to_string(), "AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES".to_string() => "3".to_string(), }) .unwrap(); @@ -821,30 +708,27 @@ mod tests { ScopedEnv::run(|| { clear_env_of_aws_keys(); let _options = S3StorageOptions::from_map(&hashmap! { - s3_constants::AWS_REGION.to_string() => "eu-west-1".to_string(), - s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "web_identity_token_file".to_string(), - s3_constants::AWS_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/web_identity_role".to_string(), - s3_constants::AWS_ROLE_SESSION_NAME.to_string() => "web_identity_session_name".to_string(), + constants::AWS_REGION.to_string() => "eu-west-1".to_string(), + constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "web_identity_token_file".to_string(), + constants::AWS_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/web_identity_role".to_string(), + constants::AWS_ROLE_SESSION_NAME.to_string() => "web_identity_session_name".to_string(), }).unwrap(); - assert_eq!( - "eu-west-1", - std::env::var(s3_constants::AWS_REGION).unwrap() - ); + assert_eq!("eu-west-1", std::env::var(constants::AWS_REGION).unwrap()); assert_eq!( "web_identity_token_file", - std::env::var(s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE).unwrap() + std::env::var(constants::AWS_WEB_IDENTITY_TOKEN_FILE).unwrap() ); assert_eq!( "arn:aws:iam::123456789012:role/web_identity_role", - std::env::var(s3_constants::AWS_ROLE_ARN).unwrap() + std::env::var(constants::AWS_ROLE_ARN).unwrap() ); assert_eq!( "web_identity_session_name", - std::env::var(s3_constants::AWS_ROLE_SESSION_NAME).unwrap() + std::env::var(constants::AWS_ROLE_SESSION_NAME).unwrap() ); }); } @@ -856,10 +740,10 @@ mod tests { clear_env_of_aws_keys(); let raw_options = hashmap! {}; - std::env::set_var(s3_constants::AWS_ACCESS_KEY_ID, "env_key"); - std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "env_key"); - std::env::set_var(s3_constants::AWS_SECRET_ACCESS_KEY, "env_key"); - std::env::set_var(s3_constants::AWS_REGION, "env_key"); + std::env::set_var(constants::AWS_ACCESS_KEY_ID, "env_key"); + std::env::set_var(constants::AWS_ENDPOINT_URL, "env_key"); + std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "env_key"); + std::env::set_var(constants::AWS_REGION, "env_key"); let combined_options = S3ObjectStoreFactory {}.with_env_s3(&StorageOptions(raw_options)); @@ -921,20 +805,17 @@ mod tests { async fn storage_options_configure_imds(value: Option<&str>) -> Duration { let _options = match value { Some(value) => S3StorageOptions::from_map(&hashmap! { - s3_constants::AWS_REGION.to_string() => "eu-west-1".to_string(), - s3_constants::AWS_EC2_METADATA_DISABLED.to_string() => value.to_string(), + constants::AWS_REGION.to_string() => "eu-west-1".to_string(), + constants::AWS_EC2_METADATA_DISABLED.to_string() => value.to_string(), }) .unwrap(), None => S3StorageOptions::from_map(&hashmap! { - s3_constants::AWS_REGION.to_string() => "eu-west-1".to_string(), + constants::AWS_REGION.to_string() => "eu-west-1".to_string(), }) .unwrap(), }; - assert_eq!( - "eu-west-1", - std::env::var(s3_constants::AWS_REGION).unwrap() - ); + assert_eq!("eu-west-1", std::env::var(constants::AWS_REGION).unwrap()); let provider = _options.sdk_config.credentials_provider().unwrap(); let now = SystemTime::now();