Skip to content

Commit

Permalink
Use DefaultCredentialsChain AWS authentication in remote_storage (#8440)
Browse files Browse the repository at this point in the history
PR #8299 has switched the storage scrubber to use
`DefaultCredentialsChain`. Now we do this for `remote_storage`, as it
allows us to use `remote_storage` from inside kubernetes. Most of the
diff is due to `GenericRemoteStorage::from_config` becoming `async fn`.
  • Loading branch information
arpad-m authored Jul 19, 2024
1 parent 3d582b2 commit 4e547e6
Show file tree
Hide file tree
Showing 23 changed files with 220 additions and 157 deletions.
4 changes: 2 additions & 2 deletions libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}

impl GenericRemoteStorage {
pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
let timeout = storage_config.timeout;
Ok(match &storage_config.storage {
RemoteStorageKind::LocalFs { local_path: path } => {
Expand All @@ -458,7 +458,7 @@ impl GenericRemoteStorage {
std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout)?))
Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout).await?))
}
RemoteStorageKind::AzureContainer(azure_config) => {
let storage_account = azure_config
Expand Down
66 changes: 24 additions & 42 deletions libs/remote_storage/src/s3_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,10 @@ use std::{

use anyhow::{anyhow, Context as _};
use aws_config::{
environment::credentials::EnvironmentVariableCredentialsProvider,
imds::credentials::ImdsCredentialsProvider,
meta::credentials::CredentialsProviderChain,
profile::ProfileFileCredentialsProvider,
provider_config::ProviderConfig,
default_provider::credentials::DefaultCredentialsChain,
retry::{RetryConfigBuilder, RetryMode},
web_identity_token::WebIdentityTokenCredentialsProvider,
BehaviorVersion,
};
use aws_credential_types::provider::SharedCredentialsProvider;
use aws_sdk_s3::{
config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
error::SdkError,
Expand Down Expand Up @@ -76,40 +70,27 @@ struct GetObjectRequest {
}
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
pub fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
tracing::debug!(
"Creating s3 remote storage for S3 bucket {}",
remote_storage_config.bucket_name
);

let region = Some(Region::new(remote_storage_config.bucket_region.clone()));

let provider_conf = ProviderConfig::without_region().with_region(region.clone());

let credentials_provider = {
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
CredentialsProviderChain::first_try(
"env",
EnvironmentVariableCredentialsProvider::new(),
)
// uses "AWS_PROFILE" / `aws sso login --profile <profile>`
.or_else(
"profile-sso",
ProfileFileCredentialsProvider::builder()
.configure(&provider_conf)
.build(),
)
// uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
// needed to access remote extensions bucket
.or_else(
"token",
WebIdentityTokenCredentialsProvider::builder()
.configure(&provider_conf)
.build(),
)
// uses imds v2
.or_else("imds", ImdsCredentialsProvider::builder().build())
};
let region = Region::new(remote_storage_config.bucket_region.clone());
let region_opt = Some(region.clone());

// https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
// https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
// Incomplete list of auth methods used by this:
// * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
// * "AWS_PROFILE" / `aws sso login --profile <profile>`
// * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
// * http (ECS/EKS) container credentials
// * imds v2
let credentials_provider = DefaultCredentialsChain::builder()
.region(region)
.build()
.await;

// AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
Expand All @@ -118,9 +99,9 @@ impl S3Bucket {
#[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
BehaviorVersion::v2023_11_09(),
)
.region(region)
.region(region_opt)
.identity_cache(IdentityCache::lazy().build())
.credentials_provider(SharedCredentialsProvider::new(credentials_provider))
.credentials_provider(credentials_provider)
.sleep_impl(SharedAsyncSleep::from(sleep_impl));

let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
Expand Down Expand Up @@ -1041,8 +1022,8 @@ mod tests {

use crate::{RemotePath, S3Bucket, S3Config};

#[test]
fn relative_path() {
#[tokio::test]
async fn relative_path() {
let all_paths = ["", "some/path", "some/path/"];
let all_paths: Vec<RemotePath> = all_paths
.iter()
Expand Down Expand Up @@ -1085,8 +1066,9 @@ mod tests {
max_keys_per_list_response: Some(5),
upload_storage_class: None,
};
let storage =
S3Bucket::new(&config, std::time::Duration::ZERO).expect("remote storage init");
let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
.await
.expect("remote storage init");
for (test_path_idx, test_path) in all_paths.iter().enumerate() {
let result = storage.relative_path_to_s3_object(test_path);
let expected = expected_outputs[prefix_idx][test_path_idx];
Expand Down
7 changes: 5 additions & 2 deletions libs/remote_storage/tests/test_real_azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct EnabledAzure {
impl EnabledAzure {
async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
let client = create_azure_client(max_keys_in_list_response)
.await
.context("Azure client creation")
.expect("Azure client creation failed");

Expand Down Expand Up @@ -187,7 +188,7 @@ impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
}
}

fn create_azure_client(
async fn create_azure_client(
max_keys_per_list_response: Option<i32>,
) -> anyhow::Result<Arc<GenericRemoteStorage>> {
use rand::Rng;
Expand Down Expand Up @@ -221,6 +222,8 @@ fn create_azure_client(
timeout: Duration::from_secs(120),
};
Ok(Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
GenericRemoteStorage::from_config(&remote_storage_config)
.await
.context("remote storage init")?,
))
}
7 changes: 5 additions & 2 deletions libs/remote_storage/tests/test_real_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ struct EnabledS3 {
impl EnabledS3 {
async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
let client = create_s3_client(max_keys_in_list_response)
.await
.context("S3 client creation")
.expect("S3 client creation failed");

Expand Down Expand Up @@ -352,7 +353,7 @@ impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
}
}

fn create_s3_client(
async fn create_s3_client(
max_keys_per_list_response: Option<i32>,
) -> anyhow::Result<Arc<GenericRemoteStorage>> {
use rand::Rng;
Expand Down Expand Up @@ -385,7 +386,9 @@ fn create_s3_client(
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};
Ok(Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
GenericRemoteStorage::from_config(&remote_storage_config)
.await
.context("remote storage init")?,
))
}

Expand Down
2 changes: 1 addition & 1 deletion pageserver/ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn main() -> anyhow::Result<()> {
.get("remote_storage")
.expect("need remote_storage");
let config = RemoteStorageConfig::from_toml(toml_item)?;
let storage = remote_storage::GenericRemoteStorage::from_config(&config);
let storage = remote_storage::GenericRemoteStorage::from_config(&config).await;
let cancel = CancellationToken::new();
storage
.unwrap()
Expand Down
6 changes: 3 additions & 3 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ fn start_pageserver(
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();

// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
let remote_storage = BACKGROUND_RUNTIME.block_on(create_remote_storage_client(conf))?;

// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
Expand Down Expand Up @@ -701,7 +701,7 @@ fn start_pageserver(
}
}

fn create_remote_storage_client(
async fn create_remote_storage_client(
conf: &'static PageServerConf,
) -> anyhow::Result<GenericRemoteStorage> {
let config = if let Some(config) = &conf.remote_storage_config {
Expand All @@ -711,7 +711,7 @@ fn create_remote_storage_client(
};

// Create the client
let mut remote_storage = GenericRemoteStorage::from_config(config)?;
let mut remote_storage = GenericRemoteStorage::from_config(config).await?;

// If `test_remote_failures` is non-zero, wrap the client with a
// wrapper that simulates failures.
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/consumption_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub async fn collect_metrics(
.expect("Failed to create http client with timeout");

let bucket_client = if let Some(bucket_config) = metric_collection_bucket {
match GenericRemoteStorage::from_config(bucket_config) {
match GenericRemoteStorage::from_config(bucket_config).await {
Ok(client) => Some(client),
Err(e) => {
// Non-fatal error: if we were given an invalid config, we will proceed
Expand Down
20 changes: 14 additions & 6 deletions pageserver/src/deletion_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,9 +828,9 @@ mod test {
}
}

fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
let harness = TenantHarness::create(test_name)?;
let harness = TenantHarness::create(test_name).await?;

// We do not load() the harness: we only need its config and remote_storage

Expand All @@ -844,7 +844,9 @@ mod test {
},
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
let storage = GenericRemoteStorage::from_config(&storage_config)
.await
.unwrap();

let mock_control_plane = MockControlPlane::new();

Expand Down Expand Up @@ -922,7 +924,9 @@ mod test {
#[tokio::test]
async fn deletion_queue_smoke() -> anyhow::Result<()> {
// Basic test that the deletion queue processes the deletions we pass into it
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
let ctx = setup("deletion_queue_smoke")
.await
.expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;

Expand Down Expand Up @@ -992,7 +996,9 @@ mod test {

#[tokio::test]
async fn deletion_queue_validation() -> anyhow::Result<()> {
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
let ctx = setup("deletion_queue_validation")
.await
.expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;

Expand Down Expand Up @@ -1051,7 +1057,9 @@ mod test {
#[tokio::test]
async fn deletion_queue_recovery() -> anyhow::Result<()> {
// Basic test that the deletion queue processes the deletions we pass into it
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
let mut ctx = setup("deletion_queue_recovery")
.await
.expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;

Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2031,7 +2031,7 @@ mod tests {
#[tokio::test]
async fn aux_files_round_trip() -> anyhow::Result<()> {
let name = "aux_files_round_trip";
let harness = TenantHarness::create(name)?;
let harness = TenantHarness::create(name).await?;

pub const TIMELINE_ID: TimelineId =
TimelineId::from_array(hex!("11223344556677881122334455667788"));
Expand Down
Loading

1 comment on commit 4e547e6

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No tests were run or test report is not available

Test coverage report is not available

The comment gets automatically updated with the latest test results
4e547e6 at 2024-07-19T19:42:27.922Z :recycle:

Please sign in to comment.