Skip to content

Commit

Permalink
improve push down filter aggregate (databendlabs#13071)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dousir9 authored Sep 29, 2023
1 parent c228a20 commit b17e271
Show file tree
Hide file tree
Showing 24 changed files with 426 additions and 159 deletions.
1 change: 0 additions & 1 deletion src/query/service/tests/it/sql/planner/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ fn test_format() {
}
.into(),
],
is_having: false,
}
.into(),
),
Expand Down
1 change: 0 additions & 1 deletion src/query/sql/src/planner/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ impl Binder {

let filter = Filter {
predicates: vec![scalar.clone()],
is_having: false,
};
debug_assert_eq!(table_expr.plan.rel_op(), RelOp::Scan);
let mut scan = match &*table_expr.plan {
Expand Down
5 changes: 1 addition & 4 deletions src/query/sql/src/planner/binder/having.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,7 @@ impl Binder {

let predicates = split_conjunctions(&scalar);

let filter = Filter {
predicates,
is_having: true,
};
let filter = Filter { predicates };

Ok(SExpr::create_unary(
Arc::new(filter.into()),
Expand Down
2 changes: 0 additions & 2 deletions src/query/sql/src/planner/binder/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ impl Binder {
Arc::new(
Filter {
predicates: left_push_down,
is_having: false,
}
.into(),
),
Expand All @@ -275,7 +274,6 @@ impl Binder {
Arc::new(
Filter {
predicates: right_push_down,
is_having: false,
}
.into(),
),
Expand Down
1 change: 0 additions & 1 deletion src/query/sql/src/planner/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ impl Binder {

let filter_plan = Filter {
predicates: split_conjunctions(&scalar),
is_having: false,
};
let new_expr = SExpr::create_unary(Arc::new(filter_plan.into()), Arc::new(child));
bind_context.set_expr_context(last_expr_context);
Expand Down
5 changes: 1 addition & 4 deletions src/query/sql/src/planner/format/display_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,7 @@ fn format_delete(delete: &DeletePlan) -> Result<String> {
if let Some(selection) = &delete.selection {
predicates.push(selection.clone());
}
let filter = RelOperator::Filter(Filter {
predicates,
is_having: false,
});
let filter = RelOperator::Filter(Filter { predicates });
SExpr::create_unary(Arc::new(filter), Arc::new(scan_expr))
};
let res = s_expr.to_format_tree(&delete.metadata).format_pretty()?;
Expand Down
8 changes: 1 addition & 7 deletions src/query/sql/src/planner/optimizer/heuristic/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ impl SubqueryRewriter {
Arc::new(
Filter {
predicates: left_filters,
is_having: false,
}
.into(),
),
Expand All @@ -262,7 +261,6 @@ impl SubqueryRewriter {
Arc::new(
Filter {
predicates: right_filters,
is_having: false,
}
.into(),
),
Expand Down Expand Up @@ -577,11 +575,7 @@ impl SubqueryRewriter {
predicates.push(self.flatten_scalar(predicate, correlated_columns)?);
}

let filter_plan = Filter {
predicates,
is_having: filter.is_having,
}
.into();
let filter_plan = Filter { predicates }.into();
Ok(SExpr::create_unary(
Arc::new(filter_plan),
Arc::new(flatten_plan),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ impl SubqueryRewriter {
};
let filter = Filter {
predicates: vec![compare.into()],
is_having: false,
};

// Filter: COUNT(*) = 1 or COUNT(*) != 1
Expand Down
6 changes: 1 addition & 5 deletions src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ impl DPhpy {
if !op.non_equi_conditions.is_empty() {
let filter = Filter {
predicates: op.non_equi_conditions.clone(),
is_having: false,
};
self.filters.insert(filter);
}
Expand Down Expand Up @@ -584,10 +583,7 @@ impl DPhpy {
predicates.extend(filter.clone().predicates.iter().cloned())
}
new_s_expr = SExpr::create_unary(
Arc::new(RelOperator::Filter(Filter {
predicates,
is_having: false,
})),
Arc::new(RelOperator::Filter(Filter { predicates })),
Arc::new(new_s_expr),
);
new_s_expr = self.push_down_filter(&new_s_expr)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ pub fn try_derive_predicates(
Arc::new(
Filter {
predicates: left_push_down,
is_having: false,
}
.into(),
),
Expand All @@ -97,7 +96,6 @@ pub fn try_derive_predicates(
Arc::new(
Filter {
predicates: right_push_down,
is_having: false,
}
.into(),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,7 @@ impl Rule for RuleEliminateFilter {
if predicates.is_empty() {
state.add_result(s_expr.child(0)?.clone());
} else if origin_predicates.len() != predicates.len() {
let filter = Filter {
predicates,
is_having: eval_scalar.is_having,
};
let filter = Filter { predicates };
state.add_result(SExpr::create_unary(
Arc::new(filter.into()),
Arc::new(s_expr.child(0)?.clone()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,6 @@ impl Rule for RuleInferFilter {
Arc::new(
Filter {
predicates: new_predicates,
is_having: filter.is_having,
}
.into(),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,7 @@ impl Rule for RuleMergeFilter {
.into_iter()
.chain(down_filter.predicates.into_iter())
.collect();
let merged = Filter {
predicates,
is_having: false,
};
let merged = Filter { predicates };

let new_expr = SExpr::create_unary(
Arc::new(merged.into()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ impl Rule for RuleNormalizeDisjunctiveFilter {
Arc::new(
Filter {
predicates: split_predicates,
is_having: filter.is_having,
}
.into(),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,36 @@ use std::sync::Arc;

use crate::optimizer::rule::Rule;
use crate::optimizer::rule::TransformResult;
use crate::optimizer::RelExpr;
use crate::optimizer::RuleID;
use crate::optimizer::SExpr;
use crate::plans::Aggregate;
use crate::plans::AggregateMode;
use crate::plans::Filter;
use crate::plans::PatternPlan;
use crate::plans::RelOp;
use crate::plans::RelOp::Pattern;
use crate::plans::RelOperator;

/// Heuristic optimizer runs in a bottom-up recursion fashion. If we match a plan like
/// Filter-Aggregate-* and push down filter to Filter(Optional)-Aggregate-Filter-*, this will not
/// work. RuleSplitAggregate will be applied first, since it's bottom up, then this rule, which
/// cause the plan be like Filter(Optional)-Aggregate-Filter-Aggregate-*, which makes no sense.
/// Hence we match 2 bundled Aggregate Ops:
/// Input: Filter
/// \
/// Aggregate(Final or Partial)
/// \
/// *
///
/// Input: Filter
/// Output:
/// (1) Aggregate(Final or Partial)
/// \
/// Aggregate(Final)
/// \
/// Aggregate(Partial)
/// Filter
/// \
/// *
///
/// Output: Filter(Optional)
/// (2)
/// Filter(remaining)
/// \
/// Aggregate(Final)
/// Aggregate(Final or Partial)
/// \
/// Aggregate(Partial)
/// Filter(pushed down)
/// \
/// Filter
/// \
/// *
/// *
pub struct RulePushDownFilterAggregate {
id: RuleID,
patterns: Vec<SExpr>,
Expand All @@ -72,17 +69,9 @@ impl RulePushDownFilterAggregate {
}
.into(),
),
Arc::new(SExpr::create_unary(
Arc::new(
PatternPlan {
plan_type: RelOp::Aggregate,
}
.into(),
),
Arc::new(SExpr::create_leaf(Arc::new(
PatternPlan { plan_type: Pattern }.into(),
))),
)),
Arc::new(SExpr::create_leaf(Arc::new(
PatternPlan { plan_type: Pattern }.into(),
))),
)),
)],
}
Expand All @@ -96,68 +85,54 @@ impl Rule for RulePushDownFilterAggregate {

fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> common_exception::Result<()> {
let filter: Filter = s_expr.plan().clone().try_into()?;
if filter.is_having {
let agg_parent = s_expr.child(0)?;
let agg_parent_plan: Aggregate = agg_parent.plan().clone().try_into()?;
let agg_child = agg_parent.child(0)?;
let agg_child_plan: Aggregate = agg_child.plan().clone().try_into()?;
if agg_parent_plan.mode == AggregateMode::Final
&& agg_child_plan.mode == AggregateMode::Partial
let aggregate_expr = s_expr.child(0)?;
let aggregate: Aggregate = aggregate_expr.plan().clone().try_into()?;
let aggregate_child_prop =
RelExpr::with_s_expr(aggregate_expr).derive_relational_prop_child(0)?;
let aggregate_group_columns = aggregate.group_columns()?;
let mut pushed_down_predicates = vec![];
let mut remaining_predicates = vec![];
for predicate in filter.predicates.into_iter() {
let predicate_used_columns = predicate.used_columns();
if predicate_used_columns.is_subset(&aggregate_child_prop.output_columns)
&& predicate_used_columns.is_subset(&aggregate_group_columns)
{
let mut push_predicates = vec![];
let mut remaining_predicates = vec![];
for predicate in filter.predicates {
let used_columns = predicate.used_columns();
let mut pushable = true;
for col in used_columns {
if !agg_parent_plan.group_columns()?.contains(&col) {
pushable = false;
break;
}
}
if pushable {
push_predicates.push(predicate);
} else {
remaining_predicates.push(predicate);
}
}
let mut result: SExpr;
// No change since nothing can be pushed down.
if push_predicates.is_empty() {
result = s_expr.clone();
} else {
let filter_push_down_expr = SExpr::create_unary(
Arc::new(RelOperator::Filter(Filter {
predicates: push_predicates,
is_having: false,
})),
Arc::new(agg_child.child(0)?.clone()),
);
let agg_with_filter_push_down_expr = SExpr::create_unary(
Arc::new(RelOperator::Aggregate(agg_parent_plan)),
pushed_down_predicates.push(predicate);
} else {
remaining_predicates.push(predicate)
}
}
if !pushed_down_predicates.is_empty() {
let pushed_down_filter = Filter {
predicates: pushed_down_predicates,
};
let mut result = if remaining_predicates.is_empty() {
SExpr::create_unary(
Arc::new(aggregate.into()),
Arc::new(SExpr::create_unary(
Arc::new(pushed_down_filter.into()),
Arc::new(aggregate_expr.child(0)?.clone()),
)),
)
} else {
let remaining_filter = Filter {
predicates: remaining_predicates,
};
SExpr::create_unary(
Arc::new(remaining_filter.into()),
Arc::new(SExpr::create_unary(
Arc::new(aggregate.into()),
Arc::new(SExpr::create_unary(
Arc::new(RelOperator::Aggregate(agg_child_plan)),
Arc::new(filter_push_down_expr),
Arc::new(pushed_down_filter.into()),
Arc::new(aggregate_expr.child(0)?.clone()),
)),
);
// All filters are pushed down.
if remaining_predicates.is_empty() {
result = agg_with_filter_push_down_expr;
} else {
// Partial filter can be pushed down.
result = SExpr::create_unary(
Arc::new(RelOperator::Filter(Filter {
predicates: remaining_predicates,
is_having: true,
})),
Arc::new(agg_with_filter_push_down_expr),
);
}
}
result.set_applied_rule(&self.id);
state.add_result(result);
}
)),
)
};
result.set_applied_rule(&self.id);
state.add_result(result);
}

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;

use crate::optimizer::rule::Rule;
Expand Down Expand Up @@ -91,10 +90,7 @@ impl RulePushDownFilterEvalScalar {
return Ok(item.scalar.clone());
}
}
Err(ErrorCode::UnknownColumn(format!(
"Cannot find column to replace `{}`(#{})",
column.column.column_name, column.column.index
)))
Ok(predicate.clone())
}
ScalarExpr::WindowFunction(window) => {
let func = match &window.func {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ pub fn try_push_down_filter_join(
Arc::new(
Filter {
predicates: original_predicates,
is_having: false,
}
.into(),
),
Expand Down
Loading

0 comments on commit b17e271

Please sign in to comment.