From 82e12bc50e868196b6dffe8539e0e27efeb340cf Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 15 May 2022 13:22:49 +0200 Subject: [PATCH 1/4] more flexible azure backend config --- rust/src/storage/azure/mod.rs | 246 ++++++++++++++++++++++++++-------- rust/src/storage/mod.rs | 37 +++-- rust/src/storage/s3/mod.rs | 49 ++----- 3 files changed, 231 insertions(+), 101 deletions(-) diff --git a/rust/src/storage/azure/mod.rs b/rust/src/storage/azure/mod.rs index 2999761f8b..21c1c185a0 100644 --- a/rust/src/storage/azure/mod.rs +++ b/rust/src/storage/azure/mod.rs @@ -2,16 +2,18 @@ //! //! This module is gated behind the "azure" feature. //! -use super::{parse_uri, ObjectMeta, StorageBackend, StorageError, UriError}; +use super::{parse_uri, str_option, ObjectMeta, StorageBackend, StorageError, UriError}; use azure_core::auth::TokenCredential; use azure_core::ClientOptions; +use azure_identity::token_credentials::{ + AutoRefreshingTokenCredential, ClientSecretCredential, TokenCredentialOptions, +}; use azure_storage::storage_shared_key_credential::StorageSharedKeyCredential; use azure_storage_datalake::prelude::*; use futures::stream::Stream; use futures::StreamExt; use log::debug; use std::collections::HashMap; -use std::env; use std::error::Error; use std::fmt::Debug; use std::sync::Arc; @@ -20,10 +22,142 @@ use tokio::sync::mpsc::{self, Sender}; use tokio_stream::wrappers::ReceiverStream; use tokio_util::task::LocalPoolHandle; -///The ADLS Gen2 Access Key -pub const AZURE_STORAGE_ACCOUNT_KEY: &str = "AZURE_STORAGE_ACCOUNT_KEY"; -///The name of storage account -pub const AZURE_STORAGE_ACCOUNT_NAME: &str = "AZURE_STORAGE_ACCOUNT_NAME"; +/// Options for configuring Azure storage backend +pub mod azure_storage_options { + ///The ADLS Gen2 Access Key + pub const AZURE_STORAGE_ACCOUNT_KEY: &str = "AZURE_STORAGE_ACCOUNT_KEY"; + ///The name of storage account + pub const AZURE_STORAGE_ACCOUNT_NAME: &str = "AZURE_STORAGE_ACCOUNT_NAME"; + /// Connection string for connecting to azure storage account + pub const AZURE_STORAGE_CONNECTION_STRING: &str = "AZURE_STORAGE_CONNECTION_STRING"; + /// Service principal id + pub const AZURE_CLIENT_ID: &str = "AZURE_CLIENT_ID"; + /// Service principal secret + pub const AZURE_CLIENT_SECRET: &str = "AZURE_CLIENT_SECRET"; + /// ID for Azure (AAD) tenant where service principal is registered. + pub const AZURE_TENANT_ID: &str = "AZURE_TENANT_ID"; +} + +/// Options used to configure the AdlsGen2Backend. +/// +/// Available options are described in [azure_storage_options]. +#[derive(Clone, Debug, PartialEq)] +pub struct AzureStorageOptions { + account_key: Option, + account_name: Option, + // connection_string: Option, + client_id: Option, + client_secret: Option, + tenant_id: Option, +} + +impl AzureStorageOptions { + /// Creates an empty instance of AzureStorageOptions + pub fn new() -> Self { + Self { + account_key: None, + account_name: None, + client_id: None, + client_secret: None, + tenant_id: None, + } + } + + /// Creates an instance of AzureStorageOptions from the given HashMap. + pub fn from_map(options: HashMap) -> Self { + Self { + account_key: str_option(&options, azure_storage_options::AZURE_STORAGE_ACCOUNT_KEY), + account_name: str_option(&options, azure_storage_options::AZURE_STORAGE_ACCOUNT_NAME), + // connection_string: str_option( + // &options, + // azure_storage_options::AZURE_STORAGE_CONNECTION_STRING, + // ), + client_id: str_option(&options, azure_storage_options::AZURE_CLIENT_ID), + client_secret: str_option(&options, azure_storage_options::AZURE_CLIENT_SECRET), + tenant_id: str_option(&options, azure_storage_options::AZURE_TENANT_ID), + } + } + + /// set account name + pub fn with_account_name(&mut self, account_name: impl Into) -> &mut Self { + self.account_name = Some(account_name.into()); + self + } + + /// set account key + pub fn with_account_key(&mut self, account_key: impl Into) -> &mut Self { + self.account_key = Some(account_key.into()); + self + } + + /// set client id + pub fn with_client_id(&mut self, client_id: impl Into) -> &mut Self { + self.client_id = Some(client_id.into()); + self + } + + /// set client secret + pub fn with_client_secret(&mut self, client_secret: impl Into) -> &mut Self { + self.client_secret = Some(client_secret.into()); + self + } + + /// set tenant id + pub fn with_tenant_id(&mut self, tenant_id: impl Into) -> &mut Self { + self.tenant_id = Some(tenant_id.into()); + self + } +} + +impl Default for AzureStorageOptions { + /// Creates an instance of AzureStorageOptions from environment variables. + fn default() -> AzureStorageOptions { + Self::from_map(HashMap::new()) + } +} + +impl TryInto for AzureStorageOptions { + type Error = StorageError; + + fn try_into(self) -> Result { + let account_name = self.account_name.ok_or_else(|| { + StorageError::AzureConfig("account name must be provided".to_string()) + })?; + + if let Some(account_key) = self.account_key { + let key = StorageSharedKeyCredential::new(account_name, account_key); + return Ok(DataLakeClient::new_with_shared_key( + key, + None, + ClientOptions::default(), + )); + } + + let client_id = self.client_id.ok_or_else(|| { + StorageError::AzureConfig("account key or client config must be provided".to_string()) + })?; + let client_secret = self.client_secret.ok_or_else(|| { + StorageError::AzureConfig("account key or client config must be provided".to_string()) + })?; + let tenant_id = self.tenant_id.ok_or_else(|| { + StorageError::AzureConfig("account key or client config must be provided".to_string()) + })?; + + let client_credential = Arc::new(ClientSecretCredential::new( + tenant_id, + client_id, + client_secret, + TokenCredentialOptions::default(), + )); + + Ok(DataLakeClient::new_with_token_credential( + Arc::new(AutoRefreshingTokenCredential::new(client_credential)), + account_name, + None, + ClientOptions::default(), + )) + } +} /// An object on an Azure Data Lake Storage Gen2 account. #[derive(Debug, PartialEq)] @@ -69,43 +203,30 @@ impl AdlsGen2Backend { /// /// See `new_with_token_credential` for alternative authentication methods. /// - pub fn new(file_system_name: &str) -> Result { - let mut map: HashMap = HashMap::new(); - - let storage_account_name = env::var(AZURE_STORAGE_ACCOUNT_NAME).map_err(|_| { - StorageError::AzureConfig("AZURE_STORAGE_ACCOUNT_NAME must be set".to_string()) - })?; - - let storage_account_key = env::var(AZURE_STORAGE_ACCOUNT_KEY).map_err(|_| { - StorageError::AzureConfig("AZURE_STORAGE_ACCOUNT_KEY must be set".to_string()) - })?; - - map.insert(AZURE_STORAGE_ACCOUNT_NAME.to_string(), storage_account_name); - map.insert(AZURE_STORAGE_ACCOUNT_KEY.to_string(), storage_account_key); - - Self::from_map(file_system_name, map) + pub fn new(file_system_name: impl Into + Clone) -> Result { + Self::new_from_options(file_system_name, AzureStorageOptions::default()) } /// Create a new [`AdlsGen2Backend`] using a [`TokenCredential`] /// See [`azure_identity::token_credentials`] for various implementations pub fn new_with_token_credential( - storage_account_name: &str, - file_system_name: &str, + storage_account_name: impl Into, + file_system_name: impl Into + Clone, token_credential: Arc, ) -> Result { + let storage_account_name: String = storage_account_name.into(); let data_lake_client = DataLakeClient::new_with_token_credential( - token_credential.clone(), - storage_account_name, + token_credential, + storage_account_name.clone(), None, ClientOptions::default(), ); - let file_system_client = - data_lake_client.into_file_system_client(file_system_name.to_owned()); + let file_system_client = data_lake_client.into_file_system_client(file_system_name.clone()); Ok(AdlsGen2Backend { - storage_account_name: storage_account_name.to_string(), - file_system_name: file_system_name.to_string(), + storage_account_name, + file_system_name: file_system_name.into(), file_system_client, local_pool_handle: LocalPoolHandle::new(1), }) @@ -113,30 +234,37 @@ impl AdlsGen2Backend { /// Create a new [`AdlsGen2Backend`] using shared key authentication pub fn new_with_shared_key( - storage_account_name: &str, - file_system_name: &str, - storage_account_key: &str, + storage_account_name: impl Into, + file_system_name: impl Into + Clone, + storage_account_key: impl Into, ) -> Result { - let key = StorageSharedKeyCredential::new( - storage_account_name.to_owned(), - storage_account_key.to_owned(), - ); - - let data_lake_client = - DataLakeClient::new_with_shared_key(key, None, ClientOptions::default()); + let mut options = AzureStorageOptions::new(); + let options = options + .with_account_name(storage_account_name) + .with_account_key(storage_account_key); - let file_system_client = - data_lake_client.into_file_system_client(file_system_name.to_owned()); + Self::new_from_options(file_system_name, options.clone()) + } - Ok(AdlsGen2Backend { - storage_account_name: storage_account_name.to_string(), - file_system_name: file_system_name.to_string(), - file_system_client, - local_pool_handle: LocalPoolHandle::new(1), - }) + /// Create a new [`AdlsGen2Backend`] using shared key authentication + pub fn new_with_client( + storage_account_name: impl Into, + file_system_name: impl Into + Clone, + client_id: impl Into, + client_secret: impl Into, + tenant_id: impl Into, + ) -> Result { + let mut options = AzureStorageOptions::new(); + let options = options + .with_account_name(storage_account_name) + .with_client_id(client_id) + .with_client_secret(client_secret) + .with_tenant_id(tenant_id); + + Self::new_from_options(file_system_name, options.clone()) } - /// Create a new [`AdlsGen2Backend`] from a map + /// Create a new [`AdlsGen2Backend`] from AzureStorageOptions /// /// Currently only shared shared authentication works with this method. /// For each authentication method, the following keys are required @@ -145,19 +273,23 @@ impl AdlsGen2Backend { /// `AZURE_STORAGE_ACCOUNT_NAME` /// `AZURE_STORAGE_ACCOUNT_KEY` /// - pub fn from_map( - file_system_name: &str, - map: HashMap, + pub fn new_from_options( + file_system_name: impl Into + Clone, + options: AzureStorageOptions, ) -> Result { - let storage_account_name = map.get(AZURE_STORAGE_ACCOUNT_NAME).ok_or_else(|| { - StorageError::AzureConfig("AZURE_STORAGE_ACCOUNT_NAME must be set".to_string()) + let storage_account_name = options.account_name.clone().ok_or_else(|| { + StorageError::AzureConfig("account name must be provided".to_string()) })?; - let storage_account_key = map.get(AZURE_STORAGE_ACCOUNT_KEY).ok_or_else(|| { - StorageError::AzureConfig("AZURE_STORAGE_ACCOUNT_KEY must be set".to_string()) - })?; + let data_lake_client: DataLakeClient = options.try_into()?; + let file_system_client = data_lake_client.into_file_system_client(file_system_name.clone()); - Self::new_with_shared_key(storage_account_name, file_system_name, storage_account_key) + Ok(AdlsGen2Backend { + storage_account_name, + file_system_name: file_system_name.into(), + file_system_client, + local_pool_handle: LocalPoolHandle::new(1), + }) } fn validate_container<'a>(&self, obj: &AdlsGen2Object<'a>) -> Result<(), StorageError> { diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 0b3ed2ae43..b1ccb26939 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -5,15 +5,13 @@ use std::pin::Pin; use chrono::{DateTime, Utc}; use futures::Stream; +use std::collections::HashMap; #[cfg(feature = "azure")] use azure_core::{Error as AzureError, HttpError as AzureHttpError}; #[cfg(feature = "azure")] use std::error::Error; -#[cfg(any(feature = "s3", feature = "s3-rustls"))] -use self::s3::S3StorageOptions; - #[cfg(feature = "azure")] pub mod azure; pub mod file; @@ -592,26 +590,47 @@ pub fn get_backend_for_uri(uri: &str) -> Result, Storage /// Returns a StorageBackend appropriate for the protocol and configured with the given options /// Options must be passed as a hashmap. Hashmap keys correspond to env variables that are used if options are not set. /// -/// Currently, S3 is the only backend that accepts options. +/// Currently, S3 and Azure are the only backends that accept options. /// Options may be passed in the HashMap or set as environment variables. /// /// [S3StorageOptions] describes the available options for the S3 backend. /// [s3::dynamodb_lock::DynamoDbLockClient] describes additional options for the atomic rename client. +/// +/// [AzureStorageOptions] describes the available options for the Azure backend. pub fn get_backend_for_uri_with_options( uri: &str, - // NOTE: prefixing options with "_" to avoid deny warnings error since usage is conditional on s3 and the only usage is with s3 so far - _options: std::collections::HashMap, + #[allow(unused)] options: HashMap, ) -> Result, StorageError> { match parse_uri(uri)? { #[cfg(any(feature = "s3", feature = "s3-rustls"))] Uri::S3Object(_) => Ok(Box::new(s3::S3StorageBackend::new_from_options( - S3StorageOptions::from_map(_options), + s3::S3StorageOptions::from_map(options), )?)), #[cfg(feature = "azure")] - Uri::AdlsGen2Object(obj) => Ok(Box::new(azure::AdlsGen2Backend::from_map( + Uri::AdlsGen2Object(obj) => Ok(Box::new(azure::AdlsGen2Backend::new_from_options( obj.file_system, - _options, + azure::AzureStorageOptions::from_map(options), )?)), _ => get_backend_for_uri(uri), } } + +#[allow(unused)] +pub(crate) fn str_or_default(map: &HashMap, key: &str, default: String) -> String { + map.get(key) + .map(|v| v.to_owned()) + .unwrap_or_else(|| std::env::var(key).unwrap_or(default)) +} + +#[allow(unused)] +pub(crate) fn str_option(map: &HashMap, key: &str) -> Option { + map.get(key) + .map_or_else(|| std::env::var(key).ok(), |v| Some(v.to_owned())) +} + +#[allow(unused)] +pub(crate) fn u64_or_default(map: &HashMap, key: &str, default: u64) -> u64 { + str_option(map, key) + .and_then(|v| v.parse().ok()) + .unwrap_or(default) +} diff --git a/rust/src/storage/s3/mod.rs b/rust/src/storage/s3/mod.rs index ac423f8555..12524fc26d 100644 --- a/rust/src/storage/s3/mod.rs +++ b/rust/src/storage/s3/mod.rs @@ -17,7 +17,9 @@ use rusoto_s3::{ use rusoto_sts::{StsAssumeRoleSessionCredentialsProvider, StsClient, WebIdentityProvider}; use tokio::io::AsyncReadExt; -use super::{parse_uri, ObjectMeta, StorageBackend, StorageError}; +use super::{ + parse_uri, str_option, str_or_default, u64_or_default, ObjectMeta, StorageBackend, StorageError, +}; use rusoto_core::credential::{ AwsCredentials, CredentialsError, DefaultCredentialsProvider, ProvideAwsCredentials, }; @@ -234,10 +236,10 @@ impl S3StorageOptions { .map(|(k, v)| (k.to_owned(), v.to_owned())) .collect(); - let endpoint_url = Self::str_option(&options, s3_storage_options::AWS_ENDPOINT_URL); + let endpoint_url = str_option(&options, s3_storage_options::AWS_ENDPOINT_URL); let region = if let Some(endpoint_url) = endpoint_url.as_ref() { Region::Custom { - name: Self::str_or_default( + name: str_or_default( &options, s3_storage_options::AWS_REGION, "custom".to_string(), @@ -257,18 +259,18 @@ impl S3StorageOptions { Self::ensure_env_var(&options, s3_storage_options::AWS_ROLE_ARN); Self::ensure_env_var(&options, s3_storage_options::AWS_ROLE_SESSION_NAME); - let s3_pool_idle_timeout = Self::u64_or_default( + let s3_pool_idle_timeout = u64_or_default( &options, s3_storage_options::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, 15, ); - let sts_pool_idle_timeout = Self::u64_or_default( + let sts_pool_idle_timeout = u64_or_default( &options, s3_storage_options::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, 10, ); - let s3_get_internal_server_error_retries = Self::u64_or_default( + let s3_get_internal_server_error_retries = u64_or_default( &options, s3_storage_options::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, 10, @@ -277,18 +279,12 @@ impl S3StorageOptions { Self { _endpoint_url: endpoint_url, region, - aws_access_key_id: Self::str_option(&options, s3_storage_options::AWS_ACCESS_KEY_ID), - aws_secret_access_key: Self::str_option( - &options, - s3_storage_options::AWS_SECRET_ACCESS_KEY, - ), - aws_session_token: Self::str_option(&options, s3_storage_options::AWS_SESSION_TOKEN), - locking_provider: Self::str_option( - &options, - s3_storage_options::AWS_S3_LOCKING_PROVIDER, - ), - assume_role_arn: Self::str_option(&options, s3_storage_options::AWS_S3_ASSUME_ROLE_ARN), - assume_role_session_name: Self::str_option( + aws_access_key_id: str_option(&options, s3_storage_options::AWS_ACCESS_KEY_ID), + aws_secret_access_key: str_option(&options, s3_storage_options::AWS_SECRET_ACCESS_KEY), + aws_session_token: str_option(&options, s3_storage_options::AWS_SESSION_TOKEN), + locking_provider: str_option(&options, s3_storage_options::AWS_S3_LOCKING_PROVIDER), + assume_role_arn: str_option(&options, s3_storage_options::AWS_S3_ASSUME_ROLE_ARN), + assume_role_session_name: str_option( &options, s3_storage_options::AWS_S3_ROLE_SESSION_NAME, ), @@ -301,23 +297,6 @@ impl S3StorageOptions { } } - fn str_or_default(map: &HashMap, key: &str, default: String) -> String { - map.get(key) - .map(|v| v.to_owned()) - .unwrap_or_else(|| std::env::var(key).unwrap_or(default)) - } - - fn str_option(map: &HashMap, key: &str) -> Option { - map.get(key) - .map_or_else(|| std::env::var(key).ok(), |v| Some(v.to_owned())) - } - - fn u64_or_default(map: &HashMap, key: &str, default: u64) -> u64 { - Self::str_option(map, key) - .and_then(|v| v.parse().ok()) - .unwrap_or(default) - } - fn ensure_env_var(map: &HashMap, key: &str) { if let Some(val) = Self::str_option(map, key) { std::env::set_var(key, val); From 86c1c1cf281dfacc40f73ea627970229b8894dd9 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 15 May 2022 13:44:28 +0200 Subject: [PATCH 2/4] update docs --- Cargo.lock | 84 ++++++++++++++++++++++++++++------- rust/src/storage/azure/mod.rs | 39 +++++++++++----- 2 files changed, 94 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df6adc8849..c956e6ae0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -102,6 +102,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-lock" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6" +dependencies = [ + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -165,9 +174,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "azure_core" -version = "0.1.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c61455ab776eedabfc7e166dda27c6c6bc2a882c043c35817501f1bd7440158" +checksum = "ca4393afee90ad13c987a2cbfeb5bbb0b9fb3c86585e42ed3ed151babaa93da1" dependencies = [ "async-trait", "base64", @@ -179,6 +188,7 @@ dependencies = [ "http", "log", "oauth2", + "pin-project", "rand 0.8.5", "reqwest", "rustc_version 0.4.0", @@ -187,18 +197,20 @@ dependencies = [ "serde_json", "thiserror", "url", - "uuid 0.8.2", + "uuid 1.0.0", ] [[package]] name = "azure_identity" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebda98657980528a8f0f0f7cc85c88c7dabc160e026bf258d06e54b77b698b08" +checksum = "f9a931af53bade449760620b429cad695a72bf07e7864d0e39e11fa442ee0458" dependencies = [ + "async-lock", "async-timer", "async-trait", "azure_core", + "base64", "chrono", "futures", "log", @@ -208,13 +220,14 @@ dependencies = [ "serde_json", "thiserror", "url", + "uuid 1.0.0", ] [[package]] name = "azure_storage" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22c413e8459badf86c9e6e0c84f5894609663bcc8fa5eb1e49bfb985273dac58" +checksum = "7a9f2aee687da9817f7b332e1e01dda51cd9f7a0a68a5abcfec7c4c494a65546" dependencies = [ "RustyXML", "async-trait", @@ -223,24 +236,25 @@ dependencies = [ "bytes", "chrono", "futures", + "hmac 0.12.1", "http", "log", "once_cell", - "ring", "serde", "serde-xml-rs", "serde_derive", "serde_json", + "sha2 0.10.2", "thiserror", "url", - "uuid 0.8.2", + "uuid 1.0.0", ] [[package]] name = "azure_storage_blobs" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a70ec6fab8a2cae5d774098267870c0f3fbef1cb63cac12afab38b8c17cc8d97" +checksum = "d17982127c4a34736a60656ddbd05b1714420686b6e6304145ee3b4501395e75" dependencies = [ "RustyXML", "azure_core", @@ -258,14 +272,14 @@ dependencies = [ "serde_json", "thiserror", "url", - "uuid 0.8.2", + "uuid 1.0.0", ] [[package]] name = "azure_storage_datalake" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2785185a4cde21bf775e71d7fd40e43fb54ab831b8b2758abf0e4b53d6086968" +checksum = "a78936621558980aa1e5d861690999f8cdc89a6203e21d04ae45c36e9e454930" dependencies = [ "async-trait", "azure_core", @@ -276,13 +290,12 @@ dependencies = [ "futures", "http", "log", - "ring", "serde", "serde-xml-rs", "serde_derive", "serde_json", "url", - "uuid 0.8.2", + "uuid 1.0.0", ] [[package]] @@ -763,6 +776,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util 0.7.1", "utime", "uuid 1.0.0", ] @@ -935,6 +949,12 @@ dependencies = [ "str-buf", ] +[[package]] +name = "event-listener" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" + [[package]] name = "fastrand" version = "1.7.0" @@ -1212,6 +1232,15 @@ dependencies = [ "digest 0.9.0", ] +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest 0.10.3", +] + [[package]] name = "http" version = "0.2.6" @@ -1972,6 +2001,26 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.8" @@ -2402,7 +2451,7 @@ dependencies = [ "bytes", "futures", "hex", - "hmac", + "hmac 0.10.1", "http", "hyper", "log", @@ -3197,6 +3246,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "tokio", "tracing", diff --git a/rust/src/storage/azure/mod.rs b/rust/src/storage/azure/mod.rs index 21c1c185a0..c4116b9f61 100644 --- a/rust/src/storage/azure/mod.rs +++ b/rust/src/storage/azure/mod.rs @@ -22,7 +22,8 @@ use tokio::sync::mpsc::{self, Sender}; use tokio_stream::wrappers::ReceiverStream; use tokio_util::task::LocalPoolHandle; -/// Options for configuring Azure storage backend +/// Storage option keys to use when creating [crate::storage::azure::AzureStorageOptions]. +/// The same key should be used whether passing a key in the hashmap or setting it as an environment variable. pub mod azure_storage_options { ///The ADLS Gen2 Access Key pub const AZURE_STORAGE_ACCOUNT_KEY: &str = "AZURE_STORAGE_ACCOUNT_KEY"; @@ -63,7 +64,7 @@ impl AzureStorageOptions { } } - /// Creates an instance of AzureStorageOptions from the given HashMap. + /// Creates an instance of AzureStorageOptions from the given HashMap and environment variables. pub fn from_map(options: HashMap) -> Self { Self { account_key: str_option(&options, azure_storage_options::AZURE_STORAGE_ACCOUNT_KEY), @@ -196,13 +197,23 @@ pub struct AdlsGen2Backend { impl AdlsGen2Backend { /// Create a new [`AdlsGen2Backend`]. /// - /// Shared key authentication is the default and requires the following environment variables + /// This will try to parse configuration options from the environment. /// - /// `AZURE_STORAGE_ACCOUNT_NAME` - /// `AZURE_STORAGE_ACCOUNT_KEY` + /// The variable `AZURE_STORAGE_ACCOUNT_NAME` always has to be set. /// - /// See `new_with_token_credential` for alternative authentication methods. + /// To use shared key authorization, also set: + /// * `AZURE_STORAGE_ACCOUNT_KEY` /// + /// To use a service principal, set: + /// * `AZURE_CLIENT_ID` + /// * `AZURE_CLIENT_SECRET` + /// * `AZURE_TENANT_ID` + /// + /// If both are configured in the environment, shared key authorization will take precedence. + /// + /// See `new_with_token_credential` to pass your own [azure_core::auth::TokenCredential] + /// + /// See `new_from_options` for more fine grained control using [AzureStorageOptions] pub fn new(file_system_name: impl Into + Clone) -> Result { Self::new_from_options(file_system_name, AzureStorageOptions::default()) } @@ -246,7 +257,7 @@ impl AdlsGen2Backend { Self::new_from_options(file_system_name, options.clone()) } - /// Create a new [`AdlsGen2Backend`] using shared key authentication + /// Create a new [`AdlsGen2Backend`] using a service principal pub fn new_with_client( storage_account_name: impl Into, file_system_name: impl Into + Clone, @@ -266,13 +277,17 @@ impl AdlsGen2Backend { /// Create a new [`AdlsGen2Backend`] from AzureStorageOptions /// - /// Currently only shared shared authentication works with this method. - /// For each authentication method, the following keys are required + /// see [azure_storage_options] for the available configuration keys. + /// + /// ```rust,ignore + /// let mut options = AzureStorageOptions::new(); /// - /// ## Shared Key Authentication - /// `AZURE_STORAGE_ACCOUNT_NAME` - /// `AZURE_STORAGE_ACCOUNT_KEY` + /// let options = options + /// .with_account_name("") + /// .with_account_key(""); /// + /// let backend = AdlsGen2Backend::new_from_options("", options.clone()); + /// ``` pub fn new_from_options( file_system_name: impl Into + Clone, options: AzureStorageOptions, From d852f439125c01b0109dc4ccb2418ec155160ade Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 15 May 2022 13:59:47 +0200 Subject: [PATCH 3/4] fix oversight in S3 --- rust/src/storage/s3/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/storage/s3/mod.rs b/rust/src/storage/s3/mod.rs index 12524fc26d..5beaa96211 100644 --- a/rust/src/storage/s3/mod.rs +++ b/rust/src/storage/s3/mod.rs @@ -298,7 +298,7 @@ impl S3StorageOptions { } fn ensure_env_var(map: &HashMap, key: &str) { - if let Some(val) = Self::str_option(map, key) { + if let Some(val) = str_option(map, key) { std::env::set_var(key, val); } } From 61e22f581b942c9a2ae087e1a54de26612fc8d04 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Mon, 16 May 2022 13:22:19 +0200 Subject: [PATCH 4/4] have only shared funcs in to level mod --- rust/src/storage/mod.rs | 16 +--------------- rust/src/storage/s3/mod.rs | 24 +++++++++++++++++------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index b1ccb26939..82dbac5632 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -615,22 +615,8 @@ pub fn get_backend_for_uri_with_options( } } -#[allow(unused)] -pub(crate) fn str_or_default(map: &HashMap, key: &str, default: String) -> String { - map.get(key) - .map(|v| v.to_owned()) - .unwrap_or_else(|| std::env::var(key).unwrap_or(default)) -} - -#[allow(unused)] +#[cfg(any(feature = "s3", feature = "s3-rustls", feature = "azure"))] pub(crate) fn str_option(map: &HashMap, key: &str) -> Option { map.get(key) .map_or_else(|| std::env::var(key).ok(), |v| Some(v.to_owned())) } - -#[allow(unused)] -pub(crate) fn u64_or_default(map: &HashMap, key: &str, default: u64) -> u64 { - str_option(map, key) - .and_then(|v| v.parse().ok()) - .unwrap_or(default) -} diff --git a/rust/src/storage/s3/mod.rs b/rust/src/storage/s3/mod.rs index 5beaa96211..39a7df9d69 100644 --- a/rust/src/storage/s3/mod.rs +++ b/rust/src/storage/s3/mod.rs @@ -17,9 +17,7 @@ use rusoto_s3::{ use rusoto_sts::{StsAssumeRoleSessionCredentialsProvider, StsClient, WebIdentityProvider}; use tokio::io::AsyncReadExt; -use super::{ - parse_uri, str_option, str_or_default, u64_or_default, ObjectMeta, StorageBackend, StorageError, -}; +use super::{parse_uri, str_option, ObjectMeta, StorageBackend, StorageError}; use rusoto_core::credential::{ AwsCredentials, CredentialsError, DefaultCredentialsProvider, ProvideAwsCredentials, }; @@ -239,7 +237,7 @@ impl S3StorageOptions { let endpoint_url = str_option(&options, s3_storage_options::AWS_ENDPOINT_URL); let region = if let Some(endpoint_url) = endpoint_url.as_ref() { Region::Custom { - name: str_or_default( + name: Self::str_or_default( &options, s3_storage_options::AWS_REGION, "custom".to_string(), @@ -259,18 +257,18 @@ impl S3StorageOptions { Self::ensure_env_var(&options, s3_storage_options::AWS_ROLE_ARN); Self::ensure_env_var(&options, s3_storage_options::AWS_ROLE_SESSION_NAME); - let s3_pool_idle_timeout = u64_or_default( + let s3_pool_idle_timeout = Self::u64_or_default( &options, s3_storage_options::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, 15, ); - let sts_pool_idle_timeout = u64_or_default( + let sts_pool_idle_timeout = Self::u64_or_default( &options, s3_storage_options::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, 10, ); - let s3_get_internal_server_error_retries = u64_or_default( + let s3_get_internal_server_error_retries = Self::u64_or_default( &options, s3_storage_options::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, 10, @@ -297,6 +295,18 @@ impl S3StorageOptions { } } + fn str_or_default(map: &HashMap, key: &str, default: String) -> String { + map.get(key) + .map(|v| v.to_owned()) + .unwrap_or_else(|| std::env::var(key).unwrap_or(default)) + } + + fn u64_or_default(map: &HashMap, key: &str, default: u64) -> u64 { + str_option(map, key) + .and_then(|v| v.parse().ok()) + .unwrap_or(default) + } + fn ensure_env_var(map: &HashMap, key: &str) { if let Some(val) = str_option(map, key) { std::env::set_var(key, val);