From 66f2d8e7545be94e1b09c51847fdedc0ab1688ef Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sat, 8 Oct 2022 16:13:15 +0200 Subject: [PATCH] refactor(rust): relax sync requirement on Executor trait impls (#5142) --- .../src/physical_plan/executors/executor.rs | 2 +- .../executors/groupby_partitioned.rs | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/polars/polars-lazy/src/physical_plan/executors/executor.rs b/polars/polars-lazy/src/physical_plan/executors/executor.rs index 75524fc49cdb5..5a13cd03ed159 100644 --- a/polars/polars-lazy/src/physical_plan/executors/executor.rs +++ b/polars/polars-lazy/src/physical_plan/executors/executor.rs @@ -7,7 +7,7 @@ use super::*; /// /// Executors have other executors as input. By having a tree of executors we can execute the /// physical plan until the last executor is evaluated. -pub trait Executor: Send + Sync { +pub trait Executor: Send { fn execute(&mut self, cache: &mut ExecutionState) -> PolarsResult; } diff --git a/polars/polars-lazy/src/physical_plan/executors/groupby_partitioned.rs b/polars/polars-lazy/src/physical_plan/executors/groupby_partitioned.rs index f2ddfb0e4d55e..0822d13d3c2cf 100644 --- a/polars/polars-lazy/src/physical_plan/executors/groupby_partitioned.rs +++ b/polars/polars-lazy/src/physical_plan/executors/groupby_partitioned.rs @@ -34,10 +34,18 @@ impl PartitionGroupByExec { } fn keys(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult> { - self.keys.iter().map(|s| s.evaluate(df, state)).collect() + compute_keys(&self.keys, df, state) } } +fn compute_keys( + keys: &[Arc], + df: &DataFrame, + state: &ExecutionState, +) -> PolarsResult> { + keys.iter().map(|s| s.evaluate(df, state)).collect() +} + fn run_partitions( df: &mut DataFrame, exec: &PartitionGroupByExec, @@ -50,11 +58,12 @@ fn run_partitions( // split on several threads. Than the final result we apply the same groupby again. let dfs = split_df(df, n_threads)?; + let phys_aggs = &exec.phys_aggs; + let keys = &exec.keys; POOL.install(|| { dfs.into_par_iter() .map(|df| { - let keys = exec.keys(&df, state)?; - let phys_aggs = &exec.phys_aggs; + let keys = compute_keys(keys, &df, state)?; let gb = df.groupby_with_series(keys, false, maintain_order)?; let groups = gb.get_groups();