Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Nov 27, 2023
1 parent eaae0e8 commit 327f4fd
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,59 @@ impl<'a> PredicatePushDown<'a> {
let input = inputs[inputs.len() - 1];
let input_schema = lp_arena.get(input).schema(lp_arena);

if let Some(local_predicates) = try_partial_pushdown_with_projections(
input_schema.as_ref(),
&exprs,
&mut acc_predicates,
expr_arena,
)? {
let alp = lp_arena.take(input);
let alp = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;
lp_arena.replace(input, alp);
let (pushdown_eligibility, alias_rename_map) =
pushdown_eligibility(input_schema.as_ref(), &exprs, &acc_predicates, expr_arena)?;

let local_predicates = match pushdown_eligibility {
PushdownEligibility::FullPushdown => vec![],
PushdownEligibility::PartialPushdown(allowed) => {
let mut out = Vec::<Node>::with_capacity(allowed.len());
for key in allowed {
out.push(acc_predicates.remove(&key).unwrap());
}
out
},
PushdownEligibility::NoPushdown => {
return self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
},
};

if !alias_rename_map.is_empty() {
for (_, node) in acc_predicates.iter_mut() {
let mut needs_rename = false;

for (_, ae) in (&*expr_arena).iter(*node) {
if let AExpr::Column(name) = ae {
needs_rename |= alias_rename_map.contains_key(name);

if needs_rename {
break;
}
}
}

let lp = lp.with_exprs_and_input(exprs, inputs);
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
} else {
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
if needs_rename {
let mut new_expr = node_to_expr(*node, expr_arena);
new_expr.mutate().apply(|e| {
if let Expr::Column(name) = e {
if let Some(rename_to) = alias_rename_map.get(name) {
*name = rename_to.clone();
};
};
true
});
let predicate = to_aexpr(new_expr, expr_arena);
*node = predicate;
}
}
}

let alp = lp_arena.take(input);
let alp = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;
lp_arena.replace(input, alp);

let lp = lp.with_exprs_and_input(exprs, inputs);
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
} else {
let mut local_predicates = Vec::with_capacity(acc_predicates.len());

Expand Down Expand Up @@ -215,16 +253,21 @@ impl<'a> PredicatePushDown<'a> {
// at the beginning of optimization, applying this step here
// guarantees that boundary predicates will not appear in other
// contexts. Note boundary projections are handled elsewhere.
let local_predicates = if let Some(out) = try_partial_pushdown(
lp.schema(lp_arena).as_ref(),
&mut acc_predicates,
expr_arena,
)? {
out
} else {
let out = acc_predicates.values().copied().collect();
acc_predicates.clear();
out

let local_predicates = match pushdown_eligibility(lp.schema(lp_arena).as_ref(), &vec![], &acc_predicates, expr_arena)?.0 {
PushdownEligibility::FullPushdown => vec![],
PushdownEligibility::PartialPushdown(allowed) => {
let mut out = Vec::<Node>::with_capacity(allowed.len());
for key in allowed {
out.push(acc_predicates.remove(&key).unwrap());
}
out
},
PushdownEligibility::NoPushdown => {
let out = acc_predicates.values().copied().collect();
acc_predicates.clear();
out
},
};

let alp = lp_arena.take(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,14 @@ fn get_maybe_aliased_projection_to_input_name_map(

pub enum PushdownEligibility {
FullPushdown,
PartialPushdown(Vec<Node>),
PartialPushdown(Vec<Arc<str>>),
NoPushdown,
}

pub fn pushdown_eligibility<'a, I>(
pub fn pushdown_eligibility(
input_schema: &Schema,
projection_nodes: &Vec<Node>,
predicate_nodes: &Vec<Node>,
acc_predicates: &PlHashMap<Arc<str>, Node>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<(PushdownEligibility, PlHashMap<Arc<str>, Arc<str>>)> {
let mut ae_nodes_stack = Vec::<Node>::with_capacity(4);
Expand All @@ -265,7 +265,9 @@ pub fn pushdown_eligibility<'a, I>(
// Important: Names inserted into any data structure by this function are
// all non-aliased.
// This function returns false if pushdown cannot be performed.
let mut process_node = |ae_nodes_stack: &mut Vec<Node>| {
let process_node = |ae_nodes_stack: &mut Vec<Node>,
has_window: &mut bool,
common_window_inputs: &mut PlHashSet<Arc<str>>| {
debug_assert_eq!(ae_nodes_stack.len(), 1);

while let Some(node) = ae_nodes_stack.pop() {
Expand Down Expand Up @@ -305,12 +307,12 @@ pub fn pushdown_eligibility<'a, I>(
}
}

if !has_window {
if !*has_window {
for name in partition_by_names.into_iter() {
common_window_inputs.insert(name);
}

has_window = true;
*has_window = true;
} else {
common_window_inputs.retain(|k| partition_by_names.contains(k))
}
Expand Down Expand Up @@ -339,7 +341,7 @@ pub fn pushdown_eligibility<'a, I>(
get_maybe_aliased_projection_to_input_name_map(*node, expr_arena)
{
if alias != column_name {
alias_to_col_map.insert(alias, column_name);
alias_to_col_map.insert(alias.clone(), column_name.clone());
col_to_alias_map.insert(column_name, alias);
}
continue;
Expand All @@ -356,7 +358,11 @@ pub fn pushdown_eligibility<'a, I>(
debug_assert!(ae_nodes_stack.is_empty());
ae_nodes_stack.push(*node);

if !process_node(&mut ae_nodes_stack) {
if !process_node(
&mut ae_nodes_stack,
&mut has_window,
&mut common_window_inputs,
) {
return Ok((PushdownEligibility::NoPushdown, alias_to_col_map));
}
}
Expand Down Expand Up @@ -385,17 +391,26 @@ pub fn pushdown_eligibility<'a, I>(
common_window_inputs = new;
}

for node in predicate_nodes.iter() {
for node in acc_predicates.values() {
debug_assert!(ae_nodes_stack.is_empty());
ae_nodes_stack.push(*node);

if !process_node(&mut ae_nodes_stack) {
if !process_node(
&mut ae_nodes_stack,
&mut has_window,
&mut common_window_inputs,
) {
return Ok((PushdownEligibility::NoPushdown, alias_to_col_map));
}
}

// Should have returned early.
debug_assert!(!common_window_inputs.is_empty() || !has_window);

if !has_window && projection_nodes.is_empty() {
return Ok((PushdownEligibility::FullPushdown, alias_to_col_map));
}

// Note: has_window is constant.
let can_use_column = |col: &Arc<str>| {
if has_window {
Expand All @@ -405,10 +420,9 @@ pub fn pushdown_eligibility<'a, I>(
}
};

let allowed_predicates = predicate_nodes
let allowed_predicates = acc_predicates
.iter()
.copied()
.filter(|&node| {
.filter_map(|(&ref key, &node)| {
debug_assert!(ae_nodes_stack.is_empty());

ae_nodes_stack.push(node);
Expand All @@ -431,17 +445,24 @@ pub fn pushdown_eligibility<'a, I>(
}

ae_nodes_stack.clear();
can_pushdown

if can_pushdown {
Some(key.clone())
} else {
None
}
})
.collect::<Vec<_>>();

if allowed_predicates.is_empty() {
Ok((PushdownEligibility::NoPushdown, alias_to_col_map))
} else {
Ok((
match allowed_predicates.len() {
0 => Ok((PushdownEligibility::NoPushdown, alias_to_col_map)),
len if len == acc_predicates.len() => {
Ok((PushdownEligibility::FullPushdown, alias_to_col_map))
},
_ => Ok((
PushdownEligibility::PartialPushdown(allowed_predicates),
alias_to_col_map,
))
)),
}
}

Expand Down

0 comments on commit 327f4fd

Please sign in to comment.