From f96773e829f3ea1221bd609845c495a94baef6a5 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 12 Feb 2024 08:56:49 -0800 Subject: [PATCH] feat: restrict more cloud interop to semaphore budget (#14435) --- crates/polars-io/src/cloud/options.rs | 19 +++++++++------ crates/polars-io/src/parquet/async_impl.rs | 28 +++++++++++++--------- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/crates/polars-io/src/cloud/options.rs b/crates/polars-io/src/cloud/options.rs index 5338682d477d..1df835d5d3b0 100644 --- a/crates/polars-io/src/cloud/options.rs +++ b/crates/polars-io/src/cloud/options.rs @@ -37,6 +37,8 @@ use smartstring::alias::String as SmartString; #[cfg(feature = "cloud")] use url::Url; +#[cfg(feature = "aws")] +use crate::pl_async::with_concurrency_budget; #[cfg(feature = "aws")] use crate::utils::resolve_homedir; @@ -284,13 +286,16 @@ impl CloudOptions { builder = builder.with_config(AmazonS3ConfigKey::Region, "us-east-1"); } else { polars_warn!("'(default_)region' not set; polars will try to get it from bucket\n\nSet the region manually to silence this warning."); - let result = reqwest::Client::builder() - .build() - .unwrap() - .head(format!("https://{bucket}.s3.amazonaws.com")) - .send() - .await - .map_err(to_compute_err)?; + let result = with_concurrency_budget(1, || async { + reqwest::Client::builder() + .build() + .unwrap() + .head(format!("https://{bucket}.s3.amazonaws.com")) + .send() + .await + .map_err(to_compute_err) + }) + .await?; if let Some(region) = result.headers().get("x-amz-bucket-region") { let region = std::str::from_utf8(region.as_bytes()).map_err(to_compute_err)?; diff --git a/crates/polars-io/src/parquet/async_impl.rs b/crates/polars-io/src/parquet/async_impl.rs index 49bf30d45087..c13e5a3c46f1 100644 --- a/crates/polars-io/src/parquet/async_impl.rs +++ b/crates/polars-io/src/parquet/async_impl.rs @@ -80,14 +80,17 @@ impl ParquetObjectStore { if self.length.is_some() { return Ok(()); } - self.length = Some( - self.store - .head(&self.path) - .await - .map_err(to_compute_err)? - .size as u64, - ); - Ok(()) + with_concurrency_budget(1, || async { + self.length = Some( + self.store + .head(&self.path) + .await + .map_err(to_compute_err)? + .size as u64, + ); + Ok(()) + }) + .await } pub async fn schema(&mut self) -> PolarsResult { @@ -112,9 +115,12 @@ impl ParquetObjectStore { let length = self.length; let mut reader = CloudReader::new(length, object_store, path); - parquet2_read::read_metadata_async(&mut reader) - .await - .map_err(to_compute_err) + with_concurrency_budget(1, || async { + parquet2_read::read_metadata_async(&mut reader) + .await + .map_err(to_compute_err) + }) + .await } /// Fetch and memoize the metadata of the parquet file.