Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(optimizer): Decorrelate EXISTS subquery #6051

Merged
merged 3 commits into from
Jun 19, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/ast/src/ast/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ pub enum Expr<'a> {
/// `EXISTS` expression
Exists {
span: &'a [Token<'a>],
/// Indicate if this is a `NOT EXISTS`
not: bool,
subquery: Box<Query<'a>>,
},
/// Scalar subquery, which will only return a single row with a single column.
Expand Down
15 changes: 10 additions & 5 deletions common/ast/src/parser/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ pub enum ExprElement<'a> {
/// `EXISTS` expression
Exists {
subquery: Query<'a>,
not: bool,
},
/// Scalar subquery, which will only return a single row with a single column.
Subquery {
Expand Down Expand Up @@ -421,8 +422,9 @@ impl<'a, I: Iterator<Item = WithSpan<'a, ExprElement<'a>>>> PrattParser<I> for E
results,
else_result,
},
ExprElement::Exists { subquery } => Expr::Exists {
ExprElement::Exists { subquery, not } => Expr::Exists {
span: elem.span.0,
not,
subquery: Box::new(subquery),
},
ExprElement::Subquery { subquery } => Expr::Subquery {
Expand Down Expand Up @@ -761,8 +763,11 @@ pub fn expr_element(i: Input) -> IResult<WithSpan<ExprElement>> {
},
);
let exists = map(
rule! { EXISTS ~ ^"(" ~ ^#query ~ ^")" },
|(_, _, subquery, _)| ExprElement::Exists { subquery },
rule! { NOT? ~ EXISTS ~ ^"(" ~ ^#query ~ ^")" },
|(opt_not, _, _, subquery, _)| ExprElement::Exists {
subquery,
not: opt_not.is_some(),
},
);
let subquery = map(
rule! {
Expand Down Expand Up @@ -848,10 +853,11 @@ pub fn expr_element(i: Input) -> IResult<WithSpan<ExprElement>> {
|(_, _, expr1, _, expr2, _)| ExprElement::IfNull { expr1, expr2 },
);
let (rest, (span, elem)) = consumed(alt((
rule! (
rule!(
#is_null : "`... IS [NOT] NULL`"
| #in_list : "`[NOT] IN (<expr>, ...)`"
| #in_subquery : "`[NOT] IN (SELECT ...)`"
| #exists : "`[NOT] EXISTS (SELECT ...)`"
| #between : "`[NOT] BETWEEN ... AND ...`"
| #binary_op : "<operator>"
| #unary_op : "<operator>"
Expand All @@ -875,7 +881,6 @@ pub fn expr_element(i: Input) -> IResult<WithSpan<ExprElement>> {
| #function_call : "<function>"
| #literal : "<literal>"
| #case : "`CASE ... END`"
| #exists : "`EXISTS (SELECT ...)`"
| #subquery : "`(SELECT ...)`"
| #group
| #column_ref : "<column>"
Expand Down
181 changes: 181 additions & 0 deletions query/src/sql/optimizer/heuristic/decorrelate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_datavalues::type_coercion::merge_types;
use common_exception::Result;

use crate::sql::binder::wrap_cast_if_needed;
use crate::sql::binder::JoinCondition;
use crate::sql::optimizer::heuristic::subquery_rewriter::SubqueryRewriter;
use crate::sql::optimizer::RelExpr;
use crate::sql::optimizer::SExpr;
use crate::sql::plans::Filter;
use crate::sql::plans::JoinType;
use crate::sql::plans::LogicalInnerJoin;
use crate::sql::plans::PatternPlan;
use crate::sql::plans::RelOp;
use crate::sql::plans::SubqueryExpr;
use crate::sql::plans::SubqueryType;
use crate::sql::MetadataRef;
use crate::sql::ScalarExpr;

/// Decorrelate subqueries inside `s_expr`.
///
/// It will first hoist all the subqueries from `Scalar`s, and transform the hoisted operator
/// into `CrossApply`(if the subquery is correlated) or `CrossJoin`(if the subquery is uncorrelated).
///
/// After hoisted all the subqueries, we will try to decorrelate the subqueries by pushing the `CrossApply`
/// down. Get more detail by reading the paper `Orthogonal Optimization of Subqueries and Aggregation`,
/// which published by Microsoft SQL Server team.
pub fn decorrelate_subquery(metadata: MetadataRef, s_expr: SExpr) -> Result<SExpr> {
let mut rewriter = SubqueryRewriter::new(metadata);
let hoisted = rewriter.rewrite(&s_expr)?;

Ok(hoisted)
}

// Try to decorrelate a `CrossApply` into `SemiJoin` or `AntiJoin`.
// We only do simple decorrelation here, the scheme is:
// 1. If the subquery is correlated, we will try to decorrelate it into `SemiJoin`
pub fn try_decorrelate_subquery(input: &SExpr, subquery: &SubqueryExpr) -> Result<Option<SExpr>> {
if subquery.outer_columns.is_empty() {
return Ok(None);
}

// TODO(leiysky): this is the canonical plan generated by Binder, we should find a proper
// way to address such a pattern.
//
// Project
// \
// EvalScalar
// \
// Filter
// \
// Get
let pattern = SExpr::create_unary(
PatternPlan {
plan_type: RelOp::Project,
}
.into(),
SExpr::create_unary(
PatternPlan {
plan_type: RelOp::EvalScalar,
}
.into(),
SExpr::create_unary(
PatternPlan {
plan_type: RelOp::Filter,
}
.into(),
SExpr::create_leaf(
PatternPlan {
plan_type: RelOp::LogicalGet,
}
.into(),
),
),
),
);

if !subquery.subquery.match_pattern(&pattern) {
return Ok(None);
}

let filter_tree = subquery
.subquery // Project
.child(0)? // EvalScalar
.child(0)?; // Filter
let filter_expr = RelExpr::with_s_expr(filter_tree);
let filter: Filter = subquery
.subquery // Project
.child(0)? // EvalScalar
.child(0)? // Filter
.plan()
.clone()
.try_into()?;
let filter_prop = filter_expr.derive_relational_prop()?;
let filter_child_prop = filter_expr.derive_relational_prop_child(0)?;

let input_expr = RelExpr::with_s_expr(input);
let input_prop = input_expr.derive_relational_prop()?;

// First, we will check if all the outer columns are in the filter.
if !filter_child_prop.outer_columns.is_empty() {
return Ok(None);
}

// Second, we will check if the filter only contains equi-predicates.
// This is not necessary, but it is a good heuristic for most cases.
let mut left_conditions = vec![];
let mut right_conditions = vec![];
let mut extra_predicates = vec![];
for pred in filter.predicates.iter() {
let join_condition = JoinCondition::new(pred, &input_prop, &filter_prop);
match join_condition {
JoinCondition::Other(_) => {
// We don't allow to evaluate non-equi predicate in hash join for now.
return Ok(None);
}

JoinCondition::Left(_) | JoinCondition::Right(_) => {
extra_predicates.push(pred.clone());
}

JoinCondition::Both { left, right } => {
let join_type = merge_types(&left.data_type(), &right.data_type())?;
let left = wrap_cast_if_needed(left.clone(), &join_type);
let right = wrap_cast_if_needed(right.clone(), &join_type);
left_conditions.push(left);
right_conditions.push(right);
}
}
}

let semi_join = LogicalInnerJoin {
left_conditions,
right_conditions,
join_type: match &subquery.typ {
SubqueryType::Any | SubqueryType::All | SubqueryType::Scalar => {
return Ok(None);
}
SubqueryType::Exists => JoinType::Semi,
SubqueryType::NotExists => JoinType::Anti,
},
};

// Rewrite plan to semi-join.
let left_child = input.clone();
// Remove `Filter` from subquery.
let right_child = SExpr::create_unary(
subquery.subquery.plan().clone(),
SExpr::create_unary(
subquery.subquery.child(0)?.plan().clone(),
SExpr::create_leaf(filter_tree.child(0)?.plan().clone()),
),
);
let mut result = SExpr::create_binary(semi_join.into(), left_child, right_child);

if !extra_predicates.is_empty() {
result = SExpr::create_unary(
Filter {
predicates: extra_predicates,
is_having: false,
}
.into(),
result,
);
}

Ok(Some(result))
}
33 changes: 29 additions & 4 deletions query/src/sql/optimizer/heuristic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod decorrelate;
mod implement;
mod rule_list;
mod subquery_rewriter;

use std::sync::Arc;

use common_exception::Result;
use lazy_static::lazy_static;

use super::rule::RuleID;
use crate::sessions::QueryContext;
use crate::sql::optimizer::heuristic::decorrelate::decorrelate_subquery;
use crate::sql::optimizer::heuristic::implement::HeuristicImplementor;
pub use crate::sql::optimizer::heuristic::rule_list::RuleList;
use crate::sql::optimizer::rule::TransformState;
use crate::sql::optimizer::SExpr;
use crate::sql::MetadataRef;

lazy_static! {
pub static ref DEFAULT_REWRITE_RULES: Vec<RuleID> = vec![
Expand All @@ -44,19 +51,37 @@ lazy_static! {
pub struct HeuristicOptimizer {
rules: RuleList,
implementor: HeuristicImplementor,

_ctx: Arc<QueryContext>,
metadata: MetadataRef,
}

impl HeuristicOptimizer {
pub fn new(rules: RuleList) -> Self {
pub fn new(ctx: Arc<QueryContext>, metadata: MetadataRef, rules: RuleList) -> Self {
HeuristicOptimizer {
rules,
implementor: HeuristicImplementor::new(),

_ctx: ctx,
metadata,
}
}

pub fn optimize(&mut self, expression: SExpr) -> Result<SExpr> {
let optimized = self.optimize_expression(&expression)?;
let result = self.implement_expression(&optimized)?;
fn pre_optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
let result = decorrelate_subquery(self.metadata.clone(), s_expr)?;

Ok(result)
}

fn post_optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
Ok(s_expr)
}

pub fn optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
let pre_optimized = self.pre_optimize(s_expr)?;
let optimized = self.optimize_expression(&pre_optimized)?;
let post_optimized = self.post_optimize(optimized)?;
let result = self.implement_expression(&post_optimized)?;
Ok(result)
}

Expand Down
Loading