Skip to content

Commit

Permalink
refactor: simplify expr checking in predicate push down
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Nov 7, 2023
1 parent 450fbab commit cb06b1b
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 171 deletions.
2 changes: 1 addition & 1 deletion crates/polars-plan/src/logical_plan/aexpr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl AExpr {
| Take { .. }
| Nth(_)
=> true,
| Alias(_, _)
Alias(_, _)
| Explode(_)
| Column(_)
| Literal(_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,33 +115,31 @@ pub(super) fn process_join(
let mut filter_left = false;
let mut filter_right = false;

// predicate should not have an aggregation or window function as that would
// be influenced by join
#[allow(clippy::suspicious_else_formatting)]
if !predicate_is_pushdown_boundary(predicate, expr_arena) {
if check_input_node(predicate, &schema_left, expr_arena) && !block_pushdown_left {
insert_and_combine_predicate(&mut pushdown_left, predicate, expr_arena);
filter_left = true;
}
assert_aexpr_allows_predicate_pushdown(predicate, expr_arena);

// if the predicate is in the left hand side
// the right hand side should be renamed with the suffix.
// in that case we should not push down as the user wants to filter on `x`
// not on `x_rhs`.
if !filter_left
&& check_input_node(predicate, &schema_right, expr_arena)
&& !block_pushdown_right
// However, if we push down to the left and all predicate columns are also
// join columns, we also push down right
|| filter_left
&& all_pred_cols_in_left_on(predicate, expr_arena, &left_on)
// TODO: Restricting to Inner and Left Join is probably too conservative
&& matches!(&options.args.how, JoinType::Inner | JoinType::Left)
{
insert_and_combine_predicate(&mut pushdown_right, predicate, expr_arena);
filter_right = true;
}
if check_input_node(predicate, &schema_left, expr_arena) && !block_pushdown_left {
insert_and_combine_predicate(&mut pushdown_left, predicate, expr_arena);
filter_left = true;
}

// if the predicate is in the left hand side
// the right hand side should be renamed with the suffix.
// in that case we should not push down as the user wants to filter on `x`
// not on `x_rhs`.
if !filter_left
&& check_input_node(predicate, &schema_right, expr_arena)
&& !block_pushdown_right
// However, if we push down to the left and all predicate columns are also
// join columns, we also push down right
|| filter_left
&& all_pred_cols_in_left_on(predicate, expr_arena, &left_on)
// TODO: Restricting to Inner and Left Join is probably too conservative
&& matches!(&options.args.how, JoinType::Inner | JoinType::Left)
{
insert_and_combine_predicate(&mut pushdown_right, predicate, expr_arena);
filter_right = true;
}

match (filter_left, filter_right, &options.args.how) {
// if not pushed down on one of the tables we have to do it locally.
(false, false, _) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ impl<'a> PredicatePushDown<'a> {
let exprs = lp.get_exprs();

if has_projections {
// we should not pass these projections
// This checks the exprs in the projections at this level.
if exprs
.iter()
.any(|e_n| projection_is_definite_pushdown_boundary(*e_n, expr_arena))
.any(|e_n| aexpr_blocks_predicate_pushdown(*e_n, expr_arena))
{
return self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena);
}
Expand Down Expand Up @@ -211,12 +211,13 @@ impl<'a> PredicatePushDown<'a> {
// filter(y > 1) --> filter(x == min(x)) & filter(y > 2)
// pushdown of filter(y > 2) is correctly stopped at the boundary
//
// Performing this step here should guarantee that acc_predicates
// in all other contexts do not contain a mix of boundary and
// non-boundary predicates.
// Assuming all predicates originate from the `Selection` node
// at the beginning of optimization, applying this step here
// guarantees that boundary predicates will not appear in other
// contexts. Note boundary projections are handled elsewhere.
let local_predicates = if acc_predicates
.values()
.any(|node| predicate_is_pushdown_boundary(*node, expr_arena))
.any(|node| aexpr_blocks_predicate_pushdown(*node, expr_arena))
{
let local_predicates = acc_predicates.values().copied().collect::<Vec<_>>();
acc_predicates.clear();
Expand Down Expand Up @@ -260,13 +261,29 @@ impl<'a> PredicatePushDown<'a> {
file_options: options,
output_schema
} => {
let mut local_predicates = partition_by_full_context(&mut acc_predicates, expr_arena);
if let Some(ref row_count) = options.row_count{
let row_count_predicates = transfer_to_local_by_name(expr_arena, &mut acc_predicates, |name| {
name.as_ref() == row_count.name
});
local_predicates.extend_from_slice(&row_count_predicates);
for node in acc_predicates.values() {
assert_aexpr_allows_predicate_pushdown(*node, expr_arena);
}

let local_predicates = match &scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet { .. } => vec![],
#[cfg(feature = "ipc")]
FileScan::Ipc { .. } => vec![],
_ => {
// Disallow row-count pushdown of other scans as they may
// not update the row counts properly before applying the
// predicate (e.g. FileScan::Csv doesn't).
if let Some(ref row_count) = options.row_count {
let row_count_predicates = transfer_to_local_by_name(expr_arena, &mut acc_predicates, |name| {
name.as_ref() == row_count.name
});
row_count_predicates
} else {
vec![]
}
}
};
let predicate = predicate_at_scan(acc_predicates, predicate, expr_arena);

if let (true, Some(predicate)) = (file_info.hive_parts.is_some(), predicate) {
Expand Down
Loading

0 comments on commit cb06b1b

Please sign in to comment.