diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index d440c03a0e65..3381c4296f05 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -443,7 +443,7 @@ impl GenericRemoteStorage> { } impl GenericRemoteStorage { - pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result { + pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result { let timeout = storage_config.timeout; Ok(match &storage_config.storage { RemoteStorageKind::LocalFs { local_path: path } => { @@ -458,7 +458,7 @@ impl GenericRemoteStorage { std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "".into()); info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}", s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint); - Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout)?)) + Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout).await?)) } RemoteStorageKind::AzureContainer(azure_config) => { let storage_account = azure_config diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index ef1bd2c04730..b65d8b7e9e7a 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -16,16 +16,10 @@ use std::{ use anyhow::{anyhow, Context as _}; use aws_config::{ - environment::credentials::EnvironmentVariableCredentialsProvider, - imds::credentials::ImdsCredentialsProvider, - meta::credentials::CredentialsProviderChain, - profile::ProfileFileCredentialsProvider, - provider_config::ProviderConfig, + default_provider::credentials::DefaultCredentialsChain, retry::{RetryConfigBuilder, RetryMode}, - web_identity_token::WebIdentityTokenCredentialsProvider, BehaviorVersion, }; -use aws_credential_types::provider::SharedCredentialsProvider; use aws_sdk_s3::{ config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep}, error::SdkError, @@ -76,40 +70,27 @@ struct GetObjectRequest { } impl S3Bucket { /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided. - pub fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result { + pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result { tracing::debug!( "Creating s3 remote storage for S3 bucket {}", remote_storage_config.bucket_name ); - let region = Some(Region::new(remote_storage_config.bucket_region.clone())); - - let provider_conf = ProviderConfig::without_region().with_region(region.clone()); - - let credentials_provider = { - // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY" - CredentialsProviderChain::first_try( - "env", - EnvironmentVariableCredentialsProvider::new(), - ) - // uses "AWS_PROFILE" / `aws sso login --profile ` - .or_else( - "profile-sso", - ProfileFileCredentialsProvider::builder() - .configure(&provider_conf) - .build(), - ) - // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME" - // needed to access remote extensions bucket - .or_else( - "token", - WebIdentityTokenCredentialsProvider::builder() - .configure(&provider_conf) - .build(), - ) - // uses imds v2 - .or_else("imds", ImdsCredentialsProvider::builder().build()) - }; + let region = Region::new(remote_storage_config.bucket_region.clone()); + let region_opt = Some(region.clone()); + + // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html + // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html + // Incomplete list of auth methods used by this: + // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY" + // * "AWS_PROFILE" / `aws sso login --profile ` + // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME" + // * http (ECS/EKS) container credentials + // * imds v2 + let credentials_provider = DefaultCredentialsChain::builder() + .region(region) + .build() + .await; // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off let sleep_impl: Arc = Arc::new(TokioSleep::new()); @@ -118,9 +99,9 @@ impl S3Bucket { #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */ BehaviorVersion::v2023_11_09(), ) - .region(region) + .region(region_opt) .identity_cache(IdentityCache::lazy().build()) - .credentials_provider(SharedCredentialsProvider::new(credentials_provider)) + .credentials_provider(credentials_provider) .sleep_impl(SharedAsyncSleep::from(sleep_impl)); let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| { @@ -1041,8 +1022,8 @@ mod tests { use crate::{RemotePath, S3Bucket, S3Config}; - #[test] - fn relative_path() { + #[tokio::test] + async fn relative_path() { let all_paths = ["", "some/path", "some/path/"]; let all_paths: Vec = all_paths .iter() @@ -1085,8 +1066,9 @@ mod tests { max_keys_per_list_response: Some(5), upload_storage_class: None, }; - let storage = - S3Bucket::new(&config, std::time::Duration::ZERO).expect("remote storage init"); + let storage = S3Bucket::new(&config, std::time::Duration::ZERO) + .await + .expect("remote storage init"); for (test_path_idx, test_path) in all_paths.iter().enumerate() { let result = storage.relative_path_to_s3_object(test_path); let expected = expected_outputs[prefix_idx][test_path_idx]; diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs index 23628dfebecc..3a20649490ba 100644 --- a/libs/remote_storage/tests/test_real_azure.rs +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -31,6 +31,7 @@ struct EnabledAzure { impl EnabledAzure { async fn setup(max_keys_in_list_response: Option) -> Self { let client = create_azure_client(max_keys_in_list_response) + .await .context("Azure client creation") .expect("Azure client creation failed"); @@ -187,7 +188,7 @@ impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs { } } -fn create_azure_client( +async fn create_azure_client( max_keys_per_list_response: Option, ) -> anyhow::Result> { use rand::Rng; @@ -221,6 +222,8 @@ fn create_azure_client( timeout: Duration::from_secs(120), }; Ok(Arc::new( - GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?, + GenericRemoteStorage::from_config(&remote_storage_config) + .await + .context("remote storage init")?, )) } diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index a273abe867e1..342bc6da0bac 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -197,6 +197,7 @@ struct EnabledS3 { impl EnabledS3 { async fn setup(max_keys_in_list_response: Option) -> Self { let client = create_s3_client(max_keys_in_list_response) + .await .context("S3 client creation") .expect("S3 client creation failed"); @@ -352,7 +353,7 @@ impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs { } } -fn create_s3_client( +async fn create_s3_client( max_keys_per_list_response: Option, ) -> anyhow::Result> { use rand::Rng; @@ -385,7 +386,9 @@ fn create_s3_client( timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }; Ok(Arc::new( - GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?, + GenericRemoteStorage::from_config(&remote_storage_config) + .await + .context("remote storage init")?, )) } diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index ea09a011e5cf..3fabf629875e 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -179,7 +179,7 @@ async fn main() -> anyhow::Result<()> { .get("remote_storage") .expect("need remote_storage"); let config = RemoteStorageConfig::from_toml(toml_item)?; - let storage = remote_storage::GenericRemoteStorage::from_config(&config); + let storage = remote_storage::GenericRemoteStorage::from_config(&config).await; let cancel = CancellationToken::new(); storage .unwrap() diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index fceddfb7575c..ec1ceb54ce0f 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -385,7 +385,7 @@ fn start_pageserver( let shutdown_pageserver = tokio_util::sync::CancellationToken::new(); // Set up remote storage client - let remote_storage = create_remote_storage_client(conf)?; + let remote_storage = BACKGROUND_RUNTIME.block_on(create_remote_storage_client(conf))?; // Set up deletion queue let (deletion_queue, deletion_workers) = DeletionQueue::new( @@ -701,7 +701,7 @@ fn start_pageserver( } } -fn create_remote_storage_client( +async fn create_remote_storage_client( conf: &'static PageServerConf, ) -> anyhow::Result { let config = if let Some(config) = &conf.remote_storage_config { @@ -711,7 +711,7 @@ fn create_remote_storage_client( }; // Create the client - let mut remote_storage = GenericRemoteStorage::from_config(config)?; + let mut remote_storage = GenericRemoteStorage::from_config(config).await?; // If `test_remote_failures` is non-zero, wrap the client with a // wrapper that simulates failures. diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 6861adad2c24..9104da60729c 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -96,7 +96,7 @@ pub async fn collect_metrics( .expect("Failed to create http client with timeout"); let bucket_client = if let Some(bucket_config) = metric_collection_bucket { - match GenericRemoteStorage::from_config(bucket_config) { + match GenericRemoteStorage::from_config(bucket_config).await { Ok(client) => Some(client), Err(e) => { // Non-fatal error: if we were given an invalid config, we will proceed diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 3e48552ace44..22f7d5b8242d 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -828,9 +828,9 @@ mod test { } } - fn setup(test_name: &str) -> anyhow::Result { + async fn setup(test_name: &str) -> anyhow::Result { let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}"))); - let harness = TenantHarness::create(test_name)?; + let harness = TenantHarness::create(test_name).await?; // We do not load() the harness: we only need its config and remote_storage @@ -844,7 +844,9 @@ mod test { }, timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }; - let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); + let storage = GenericRemoteStorage::from_config(&storage_config) + .await + .unwrap(); let mock_control_plane = MockControlPlane::new(); @@ -922,7 +924,9 @@ mod test { #[tokio::test] async fn deletion_queue_smoke() -> anyhow::Result<()> { // Basic test that the deletion queue processes the deletions we pass into it - let ctx = setup("deletion_queue_smoke").expect("Failed test setup"); + let ctx = setup("deletion_queue_smoke") + .await + .expect("Failed test setup"); let client = ctx.deletion_queue.new_client(); client.recover(HashMap::new())?; @@ -992,7 +996,9 @@ mod test { #[tokio::test] async fn deletion_queue_validation() -> anyhow::Result<()> { - let ctx = setup("deletion_queue_validation").expect("Failed test setup"); + let ctx = setup("deletion_queue_validation") + .await + .expect("Failed test setup"); let client = ctx.deletion_queue.new_client(); client.recover(HashMap::new())?; @@ -1051,7 +1057,9 @@ mod test { #[tokio::test] async fn deletion_queue_recovery() -> anyhow::Result<()> { // Basic test that the deletion queue processes the deletions we pass into it - let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup"); + let mut ctx = setup("deletion_queue_recovery") + .await + .expect("Failed test setup"); let client = ctx.deletion_queue.new_client(); client.recover(HashMap::new())?; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index a821b824d0c3..3bbd084ab498 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -2031,7 +2031,7 @@ mod tests { #[tokio::test] async fn aux_files_round_trip() -> anyhow::Result<()> { let name = "aux_files_round_trip"; - let harness = TenantHarness::create(name)?; + let harness = TenantHarness::create(name).await?; pub const TIMELINE_ID: TimelineId = TimelineId::from_array(hex!("11223344556677881122334455667788")); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 637051413f16..7384049523e2 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3788,7 +3788,7 @@ pub(crate) mod harness { } impl TenantHarness { - pub fn create_custom( + pub async fn create_custom( test_name: &'static str, tenant_conf: TenantConf, tenant_id: TenantId, @@ -3824,7 +3824,7 @@ pub(crate) mod harness { }, timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }; - let remote_storage = GenericRemoteStorage::from_config(&config).unwrap(); + let remote_storage = GenericRemoteStorage::from_config(&config).await.unwrap(); let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone())); Ok(Self { @@ -3839,7 +3839,7 @@ pub(crate) mod harness { }) } - pub fn create(test_name: &'static str) -> anyhow::Result { + pub async fn create(test_name: &'static str) -> anyhow::Result { // Disable automatic GC and compaction to make the unit tests more deterministic. // The tests perform them manually if needed. let tenant_conf = TenantConf { @@ -3856,6 +3856,7 @@ pub(crate) mod harness { shard, Generation::new(0xdeadbeef), ) + .await } pub fn span(&self) -> tracing::Span { @@ -3992,7 +3993,7 @@ mod tests { #[tokio::test] async fn test_basic() -> anyhow::Result<()> { - let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await; + let (tenant, ctx) = TenantHarness::create("test_basic").await?.load().await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx) .await?; @@ -4039,7 +4040,8 @@ mod tests { #[tokio::test] async fn no_duplicate_timelines() -> anyhow::Result<()> { - let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")? + let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines") + .await? .load() .await; let _ = tenant @@ -4071,7 +4073,7 @@ mod tests { async fn test_branch() -> anyhow::Result<()> { use std::str::from_utf8; - let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await; + let (tenant, ctx) = TenantHarness::create("test_branch").await?.load().await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; @@ -4193,7 +4195,8 @@ mod tests { #[tokio::test] async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> { let (tenant, ctx) = - TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")? + TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data") + .await? .load() .await; let tline = tenant @@ -4240,7 +4243,8 @@ mod tests { #[tokio::test] async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> { let (tenant, ctx) = - TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")? + TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn") + .await? .load() .await; @@ -4295,7 +4299,8 @@ mod tests { #[tokio::test] async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> { let (tenant, ctx) = - TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")? + TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline") + .await? .load() .await; let tline = tenant @@ -4352,7 +4357,8 @@ mod tests { #[tokio::test] async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> { let (tenant, ctx) = - TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")? + TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child") + .await? .load() .await; let tline = tenant @@ -4382,10 +4388,10 @@ mod tests { } #[tokio::test] async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> { - let (tenant, ctx) = - TenantHarness::create("test_parent_keeps_data_forever_after_branching")? - .load() - .await; + let (tenant, ctx) = TenantHarness::create("test_parent_keeps_data_forever_after_branching") + .await? + .load() + .await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; @@ -4423,7 +4429,7 @@ mod tests { #[tokio::test] async fn timeline_load() -> anyhow::Result<()> { const TEST_NAME: &str = "timeline_load"; - let harness = TenantHarness::create(TEST_NAME)?; + let harness = TenantHarness::create(TEST_NAME).await?; { let (tenant, ctx) = harness.load().await; let tline = tenant @@ -4450,7 +4456,7 @@ mod tests { #[tokio::test] async fn timeline_load_with_ancestor() -> anyhow::Result<()> { const TEST_NAME: &str = "timeline_load_with_ancestor"; - let harness = TenantHarness::create(TEST_NAME)?; + let harness = TenantHarness::create(TEST_NAME).await?; // create two timelines { let (tenant, ctx) = harness.load().await; @@ -4498,7 +4504,10 @@ mod tests { #[tokio::test] async fn delta_layer_dumping() -> anyhow::Result<()> { use storage_layer::AsLayerDesc; - let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await; + let (tenant, ctx) = TenantHarness::create("test_layer_dumping") + .await? + .load() + .await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; @@ -4525,7 +4534,7 @@ mod tests { #[tokio::test] async fn test_images() -> anyhow::Result<()> { - let (tenant, ctx) = TenantHarness::create("test_images")?.load().await; + let (tenant, ctx) = TenantHarness::create("test_images").await?.load().await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx) .await?; @@ -4696,7 +4705,7 @@ mod tests { // #[tokio::test] async fn test_bulk_insert() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_bulk_insert")?; + let harness = TenantHarness::create("test_bulk_insert").await?; let (tenant, ctx) = harness.load().await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx) @@ -4727,7 +4736,7 @@ mod tests { // so the search can stop at the first delta layer and doesn't traverse any deeper. #[tokio::test] async fn test_get_vectored() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_get_vectored")?; + let harness = TenantHarness::create("test_get_vectored").await?; let (tenant, ctx) = harness.load().await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx) @@ -4805,7 +4814,7 @@ mod tests { #[tokio::test] async fn test_get_vectored_aux_files() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_get_vectored_aux_files")?; + let harness = TenantHarness::create("test_get_vectored_aux_files").await?; let (tenant, ctx) = harness.load().await; let tline = tenant @@ -4891,7 +4900,8 @@ mod tests { TenantId::generate(), ShardIdentity::unsharded(), Generation::new(0xdeadbeef), - )?; + ) + .await?; let (tenant, ctx) = harness.load().await; let mut current_key = Key::from_hex("010000000033333333444444445500000000").unwrap(); @@ -5034,7 +5044,7 @@ mod tests { // ``` #[tokio::test] async fn test_get_vectored_ancestor_descent() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_get_vectored_on_lsn_axis")?; + let harness = TenantHarness::create("test_get_vectored_on_lsn_axis").await?; let (tenant, ctx) = harness.load().await; let start_key = Key::from_hex("010000000033333333444444445500000000").unwrap(); @@ -5183,7 +5193,7 @@ mod tests { name: &'static str, compaction_algorithm: CompactionAlgorithm, ) -> anyhow::Result<()> { - let mut harness = TenantHarness::create(name)?; + let mut harness = TenantHarness::create(name).await?; harness.tenant_conf.compaction_algorithm = CompactionAlgorithmSettings { kind: compaction_algorithm, }; @@ -5267,7 +5277,8 @@ mod tests { #[tokio::test] async fn test_traverse_branches() -> anyhow::Result<()> { - let (tenant, ctx) = TenantHarness::create("test_traverse_branches")? + let (tenant, ctx) = TenantHarness::create("test_traverse_branches") + .await? .load() .await; let mut tline = tenant @@ -5357,7 +5368,8 @@ mod tests { #[tokio::test] async fn test_traverse_ancestors() -> anyhow::Result<()> { - let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")? + let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors") + .await? .load() .await; let mut tline = tenant @@ -5423,7 +5435,8 @@ mod tests { #[tokio::test] async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> { - let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")? + let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable") + .await? .load() .await; @@ -5492,7 +5505,7 @@ mod tests { #[tokio::test] async fn test_create_guard_crash() -> anyhow::Result<()> { let name = "test_create_guard_crash"; - let harness = TenantHarness::create(name)?; + let harness = TenantHarness::create(name).await?; { let (tenant, ctx) = harness.load().await; let tline = tenant @@ -5545,7 +5558,7 @@ mod tests { name: &'static str, compaction_algorithm: CompactionAlgorithm, ) -> anyhow::Result<()> { - let mut harness = TenantHarness::create(name)?; + let mut harness = TenantHarness::create(name).await?; harness.tenant_conf.compaction_algorithm = CompactionAlgorithmSettings { kind: compaction_algorithm, }; @@ -5569,7 +5582,7 @@ mod tests { #[tokio::test] async fn test_metadata_scan() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_metadata_scan")?; + let harness = TenantHarness::create("test_metadata_scan").await?; let (tenant, ctx) = harness.load().await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) @@ -5688,7 +5701,7 @@ mod tests { #[tokio::test] async fn test_metadata_compaction_trigger() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_metadata_compaction_trigger")?; + let harness = TenantHarness::create("test_metadata_compaction_trigger").await?; let (tenant, ctx) = harness.load().await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) @@ -5747,7 +5760,9 @@ mod tests { #[tokio::test] async fn test_branch_copies_dirty_aux_file_flag() { - let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag").unwrap(); + let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag") + .await + .unwrap(); // the default aux file policy to switch is v1 if not set by the admins assert_eq!( @@ -5849,7 +5864,9 @@ mod tests { #[tokio::test] async fn aux_file_policy_switch() { - let mut harness = TenantHarness::create("aux_file_policy_switch").unwrap(); + let mut harness = TenantHarness::create("aux_file_policy_switch") + .await + .unwrap(); harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::CrossValidation; // set to cross-validation mode let (tenant, ctx) = harness.load().await; @@ -6023,7 +6040,9 @@ mod tests { #[tokio::test] async fn aux_file_policy_force_switch() { - let mut harness = TenantHarness::create("aux_file_policy_force_switch").unwrap(); + let mut harness = TenantHarness::create("aux_file_policy_force_switch") + .await + .unwrap(); harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1; let (tenant, ctx) = harness.load().await; @@ -6084,7 +6103,9 @@ mod tests { #[tokio::test] async fn aux_file_policy_auto_detect() { - let mut harness = TenantHarness::create("aux_file_policy_auto_detect").unwrap(); + let mut harness = TenantHarness::create("aux_file_policy_auto_detect") + .await + .unwrap(); harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V2; // set to cross-validation mode let (tenant, ctx) = harness.load().await; @@ -6147,7 +6168,7 @@ mod tests { #[tokio::test] async fn test_metadata_image_creation() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_metadata_image_creation")?; + let harness = TenantHarness::create("test_metadata_image_creation").await?; let (tenant, ctx) = harness.load().await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) @@ -6246,7 +6267,7 @@ mod tests { #[tokio::test] async fn test_vectored_missing_data_key_reads() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_vectored_missing_data_key_reads")?; + let harness = TenantHarness::create("test_vectored_missing_data_key_reads").await?; let (tenant, ctx) = harness.load().await; let base_key = Key::from_hex("000000000033333333444444445500000000").unwrap(); @@ -6318,7 +6339,7 @@ mod tests { #[tokio::test] async fn test_vectored_missing_metadata_key_reads() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads")?; + let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads").await?; let (tenant, ctx) = harness.load().await; let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap(); @@ -6410,7 +6431,7 @@ mod tests { #[tokio::test] async fn test_metadata_tombstone_reads() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_metadata_tombstone_reads")?; + let harness = TenantHarness::create("test_metadata_tombstone_reads").await?; let (tenant, ctx) = harness.load().await; let key0 = Key::from_hex("620000000033333333444444445500000000").unwrap(); let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap(); @@ -6490,7 +6511,9 @@ mod tests { #[tokio::test] async fn test_metadata_tombstone_image_creation() { - let harness = TenantHarness::create("test_metadata_tombstone_image_creation").unwrap(); + let harness = TenantHarness::create("test_metadata_tombstone_image_creation") + .await + .unwrap(); let (tenant, ctx) = harness.load().await; let key0 = Key::from_hex("620000000033333333444444445500000000").unwrap(); @@ -6562,8 +6585,9 @@ mod tests { #[tokio::test] async fn test_metadata_tombstone_empty_image_creation() { - let harness = - TenantHarness::create("test_metadata_tombstone_empty_image_creation").unwrap(); + let harness = TenantHarness::create("test_metadata_tombstone_empty_image_creation") + .await + .unwrap(); let (tenant, ctx) = harness.load().await; let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap(); @@ -6626,7 +6650,7 @@ mod tests { #[tokio::test] async fn test_simple_bottom_most_compaction_images() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_simple_bottom_most_compaction_images")?; + let harness = TenantHarness::create("test_simple_bottom_most_compaction_images").await?; let (tenant, ctx) = harness.load().await; fn get_key(id: u32) -> Key { @@ -6834,7 +6858,7 @@ mod tests { #[tokio::test] async fn test_neon_test_record() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_neon_test_record")?; + let harness = TenantHarness::create("test_neon_test_record").await?; let (tenant, ctx) = harness.load().await; fn get_key(id: u32) -> Key { @@ -6915,7 +6939,7 @@ mod tests { #[tokio::test] async fn test_lsn_lease() -> anyhow::Result<()> { - let (tenant, ctx) = TenantHarness::create("test_lsn_lease")?.load().await; + let (tenant, ctx) = TenantHarness::create("test_lsn_lease").await?.load().await; let key = Key::from_hex("010000000033333333444444445500000000").unwrap(); let end_lsn = Lsn(0x100); @@ -7004,7 +7028,7 @@ mod tests { #[tokio::test] async fn test_simple_bottom_most_compaction_deltas() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_simple_bottom_most_compaction_deltas")?; + let harness = TenantHarness::create("test_simple_bottom_most_compaction_deltas").await?; let (tenant, ctx) = harness.load().await; fn get_key(id: u32) -> Key { diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index b0159e22bfc0..49126086772b 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -2698,7 +2698,9 @@ mod tests { // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully // wait for it to complete before proceeding. - let h = TenantHarness::create("shutdown_awaits_in_progress_tenant").unwrap(); + let h = TenantHarness::create("shutdown_awaits_in_progress_tenant") + .await + .unwrap(); let (t, _ctx) = h.load().await; // harness loads it to active, which is forced and nothing is running on the tenant diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 66b759c8e0d8..bb42fbeebf78 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -2128,7 +2128,7 @@ mod tests { impl TestSetup { async fn new(test_name: &str) -> anyhow::Result { let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}"))); - let harness = TenantHarness::create(test_name)?; + let harness = TenantHarness::create(test_name).await?; let (tenant, ctx) = harness.load().await; let timeline = tenant diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index c34923320aee..512e9e86fac1 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1934,7 +1934,7 @@ pub(crate) mod test { #[tokio::test] async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?; + let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?; let (tenant, ctx) = harness.load().await; let timeline_id = TimelineId::generate(); @@ -2034,7 +2034,9 @@ pub(crate) mod test { use crate::walrecord::NeonWalRecord; use bytes::Bytes; - let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap(); + let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke") + .await + .unwrap(); let (tenant, ctx) = h.load().await; let ctx = &ctx; let timeline = tenant @@ -2312,7 +2314,7 @@ pub(crate) mod test { #[tokio::test] async fn delta_layer_iterator() { - let harness = TenantHarness::create("delta_layer_iterator").unwrap(); + let harness = TenantHarness::create("delta_layer_iterator").await.unwrap(); let (tenant, ctx) = harness.load().await; let tline = tenant diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 45b47bb62b0c..19e4e9e2e9ca 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -1111,6 +1111,7 @@ mod test { ShardIdentity::unsharded(), get_next_gen(), ) + .await .unwrap(); let (tenant, ctx) = harness.load().await; let timeline = tenant @@ -1177,6 +1178,7 @@ mod test { // But here, all we care about is that the gen number is unique. get_next_gen(), ) + .await .unwrap(); let (tenant, ctx) = harness.load().await; let timeline = tenant @@ -1308,7 +1310,7 @@ mod test { #[tokio::test] async fn image_layer_iterator() { - let harness = TenantHarness::create("image_layer_iterator").unwrap(); + let harness = TenantHarness::create("image_layer_iterator").await.unwrap(); let (tenant, ctx) = harness.load().await; let tline = tenant diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 3a7aca7a6cc4..8a3737f8a760 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -22,7 +22,7 @@ const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_s async fn smoke_test() { let handle = tokio::runtime::Handle::current(); - let h = TenantHarness::create("smoke_test").unwrap(); + let h = TenantHarness::create("smoke_test").await.unwrap(); let span = h.span(); let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1)); let (tenant, _) = h.load().await; @@ -176,7 +176,9 @@ async fn evict_and_wait_on_wanted_deleted() { // this is the runtime on which Layer spawns the blocking tasks on let handle = tokio::runtime::Handle::current(); - let h = TenantHarness::create("evict_and_wait_on_wanted_deleted").unwrap(); + let h = TenantHarness::create("evict_and_wait_on_wanted_deleted") + .await + .unwrap(); utils::logging::replace_panic_hook_with_tracing_panic_hook().forget(); let (tenant, ctx) = h.load().await; @@ -258,7 +260,9 @@ fn read_wins_pending_eviction() { rt.block_on(async move { // this is the runtime on which Layer spawns the blocking tasks on let handle = tokio::runtime::Handle::current(); - let h = TenantHarness::create("read_wins_pending_eviction").unwrap(); + let h = TenantHarness::create("read_wins_pending_eviction") + .await + .unwrap(); let (tenant, ctx) = h.load().await; let span = h.span(); let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1)); @@ -390,7 +394,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) { rt.block_on(async move { // this is the runtime on which Layer spawns the blocking tasks on let handle = tokio::runtime::Handle::current(); - let h = TenantHarness::create(name).unwrap(); + let h = TenantHarness::create(name).await.unwrap(); let (tenant, ctx) = h.load().await; let span = h.span(); let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1)); @@ -559,8 +563,9 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) { #[tokio::test(start_paused = true)] async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() { let handle = tokio::runtime::Handle::current(); - let h = - TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction").unwrap(); + let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction") + .await + .unwrap(); let (tenant, ctx) = h.load().await; let timeline = tenant @@ -636,7 +641,9 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() { #[tokio::test(start_paused = true)] async fn evict_and_wait_does_not_wait_for_download() { // let handle = tokio::runtime::Handle::current(); - let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download").unwrap(); + let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download") + .await + .unwrap(); let (tenant, ctx) = h.load().await; let span = h.span(); let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1)); @@ -733,7 +740,9 @@ async fn eviction_cancellation_on_drop() { // this is the runtime on which Layer spawns the blocking tasks on let handle = tokio::runtime::Handle::current(); - let h = TenantHarness::create("eviction_cancellation_on_drop").unwrap(); + let h = TenantHarness::create("eviction_cancellation_on_drop") + .await + .unwrap(); utils::logging::replace_panic_hook_with_tracing_panic_hook().forget(); let (tenant, ctx) = h.load().await; diff --git a/pageserver/src/tenant/storage_layer/merge_iterator.rs b/pageserver/src/tenant/storage_layer/merge_iterator.rs index 6f59b2fd7765..eb4a1f28a11c 100644 --- a/pageserver/src/tenant/storage_layer/merge_iterator.rs +++ b/pageserver/src/tenant/storage_layer/merge_iterator.rs @@ -293,7 +293,9 @@ mod tests { use crate::repository::Value; use bytes::Bytes; - let harness = TenantHarness::create("merge_iterator_merge_in_between").unwrap(); + let harness = TenantHarness::create("merge_iterator_merge_in_between") + .await + .unwrap(); let (tenant, ctx) = harness.load().await; let tline = tenant @@ -356,7 +358,9 @@ mod tests { use crate::repository::Value; use bytes::Bytes; - let harness = TenantHarness::create("merge_iterator_delta_merge").unwrap(); + let harness = TenantHarness::create("merge_iterator_delta_merge") + .await + .unwrap(); let (tenant, ctx) = harness.load().await; let tline = tenant @@ -430,7 +434,9 @@ mod tests { use crate::repository::Value; use bytes::Bytes; - let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge").unwrap(); + let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge") + .await + .unwrap(); let (tenant, ctx) = harness.load().await; let tline = tenant diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 3d3d3ac34de1..19b13969811c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -6046,8 +6046,9 @@ mod tests { #[tokio::test] async fn two_layer_eviction_attempts_at_the_same_time() { - let harness = - TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap(); + let harness = TenantHarness::create("two_layer_eviction_attempts_at_the_same_time") + .await + .unwrap(); let (tenant, ctx) = harness.load().await; let timeline = tenant diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 1d2ffec08fb5..de50f217d80e 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -1118,7 +1118,7 @@ mod tests { #[tokio::test] async fn no_connection_no_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::create("no_connection_no_candidate")?; + let harness = TenantHarness::create("no_connection_no_candidate").await?; let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); @@ -1151,7 +1151,7 @@ mod tests { #[tokio::test] async fn connection_no_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::create("connection_no_candidate")?; + let harness = TenantHarness::create("connection_no_candidate").await?; let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); @@ -1216,7 +1216,7 @@ mod tests { #[tokio::test] async fn no_connection_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::create("no_connection_candidate")?; + let harness = TenantHarness::create("no_connection_candidate").await?; let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); @@ -1279,7 +1279,7 @@ mod tests { #[tokio::test] async fn candidate_with_many_connection_failures() -> anyhow::Result<()> { - let harness = TenantHarness::create("candidate_with_many_connection_failures")?; + let harness = TenantHarness::create("candidate_with_many_connection_failures").await?; let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); @@ -1319,7 +1319,7 @@ mod tests { #[tokio::test] async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?; + let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate").await?; let mut state = dummy_state(&harness).await; let current_lsn = Lsn(100_000).align(); let now = Utc::now().naive_utc(); @@ -1385,7 +1385,8 @@ mod tests { #[tokio::test] async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::create("timeout_connection_threshold_current_candidate")?; + let harness = + TenantHarness::create("timeout_connection_threshold_current_candidate").await?; let mut state = dummy_state(&harness).await; let current_lsn = Lsn(100_000).align(); let now = Utc::now().naive_utc(); @@ -1448,7 +1449,7 @@ mod tests { #[tokio::test] async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate")?; + let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate").await?; let mut state = dummy_state(&harness).await; let current_lsn = Lsn(100_000).align(); let new_lsn = Lsn(100_100).align(); @@ -1550,7 +1551,7 @@ mod tests { // and pageserver should prefer to connect to it. let test_az = Some("test_az".to_owned()); - let harness = TenantHarness::create("switch_to_same_availability_zone")?; + let harness = TenantHarness::create("switch_to_same_availability_zone").await?; let mut state = dummy_state(&harness).await; state.conf.availability_zone.clone_from(&test_az); let current_lsn = Lsn(100_000).align(); diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 07c90385e654..dff3a8f52da4 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1754,7 +1754,7 @@ mod tests { #[tokio::test] async fn test_relsize() -> Result<()> { - let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await; + let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx) .await?; @@ -1975,7 +1975,10 @@ mod tests { // and then created it again within the same layer. #[tokio::test] async fn test_drop_extend() -> Result<()> { - let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await; + let (tenant, ctx) = TenantHarness::create("test_drop_extend") + .await? + .load() + .await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx) .await?; @@ -2046,7 +2049,10 @@ mod tests { // and then extended it again within the same layer. #[tokio::test] async fn test_truncate_extend() -> Result<()> { - let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await; + let (tenant, ctx) = TenantHarness::create("test_truncate_extend") + .await? + .load() + .await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx) .await?; @@ -2188,7 +2194,7 @@ mod tests { /// split into multiple 1 GB segments in Postgres. #[tokio::test] async fn test_large_rel() -> Result<()> { - let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await; + let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await; let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx) .await?; @@ -2296,7 +2302,7 @@ mod tests { let startpoint = Lsn::from_hex("14AEC08").unwrap(); let _endpoint = Lsn::from_hex("1FFFF98").unwrap(); - let harness = TenantHarness::create("test_ingest_real_wal").unwrap(); + let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap(); let (tenant, ctx) = harness.load().await; let remote_initdb_path = diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index cfc1f8e89e3f..543a45827400 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -181,8 +181,9 @@ pub async fn worker( let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx)); let rx = rx.map(RequestData::from); - let storage = - GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?; + let storage = GenericRemoteStorage::from_config(&remote_storage_config) + .await + .context("remote storage init")?; let properties = WriterProperties::builder() .set_data_page_size_limit(config.parquet_upload_page_size) @@ -217,6 +218,7 @@ pub async fn worker( let storage_disconnect = GenericRemoteStorage::from_config(&disconnect_events_storage_config) + .await .context("remote storage for disconnect events init")?; let parquet_config_disconnect = parquet_config.clone(); tokio::try_join!( @@ -545,7 +547,9 @@ mod tests { }, timeout: std::time::Duration::from_secs(120), }; - let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap(); + let storage = GenericRemoteStorage::from_config(&remote_storage_config) + .await + .unwrap(); worker_inner(storage, rx, config).await.unwrap(); diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 56ed2145dc25..a8735fe0bbda 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -357,11 +357,15 @@ pub async fn task_backup( info!("metrics backup has shut down"); } // Even if the remote storage is not configured, we still want to clear the metrics. - let storage = backup_config - .remote_storage_config - .as_ref() - .map(|config| GenericRemoteStorage::from_config(config).context("remote storage init")) - .transpose()?; + let storage = if let Some(config) = backup_config.remote_storage_config.as_ref() { + Some( + GenericRemoteStorage::from_config(config) + .await + .context("remote storage init")?, + ) + } else { + None + }; let mut ticker = tokio::time::interval(backup_config.interval); let mut prev = Utc::now(); let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned(); diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 9eb6546d6bae..2365fd05871f 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -418,7 +418,7 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { let timeline_collector = safekeeper::metrics::TimelineCollector::new(); metrics::register_internal(Box::new(timeline_collector))?; - wal_backup::init_remote_storage(&conf); + wal_backup::init_remote_storage(&conf).await; // Keep handles to main tasks to die if any of them disappears. let mut tasks_handles: FuturesUnordered> = diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 5a590689c374..7ecee178f3b4 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -22,7 +22,7 @@ use tokio::fs::File; use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::sync::watch; +use tokio::sync::{watch, OnceCell}; use tokio::time::sleep; use tracing::*; @@ -33,8 +33,6 @@ use crate::timeline::{PeerInfo, WalResidentTimeline}; use crate::timeline_manager::{Manager, StateSnapshot}; use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME}; -use once_cell::sync::OnceCell; - const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10; const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000; @@ -167,7 +165,7 @@ fn determine_offloader( } } -static REMOTE_STORAGE: OnceCell> = OnceCell::new(); +static REMOTE_STORAGE: OnceCell> = OnceCell::const_new(); // Storage must be configured and initialized when this is called. fn get_configured_remote_storage() -> &'static GenericRemoteStorage { @@ -178,14 +176,22 @@ fn get_configured_remote_storage() -> &'static GenericRemoteStorage { .unwrap() } -pub fn init_remote_storage(conf: &SafeKeeperConf) { +pub async fn init_remote_storage(conf: &SafeKeeperConf) { // TODO: refactor REMOTE_STORAGE to avoid using global variables, and provide // dependencies to all tasks instead. - REMOTE_STORAGE.get_or_init(|| { - conf.remote_storage - .as_ref() - .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage")) - }); + REMOTE_STORAGE + .get_or_init(|| async { + if let Some(conf) = conf.remote_storage.as_ref() { + Some( + GenericRemoteStorage::from_config(conf) + .await + .expect("failed to create remote storage"), + ) + } else { + None + } + }) + .await; } struct WalBackupTask {