From 2503aa9dd884428396fbe089b53c4ed65b6d055c Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 6 Jan 2023 16:52:34 +0100 Subject: [PATCH] fix(rust, python): don't run cse cache_states if no projections found --- .../logical_plan/optimizer/cache_states.rs | 2 +- .../src/logical_plan/optimizer/mod.rs | 7 ++- .../optimizer/projection_pushdown/generic.rs | 2 +- .../optimizer/projection_pushdown/groupby.rs | 2 +- .../optimizer/projection_pushdown/hstack.rs | 2 +- .../optimizer/projection_pushdown/joins.rs | 2 +- .../optimizer/projection_pushdown/melt.rs | 2 +- .../optimizer/projection_pushdown/mod.rs | 45 +++++++++++-------- .../projection_pushdown/projection.rs | 2 +- py-polars/tests/unit/test_cse.py | 27 +++++++++++ 10 files changed, 66 insertions(+), 27 deletions(-) diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cache_states.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cache_states.rs index 9422149ed6e5..19260bb17362 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cache_states.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cache_states.rs @@ -146,7 +146,7 @@ pub(super) fn set_cache_states( // and finally remove that last projection and stitch the subplan // back to the cache node again if !cache_schema_and_children.is_empty() { - let pd = projection_pushdown::ProjectionPushDown {}; + let mut pd = ProjectionPushDown::new(); for (_cache_id, (children, columns)) in cache_schema_and_children { if !columns.is_empty() { let projection = columns diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/mod.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/mod.rs index 68f3d3d4369a..3d036c71313a 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/mod.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/mod.rs @@ -96,11 +96,14 @@ pub fn optimize( // should be run before predicate pushdown if projection_pushdown { - let projection_pushdown_opt = ProjectionPushDown {}; + let mut projection_pushdown_opt = ProjectionPushDown::new(); let alp = lp_arena.take(lp_top); let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?; lp_arena.replace(lp_top, alp); - cache_states::set_cache_states(lp_top, lp_arena, expr_arena, scratch, cse_changed); + + if projection_pushdown_opt.changed { + cache_states::set_cache_states(lp_top, lp_arena, expr_arena, scratch, cse_changed); + } } if predicate_pushdown { diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/generic.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/generic.rs index 609c9c976a08..468cf78f51c3 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/generic.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/generic.rs @@ -2,7 +2,7 @@ use super::*; #[allow(clippy::too_many_arguments)] pub(super) fn process_generic( - proj_pd: &ProjectionPushDown, + proj_pd: &mut ProjectionPushDown, lp: ALogicalPlan, acc_projections: Vec, projected_names: PlHashSet>, diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/groupby.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/groupby.rs index daae5aa7423e..e5875e1e9beb 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/groupby.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/groupby.rs @@ -2,7 +2,7 @@ use super::*; #[allow(clippy::too_many_arguments)] pub(super) fn process_groupby( - proj_pd: &ProjectionPushDown, + proj_pd: &mut ProjectionPushDown, input: Node, keys: Vec, aggs: Vec, diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/hstack.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/hstack.rs index dcfb4865c88a..e2a41be62e9e 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/hstack.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/hstack.rs @@ -2,7 +2,7 @@ use super::*; #[allow(clippy::too_many_arguments)] pub(super) fn process_hstack( - proj_pd: &ProjectionPushDown, + proj_pd: &mut ProjectionPushDown, input: Node, mut exprs: Vec, mut acc_projections: Vec, diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/joins.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/joins.rs index c3ee0dd803fb..bed43e8af064 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/joins.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/joins.rs @@ -41,7 +41,7 @@ fn add_nodes_to_accumulated_state( #[allow(clippy::too_many_arguments)] pub(super) fn process_join( - proj_pd: &ProjectionPushDown, + proj_pd: &mut ProjectionPushDown, input_left: Node, input_right: Node, left_on: Vec, diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/melt.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/melt.rs index 2472ae63405d..4ad80da73e4b 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/melt.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/melt.rs @@ -2,7 +2,7 @@ use super::*; #[allow(clippy::too_many_arguments)] pub(super) fn process_melt( - proj_pd: &ProjectionPushDown, + proj_pd: &mut ProjectionPushDown, input: Node, args: Arc, schema: SchemaRef, diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs index 08ceb46e072a..23baeb172c49 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs @@ -139,12 +139,18 @@ fn update_scan_schema( Ok(new_schema) } -pub struct ProjectionPushDown {} +pub struct ProjectionPushDown { + pub(crate) changed: bool, +} impl ProjectionPushDown { + pub(super) fn new() -> Self { + Self { changed: false } + } + /// Projection will be done at this node, but we continue optimization fn no_pushdown_restart_opt( - &self, + &mut self, lp: ALogicalPlan, acc_projections: Vec, projections_seen: usize, @@ -177,7 +183,7 @@ impl ProjectionPushDown { } fn finish_node( - &self, + &mut self, local_projections: Vec, builder: ALogicalPlanBuilder, ) -> ALogicalPlan { @@ -190,7 +196,7 @@ impl ProjectionPushDown { #[allow(clippy::too_many_arguments)] fn join_push_down( - &self, + &mut self, schema_left: &Schema, schema_right: &Schema, proj: Node, @@ -224,7 +230,7 @@ impl ProjectionPushDown { /// This pushes down current node and assigns the result to this node. fn pushdown_and_assign( - &self, + &mut self, input: Node, acc_projections: Vec, names: PlHashSet>, @@ -251,7 +257,7 @@ impl ProjectionPushDown { /// /// The local projections are return and still have to be applied fn pushdown_and_assign_check_schema( - &self, + &mut self, input: Node, acc_projections: Vec, projections_seen: usize, @@ -290,7 +296,7 @@ impl ProjectionPushDown { /// * `expr_arena` - The local memory arena for the expressions. /// fn push_down( - &self, + &mut self, logical_plan: ALogicalPlan, mut acc_projections: Vec, mut projected_names: PlHashSet>, @@ -301,16 +307,19 @@ impl ProjectionPushDown { use ALogicalPlan::*; match logical_plan { - Projection { expr, input, .. } => process_projection( - self, - input, - expr, - acc_projections, - projected_names, - projections_seen, - lp_arena, - expr_arena, - ), + Projection { expr, input, .. } => { + self.changed = true; + process_projection( + self, + input, + expr, + acc_projections, + projected_names, + projections_seen, + lp_arena, + expr_arena, + ) + } LocalProjection { expr, input, .. } => { self.pushdown_and_assign( input, @@ -804,7 +813,7 @@ impl ProjectionPushDown { } pub fn optimize( - &self, + &mut self, logical_plan: ALogicalPlan, lp_arena: &mut Arena, expr_arena: &mut Arena, diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/projection.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/projection.rs index 7a3f359adf8c..6d1f6c477ed0 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/projection.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/projection.rs @@ -2,7 +2,7 @@ use super::*; #[allow(clippy::too_many_arguments)] pub(super) fn process_projection( - proj_pd: &ProjectionPushDown, + proj_pd: &mut ProjectionPushDown, input: Node, exprs: Vec, mut acc_projections: Vec, diff --git a/py-polars/tests/unit/test_cse.py b/py-polars/tests/unit/test_cse.py index b19476254c4f..71099df35551 100644 --- a/py-polars/tests/unit/test_cse.py +++ b/py-polars/tests/unit/test_cse.py @@ -1,4 +1,5 @@ import re +from datetime import date import polars as pl @@ -31,3 +32,29 @@ def test_union_duplicates() -> None: ) == 10 ) + + +def test_cse_schema_6081() -> None: + df = pl.DataFrame( + data=[ + [date(2022, 12, 12), 1, 1], + [date(2022, 12, 12), 1, 2], + [date(2022, 12, 13), 5, 2], + ], + columns=["date", "id", "value"], + orient="row", + ).lazy() + + min_value_by_group = df.groupby(["date", "id"]).agg( + pl.col("value").min().alias("min_value") + ) + + result = df.join(min_value_by_group, on=["date", "id"], how="left") + assert result.collect( + common_subplan_elimination=True, projection_pushdown=True + ).to_dict(False) == { + "date": [date(2022, 12, 12), date(2022, 12, 12), date(2022, 12, 13)], + "id": [1, 1, 5], + "value": [1, 2, 2], + "min_value": [1, 1, 2], + }