Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standardise credentials API (#4223) (#4163) #4225

Merged
merged 4 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
// under the License.

use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
use crate::aws::{STORE, STRICT_PATH_ENCODE_SET};
use crate::aws::credential::{AwsCredential, CredentialExt};
use crate::aws::{AwsCredentialProvider, STORE, STRICT_PATH_ENCODE_SET};
use crate::client::list::ListResponse;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
Expand Down Expand Up @@ -135,7 +135,7 @@ pub struct S3Config {
pub endpoint: String,
pub bucket: String,
pub bucket_endpoint: String,
pub credentials: Box<dyn CredentialProvider>,
pub credentials: AwsCredentialProvider,
pub retry_config: RetryConfig,
pub client_options: ClientOptions,
pub sign_payload: bool,
Expand Down
91 changes: 40 additions & 51 deletions object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
use crate::aws::{STORE, STRICT_ENCODE_SET};
use crate::client::retry::RetryExt;
use crate::client::token::{TemporaryToken, TokenCache};
use crate::client::TokenProvider;
use crate::util::hmac_sha256;
use crate::{Result, RetryConfig};
use async_trait::async_trait;
use bytes::Buf;
use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use futures::TryFutureExt;
use percent_encoding::utf8_percent_encode;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{Client, Method, Request, RequestBuilder, StatusCode};
Expand All @@ -41,10 +41,14 @@ static EMPTY_SHA256_HASH: &str =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
static UNSIGNED_PAYLOAD_LITERAL: &str = "UNSIGNED-PAYLOAD";

#[derive(Debug)]
/// A set of AWS security credentials
#[derive(Debug, Eq, PartialEq)]
pub struct AwsCredential {
/// AWS_ACCESS_KEY_ID
pub key_id: String,
/// AWS_SECRET_ACCESS_KEY
pub secret_key: String,
/// AWS_SESSION_TOKEN
pub token: Option<String>,
}

Expand Down Expand Up @@ -291,49 +295,31 @@ fn canonicalize_headers(header_map: &HeaderMap) -> (String, String) {
(signed_headers, canonical_headers)
}

/// Provides credentials for use when signing requests
pub trait CredentialProvider: std::fmt::Debug + Send + Sync {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>>;
}

/// A static set of credentials
#[derive(Debug)]
pub struct StaticCredentialProvider {
pub credential: Arc<AwsCredential>,
}

impl CredentialProvider for StaticCredentialProvider {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
Box::pin(futures::future::ready(Ok(Arc::clone(&self.credential))))
}
}

/// Credentials sourced from the instance metadata service
///
/// <https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html>
#[derive(Debug)]
pub struct InstanceCredentialProvider {
pub cache: TokenCache<Arc<AwsCredential>>,
pub client: Client,
pub retry_config: RetryConfig,
pub imdsv1_fallback: bool,
pub metadata_endpoint: String,
}

impl CredentialProvider for InstanceCredentialProvider {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
Box::pin(self.cache.get_or_insert_with(|| {
instance_creds(
&self.client,
&self.retry_config,
&self.metadata_endpoint,
self.imdsv1_fallback,
)
#[async_trait]
impl TokenProvider for InstanceCredentialProvider {
type Credential = AwsCredential;

async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
) -> Result<TemporaryToken<Arc<AwsCredential>>> {
instance_creds(client, retry, &self.metadata_endpoint, self.imdsv1_fallback)
.await
.map_err(|source| crate::Error::Generic {
store: STORE,
source,
})
}))
}
}

Expand All @@ -342,31 +328,34 @@ impl CredentialProvider for InstanceCredentialProvider {
/// <https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-technical-overview.html>
#[derive(Debug)]
pub struct WebIdentityProvider {
pub cache: TokenCache<Arc<AwsCredential>>,
pub token_path: String,
pub role_arn: String,
pub session_name: String,
pub endpoint: String,
pub client: Client,
pub retry_config: RetryConfig,
}

impl CredentialProvider for WebIdentityProvider {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
Box::pin(self.cache.get_or_insert_with(|| {
web_identity(
&self.client,
&self.retry_config,
&self.token_path,
&self.role_arn,
&self.session_name,
&self.endpoint,
)
.map_err(|source| crate::Error::Generic {
store: STORE,
source,
})
}))
#[async_trait]
impl TokenProvider for WebIdentityProvider {
type Credential = AwsCredential;

async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
) -> Result<TemporaryToken<Arc<AwsCredential>>> {
web_identity(
client,
retry,
&self.token_path,
&self.role_arn,
&self.session_name,
&self.endpoint,
)
.await
.map_err(|source| crate::Error::Generic {
store: STORE,
source,
})
}
}

Expand Down
60 changes: 32 additions & 28 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ use url::Url;
pub use crate::aws::checksum::Checksum;
use crate::aws::client::{S3Client, S3Config};
use crate::aws::credential::{
AwsCredential, CredentialProvider, InstanceCredentialProvider,
StaticCredentialProvider, WebIdentityProvider,
AwsCredential, InstanceCredentialProvider, WebIdentityProvider,
};
use crate::client::header::header_meta;
use crate::client::ClientConfigKey;
use crate::client::{
ClientConfigKey, CredentialProvider, StaticCredentialProvider,
TokenCredentialProvider,
};
use crate::config::ConfigValue;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
use crate::{
Expand Down Expand Up @@ -83,6 +85,8 @@ const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.rem

const STORE: &str = "S3";

type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>;

/// Default metadata endpoint
static METADATA_ENDPOINT: &str = "http://169.254.169.254";

Expand Down Expand Up @@ -1001,13 +1005,12 @@ impl AmazonS3Builder {
let credentials = match (self.access_key_id, self.secret_access_key, self.token) {
(Some(key_id), Some(secret_key), token) => {
info!("Using Static credential provider");
Box::new(StaticCredentialProvider {
credential: Arc::new(AwsCredential {
key_id,
secret_key,
token,
}),
}) as _
let credential = AwsCredential {
key_id,
secret_key,
token,
};
Arc::new(StaticCredentialProvider::new(credential)) as _
}
(None, Some(_), _) => return Err(Error::MissingAccessKeyId.into()),
(Some(_), None, _) => return Err(Error::MissingSecretAccessKey.into()),
Expand All @@ -1031,15 +1034,18 @@ impl AmazonS3Builder {
.with_allow_http(false)
.client()?;

Box::new(WebIdentityProvider {
cache: Default::default(),
let token = WebIdentityProvider {
token_path,
session_name,
role_arn,
endpoint,
};

Arc::new(TokenCredentialProvider::new(
token,
client,
retry_config: self.retry_config.clone(),
}) as _
self.retry_config.clone(),
)) as _
}
_ => match self.profile {
Some(profile) => {
Expand All @@ -1049,19 +1055,20 @@ impl AmazonS3Builder {
None => {
info!("Using Instance credential provider");

// The instance metadata endpoint is access over HTTP
let client_options =
self.client_options.clone().with_allow_http(true);

Box::new(InstanceCredentialProvider {
let token = InstanceCredentialProvider {
cache: Default::default(),
client: client_options.client()?,
retry_config: self.retry_config.clone(),
imdsv1_fallback: self.imdsv1_fallback.get()?,
metadata_endpoint: self
.metadata_endpoint
.unwrap_or_else(|| METADATA_ENDPOINT.into()),
}) as _
};

Arc::new(TokenCredentialProvider::new(
token,
// The instance metadata endpoint is access over HTTP
self.client_options.clone().with_allow_http(true).client()?,
self.retry_config.clone(),
)) as _
}
},
},
Expand Down Expand Up @@ -1114,11 +1121,8 @@ fn profile_region(profile: String) -> Option<String> {
}

#[cfg(feature = "aws_profile")]
fn profile_credentials(
profile: String,
region: String,
) -> Result<Box<dyn CredentialProvider>> {
Ok(Box::new(profile::ProfileProvider::new(
fn profile_credentials(profile: String, region: String) -> Result<AwsCredentialProvider> {
Ok(Arc::new(profile::ProfileProvider::new(
profile,
Some(region),
)))
Expand All @@ -1133,7 +1137,7 @@ fn profile_region(_profile: String) -> Option<String> {
fn profile_credentials(
_profile: String,
_region: String,
) -> Result<Box<dyn CredentialProvider>> {
) -> Result<AwsCredentialProvider> {
Err(Error::MissingProfileFeature.into())
}

Expand Down
71 changes: 38 additions & 33 deletions object_store/src/aws/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@

#![cfg(feature = "aws_profile")]

use async_trait::async_trait;
use aws_config::meta::region::ProvideRegion;
use aws_config::profile::profile_file::ProfileFiles;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::profile::ProfileFileRegionProvider;
use aws_config::provider_config::ProviderConfig;
use aws_credential_types::provider::ProvideCredentials;
use aws_types::region::Region;
use futures::future::BoxFuture;
use std::sync::Arc;
use std::time::Instant;
use std::time::SystemTime;

use crate::aws::credential::CredentialProvider;
use crate::aws::AwsCredential;
use crate::client::token::{TemporaryToken, TokenCache};
use crate::client::CredentialProvider;
use crate::Result;

#[cfg(test)]
Expand Down Expand Up @@ -91,38 +91,43 @@ impl ProfileProvider {
}
}

#[async_trait]
impl CredentialProvider for ProfileProvider {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
Box::pin(self.cache.get_or_insert_with(move || async move {
let region = self.region.clone().map(Region::new);

let config = ProviderConfig::default().with_region(region);

let credentials = ProfileFileCredentialsProvider::builder()
.configure(&config)
.profile_name(&self.name)
.build();

let c = credentials.provide_credentials().await.map_err(|source| {
crate::Error::Generic {
store: "S3",
source: Box::new(source),
}
})?;
let t_now = SystemTime::now();
let expiry = c
.expiry()
.and_then(|e| e.duration_since(t_now).ok())
.map(|ttl| Instant::now() + ttl);

Ok(TemporaryToken {
token: Arc::new(AwsCredential {
key_id: c.access_key_id().to_string(),
secret_key: c.secret_access_key().to_string(),
token: c.session_token().map(ToString::to_string),
}),
expiry,
type Credential = AwsCredential;

async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
self.cache
.get_or_insert_with(move || async move {
let region = self.region.clone().map(Region::new);

let config = ProviderConfig::default().with_region(region);

let credentials = ProfileFileCredentialsProvider::builder()
.configure(&config)
.profile_name(&self.name)
.build();

let c = credentials.provide_credentials().await.map_err(|source| {
crate::Error::Generic {
store: "S3",
source: Box::new(source),
}
})?;
let t_now = SystemTime::now();
let expiry = c
.expiry()
.and_then(|e| e.duration_since(t_now).ok())
.map(|ttl| Instant::now() + ttl);

Ok(TemporaryToken {
token: Arc::new(AwsCredential {
key_id: c.access_key_id().to_string(),
secret_key: c.secret_access_key().to_string(),
token: c.session_token().map(ToString::to_string),
}),
expiry,
})
})
}))
.await
}
}
Loading