Skip to content

Commit

Permalink
Merge pull request #6051 from leiysky/decorrelation
Browse files Browse the repository at this point in the history
feature(optimizer): Decorrelate `EXISTS` subquery
  • Loading branch information
BohuTANG authored Jun 19, 2022
2 parents cf3fbf6 + 5c6e3f9 commit c6e3530
Show file tree
Hide file tree
Showing 21 changed files with 490 additions and 170 deletions.
2 changes: 2 additions & 0 deletions common/ast/src/ast/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ pub enum Expr<'a> {
/// `EXISTS` expression
Exists {
span: &'a [Token<'a>],
/// Indicate if this is a `NOT EXISTS`
not: bool,
subquery: Box<Query<'a>>,
},
/// Scalar subquery, which will only return a single row with a single column.
Expand Down
15 changes: 10 additions & 5 deletions common/ast/src/parser/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ pub enum ExprElement<'a> {
/// `EXISTS` expression
Exists {
subquery: Query<'a>,
not: bool,
},
/// Scalar subquery, which will only return a single row with a single column.
Subquery {
Expand Down Expand Up @@ -421,8 +422,9 @@ impl<'a, I: Iterator<Item = WithSpan<'a, ExprElement<'a>>>> PrattParser<I> for E
results,
else_result,
},
ExprElement::Exists { subquery } => Expr::Exists {
ExprElement::Exists { subquery, not } => Expr::Exists {
span: elem.span.0,
not,
subquery: Box::new(subquery),
},
ExprElement::Subquery { subquery } => Expr::Subquery {
Expand Down Expand Up @@ -761,8 +763,11 @@ pub fn expr_element(i: Input) -> IResult<WithSpan<ExprElement>> {
},
);
let exists = map(
rule! { EXISTS ~ ^"(" ~ ^#query ~ ^")" },
|(_, _, subquery, _)| ExprElement::Exists { subquery },
rule! { NOT? ~ EXISTS ~ ^"(" ~ ^#query ~ ^")" },
|(opt_not, _, _, subquery, _)| ExprElement::Exists {
subquery,
not: opt_not.is_some(),
},
);
let subquery = map(
rule! {
Expand Down Expand Up @@ -848,10 +853,11 @@ pub fn expr_element(i: Input) -> IResult<WithSpan<ExprElement>> {
|(_, _, expr1, _, expr2, _)| ExprElement::IfNull { expr1, expr2 },
);
let (rest, (span, elem)) = consumed(alt((
rule! (
rule!(
#is_null : "`... IS [NOT] NULL`"
| #in_list : "`[NOT] IN (<expr>, ...)`"
| #in_subquery : "`[NOT] IN (SELECT ...)`"
| #exists : "`[NOT] EXISTS (SELECT ...)`"
| #between : "`[NOT] BETWEEN ... AND ...`"
| #binary_op : "<operator>"
| #unary_op : "<operator>"
Expand All @@ -875,7 +881,6 @@ pub fn expr_element(i: Input) -> IResult<WithSpan<ExprElement>> {
| #function_call : "<function>"
| #literal : "<literal>"
| #case : "`CASE ... END`"
| #exists : "`EXISTS (SELECT ...)`"
| #subquery : "`(SELECT ...)`"
| #group
| #column_ref : "<column>"
Expand Down
181 changes: 181 additions & 0 deletions query/src/sql/optimizer/heuristic/decorrelate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_datavalues::type_coercion::merge_types;
use common_exception::Result;

use crate::sql::binder::wrap_cast_if_needed;
use crate::sql::binder::JoinCondition;
use crate::sql::optimizer::heuristic::subquery_rewriter::SubqueryRewriter;
use crate::sql::optimizer::RelExpr;
use crate::sql::optimizer::SExpr;
use crate::sql::plans::Filter;
use crate::sql::plans::JoinType;
use crate::sql::plans::LogicalInnerJoin;
use crate::sql::plans::PatternPlan;
use crate::sql::plans::RelOp;
use crate::sql::plans::SubqueryExpr;
use crate::sql::plans::SubqueryType;
use crate::sql::MetadataRef;
use crate::sql::ScalarExpr;

/// Decorrelate subqueries inside `s_expr`.
///
/// It will first hoist all the subqueries from `Scalar`s, and transform the hoisted operator
/// into `CrossApply`(if the subquery is correlated) or `CrossJoin`(if the subquery is uncorrelated).
///
/// After hoisted all the subqueries, we will try to decorrelate the subqueries by pushing the `CrossApply`
/// down. Get more detail by reading the paper `Orthogonal Optimization of Subqueries and Aggregation`,
/// which published by Microsoft SQL Server team.
pub fn decorrelate_subquery(metadata: MetadataRef, s_expr: SExpr) -> Result<SExpr> {
let mut rewriter = SubqueryRewriter::new(metadata);
let hoisted = rewriter.rewrite(&s_expr)?;

Ok(hoisted)
}

// Try to decorrelate a `CrossApply` into `SemiJoin` or `AntiJoin`.
// We only do simple decorrelation here, the scheme is:
// 1. If the subquery is correlated, we will try to decorrelate it into `SemiJoin`
pub fn try_decorrelate_subquery(input: &SExpr, subquery: &SubqueryExpr) -> Result<Option<SExpr>> {
if subquery.outer_columns.is_empty() {
return Ok(None);
}

// TODO(leiysky): this is the canonical plan generated by Binder, we should find a proper
// way to address such a pattern.
//
// Project
// \
// EvalScalar
// \
// Filter
// \
// Get
let pattern = SExpr::create_unary(
PatternPlan {
plan_type: RelOp::Project,
}
.into(),
SExpr::create_unary(
PatternPlan {
plan_type: RelOp::EvalScalar,
}
.into(),
SExpr::create_unary(
PatternPlan {
plan_type: RelOp::Filter,
}
.into(),
SExpr::create_leaf(
PatternPlan {
plan_type: RelOp::LogicalGet,
}
.into(),
),
),
),
);

if !subquery.subquery.match_pattern(&pattern) {
return Ok(None);
}

let filter_tree = subquery
.subquery // Project
.child(0)? // EvalScalar
.child(0)?; // Filter
let filter_expr = RelExpr::with_s_expr(filter_tree);
let filter: Filter = subquery
.subquery // Project
.child(0)? // EvalScalar
.child(0)? // Filter
.plan()
.clone()
.try_into()?;
let filter_prop = filter_expr.derive_relational_prop()?;
let filter_child_prop = filter_expr.derive_relational_prop_child(0)?;

let input_expr = RelExpr::with_s_expr(input);
let input_prop = input_expr.derive_relational_prop()?;

// First, we will check if all the outer columns are in the filter.
if !filter_child_prop.outer_columns.is_empty() {
return Ok(None);
}

// Second, we will check if the filter only contains equi-predicates.
// This is not necessary, but it is a good heuristic for most cases.
let mut left_conditions = vec![];
let mut right_conditions = vec![];
let mut extra_predicates = vec![];
for pred in filter.predicates.iter() {
let join_condition = JoinCondition::new(pred, &input_prop, &filter_prop);
match join_condition {
JoinCondition::Other(_) => {
// We don't allow to evaluate non-equi predicate in hash join for now.
return Ok(None);
}

JoinCondition::Left(_) | JoinCondition::Right(_) => {
extra_predicates.push(pred.clone());
}

JoinCondition::Both { left, right } => {
let join_type = merge_types(&left.data_type(), &right.data_type())?;
let left = wrap_cast_if_needed(left.clone(), &join_type);
let right = wrap_cast_if_needed(right.clone(), &join_type);
left_conditions.push(left);
right_conditions.push(right);
}
}
}

let semi_join = LogicalInnerJoin {
left_conditions,
right_conditions,
join_type: match &subquery.typ {
SubqueryType::Any | SubqueryType::All | SubqueryType::Scalar => {
return Ok(None);
}
SubqueryType::Exists => JoinType::Semi,
SubqueryType::NotExists => JoinType::Anti,
},
};

// Rewrite plan to semi-join.
let left_child = input.clone();
// Remove `Filter` from subquery.
let right_child = SExpr::create_unary(
subquery.subquery.plan().clone(),
SExpr::create_unary(
subquery.subquery.child(0)?.plan().clone(),
SExpr::create_leaf(filter_tree.child(0)?.plan().clone()),
),
);
let mut result = SExpr::create_binary(semi_join.into(), left_child, right_child);

if !extra_predicates.is_empty() {
result = SExpr::create_unary(
Filter {
predicates: extra_predicates,
is_having: false,
}
.into(),
result,
);
}

Ok(Some(result))
}
33 changes: 29 additions & 4 deletions query/src/sql/optimizer/heuristic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod decorrelate;
mod implement;
mod rule_list;
mod subquery_rewriter;

use std::sync::Arc;

use common_exception::Result;
use lazy_static::lazy_static;

use super::rule::RuleID;
use crate::sessions::QueryContext;
use crate::sql::optimizer::heuristic::decorrelate::decorrelate_subquery;
use crate::sql::optimizer::heuristic::implement::HeuristicImplementor;
pub use crate::sql::optimizer::heuristic::rule_list::RuleList;
use crate::sql::optimizer::rule::TransformState;
use crate::sql::optimizer::SExpr;
use crate::sql::MetadataRef;

lazy_static! {
pub static ref DEFAULT_REWRITE_RULES: Vec<RuleID> = vec![
Expand All @@ -44,19 +51,37 @@ lazy_static! {
pub struct HeuristicOptimizer {
rules: RuleList,
implementor: HeuristicImplementor,

_ctx: Arc<QueryContext>,
metadata: MetadataRef,
}

impl HeuristicOptimizer {
pub fn new(rules: RuleList) -> Self {
pub fn new(ctx: Arc<QueryContext>, metadata: MetadataRef, rules: RuleList) -> Self {
HeuristicOptimizer {
rules,
implementor: HeuristicImplementor::new(),

_ctx: ctx,
metadata,
}
}

pub fn optimize(&mut self, expression: SExpr) -> Result<SExpr> {
let optimized = self.optimize_expression(&expression)?;
let result = self.implement_expression(&optimized)?;
fn pre_optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
let result = decorrelate_subquery(self.metadata.clone(), s_expr)?;

Ok(result)
}

fn post_optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
Ok(s_expr)
}

pub fn optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
let pre_optimized = self.pre_optimize(s_expr)?;
let optimized = self.optimize_expression(&pre_optimized)?;
let post_optimized = self.post_optimize(optimized)?;
let result = self.implement_expression(&post_optimized)?;
Ok(result)
}

Expand Down
Loading

0 comments on commit c6e3530

Please sign in to comment.