Skip to content

Commit

Permalink
fix(rust, python): don't count hstack as projection in cse
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 3, 2022
1 parent 6669a84 commit 198a6be
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ fn get_upper_projections(
let parent = lp_arena.get(parent);

use ALogicalPlan::*;
// during projection pushdown all accumulated p
// during projection pushdown all accumulated
match parent {
Projection { expr, .. } | HStack { exprs: expr, .. } => {
Projection { expr, .. } => {
let mut out = Vec::with_capacity(expr.len());
for node in expr {
out.extend(aexpr_to_leaf_names_iter(*node, expr_arena));
Expand All @@ -39,8 +39,9 @@ pub(super) fn set_cache_states(
scratch.clear();

// per cache id holds:
// a Vec: with (parent, child) pairs
// a Set: with the union of column names.
// a Vec: with children of the node
// a Set: with the union of projected column names.
// a Set: with the union of hstack column names.
let mut cache_schema_and_children = BTreeMap::new();

let mut stack = Vec::with_capacity(4);
Expand All @@ -49,23 +50,23 @@ pub(super) fn set_cache_states(
// first traversal
// collect the union of columns per cache id.
// and find the cache parents
while let Some((node, mut cache_id, mut parent, mut previous_cache)) = stack.pop() {
let lp = lp_arena.get(node);
while let Some((current_node, mut cache_id, mut parent, mut previous_cache)) = stack.pop() {
let lp = lp_arena.get(current_node);
lp.copy_inputs(scratch);

use ALogicalPlan::*;
match lp {
// don't allow parallelism as caches need eachothers work
// also self-referencing plans can deadlock on the files they lock
Join { options, .. } if has_caches && options.allow_parallel => {
if let Join { options, .. } = lp_arena.get_mut(node) {
if let Join { options, .. } = lp_arena.get_mut(current_node) {
options.allow_parallel = false;
}
}
// don't allow parallelism as caches need eachothers work
// also self-referencing plans can deadlock on the files they lock
Union { options, .. } if has_caches && options.parallel => {
if let Union { options, .. } = lp_arena.get_mut(node) {
if let Union { options, .. } = lp_arena.get_mut(current_node) {
options.parallel = false;
}
}
Expand All @@ -81,23 +82,40 @@ pub(super) fn set_cache_states(
// we never want to naively take parents, as a join or aggregate for instance
// change the schema

let entry = cache_schema_and_children.entry(*id).or_insert_with(|| {
(
Vec::new(),
PlIndexSet::with_capacity_and_hasher(0, Default::default()),
)
});
entry.0.push(*input);
let (children, union_names) =
cache_schema_and_children.entry(*id).or_insert_with(|| {
(
Vec::new(),
PlIndexSet::with_capacity_and_hasher(0, Default::default()),
)
});
children.push(*input);

if let Some(names) = get_upper_projections(parent_node, lp_arena, expr_arena) {
entry.1.extend(names);
union_names.extend(names);
}
// There was no projection and we must take
// a previous cache has projections, but projection pushdown has not ran further.
// so we take the projections of the cache above that are available in the schema
// of this node
// TODO! maybe we should restart projection pushdown on encountering cache nodes?
else if let Some(previous_cache) = previous_cache {
let schema = lp.schema(lp_arena);
let (_, union_names_previous) =
cache_schema_and_children.get_mut(&previous_cache).unwrap();
// we cannot directly extend as we borrow as mut
#[allow(clippy::needless_collect)]
let tmp: Vec<_> = union_names_previous
.iter()
.filter(|name| schema.get(name).is_some())
.cloned()
.collect();
union_names_previous.extend(tmp.into_iter())
}
// There was no explicit projection and we must take
// all columns
else {
let schema = lp.schema(lp_arena);
entry
.1
union_names
.extend(schema.iter_names().map(|name| Arc::from(name.as_str())));
}
}
Expand All @@ -106,7 +124,7 @@ pub(super) fn set_cache_states(
_ => {}
}

parent = Some(node);
parent = Some(current_node);
for n in scratch.iter() {
stack.push((*n, cache_id, parent, previous_cache))
}
Expand Down
30 changes: 30 additions & 0 deletions polars/polars-lazy/src/tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2049,3 +2049,33 @@ fn test_partitioned_gb_ternary() -> PolarsResult<()> {

Ok(())
}

#[test]
#[cfg(feature = "cross_join")]
fn test_cse_columns_projections() -> PolarsResult<()> {
let right = df![
"A" => [1, 2],
"B" => [3, 4],
"D" => [5, 6]
]?
.lazy();

let left = df![
"C" => [3, 4],
]?
.lazy();

let left = left.cross_join(right.clone().select([col("A")]));
let q = left.join(
right.rename(["B"], ["C"]),
[col("A"), col("C")],
[col("A"), col("C")],
JoinType::Left,
);

let out = q.collect()?;

assert_eq!(out.get_column_names(), &["C", "A", "D"]);

Ok(())
}
15 changes: 15 additions & 0 deletions py-polars/tests/unit/test_cse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import polars as pl


def test_cse_rename_cross_join_5405() -> None:
right = pl.DataFrame({"A": [1, 2], "B": [3, 4], "D": [5, 6]}).lazy()

left = pl.DataFrame({"C": [3, 4]}).lazy().join(right.select("A"), how="cross")

out = left.join(right.rename({"B": "C"}), on=["A", "C"], how="left")

assert out.collect(common_subplan_elimination=True).to_dict(False) == {
"C": [3, 3, 4, 4],
"A": [1, 2, 1, 2],
"D": [5, None, None, 6],
}

0 comments on commit 198a6be

Please sign in to comment.