Skip to content

Commit

Permalink
fix: rolling nested groups deadlock (#13835)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jan 19, 2024
1 parent e97db2a commit 713ec01
Showing 1 changed file with 6 additions and 15 deletions.
21 changes: 6 additions & 15 deletions crates/polars-lazy/src/physical_plan/expressions/rolling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,26 @@ impl PhysicalExpr for RollingExpr {
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Series> {
let groups_key = format!("{:?}", &self.options);

let mut groups_map = state.group_tuples.read().unwrap();
let groups_map = state.group_tuples.read().unwrap();
// Groups must be set by expression runner.
let groups = groups_map.get(&groups_key);

let mut groups_map_write;

// There can be multiple rolling expressions in a single expr.
// E.g. `min().rolling() + max().rolling()`
// So if we hit that we will compute them here.
let groups = match groups {
Some(groups) => groups,
Some(groups) => Cow::Borrowed(groups),
None => {
drop(groups_map);
// We cannot cache those as mutexes under rayon can deadlock.
// TODO! precompute all groups up front.
let (_time_key, _keys, groups) = df.group_by_rolling(vec![], &self.options)?;
groups_map_write = state.group_tuples.write().unwrap();
groups_map_write.entry_ref(&groups_key).or_insert(groups);

drop(groups_map_write);

// Get a reference to the read guard so that other threads
// can continue
groups_map = state.group_tuples.read().unwrap();
groups_map.get(&groups_key).expect("impl error")
Cow::Owned(groups)
},
};

let mut out = self
.phys_function
.evaluate_on_groups(df, groups, state)?
.evaluate_on_groups(df, &groups, state)?
.finalize();
polars_ensure!(out.len() == groups.len(), agg_len = out.len(), groups.len());
if let Some(name) = &self.out_name {
Expand Down

0 comments on commit 713ec01

Please sign in to comment.