Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rust, python): don't run cse cache_states if no projections found #6087

Merged
merged 1 commit into from
Jan 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub(super) fn set_cache_states(
// and finally remove that last projection and stitch the subplan
// back to the cache node again
if !cache_schema_and_children.is_empty() {
let pd = projection_pushdown::ProjectionPushDown {};
let mut pd = ProjectionPushDown::new();
for (_cache_id, (children, columns)) in cache_schema_and_children {
if !columns.is_empty() {
let projection = columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ pub fn optimize(

// should be run before predicate pushdown
if projection_pushdown {
let projection_pushdown_opt = ProjectionPushDown {};
let mut projection_pushdown_opt = ProjectionPushDown::new();
let alp = lp_arena.take(lp_top);
let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
lp_arena.replace(lp_top, alp);
cache_states::set_cache_states(lp_top, lp_arena, expr_arena, scratch, cse_changed);

if projection_pushdown_opt.changed {
cache_states::set_cache_states(lp_top, lp_arena, expr_arena, scratch, cse_changed);
}
}

if predicate_pushdown {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;

#[allow(clippy::too_many_arguments)]
pub(super) fn process_generic(
proj_pd: &ProjectionPushDown,
proj_pd: &mut ProjectionPushDown,
lp: ALogicalPlan,
acc_projections: Vec<Node>,
projected_names: PlHashSet<Arc<str>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;

#[allow(clippy::too_many_arguments)]
pub(super) fn process_groupby(
proj_pd: &ProjectionPushDown,
proj_pd: &mut ProjectionPushDown,
input: Node,
keys: Vec<Node>,
aggs: Vec<Node>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;

#[allow(clippy::too_many_arguments)]
pub(super) fn process_hstack(
proj_pd: &ProjectionPushDown,
proj_pd: &mut ProjectionPushDown,
input: Node,
mut exprs: Vec<Node>,
mut acc_projections: Vec<Node>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn add_nodes_to_accumulated_state(

#[allow(clippy::too_many_arguments)]
pub(super) fn process_join(
proj_pd: &ProjectionPushDown,
proj_pd: &mut ProjectionPushDown,
input_left: Node,
input_right: Node,
left_on: Vec<Node>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;

#[allow(clippy::too_many_arguments)]
pub(super) fn process_melt(
proj_pd: &ProjectionPushDown,
proj_pd: &mut ProjectionPushDown,
input: Node,
args: Arc<MeltArgs>,
schema: SchemaRef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,18 @@ fn update_scan_schema(
Ok(new_schema)
}

pub struct ProjectionPushDown {}
pub struct ProjectionPushDown {
pub(crate) changed: bool,
}

impl ProjectionPushDown {
pub(super) fn new() -> Self {
Self { changed: false }
}

/// Projection will be done at this node, but we continue optimization
fn no_pushdown_restart_opt(
&self,
&mut self,
lp: ALogicalPlan,
acc_projections: Vec<Node>,
projections_seen: usize,
Expand Down Expand Up @@ -177,7 +183,7 @@ impl ProjectionPushDown {
}

fn finish_node(
&self,
&mut self,
local_projections: Vec<Node>,
builder: ALogicalPlanBuilder,
) -> ALogicalPlan {
Expand All @@ -190,7 +196,7 @@ impl ProjectionPushDown {

#[allow(clippy::too_many_arguments)]
fn join_push_down(
&self,
&mut self,
schema_left: &Schema,
schema_right: &Schema,
proj: Node,
Expand Down Expand Up @@ -224,7 +230,7 @@ impl ProjectionPushDown {

/// This pushes down current node and assigns the result to this node.
fn pushdown_and_assign(
&self,
&mut self,
input: Node,
acc_projections: Vec<Node>,
names: PlHashSet<Arc<str>>,
Expand All @@ -251,7 +257,7 @@ impl ProjectionPushDown {
///
/// The local projections are return and still have to be applied
fn pushdown_and_assign_check_schema(
&self,
&mut self,
input: Node,
acc_projections: Vec<Node>,
projections_seen: usize,
Expand Down Expand Up @@ -290,7 +296,7 @@ impl ProjectionPushDown {
/// * `expr_arena` - The local memory arena for the expressions.
///
fn push_down(
&self,
&mut self,
logical_plan: ALogicalPlan,
mut acc_projections: Vec<Node>,
mut projected_names: PlHashSet<Arc<str>>,
Expand All @@ -301,16 +307,19 @@ impl ProjectionPushDown {
use ALogicalPlan::*;

match logical_plan {
Projection { expr, input, .. } => process_projection(
self,
input,
expr,
acc_projections,
projected_names,
projections_seen,
lp_arena,
expr_arena,
),
Projection { expr, input, .. } => {
self.changed = true;
process_projection(
self,
input,
expr,
acc_projections,
projected_names,
projections_seen,
lp_arena,
expr_arena,
)
}
LocalProjection { expr, input, .. } => {
self.pushdown_and_assign(
input,
Expand Down Expand Up @@ -804,7 +813,7 @@ impl ProjectionPushDown {
}

pub fn optimize(
&self,
&mut self,
logical_plan: ALogicalPlan,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;

#[allow(clippy::too_many_arguments)]
pub(super) fn process_projection(
proj_pd: &ProjectionPushDown,
proj_pd: &mut ProjectionPushDown,
input: Node,
exprs: Vec<Node>,
mut acc_projections: Vec<Node>,
Expand Down
27 changes: 27 additions & 0 deletions py-polars/tests/unit/test_cse.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
from datetime import date

import polars as pl

Expand Down Expand Up @@ -31,3 +32,29 @@ def test_union_duplicates() -> None:
)
== 10
)


def test_cse_schema_6081() -> None:
df = pl.DataFrame(
data=[
[date(2022, 12, 12), 1, 1],
[date(2022, 12, 12), 1, 2],
[date(2022, 12, 13), 5, 2],
],
columns=["date", "id", "value"],
orient="row",
).lazy()

min_value_by_group = df.groupby(["date", "id"]).agg(
pl.col("value").min().alias("min_value")
)

result = df.join(min_value_by_group, on=["date", "id"], how="left")
assert result.collect(
common_subplan_elimination=True, projection_pushdown=True
).to_dict(False) == {
"date": [date(2022, 12, 12), date(2022, 12, 12), date(2022, 12, 13)],
"id": [1, 1, 5],
"value": [1, 2, 2],
"min_value": [1, 1, 2],
}