diff --git a/crates/polars-lazy/src/physical_plan/expressions/rolling.rs b/crates/polars-lazy/src/physical_plan/expressions/rolling.rs index 6bfe1047a770..ba8b5baaa61c 100644 --- a/crates/polars-lazy/src/physical_plan/expressions/rolling.rs +++ b/crates/polars-lazy/src/physical_plan/expressions/rolling.rs @@ -20,35 +20,26 @@ impl PhysicalExpr for RollingExpr { fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let groups_key = format!("{:?}", &self.options); - let mut groups_map = state.group_tuples.read().unwrap(); + let groups_map = state.group_tuples.read().unwrap(); // Groups must be set by expression runner. let groups = groups_map.get(&groups_key); - let mut groups_map_write; - // There can be multiple rolling expressions in a single expr. // E.g. `min().rolling() + max().rolling()` // So if we hit that we will compute them here. let groups = match groups { - Some(groups) => groups, + Some(groups) => Cow::Borrowed(groups), None => { - drop(groups_map); + // We cannot cache those as mutexes under rayon can deadlock. + // TODO! precompute all groups up front. let (_time_key, _keys, groups) = df.group_by_rolling(vec![], &self.options)?; - groups_map_write = state.group_tuples.write().unwrap(); - groups_map_write.entry_ref(&groups_key).or_insert(groups); - - drop(groups_map_write); - - // Get a reference to the read guard so that other threads - // can continue - groups_map = state.group_tuples.read().unwrap(); - groups_map.get(&groups_key).expect("impl error") + Cow::Owned(groups) }, }; let mut out = self .phys_function - .evaluate_on_groups(df, groups, state)? + .evaluate_on_groups(df, &groups, state)? .finalize(); polars_ensure!(out.len() == groups.len(), agg_len = out.len(), groups.len()); if let Some(name) = &self.out_name {