Skip to content

Commit

Permalink
fix: Include cloud creds in cache key (pola-rs#15609)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Apr 12, 2024
1 parent 6d7ddcc commit c068e76
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async = [
"polars-error/regex",
"polars-parquet?/async",
]
cloud = ["object_store", "async", "polars-error/object_store", "url"]
cloud = ["object_store", "async", "polars-error/object_store", "url", "serde_json", "serde"]
aws = ["object_store/aws", "cloud", "reqwest"]
azure = ["object_store/azure", "cloud"]
gcp = ["object_store/gcp", "cloud"]
Expand Down
11 changes: 7 additions & 4 deletions crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult {

/// Get the key of a url for object store registration.
/// The credential info will be removed
fn url_to_key(url: &Url) -> String {
fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String {
// We include credentials as they can expire, so users will send new credentials for the same url.
let creds = serde_json::to_string(&options).unwrap_or_else(|_| "".into());
format!(
"{}://{}",
"{}://{}<\\creds\\>{}",
url.scheme(),
&url[url::Position::BeforeHost..url::Position::AfterPort],
creds
)
}

Expand All @@ -49,7 +52,7 @@ pub async fn build_object_store(
let parsed = parse_url(url).map_err(to_compute_err)?;
let cloud_location = CloudLocation::from_url(&parsed)?;

let key = url_to_key(&parsed);
let key = url_and_creds_to_key(&parsed, options);
let mut allow_cache = true;

{
Expand Down Expand Up @@ -117,7 +120,7 @@ pub async fn build_object_store(
if allow_cache {
let mut cache = OBJECT_STORE_CACHE.write().await;
// Clear the cache if we surpass a certain amount of buckets.
if cache.len() > 32 {
if cache.len() > 8 {
cache.clear()
}
cache.insert(key, store.clone());
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-plan/src/logical_plan/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ pub(super) fn predicate_to_pa(
input,
..
} => {
if !matches!(expr_arena.get(input[0]), AExpr::Column(_)) {
if !matches!(expr_arena.get(input.first()?.node()), AExpr::Column(_)) {
None
} else {
let col = predicate_to_pa(*input.first()?, expr_arena, args)?;
let col = predicate_to_pa(input.first()?.node(), expr_arena, args)?;
let left_cmp_op = match closed {
ClosedInterval::Both | ClosedInterval::Left => Operator::Lt,
ClosedInterval::None | ClosedInterval::Right => Operator::LtEq,
Expand All @@ -174,8 +174,8 @@ pub(super) fn predicate_to_pa(
ClosedInterval::None | ClosedInterval::Left => Operator::GtEq,
};

let lower = predicate_to_pa(*input.get(1)?, expr_arena, args)?;
let upper = predicate_to_pa(*input.get(2)?, expr_arena, args)?;
let lower = predicate_to_pa(input.get(1)?.node(), expr_arena, args)?;
let upper = predicate_to_pa(input.get(2)?.node(), expr_arena, args)?;

Some(format!(
"(({col} {left_cmp_op} {lower}) & ({col} {right_cmp_op} {upper}))"
Expand Down

0 comments on commit c068e76

Please sign in to comment.