Skip to content

Commit

Permalink
fix: fix projection pushdown for new outer join schema (#13527)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jan 8, 2024
1 parent ab6a7cc commit ac5c6af
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 15 deletions.
31 changes: 17 additions & 14 deletions crates/polars-plan/src/logical_plan/optimizer/fast_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,30 @@ fn impl_fast_projection(
expr_arena: &Arena<AExpr>,
duplicate_check: bool,
) -> Option<ALogicalPlan> {
// First check if we can apply the optimization before we allocate.
if !expr
.iter()
.all(|node| matches!(expr_arena.get(*node), AExpr::Column(_)))
{
return None;
}

let mut columns = Vec::with_capacity(expr.len());
for node in expr.iter() {
if let AExpr::Column(name) = expr_arena.get(*node) {
columns.push(SmartString::from(name.as_ref()))
} else {
break;
unreachable!();
}
}
if columns.len() == expr.len() {
let lp = ALogicalPlan::MapFunction {
input,
function: FunctionNode::FastProjection {
columns: Arc::from(columns),
duplicate_check,
},
};

Some(lp)
} else {
None
}
let lp = ALogicalPlan::MapFunction {
input,
function: FunctionNode::FastProjection {
columns: Arc::from(columns),
duplicate_check,
},
};
Some(lp)
}

impl OptimizationRule for FastProjectionAndCollapse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,27 @@ pub(super) fn process_join(
// In outer joins both columns remain. So `add_local=true` also for the right table
let add_local = matches!(options.args.how, JoinType::Outer { coalesce: false });
for e in &right_on {
add_keys_to_accumulated_state(
// In case of outer joins we also add the columns.
// But before we do that we must check if the column wasn't already added by the lhs.
let add_local = if add_local {
let name = aexpr_to_leaf_name(*e, expr_arena);
!already_added_local_to_local_projected.contains(name.as_ref())
} else {
false
};

let local_name = add_keys_to_accumulated_state(
*e,
&mut pushdown_right,
&mut local_projection,
&mut names_right,
expr_arena,
add_local,
);

if let Some(local_name) = local_name {
already_added_local_to_local_projected.insert(local_name);
}
}

for proj in acc_projections {
Expand Down
8 changes: 8 additions & 0 deletions py-polars/tests/unit/test_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,11 @@ def test_schema_outer_join_projection_pd_13287() -> None:
).with_columns(
pl.col("a").fill_null(pl.col("c")),
).select("a").collect().to_dict(as_series=False) == {"a": [2, 3, 1, 1]}


def test_projection_pushdown_outer_join_duplicates() -> None:
df1 = pl.DataFrame({"a": [1, 2, 3], "b": [10, 20, 30]}).lazy()
df2 = pl.DataFrame({"a": [1, 2, 3], "b": [10, 20, 30]}).lazy()
assert (
df1.join(df2, on="a", how="outer").with_columns(c=0).select("a", "c").collect()
).to_dict(as_series=False) == {"a": [1, 2, 3], "c": [0, 0, 0]}

0 comments on commit ac5c6af

Please sign in to comment.