Skip to content

Commit

Permalink
perf: Ensure only nodes that are not changed are cached in collapse o…
Browse files Browse the repository at this point in the history
…ptimizer (pola-rs#17791)
  • Loading branch information
ritchie46 authored Jul 22, 2024
1 parent 9df5929 commit 6f8b478
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 11 deletions.
10 changes: 7 additions & 3 deletions crates/polars-plan/src/plans/optimizer/collapse_and_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::*;
///
/// The schema reported after this optimization is also
pub(super) struct SimpleProjectionAndCollapse {
/// keep track of nodes that are already processed when they
/// Keep track of nodes that are already processed when they
/// can be expensive. Schema materialization can be for instance.
processed: BTreeSet<Node>,
eager: bool,
Expand All @@ -39,12 +39,14 @@ impl OptimizationRule for SimpleProjectionAndCollapse {

match lp {
Select { input, expr, .. } => {
if !matches!(lp_arena.get(*input), ExtContext { .. }) && self.processed.insert(node)
if !matches!(lp_arena.get(*input), ExtContext { .. })
&& !self.processed.contains(&node)
{
// First check if we can apply the optimization before we allocate.
if !expr.iter().all(|e| {
matches!(expr_arena.get(e.node()), AExpr::Column(_)) && !e.has_alias()
}) {
self.processed.insert(node);
return None;
}

Expand All @@ -59,6 +61,7 @@ impl OptimizationRule for SimpleProjectionAndCollapse {

Some(alp)
} else {
self.processed.insert(node);
None
}
},
Expand All @@ -73,7 +76,7 @@ impl OptimizationRule for SimpleProjectionAndCollapse {
}),
// Cleanup projections set in projection pushdown just above caches
// they are not needed.
cache_lp @ Cache { .. } if self.processed.insert(node) => {
cache_lp @ Cache { .. } if self.processed.contains(&node) => {
let cache_schema = cache_lp.schema(lp_arena);
if cache_schema.len() == columns.len()
&& cache_schema.iter_names().zip(columns.iter_names()).all(
Expand All @@ -92,6 +95,7 @@ impl OptimizationRule for SimpleProjectionAndCollapse {
if *input_schema.as_ref() == *columns {
Some(other.clone())
} else {
self.processed.insert(node);
None
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ pub(super) fn process_asof_join(
expr_arena,
)?;

Ok(resolve_join_suffixes(
resolve_join_suffixes(
input_left,
input_right,
left_on,
Expand All @@ -190,7 +190,7 @@ pub(super) fn process_asof_join(
lp_arena,
expr_arena,
&local_projection,
))
)
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -365,7 +365,7 @@ pub(super) fn process_join(
expr_arena,
)?;

Ok(resolve_join_suffixes(
resolve_join_suffixes(
input_left,
input_right,
left_on,
Expand All @@ -374,7 +374,7 @@ pub(super) fn process_join(
lp_arena,
expr_arena,
&local_projection,
))
)
}

fn process_projection(
Expand Down Expand Up @@ -469,13 +469,14 @@ fn resolve_join_suffixes(
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
local_projection: &[ColumnNode],
) -> IR {
) -> PolarsResult<IR> {
let suffix = options.args.suffix();
let alp = IRBuilder::new(input_left, expr_arena, lp_arena)
.join(input_right, left_on, right_on, options.clone())
.build();
let schema_after_join = alp.schema(lp_arena);

let mut all_columns = true;
let projections = local_projection
.iter()
.map(|proj| {
Expand All @@ -484,14 +485,20 @@ fn resolve_join_suffixes(
let downstream_name = &name.as_ref()[..name.len() - suffix.len()];
let col = AExpr::Column(ColumnName::from(downstream_name));
let node = expr_arena.add(col);
all_columns = false;
ExprIR::new(node, OutputName::Alias(name.clone()))
} else {
ExprIR::new(proj.0, OutputName::ColumnLhs(name.clone()))
}
})
.collect::<Vec<_>>();

IRBuilder::from_lp(alp, expr_arena, lp_arena)
.project(projections, Default::default())
.build()
let builder = IRBuilder::from_lp(alp, expr_arena, lp_arena);
Ok(if all_columns {
builder
.project_simple(projections.iter().map(|e| e.output_name()))?
.build()
} else {
builder.project(projections, Default::default()).build()
})
}
36 changes: 36 additions & 0 deletions py-polars/tests/unit/test_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,3 +529,39 @@ def test_projection_literal_no_alias_17739() -> None:
assert df.select(pl.lit(False)).select("literal").collect().to_dict(
as_series=False
) == {"literal": [False]}


def test_projections_collapse_17781() -> None:
frame1 = pl.LazyFrame(
{
"index": [0],
"data1": [0],
"data2": [0],
}
)
frame2 = pl.LazyFrame(
{
"index": [0],
"label1": [True],
"label2": [False],
"label3": [False],
},
schema=[
("index", pl.Int64),
("label1", pl.Boolean),
("label2", pl.Boolean),
("label3", pl.Boolean),
],
)
cols = ["index", "data1", "label1", "label2"]

lf = None
for lfj in [frame1, frame2]:
use_columns = [c for c in cols if c in lfj.collect_schema().names()]
lfj = lfj.select(use_columns)
lfj = lfj.select(use_columns)
if lf is None:
lf = lfj
else:
lf = lf.join(lfj, on="index", how="left")
assert "SELECT " not in lf.explain() # type: ignore[union-attr]

0 comments on commit 6f8b478

Please sign in to comment.