Skip to content

Commit

Permalink
feat: restrict more cloud interop to semaphore budget (#14435)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 12, 2024
1 parent 649c33a commit f96773e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 18 deletions.
19 changes: 12 additions & 7 deletions crates/polars-io/src/cloud/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use smartstring::alias::String as SmartString;
#[cfg(feature = "cloud")]
use url::Url;

#[cfg(feature = "aws")]
use crate::pl_async::with_concurrency_budget;
#[cfg(feature = "aws")]
use crate::utils::resolve_homedir;

Expand Down Expand Up @@ -284,13 +286,16 @@ impl CloudOptions {
builder = builder.with_config(AmazonS3ConfigKey::Region, "us-east-1");
} else {
polars_warn!("'(default_)region' not set; polars will try to get it from bucket\n\nSet the region manually to silence this warning.");
let result = reqwest::Client::builder()
.build()
.unwrap()
.head(format!("https://{bucket}.s3.amazonaws.com"))
.send()
.await
.map_err(to_compute_err)?;
let result = with_concurrency_budget(1, || async {
reqwest::Client::builder()
.build()
.unwrap()
.head(format!("https://{bucket}.s3.amazonaws.com"))
.send()
.await
.map_err(to_compute_err)
})
.await?;
if let Some(region) = result.headers().get("x-amz-bucket-region") {
let region =
std::str::from_utf8(region.as_bytes()).map_err(to_compute_err)?;
Expand Down
28 changes: 17 additions & 11 deletions crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,17 @@ impl ParquetObjectStore {
if self.length.is_some() {
return Ok(());
}
self.length = Some(
self.store
.head(&self.path)
.await
.map_err(to_compute_err)?
.size as u64,
);
Ok(())
with_concurrency_budget(1, || async {
self.length = Some(
self.store
.head(&self.path)
.await
.map_err(to_compute_err)?
.size as u64,
);
Ok(())
})
.await
}

pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
Expand All @@ -112,9 +115,12 @@ impl ParquetObjectStore {
let length = self.length;
let mut reader = CloudReader::new(length, object_store, path);

parquet2_read::read_metadata_async(&mut reader)
.await
.map_err(to_compute_err)
with_concurrency_budget(1, || async {
parquet2_read::read_metadata_async(&mut reader)
.await
.map_err(to_compute_err)
})
.await
}

/// Fetch and memoize the metadata of the parquet file.
Expand Down

0 comments on commit f96773e

Please sign in to comment.