Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental AWS_PROFILE support (#2178) #2891

Merged
merged 3 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ jobs:
run: cargo clippy -p object_store -- -D warnings
- name: Run clippy with aws feature
run: cargo clippy -p object_store --features aws -- -D warnings
- name: Run clippy with aws_profile feature
run: cargo clippy -p object_store --features aws_profile -- -D warnings
- name: Run clippy with gcp feature
run: cargo clippy -p object_store --features gcp -- -D warnings
- name: Run clippy with azure feature
Expand Down
7 changes: 7 additions & 0 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,19 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"
ring = { version = "0.16", default-features = false, features = ["std"], optional = true }
rustls-pemfile = { version = "1.0", default-features = false, optional = true }

# AWS Profile support
aws-types = { version = "0.49", optional = true }
aws-config = { version = "0.49", optional = true }

[features]
cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud"]
gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud"]

# Experimental support for AWS_PROFILE
aws_profile = ["aws", "aws-config", "aws-types"]

[dev-dependencies] # In alphabetical order
dotenv = "0.15.0"
tempfile = "3.1.0"
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ pub struct S3Config {
pub endpoint: String,
pub bucket: String,
pub bucket_endpoint: String,
pub credentials: CredentialProvider,
pub credentials: Box<dyn CredentialProvider>,
pub retry_config: RetryConfig,
pub allow_http: bool,
}
Expand Down
152 changes: 105 additions & 47 deletions object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::util::hmac_sha256;
use crate::{Result, RetryConfig};
use bytes::Buf;
use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use futures::TryFutureExt;
use percent_encoding::utf8_percent_encode;
use reqwest::header::{HeaderMap, HeaderValue};
Expand Down Expand Up @@ -289,21 +290,8 @@ fn canonicalize_headers(header_map: &HeaderMap) -> (String, String) {
}

/// Provides credentials for use when signing requests
#[derive(Debug)]
pub enum CredentialProvider {
Static(StaticCredentialProvider),
Instance(InstanceCredentialProvider),
WebIdentity(WebIdentityProvider),
}

impl CredentialProvider {
pub async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
match self {
Self::Static(s) => Ok(Arc::clone(&s.credential)),
Self::Instance(c) => c.get_credential().await,
Self::WebIdentity(c) => c.get_credential().await,
}
}
pub trait CredentialProvider: std::fmt::Debug + Send + Sync {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>>;
}

/// A static set of credentials
Expand All @@ -312,6 +300,12 @@ pub struct StaticCredentialProvider {
pub credential: Arc<AwsCredential>,
}

impl CredentialProvider for StaticCredentialProvider {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
Box::pin(futures::future::ready(Ok(Arc::clone(&self.credential))))
}
}

/// Credentials sourced from the instance metadata service
///
/// <https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html>
Expand All @@ -324,22 +318,20 @@ pub struct InstanceCredentialProvider {
pub metadata_endpoint: String,
}

impl InstanceCredentialProvider {
async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
self.cache
.get_or_insert_with(|| {
instance_creds(
&self.client,
&self.retry_config,
&self.metadata_endpoint,
self.imdsv1_fallback,
)
.map_err(|source| crate::Error::Generic {
store: "S3",
source,
})
impl CredentialProvider for InstanceCredentialProvider {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
Box::pin(self.cache.get_or_insert_with(|| {
instance_creds(
&self.client,
&self.retry_config,
&self.metadata_endpoint,
self.imdsv1_fallback,
)
.map_err(|source| crate::Error::Generic {
store: "S3",
source,
})
.await
}))
}
}

Expand All @@ -357,24 +349,22 @@ pub struct WebIdentityProvider {
pub retry_config: RetryConfig,
}

impl WebIdentityProvider {
async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
self.cache
.get_or_insert_with(|| {
web_identity(
&self.client,
&self.retry_config,
&self.token,
&self.role_arn,
&self.session_name,
&self.endpoint,
)
.map_err(|source| crate::Error::Generic {
store: "S3",
source,
})
impl CredentialProvider for WebIdentityProvider {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
Box::pin(self.cache.get_or_insert_with(|| {
web_identity(
&self.client,
&self.retry_config,
&self.token,
&self.role_arn,
&self.session_name,
&self.endpoint,
)
.map_err(|source| crate::Error::Generic {
store: "S3",
source,
})
.await
}))
}
}

Expand Down Expand Up @@ -520,6 +510,74 @@ async fn web_identity(
})
}

#[cfg(feature = "aws_profile")]
mod profile {
use super::*;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::provider_config::ProviderConfig;
use aws_types::credentials::ProvideCredentials;
use aws_types::region::Region;
use std::time::SystemTime;

#[derive(Debug)]
pub struct ProfileProvider {
cache: TokenCache<Arc<AwsCredential>>,
credentials: ProfileFileCredentialsProvider,
}

impl ProfileProvider {
pub fn new(name: String, region: String) -> Self {
let config = ProviderConfig::default().with_region(Some(Region::new(region)));

Self {
cache: Default::default(),
credentials: ProfileFileCredentialsProvider::builder()
.configure(&config)
.profile_name(name)
.build(),
}
}
}

impl CredentialProvider for ProfileProvider {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
Box::pin(self.cache.get_or_insert_with(move || async move {
let c =
self.credentials
.provide_credentials()
.await
.map_err(|source| crate::Error::Generic {
store: "S3",
source: Box::new(source),
})?;

let t_now = SystemTime::now();
let expiry = match c.expiry().and_then(|e| e.duration_since(t_now).ok()) {
Some(ttl) => Instant::now() + ttl,
None => {
return Err(crate::Error::Generic {
store: "S3",
source: "Invalid expiry".into(),
})
}
};

Ok(TemporaryToken {
token: Arc::new(AwsCredential {
key_id: c.access_key_id().to_string(),
secret_key: c.secret_access_key().to_string(),
token: c.session_token().map(ToString::to_string),
}),
expiry,
})
}))
}
}
}

#[cfg(feature = "aws_profile")]
pub use profile::ProfileProvider;

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading