diff --git a/polars/polars-lazy/src/physical_plan/executors/executor.rs b/polars/polars-lazy/src/physical_plan/executors/executor.rs index 75524fc49cdb5..5a13cd03ed159 100644 --- a/polars/polars-lazy/src/physical_plan/executors/executor.rs +++ b/polars/polars-lazy/src/physical_plan/executors/executor.rs @@ -7,7 +7,7 @@ use super::*; /// /// Executors have other executors as input. By having a tree of executors we can execute the /// physical plan until the last executor is evaluated. -pub trait Executor: Send + Sync { +pub trait Executor: Send { fn execute(&mut self, cache: &mut ExecutionState) -> PolarsResult; } diff --git a/polars/polars-lazy/src/physical_plan/executors/groupby_partitioned.rs b/polars/polars-lazy/src/physical_plan/executors/groupby_partitioned.rs index f2ddfb0e4d55e..0822d13d3c2cf 100644 --- a/polars/polars-lazy/src/physical_plan/executors/groupby_partitioned.rs +++ b/polars/polars-lazy/src/physical_plan/executors/groupby_partitioned.rs @@ -34,10 +34,18 @@ impl PartitionGroupByExec { } fn keys(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult> { - self.keys.iter().map(|s| s.evaluate(df, state)).collect() + compute_keys(&self.keys, df, state) } } +fn compute_keys( + keys: &[Arc], + df: &DataFrame, + state: &ExecutionState, +) -> PolarsResult> { + keys.iter().map(|s| s.evaluate(df, state)).collect() +} + fn run_partitions( df: &mut DataFrame, exec: &PartitionGroupByExec, @@ -50,11 +58,12 @@ fn run_partitions( // split on several threads. Than the final result we apply the same groupby again. let dfs = split_df(df, n_threads)?; + let phys_aggs = &exec.phys_aggs; + let keys = &exec.keys; POOL.install(|| { dfs.into_par_iter() .map(|df| { - let keys = exec.keys(&df, state)?; - let phys_aggs = &exec.phys_aggs; + let keys = compute_keys(keys, &df, state)?; let gb = df.groupby_with_series(keys, false, maintain_order)?; let groups = gb.get_groups();