From cbec6475aa60ceee8764c1b64bf0342a58b0077a Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 11 Jul 2024 19:37:27 +1000 Subject: [PATCH 1/4] c --- crates/polars-io/src/cloud/glob.rs | 7 ---- crates/polars-io/src/cloud/options.rs | 54 +++++++++++++++++---------- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/crates/polars-io/src/cloud/glob.rs b/crates/polars-io/src/cloud/glob.rs index dc202c5459d3..08bf4664a707 100644 --- a/crates/polars-io/src/cloud/glob.rs +++ b/crates/polars-io/src/cloud/glob.rs @@ -90,13 +90,6 @@ impl CloudLocation { let (bucket, key) = if is_local { ("".into(), parsed.path()) } else { - if parsed.scheme().starts_with("http") { - return Ok(CloudLocation { - scheme: parsed.scheme().into(), - ..Default::default() - }); - } - let key = parsed.path(); let bucket = parsed .host() diff --git a/crates/polars-io/src/cloud/options.rs b/crates/polars-io/src/cloud/options.rs index 59266282113c..8743dee0c2a0 100644 --- a/crates/polars-io/src/cloud/options.rs +++ b/crates/polars-io/src/cloud/options.rs @@ -241,26 +241,40 @@ impl CloudOptions { } } - read_config( - &mut builder, - &[( - Path::new("~/.aws/config"), - &[("region = (.*)\n", AmazonS3ConfigKey::Region)], - )], - ); - read_config( - &mut builder, - &[( - Path::new("~/.aws/credentials"), - &[ - ("aws_access_key_id = (.*)\n", AmazonS3ConfigKey::AccessKeyId), - ( - "aws_secret_access_key = (.*)\n", - AmazonS3ConfigKey::SecretAccessKey, - ), - ], - )], - ); + if builder + .get_config_value(&AmazonS3ConfigKey::Region) + .is_none() + { + read_config( + &mut builder, + &[( + Path::new("~/.aws/config"), + &[("region = (.*)\n", AmazonS3ConfigKey::Region)], + )], + ); + } + + if builder + .get_config_value(&AmazonS3ConfigKey::AccessKeyId) + .is_none() + || builder + .get_config_value(&AmazonS3ConfigKey::SecretAccessKey) + .is_none() + { + read_config( + &mut builder, + &[( + Path::new("~/.aws/credentials"), + &[ + ("aws_access_key_id = (.*)\n", AmazonS3ConfigKey::AccessKeyId), + ( + "aws_secret_access_key = (.*)\n", + AmazonS3ConfigKey::SecretAccessKey, + ), + ], + )], + ); + } if builder .get_config_value(&AmazonS3ConfigKey::DefaultRegion) From 4b5f4cd4f3e3b248e672fcf98a1be8f1a7365b32 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 11 Jul 2024 20:09:24 +1000 Subject: [PATCH 2/4] c --- crates/polars-io/src/cloud/glob.rs | 7 ++++ crates/polars-io/src/cloud/options.rs | 58 ++++++++++----------------- crates/polars-io/src/utils/path.rs | 7 ++-- 3 files changed, 32 insertions(+), 40 deletions(-) diff --git a/crates/polars-io/src/cloud/glob.rs b/crates/polars-io/src/cloud/glob.rs index 08bf4664a707..dc202c5459d3 100644 --- a/crates/polars-io/src/cloud/glob.rs +++ b/crates/polars-io/src/cloud/glob.rs @@ -90,6 +90,13 @@ impl CloudLocation { let (bucket, key) = if is_local { ("".into(), parsed.path()) } else { + if parsed.scheme().starts_with("http") { + return Ok(CloudLocation { + scheme: parsed.scheme().into(), + ..Default::default() + }); + } + let key = parsed.path(); let bucket = parsed .host() diff --git a/crates/polars-io/src/cloud/options.rs b/crates/polars-io/src/cloud/options.rs index 8743dee0c2a0..57d29ee71de0 100644 --- a/crates/polars-io/src/cloud/options.rs +++ b/crates/polars-io/src/cloud/options.rs @@ -200,14 +200,12 @@ fn read_config( let content = std::str::from_utf8(buf.as_ref()).ok()?; for (pattern, key) in keys.iter() { - let local = std::mem::take(builder); - if builder.get_config_value(key).is_none() { let reg = Regex::new(pattern).unwrap(); let cap = reg.captures(content)?; let m = cap.get(1)?; let parsed = m.as_str(); - *builder = local.with_config(*key, parsed) + *builder = std::mem::take(builder).with_config(*key, parsed); } } } @@ -241,40 +239,26 @@ impl CloudOptions { } } - if builder - .get_config_value(&AmazonS3ConfigKey::Region) - .is_none() - { - read_config( - &mut builder, - &[( - Path::new("~/.aws/config"), - &[("region = (.*)\n", AmazonS3ConfigKey::Region)], - )], - ); - } - - if builder - .get_config_value(&AmazonS3ConfigKey::AccessKeyId) - .is_none() - || builder - .get_config_value(&AmazonS3ConfigKey::SecretAccessKey) - .is_none() - { - read_config( - &mut builder, - &[( - Path::new("~/.aws/credentials"), - &[ - ("aws_access_key_id = (.*)\n", AmazonS3ConfigKey::AccessKeyId), - ( - "aws_secret_access_key = (.*)\n", - AmazonS3ConfigKey::SecretAccessKey, - ), - ], - )], - ); - } + read_config( + &mut builder, + &[( + Path::new("~/.aws/config"), + &[("region = (.*)\n", AmazonS3ConfigKey::Region)], + )], + ); + read_config( + &mut builder, + &[( + Path::new("~/.aws/credentials"), + &[ + ("aws_access_key_id = (.*)\n", AmazonS3ConfigKey::AccessKeyId), + ( + "aws_secret_access_key = (.*)\n", + AmazonS3ConfigKey::SecretAccessKey, + ), + ], + )], + ); if builder .get_config_value(&AmazonS3ConfigKey::DefaultRegion) diff --git a/crates/polars-io/src/utils/path.rs b/crates/polars-io/src/utils/path.rs index 91bc74ed2700..050982446fc2 100644 --- a/crates/polars-io/src/utils/path.rs +++ b/crates/polars-io/src/utils/path.rs @@ -137,9 +137,10 @@ pub fn expand_paths( let prefix = object_path_from_string(cloud_location.prefix.clone())?; - let out = if !path.ends_with("/") - && cloud_location.expansion.is_none() - && store.head(&prefix).await.is_ok() + let out = if cloud_location.scheme.starts_with("http") + || (!path.ends_with("/") + && cloud_location.expansion.is_none() + && store.head(&prefix).await.is_ok()) { ( 0, From 95b9ab8a9fb8869e0177f9680715a290187ddcf7 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 11 Jul 2024 21:14:32 +1000 Subject: [PATCH 3/4] c --- crates/polars-io/src/file_cache/utils.rs | 29 ++++++++++++++----- crates/polars-io/src/utils/path.rs | 11 ++++--- .../polars-plan/src/plans/conversion/scans.rs | 3 +- 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/crates/polars-io/src/file_cache/utils.rs b/crates/polars-io/src/file_cache/utils.rs index b239c2792e54..eb99296d4e28 100644 --- a/crates/polars-io/src/file_cache/utils.rs +++ b/crates/polars-io/src/file_cache/utils.rs @@ -52,8 +52,8 @@ pub(super) fn update_last_accessed(file: &std::fs::File) { } } -pub fn init_entries_from_uri_list]>>( - uri_list: A, +pub fn init_entries_from_uri_list( + uri_list: &[Arc], cloud_options: Option<&CloudOptions>, ) -> PolarsResult>> { let uri_list = uri_list.as_ref(); @@ -69,13 +69,27 @@ pub fn init_entries_from_uri_list]>>( .unwrap_or_else(get_env_file_cache_ttl); if is_cloud_url(first_uri) { - let (_, object_store) = pl_async::get_runtime() - .block_on_potential_spawn(build_object_store(first_uri, cloud_options))?; - let object_store = PolarsObjectStore::new(object_store); + let object_stores = pl_async::get_runtime().block_on_potential_spawn(async { + futures::future::try_join_all( + (0..if first_uri.starts_with("http") { + // Object stores for http are tied to the path. + uri_list.len() + } else { + 1 + }) + .map(|i| async move { + let (_, object_store) = + build_object_store(&uri_list[i], cloud_options).await?; + PolarsResult::Ok(PolarsObjectStore::new(object_store)) + }), + ) + .await + })?; uri_list .iter() - .map(|uri| { + .enumerate() + .map(|(i, uri)| { FILE_CACHE.init_entry( uri.clone(), || { @@ -88,7 +102,8 @@ pub fn init_entries_from_uri_list]>>( object_path_from_string(prefix)? }; - let object_store = object_store.clone(); + let object_store = + object_stores[std::cmp::min(i, object_stores.len())].clone(); let uri = uri.clone(); Ok(Arc::new(CloudFileFetcher { diff --git a/crates/polars-io/src/utils/path.rs b/crates/polars-io/src/utils/path.rs index 050982446fc2..84e02022bd85 100644 --- a/crates/polars-io/src/utils/path.rs +++ b/crates/polars-io/src/utils/path.rs @@ -132,15 +132,18 @@ pub fn expand_paths( cloud_options: Option<&CloudOptions>| -> PolarsResult<(usize, Vec)> { crate::pl_async::get_runtime().block_on_potential_spawn(async { + if path.starts_with("http") { + return Ok((0, vec![PathBuf::from(path)])); + } + let (cloud_location, store) = crate::cloud::build_object_store(path, cloud_options).await?; let prefix = object_path_from_string(cloud_location.prefix.clone())?; - let out = if cloud_location.scheme.starts_with("http") - || (!path.ends_with("/") - && cloud_location.expansion.is_none() - && store.head(&prefix).await.is_ok()) + let out = if !path.ends_with("/") + && cloud_location.expansion.is_none() + && store.head(&prefix).await.is_ok() { ( 0, diff --git a/crates/polars-plan/src/plans/conversion/scans.rs b/crates/polars-plan/src/plans/conversion/scans.rs index 68e17d278898..5908d085fd15 100644 --- a/crates/polars-plan/src/plans/conversion/scans.rs +++ b/crates/polars-plan/src/plans/conversion/scans.rs @@ -156,7 +156,8 @@ pub(super) fn csv_file_info( paths .iter() .map(|path| Arc::from(path.to_str().unwrap())) - .collect::>(), + .collect::>() + .as_slice(), cloud_options, )?) } else { From 75a004da82f88f248b3f22c78e436f05dff0453a Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 11 Jul 2024 21:18:40 +1000 Subject: [PATCH 4/4] c --- crates/polars-io/src/file_cache/utils.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/polars-io/src/file_cache/utils.rs b/crates/polars-io/src/file_cache/utils.rs index eb99296d4e28..fec85a0a5128 100644 --- a/crates/polars-io/src/file_cache/utils.rs +++ b/crates/polars-io/src/file_cache/utils.rs @@ -56,8 +56,6 @@ pub fn init_entries_from_uri_list( uri_list: &[Arc], cloud_options: Option<&CloudOptions>, ) -> PolarsResult>> { - let uri_list = uri_list.as_ref(); - if uri_list.is_empty() { return Ok(Default::default()); }