From 94e3891e03842301ad9f81d05500a34068e3b678 Mon Sep 17 00:00:00 2001 From: leiysky Date: Sat, 18 Jun 2022 18:31:27 +0800 Subject: [PATCH 1/2] decorrelate exists subquery --- common/ast/src/ast/expr.rs | 2 + common/ast/src/parser/expr.rs | 15 +- .../sql/optimizer/heuristic/decorrelate.rs | 181 ++++++++++++++++++ query/src/sql/optimizer/heuristic/mod.rs | 33 +++- .../heuristic/subquery_rewriter.rs} | 175 ++++++++++------- query/src/sql/optimizer/mod.rs | 21 +- .../rule/rewrite/rule_merge_project.rs | 10 +- .../rewrite/rule_push_down_filter_join.rs | 68 +------ query/src/sql/planner/binder/mod.rs | 6 +- query/src/sql/planner/binder/scalar_common.rs | 51 +++++ query/src/sql/planner/mod.rs | 2 +- query/src/sql/planner/semantic/type_check.rs | 15 +- .../tests/it/sql/optimizer/heuristic/join.rs | 2 +- query/tests/it/sql/optimizer/heuristic/mod.rs | 6 +- .../it/sql/optimizer/heuristic/select.rs | 7 +- .../it/sql/optimizer/heuristic/subquery.rs | 16 +- .../optimizer/heuristic/testdata/select.test | 6 + .../heuristic/testdata/subquery.test | 22 ++- .../20+_others/20_0001_planner_v2.result | 5 + .../20+_others/20_0001_planner_v2.sql | 6 + 20 files changed, 485 insertions(+), 164 deletions(-) create mode 100644 query/src/sql/optimizer/heuristic/decorrelate.rs rename query/src/sql/{planner/binder/subquery.rs => optimizer/heuristic/subquery_rewriter.rs} (71%) diff --git a/common/ast/src/ast/expr.rs b/common/ast/src/ast/expr.rs index fc50df4c2c73..f361655bdc3a 100644 --- a/common/ast/src/ast/expr.rs +++ b/common/ast/src/ast/expr.rs @@ -143,6 +143,8 @@ pub enum Expr<'a> { /// `EXISTS` expression Exists { span: &'a [Token<'a>], + /// Indicate if this is a `NOT EXISTS` + not: bool, subquery: Box>, }, /// Scalar subquery, which will only return a single row with a single column. diff --git a/common/ast/src/parser/expr.rs b/common/ast/src/parser/expr.rs index 0d142dcd0ad0..ac16cd467578 100644 --- a/common/ast/src/parser/expr.rs +++ b/common/ast/src/parser/expr.rs @@ -239,6 +239,7 @@ pub enum ExprElement<'a> { /// `EXISTS` expression Exists { subquery: Query<'a>, + not: bool, }, /// Scalar subquery, which will only return a single row with a single column. Subquery { @@ -421,8 +422,9 @@ impl<'a, I: Iterator>>> PrattParser for E results, else_result, }, - ExprElement::Exists { subquery } => Expr::Exists { + ExprElement::Exists { subquery, not } => Expr::Exists { span: elem.span.0, + not, subquery: Box::new(subquery), }, ExprElement::Subquery { subquery } => Expr::Subquery { @@ -761,8 +763,11 @@ pub fn expr_element(i: Input) -> IResult> { }, ); let exists = map( - rule! { EXISTS ~ ^"(" ~ ^#query ~ ^")" }, - |(_, _, subquery, _)| ExprElement::Exists { subquery }, + rule! { NOT? ~ EXISTS ~ ^"(" ~ ^#query ~ ^")" }, + |(opt_not, _, _, subquery, _)| ExprElement::Exists { + subquery, + not: opt_not.is_some(), + }, ); let subquery = map( rule! { @@ -848,10 +853,11 @@ pub fn expr_element(i: Input) -> IResult> { |(_, _, expr1, _, expr2, _)| ExprElement::IfNull { expr1, expr2 }, ); let (rest, (span, elem)) = consumed(alt(( - rule! ( + rule!( #is_null : "`... IS [NOT] NULL`" | #in_list : "`[NOT] IN (, ...)`" | #in_subquery : "`[NOT] IN (SELECT ...)`" + | #exists : "`[NOT] EXISTS (SELECT ...)`" | #between : "`[NOT] BETWEEN ... AND ...`" | #binary_op : "" | #unary_op : "" @@ -875,7 +881,6 @@ pub fn expr_element(i: Input) -> IResult> { | #function_call : "" | #literal : "" | #case : "`CASE ... END`" - | #exists : "`EXISTS (SELECT ...)`" | #subquery : "`(SELECT ...)`" | #group | #column_ref : "" diff --git a/query/src/sql/optimizer/heuristic/decorrelate.rs b/query/src/sql/optimizer/heuristic/decorrelate.rs new file mode 100644 index 000000000000..7f90a7039361 --- /dev/null +++ b/query/src/sql/optimizer/heuristic/decorrelate.rs @@ -0,0 +1,181 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_datavalues::type_coercion::merge_types; +use common_exception::Result; + +use crate::sql::binder::wrap_cast_if_needed; +use crate::sql::binder::JoinCondition; +use crate::sql::optimizer::heuristic::subquery_rewriter::SubqueryRewriter; +use crate::sql::optimizer::RelExpr; +use crate::sql::optimizer::SExpr; +use crate::sql::plans::Filter; +use crate::sql::plans::JoinType; +use crate::sql::plans::LogicalInnerJoin; +use crate::sql::plans::PatternPlan; +use crate::sql::plans::RelOp; +use crate::sql::plans::SubqueryExpr; +use crate::sql::plans::SubqueryType; +use crate::sql::MetadataRef; +use crate::sql::ScalarExpr; + +/// Decorrelate subqueries inside `s_expr`. +/// +/// It will first hoist all the subqueries from `Scalar`s, and transform the hoisted operator +/// into `CrossApply`(if the subquery is correlated) or `CrossJoin`(if the subquery is uncorrelated). +/// +/// After hoisted all the subqueries, we will try to decorrelate the subqueries by pushing the `CrossApply` +/// down. Get more detail by reading the paper `Orthogonal Optimization of Subqueries and Aggregation`, +/// which published by Microsoft SQL Server team. +pub fn decorrelate_subquery(metadata: MetadataRef, s_expr: SExpr) -> Result { + let mut rewriter = SubqueryRewriter::new(metadata); + let hoisted = rewriter.rewrite(&s_expr)?; + + Ok(hoisted) +} + +// Try to decorrelate a `CrossApply` into `SemiJoin` or `AntiJoin`. +// We only do simple decorrelation here, the scheme is: +// 1. If the subquery is correlated, we will try to decorrelate it into `SemiJoin` +pub fn try_decorrelate_subquery(input: &SExpr, subquery: &SubqueryExpr) -> Result> { + if subquery.outer_columns.is_empty() { + return Ok(None); + } + + // TODO(leiysky): this is the canonical plan generated by Binder, we should find a proper + // way to address such a pattern. + // + // Project + // \ + // EvalScalar + // \ + // Filter + // \ + // Get + let pattern = SExpr::create_unary( + PatternPlan { + plan_type: RelOp::Project, + } + .into(), + SExpr::create_unary( + PatternPlan { + plan_type: RelOp::EvalScalar, + } + .into(), + SExpr::create_unary( + PatternPlan { + plan_type: RelOp::Filter, + } + .into(), + SExpr::create_leaf( + PatternPlan { + plan_type: RelOp::LogicalGet, + } + .into(), + ), + ), + ), + ); + + if !subquery.subquery.match_pattern(&pattern) { + return Ok(None); + } + + let filter_tree = subquery + .subquery // Project + .child(0)? // EvalScalar + .child(0)?; // Filter + let filter_expr = RelExpr::with_s_expr(filter_tree); + let filter: Filter = subquery + .subquery // Project + .child(0)? // EvalScalar + .child(0)? // Filter + .plan() + .clone() + .try_into()?; + let filter_prop = filter_expr.derive_relational_prop()?; + let filter_child_prop = filter_expr.derive_relational_prop_child(0)?; + + let input_expr = RelExpr::with_s_expr(input); + let input_prop = input_expr.derive_relational_prop()?; + + // First, we will check if all the outer columns are in the filter. + if !filter_child_prop.outer_columns.is_empty() { + return Ok(None); + } + + // Second, we will check if the filter only contains equi-predicates. + // This is not necessary, but it is a good heuristic for most cases. + let mut left_conditions = vec![]; + let mut right_conditions = vec![]; + let mut extra_predicates = vec![]; + for pred in filter.predicates.iter() { + let join_condition = JoinCondition::new(pred, &input_prop, &filter_prop); + match join_condition { + JoinCondition::Other(_) => { + // We don't allow to evaluate non-equi predicate in hash join for now. + return Ok(None); + } + + JoinCondition::Left(_) | JoinCondition::Right(_) => { + extra_predicates.push(pred.clone()); + } + + JoinCondition::Both { left, right } => { + let join_type = merge_types(&left.data_type(), &right.data_type())?; + let left = wrap_cast_if_needed(left.clone(), &join_type); + let right = wrap_cast_if_needed(right.clone(), &join_type); + left_conditions.push(left); + right_conditions.push(right); + } + } + } + + let semi_join = LogicalInnerJoin { + left_conditions, + right_conditions, + join_type: match &subquery.typ { + SubqueryType::Any | SubqueryType::All | SubqueryType::Scalar => { + return Ok(None); + } + SubqueryType::Exists => JoinType::Semi, + SubqueryType::NotExists => JoinType::Anti, + }, + }; + + // Rewrite plan to semi-join. + let left_child = input.clone(); + // Remove `Filter` from subquery. + let right_child = SExpr::create_unary( + subquery.subquery.plan().clone(), + SExpr::create_unary( + subquery.subquery.child(0)?.plan().clone(), + SExpr::create_leaf(filter_tree.child(0)?.plan().clone()), + ), + ); + let mut result = SExpr::create_binary(semi_join.into(), left_child, right_child); + + if !extra_predicates.is_empty() { + result = SExpr::create_unary( + Filter { + predicates: extra_predicates, + is_having: false, + } + .into(), + result, + ); + } + + Ok(Some(result)) +} diff --git a/query/src/sql/optimizer/heuristic/mod.rs b/query/src/sql/optimizer/heuristic/mod.rs index 181e1d2d136f..9de1be8d63fd 100644 --- a/query/src/sql/optimizer/heuristic/mod.rs +++ b/query/src/sql/optimizer/heuristic/mod.rs @@ -12,17 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod decorrelate; mod implement; mod rule_list; +mod subquery_rewriter; + +use std::sync::Arc; use common_exception::Result; use lazy_static::lazy_static; use super::rule::RuleID; +use crate::sessions::QueryContext; +use crate::sql::optimizer::heuristic::decorrelate::decorrelate_subquery; use crate::sql::optimizer::heuristic::implement::HeuristicImplementor; pub use crate::sql::optimizer::heuristic::rule_list::RuleList; use crate::sql::optimizer::rule::TransformState; use crate::sql::optimizer::SExpr; +use crate::sql::MetadataRef; lazy_static! { pub static ref DEFAULT_REWRITE_RULES: Vec = vec![ @@ -44,19 +51,37 @@ lazy_static! { pub struct HeuristicOptimizer { rules: RuleList, implementor: HeuristicImplementor, + + _ctx: Arc, + metadata: MetadataRef, } impl HeuristicOptimizer { - pub fn new(rules: RuleList) -> Self { + pub fn new(ctx: Arc, metadata: MetadataRef, rules: RuleList) -> Self { HeuristicOptimizer { rules, implementor: HeuristicImplementor::new(), + + _ctx: ctx, + metadata, } } - pub fn optimize(&mut self, expression: SExpr) -> Result { - let optimized = self.optimize_expression(&expression)?; - let result = self.implement_expression(&optimized)?; + fn pre_optimize(&mut self, s_expr: SExpr) -> Result { + let result = decorrelate_subquery(self.metadata.clone(), s_expr)?; + + Ok(result) + } + + fn post_optimize(&mut self, s_expr: SExpr) -> Result { + Ok(s_expr) + } + + pub fn optimize(&mut self, s_expr: SExpr) -> Result { + let pre_optimized = self.pre_optimize(s_expr)?; + let optimized = self.optimize_expression(&pre_optimized)?; + let post_optimized = self.post_optimize(optimized)?; + let result = self.implement_expression(&post_optimized)?; Ok(result) } diff --git a/query/src/sql/planner/binder/subquery.rs b/query/src/sql/optimizer/heuristic/subquery_rewriter.rs similarity index 71% rename from query/src/sql/planner/binder/subquery.rs rename to query/src/sql/optimizer/heuristic/subquery_rewriter.rs index c5b359cef728..a9e8d61e5668 100644 --- a/query/src/sql/planner/binder/subquery.rs +++ b/query/src/sql/optimizer/heuristic/subquery_rewriter.rs @@ -17,9 +17,9 @@ use common_datavalues::DataValue; use common_exception::ErrorCode; use common_exception::Result; use common_functions::aggregates::AggregateFunctionFactory; -use common_functions::scalars::FunctionFactory; use crate::sql::binder::ColumnBinding; +use crate::sql::optimizer::heuristic::decorrelate::try_decorrelate_subquery; use crate::sql::optimizer::ColumnSet; use crate::sql::optimizer::RelExpr; use crate::sql::optimizer::SExpr; @@ -45,7 +45,12 @@ use crate::sql::plans::ScalarItem; use crate::sql::plans::SubqueryExpr; use crate::sql::plans::SubqueryType; use crate::sql::MetadataRef; -use crate::sql::ScalarExpr; + +enum UnnestResult { + Uncorrelated, + Apply, + SimpleJoin, // SemiJoin or AntiJoin +} /// Rewrite subquery into `Apply` operator pub struct SubqueryRewriter { @@ -63,7 +68,7 @@ impl SubqueryRewriter { let mut input = self.rewrite(s_expr.child(0)?)?; for item in plan.items.iter_mut() { - let res = self.try_rewrite_subquery(&item.scalar, &input)?; + let res = self.try_rewrite_subquery(&item.scalar, &input, false)?; input = res.1; item.scalar = res.0; } @@ -74,7 +79,7 @@ impl SubqueryRewriter { let mut input = self.rewrite(s_expr.child(0)?)?; for pred in plan.predicates.iter_mut() { - let res = self.try_rewrite_subquery(pred, &input)?; + let res = self.try_rewrite_subquery(pred, &input, true)?; input = res.1; *pred = res.0; } @@ -85,13 +90,13 @@ impl SubqueryRewriter { let mut input = self.rewrite(s_expr.child(0)?)?; for item in plan.group_items.iter_mut() { - let res = self.try_rewrite_subquery(&item.scalar, &input)?; + let res = self.try_rewrite_subquery(&item.scalar, &input, false)?; input = res.1; item.scalar = res.0; } for item in plan.aggregate_functions.iter_mut() { - let res = self.try_rewrite_subquery(&item.scalar, &input)?; + let res = self.try_rewrite_subquery(&item.scalar, &input, false)?; input = res.1; item.scalar = res.0; } @@ -119,33 +124,54 @@ impl SubqueryRewriter { } } - pub fn build_apply(&mut self, left: &SExpr, subquery: &SubqueryExpr) -> Result { + fn unnest_subquery( + &mut self, + left: &SExpr, + subquery: &SubqueryExpr, + is_conjunctive_predicate: bool, + ) -> Result<(SExpr, UnnestResult)> { match subquery.typ { SubqueryType::Scalar => { let rel_expr = RelExpr::with_s_expr(&subquery.subquery); let prop = rel_expr.derive_relational_prop()?; - let result: RelOperator = if prop.outer_columns.is_empty() { - LogicalInnerJoin { - left_conditions: vec![], - right_conditions: vec![], - join_type: JoinType::Cross, - } - .into() + let (rel_op, result): (RelOperator, _) = if prop.outer_columns.is_empty() { + ( + LogicalInnerJoin { + left_conditions: vec![], + right_conditions: vec![], + join_type: JoinType::Cross, + } + .into(), + UnnestResult::Uncorrelated, + ) } else { - CrossApply { - subquery_output: prop.output_columns, - correlated_columns: subquery.outer_columns.clone(), - } - .into() + ( + CrossApply { + subquery_output: prop.output_columns, + correlated_columns: subquery.outer_columns.clone(), + } + .into(), + UnnestResult::Apply, + ) }; - Ok(SExpr::create_binary( + Ok(( + SExpr::create_binary( + rel_op, + left.clone(), + SExpr::create_unary(Max1Row.into(), subquery.subquery.clone()), + ), result, - left.clone(), - SExpr::create_unary(Max1Row.into(), subquery.subquery.clone()), )) } - SubqueryType::Exists => { + SubqueryType::Exists | SubqueryType::NotExists => { + if is_conjunctive_predicate { + if let Some(result) = try_decorrelate_subquery(left, subquery)? { + return Ok((result, UnnestResult::SimpleJoin)); + } + } + // We will rewrite EXISTS subquery into the form `COUNT(*) > 0`. + // In contrast, NOT EXISTS subquery will be rewritten into `COUNT(*) = 0`. // For example, `EXISTS(SELECT a FROM t WHERE a > 1)` will be rewritten into // `(SELECT COUNT(*) > 0 FROM t WHERE a > 1)` let agg_func = AggregateFunctionFactory::instance().get("count", vec![], vec![])?; @@ -172,14 +198,18 @@ impl SubqueryRewriter { from_distinct: false, }; - // COUNT(*) > 0 + // COUNT(*) > 0 or COUNT(*) = 0 let compare_index = self.metadata.write().add_column( - "exists".to_string(), + "subquery".to_string(), BooleanType::new_impl(), None, ); let compare = ComparisonExpr { - op: ComparisonOp::GT, + op: if subquery.typ == SubqueryType::Exists { + ComparisonOp::GT + } else { + ComparisonOp::Equal + }, left: Box::new( BoundColumnRef { column: ColumnBinding { @@ -213,7 +243,7 @@ impl SubqueryRewriter { }; // Project - // EvalScalar: COUNT(*) > 0 + // EvalScalar: COUNT(*) > 0 or COUNT(*) = 0 // Aggregate: COUNT(*) let rewritten_subquery = SExpr::create_unary( project.into(), @@ -226,25 +256,30 @@ impl SubqueryRewriter { let rel_expr = RelExpr::with_s_expr(&rewritten_subquery); let prop = rel_expr.derive_relational_prop()?; - let result: RelOperator = if prop.outer_columns.is_empty() { - LogicalInnerJoin { - left_conditions: vec![], - right_conditions: vec![], - join_type: JoinType::Cross, - } - .into() + let (rel_op, result): (RelOperator, _) = if prop.outer_columns.is_empty() { + ( + LogicalInnerJoin { + left_conditions: vec![], + right_conditions: vec![], + join_type: JoinType::Cross, + } + .into(), + UnnestResult::Uncorrelated, + ) } else { - CrossApply { - subquery_output: prop.output_columns, - correlated_columns: subquery.outer_columns.clone(), - } - .into() + ( + CrossApply { + subquery_output: prop.output_columns, + correlated_columns: subquery.outer_columns.clone(), + } + .into(), + UnnestResult::Apply, + ) }; - Ok(SExpr::create_binary( + Ok(( + SExpr::create_binary(rel_op, left.clone(), rewritten_subquery), result, - left.clone(), - rewritten_subquery, )) } _ => Err(ErrorCode::LogicalError(format!( @@ -256,38 +291,41 @@ impl SubqueryRewriter { /// Try to extract subquery from a scalar expression. Returns replaced scalar expression /// and the subqueries. - fn try_rewrite_subquery(&mut self, scalar: &Scalar, s_expr: &SExpr) -> Result<(Scalar, SExpr)> { + fn try_rewrite_subquery( + &mut self, + scalar: &Scalar, + s_expr: &SExpr, + is_conjunctive_predicate: bool, + ) -> Result<(Scalar, SExpr)> { match scalar { Scalar::BoundColumnRef(_) => Ok((scalar.clone(), s_expr.clone())), Scalar::ConstantExpr(_) => Ok((scalar.clone(), s_expr.clone())), Scalar::AndExpr(expr) => { - let (left, _result_left) = self.try_rewrite_subquery(&expr.left, s_expr)?; - let (right, _result_right) = self.try_rewrite_subquery(&expr.right, s_expr)?; - let func = FunctionFactory::instance() - .get("and", &[&left.data_type(), &right.data_type()])?; + // Notice that the conjunctions has been flattened in binder, if we encounter + // a AND here, we can't treat it as a conjunction. + let (left, s_expr) = self.try_rewrite_subquery(&expr.left, s_expr, false)?; + let (right, s_expr) = self.try_rewrite_subquery(&expr.right, &s_expr, false)?; Ok(( AndExpr { left: Box::new(left), right: Box::new(right), - return_type: func.return_type(), + return_type: expr.return_type.clone(), } .into(), - s_expr.clone(), + s_expr, )) } Scalar::OrExpr(expr) => { - let (left, s_expr) = self.try_rewrite_subquery(&expr.left, s_expr)?; - let (right, s_expr) = self.try_rewrite_subquery(&expr.right, &s_expr)?; - let func = FunctionFactory::instance() - .get("or", &[&left.data_type(), &right.data_type()])?; + let (left, s_expr) = self.try_rewrite_subquery(&expr.left, s_expr, false)?; + let (right, s_expr) = self.try_rewrite_subquery(&expr.right, &s_expr, false)?; Ok(( OrExpr { left: Box::new(left), right: Box::new(right), - return_type: func.return_type(), + return_type: expr.return_type.clone(), } .into(), s_expr, @@ -295,18 +333,14 @@ impl SubqueryRewriter { } Scalar::ComparisonExpr(expr) => { - let (left, s_expr) = self.try_rewrite_subquery(&expr.left, s_expr)?; - let (right, s_expr) = self.try_rewrite_subquery(&expr.right, &s_expr)?; - let func = FunctionFactory::instance().get(expr.op.to_func_name(), &[ - &left.data_type(), - &right.data_type(), - ])?; + let (left, s_expr) = self.try_rewrite_subquery(&expr.left, s_expr, false)?; + let (right, s_expr) = self.try_rewrite_subquery(&expr.right, &s_expr, false)?; Ok(( ComparisonExpr { op: expr.op.clone(), left: Box::new(left), right: Box::new(right), - return_type: func.return_type(), + return_type: expr.return_type.clone(), } .into(), s_expr, @@ -319,7 +353,7 @@ impl SubqueryRewriter { let mut args = vec![]; let mut s_expr = s_expr.clone(); for arg in func.arguments.iter() { - let res = self.try_rewrite_subquery(arg, &s_expr)?; + let res = self.try_rewrite_subquery(arg, &s_expr, false)?; s_expr = res.1; args.push(res.0); } @@ -336,7 +370,7 @@ impl SubqueryRewriter { } Scalar::CastExpr(cast) => { - let (scalar, s_expr) = self.try_rewrite_subquery(&cast.argument, s_expr)?; + let (scalar, s_expr) = self.try_rewrite_subquery(&cast.argument, s_expr, false)?; Ok(( CastExpr { argument: Box::new(scalar), @@ -353,7 +387,20 @@ impl SubqueryRewriter { let mut subquery = subquery.clone(); subquery.subquery = self.rewrite(&subquery.subquery)?; - let s_expr = self.build_apply(s_expr, &subquery)?; + let (s_expr, result) = + self.unnest_subquery(s_expr, &subquery, is_conjunctive_predicate)?; + + // If we unnest the subquery into a simple join, then we can replace the + // original predicate with a `TRUE` literal to eliminate the conjunction. + if matches!(result, UnnestResult::SimpleJoin) { + return Ok(( + Scalar::ConstantExpr(ConstantExpr { + value: DataValue::Boolean(true), + data_type: BooleanType::new_impl(), + }), + s_expr, + )); + } let rel_expr = RelExpr::with_s_expr(s_expr.child(1)?); let prop = rel_expr.derive_relational_prop()?; diff --git a/query/src/sql/optimizer/mod.rs b/query/src/sql/optimizer/mod.rs index aed0332ba28c..68020fe5c45b 100644 --- a/query/src/sql/optimizer/mod.rs +++ b/query/src/sql/optimizer/mod.rs @@ -23,6 +23,8 @@ mod property; mod rule; mod s_expr; +use std::sync::Arc; + use common_exception::Result; pub use heuristic::HeuristicOptimizer; pub use heuristic::DEFAULT_REWRITE_RULES; @@ -39,25 +41,26 @@ pub use rule::RuleFactory; pub use s_expr::SExpr; use super::plans::Plan; +use crate::sessions::QueryContext; pub use crate::sql::optimizer::heuristic::RuleList; pub use crate::sql::optimizer::rule::RuleID; use crate::sql::optimizer::rule::RuleSet; use crate::sql::MetadataRef; -pub fn optimize(plan: Plan) -> Result { +pub fn optimize(ctx: Arc, plan: Plan) -> Result { match plan { Plan::Query { s_expr, bind_context, metadata, } => Ok(Plan::Query { - s_expr: optimize_query(s_expr, metadata.clone())?, + s_expr: optimize_query(ctx, metadata.clone(), s_expr)?, bind_context, metadata, }), Plan::Explain { kind, plan } => Ok(Plan::Explain { kind, - plan: Box::new(optimize(*plan)?), + plan: Box::new(optimize(ctx, *plan)?), }), // Passthrough statements @@ -65,13 +68,17 @@ pub fn optimize(plan: Plan) -> Result { } } -pub fn optimize_query(expression: SExpr, _metadata: MetadataRef) -> Result { +pub fn optimize_query( + ctx: Arc, + metadata: MetadataRef, + s_expr: SExpr, +) -> Result { let rules = RuleList::create(DEFAULT_REWRITE_RULES.clone())?; - let mut heuristic = HeuristicOptimizer::new(rules); - let s_expr = heuristic.optimize(expression)?; + let mut heuristic = HeuristicOptimizer::new(ctx, metadata, rules); + let optimized = heuristic.optimize(s_expr)?; // TODO: enable cascades optimizer // let mut cascades = CascadesOptimizer::create(ctx); // cascades.optimize(s_expr) - Ok(s_expr) + Ok(optimized) } diff --git a/query/src/sql/optimizer/rule/rewrite/rule_merge_project.rs b/query/src/sql/optimizer/rule/rewrite/rule_merge_project.rs index 86e010ff683d..107b3303b7bd 100644 --- a/query/src/sql/optimizer/rule/rewrite/rule_merge_project.rs +++ b/query/src/sql/optimizer/rule/rewrite/rule_merge_project.rs @@ -66,13 +66,9 @@ impl Rule for RuleMergeProject { fn apply(&self, s_expr: &SExpr, state: &mut TransformState) -> Result<()> { let up_project: Project = s_expr.plan().clone().try_into()?; - let down_project: Project = s_expr.child(0)?.plan().clone().try_into()?; - let columns = up_project - .columns - .union(&down_project.columns) - .cloned() - .collect(); - let merged = Project { columns }; + let merged = Project { + columns: up_project.columns, + }; let new_expr = SExpr::create_unary(merged.into(), s_expr.child(0)?.child(0)?.clone()); state.add_result(new_expr); diff --git a/query/src/sql/optimizer/rule/rewrite/rule_push_down_filter_join.rs b/query/src/sql/optimizer/rule/rewrite/rule_push_down_filter_join.rs index f4aaa8b19e98..b555f113a6df 100644 --- a/query/src/sql/optimizer/rule/rewrite/rule_push_down_filter_join.rs +++ b/query/src/sql/optimizer/rule/rewrite/rule_push_down_filter_join.rs @@ -15,76 +15,20 @@ use common_datavalues::type_coercion::merge_types; use common_exception::Result; -use crate::sql::binder::satisfied_by; use crate::sql::binder::wrap_cast_if_needed; +use crate::sql::binder::JoinCondition; use crate::sql::optimizer::rule::Rule; use crate::sql::optimizer::rule::TransformState; use crate::sql::optimizer::RelExpr; -use crate::sql::optimizer::RelationalProperty; use crate::sql::optimizer::RuleID; use crate::sql::optimizer::SExpr; -use crate::sql::plans::ComparisonExpr; -use crate::sql::plans::ComparisonOp; use crate::sql::plans::Filter; use crate::sql::plans::JoinType; use crate::sql::plans::LogicalInnerJoin; use crate::sql::plans::PatternPlan; use crate::sql::plans::RelOp; -use crate::sql::plans::Scalar; use crate::sql::plans::ScalarExpr; -/// Predicate types to determine how to push down -/// a predicate. -/// Given a query: `SELECT * FROM t(a), t1(b) WHERE a = 1 AND b = 1 AND a = b AND a+b = 1`, -/// the predicate types are: -/// - Left: `a = 1` -/// - Right: `b = 1` -/// - Both: `a = b` -/// - Other: `a+b = 1` -enum Predicate<'a> { - Left(&'a Scalar), - Right(&'a Scalar), - Both { left: &'a Scalar, right: &'a Scalar }, - Other(&'a Scalar), -} - -impl<'a> Predicate<'a> { - pub fn new( - scalar: &'a Scalar, - left_prop: &RelationalProperty, - right_prop: &RelationalProperty, - ) -> Self { - if satisfied_by(scalar, left_prop) { - return Self::Left(scalar); - } - - if satisfied_by(scalar, right_prop) { - return Self::Right(scalar); - } - - if let Scalar::ComparisonExpr(ComparisonExpr { - op: ComparisonOp::Equal, - left, - right, - .. - }) = scalar - { - if satisfied_by(left, left_prop) && satisfied_by(right, right_prop) { - return Self::Both { left, right }; - } - - if satisfied_by(right, left_prop) && satisfied_by(left, right_prop) { - return Self::Both { - left: right, - right: left, - }; - } - } - - Self::Other(scalar) - } -} - pub struct RulePushDownFilterJoin { id: RuleID, pattern: SExpr, @@ -149,19 +93,19 @@ impl Rule for RulePushDownFilterJoin { let mut need_push = false; for predicate in filter.predicates.into_iter() { - let pred = Predicate::new(&predicate, &left_prop, &right_prop); + let pred = JoinCondition::new(&predicate, &left_prop, &right_prop); match pred { - Predicate::Left(_) => { + JoinCondition::Left(_) => { need_push = true; left_push_down.push(predicate); } - Predicate::Right(_) => { + JoinCondition::Right(_) => { need_push = true; right_push_down.push(predicate); } - Predicate::Other(_) => original_predicates.push(predicate), + JoinCondition::Other(_) => original_predicates.push(predicate), - Predicate::Both { left, right } => { + JoinCondition::Both { left, right } => { let left_type = left.data_type(); let right_type = right.data_type(); let join_key_type = merge_types(&left_type, &right_type); diff --git a/query/src/sql/planner/binder/mod.rs b/query/src/sql/planner/binder/mod.rs index 719462956e6e..51afe31b27e3 100644 --- a/query/src/sql/planner/binder/mod.rs +++ b/query/src/sql/planner/binder/mod.rs @@ -29,7 +29,6 @@ use common_planners::DropUserPlan; use common_planners::DropUserStagePlan; pub use scalar_common::*; -use self::subquery::SubqueryRewriter; use super::plans::Plan; use crate::catalogs::CatalogManager; use crate::sessions::QueryContext; @@ -50,7 +49,6 @@ mod scalar_visitor; mod select; mod show; mod sort; -mod subquery; mod table; /// Binder is responsible to transform AST of a query into a canonical logical SExpr. @@ -92,9 +90,7 @@ impl<'a> Binder { ) -> Result { let plan = match stmt { Statement::Query(query) => { - let (mut s_expr, bind_context) = self.bind_query(bind_context, query).await?; - let mut rewriter = SubqueryRewriter::new(self.metadata.clone()); - s_expr = rewriter.rewrite(&s_expr)?; + let (s_expr, bind_context) = self.bind_query(bind_context, query).await?; Plan::Query { s_expr, metadata: self.metadata.clone(), diff --git a/query/src/sql/planner/binder/scalar_common.rs b/query/src/sql/planner/binder/scalar_common.rs index b98cf5c03fc4..0403c25745fa 100644 --- a/query/src/sql/planner/binder/scalar_common.rs +++ b/query/src/sql/planner/binder/scalar_common.rs @@ -98,3 +98,54 @@ pub fn wrap_cast_if_needed(scalar: Scalar, target_type: &DataTypeImpl) -> Scalar pub fn satisfied_by(scalar: &Scalar, prop: &RelationalProperty) -> bool { scalar.used_columns().is_subset(&prop.output_columns) } + +/// Helper to determine join condition type from a scalar expression. +/// Given a query: `SELECT * FROM t(a), t1(b) WHERE a = 1 AND b = 1 AND a = b AND a+b = 1`, +/// the predicate types are: +/// - Left: `a = 1` +/// - Right: `b = 1` +/// - Both: `a = b` +/// - Other: `a+b = 1` +pub enum JoinCondition<'a> { + Left(&'a Scalar), + Right(&'a Scalar), + Both { left: &'a Scalar, right: &'a Scalar }, + Other(&'a Scalar), +} + +impl<'a> JoinCondition<'a> { + pub fn new( + scalar: &'a Scalar, + left_prop: &RelationalProperty, + right_prop: &RelationalProperty, + ) -> Self { + if satisfied_by(scalar, left_prop) { + return Self::Left(scalar); + } + + if satisfied_by(scalar, right_prop) { + return Self::Right(scalar); + } + + if let Scalar::ComparisonExpr(ComparisonExpr { + op: ComparisonOp::Equal, + left, + right, + .. + }) = scalar + { + if satisfied_by(left, left_prop) && satisfied_by(right, right_prop) { + return Self::Both { left, right }; + } + + if satisfied_by(right, left_prop) && satisfied_by(left, right_prop) { + return Self::Both { + left: right, + right: left, + }; + } + } + + Self::Other(scalar) + } +} diff --git a/query/src/sql/planner/mod.rs b/query/src/sql/planner/mod.rs index 9481bb1c097e..f734139e0528 100644 --- a/query/src/sql/planner/mod.rs +++ b/query/src/sql/planner/mod.rs @@ -66,7 +66,7 @@ impl Planner { let plan = binder.bind(&stmts[0]).await?; // Step 3: optimize the SExpr with optimizers, and generate optimized physical SExpr - let optimized_plan = optimize(plan)?; + let optimized_plan = optimize(self.ctx.clone(), plan)?; Ok((optimized_plan, metadata.clone())) } diff --git a/query/src/sql/planner/semantic/type_check.rs b/query/src/sql/planner/semantic/type_check.rs index d72beee10343..8f3b18b71130 100644 --- a/query/src/sql/planner/semantic/type_check.rs +++ b/query/src/sql/planner/semantic/type_check.rs @@ -404,9 +404,18 @@ impl<'a> TypeChecker<'a> { ) } - Expr::Exists { subquery, .. } => { - self.resolve_subquery(SubqueryType::Exists, subquery, true, None) - .await? + Expr::Exists { subquery, not, .. } => { + self.resolve_subquery( + if *not { + SubqueryType::NotExists + } else { + SubqueryType::Exists + }, + subquery, + true, + None, + ) + .await? } Expr::Subquery { subquery, .. } => { diff --git a/query/tests/it/sql/optimizer/heuristic/join.rs b/query/tests/it/sql/optimizer/heuristic/join.rs index a54c107d2c87..d9b68ffc96f0 100644 --- a/query/tests/it/sql/optimizer/heuristic/join.rs +++ b/query/tests/it/sql/optimizer/heuristic/join.rs @@ -22,7 +22,7 @@ use super::Suite; use crate::tests::create_query_context; #[tokio::test] -pub async fn test_optimizer_join() -> Result<()> { +pub async fn test_heuristic_optimizer_join() -> Result<()> { let mut mint = Mint::new("tests/it/sql/optimizer/heuristic/testdata/"); let mut file = mint.new_goldenfile("join.test")?; diff --git a/query/tests/it/sql/optimizer/heuristic/mod.rs b/query/tests/it/sql/optimizer/heuristic/mod.rs index 39e9cf81c14b..668186e63d8c 100644 --- a/query/tests/it/sql/optimizer/heuristic/mod.rs +++ b/query/tests/it/sql/optimizer/heuristic/mod.rs @@ -53,12 +53,16 @@ async fn run_test(ctx: Arc, suite: &Suite) -> Result { ); let plan = binder.bind(&stmts[0]).await?; - let mut heuristic_opt = HeuristicOptimizer::new(RuleList::create(suite.rules.clone())?); let result = match plan { Plan::Query { s_expr, metadata, .. } => { + let mut heuristic_opt = HeuristicOptimizer::new( + ctx.clone(), + metadata.clone(), + RuleList::create(suite.rules.clone())?, + ); let optimized = heuristic_opt.optimize(s_expr)?; optimized.to_format_tree(&metadata).format_indent() } diff --git a/query/tests/it/sql/optimizer/heuristic/select.rs b/query/tests/it/sql/optimizer/heuristic/select.rs index fc0cf566bc75..ccccd53655f6 100644 --- a/query/tests/it/sql/optimizer/heuristic/select.rs +++ b/query/tests/it/sql/optimizer/heuristic/select.rs @@ -22,7 +22,7 @@ use super::Suite; use crate::tests::create_query_context; #[tokio::test] -pub async fn test_optimizer_select() -> Result<()> { +pub async fn test_heuristic_optimizer_select() -> Result<()> { let mut mint = Mint::new("tests/it/sql/optimizer/heuristic/testdata/"); let mut file = mint.new_goldenfile("select.test")?; @@ -74,6 +74,11 @@ pub async fn test_optimizer_select() -> Result<()> { query: "select * from numbers(1) where null".to_string(), rules: DEFAULT_REWRITE_RULES.clone(), }, + Suite { + comment: "".to_string(), + query: "select a from (select number as a, number as b from numbers(1))".to_string(), + rules: DEFAULT_REWRITE_RULES.clone(), + }, ]; run_suites(ctx, &mut file, &suites).await diff --git a/query/tests/it/sql/optimizer/heuristic/subquery.rs b/query/tests/it/sql/optimizer/heuristic/subquery.rs index 67e0609c018a..6c4b9e30be2c 100644 --- a/query/tests/it/sql/optimizer/heuristic/subquery.rs +++ b/query/tests/it/sql/optimizer/heuristic/subquery.rs @@ -21,7 +21,7 @@ use super::Suite; use crate::tests::create_query_context; #[tokio::test] -pub async fn test_optimizer_subquery() -> Result<()> { +pub async fn test_heuristic_optimizer_subquery() -> Result<()> { let mut mint = Mint::new("tests/it/sql/optimizer/heuristic/testdata/"); let mut file = mint.new_goldenfile("subquery.test")?; @@ -45,7 +45,19 @@ pub async fn test_optimizer_subquery() -> Result<()> { query: "select t.number from numbers(1) as t where number = (select * from numbers(1) where number = 0)" .to_string(), rules: DEFAULT_REWRITE_RULES.clone(), - } + }, + Suite { + comment: "# Correlated subquery can be translated to SemiJoin".to_string(), + query: "select t.number from numbers(1) as t where exists (select * from numbers(1) where number = t.number)" + .to_string(), + rules: DEFAULT_REWRITE_RULES.clone(), + }, + Suite { + comment: "# Correlated subquery can be translated to AntiJoin".to_string(), + query: "select t.number from numbers(1) as t where not exists (select * from numbers(1) where number = t.number)" + .to_string(), + rules: DEFAULT_REWRITE_RULES.clone(), + }, ]; run_suites(ctx, &mut file, &suites).await diff --git a/query/tests/it/sql/optimizer/heuristic/testdata/select.test b/query/tests/it/sql/optimizer/heuristic/testdata/select.test index 7d579c5dfccc..c0684fc3f7ec 100644 --- a/query/tests/it/sql/optimizer/heuristic/testdata/select.test +++ b/query/tests/it/sql/optimizer/heuristic/testdata/select.test @@ -63,3 +63,9 @@ Project: [number] Scan: default.system.numbers +select a from (select number as a, number as b from numbers(1)) +---- +Project: [number] + Scan: default.system.numbers + + diff --git a/query/tests/it/sql/optimizer/heuristic/testdata/subquery.test b/query/tests/it/sql/optimizer/heuristic/testdata/subquery.test index 5f0e1f7e4373..3e7644021eb6 100644 --- a/query/tests/it/sql/optimizer/heuristic/testdata/subquery.test +++ b/query/tests/it/sql/optimizer/heuristic/testdata/subquery.test @@ -23,7 +23,7 @@ select t.number from numbers(1) as t where exists (select * from numbers(1) wher Project: [number] CrossJoin Scan: default.system.numbers - Project: [exists] + Project: [subquery] Filter: [subquery_3] EvalScalar: [count(*) > 0] Aggregate: group items: [], aggregate functions: [count(*)] @@ -44,3 +44,23 @@ Project: [number] Scan: default.system.numbers +# Correlated subquery can be translated to SemiJoin +select t.number from numbers(1) as t where exists (select * from numbers(1) where number = t.number) +---- +Project: [number] + HashJoin: SEMI, build keys: [number], probe keys: [number] + Scan: default.system.numbers + Project: [number] + Scan: default.system.numbers + + +# Correlated subquery can be translated to AntiJoin +select t.number from numbers(1) as t where not exists (select * from numbers(1) where number = t.number) +---- +Project: [number] + HashJoin: ANTI, build keys: [number], probe keys: [number] + Scan: default.system.numbers + Project: [number] + Scan: default.system.numbers + + diff --git a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result index fd8a70d03595..61e945b9b9a1 100644 --- a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result +++ b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result @@ -376,3 +376,8 @@ NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 NULL NULL NULL 1 +0 +1 +2 +3 +4 diff --git a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql index ae0c69ad5c99..73ce7ac6920e 100644 --- a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql +++ b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql @@ -325,4 +325,10 @@ select '====NULL===='; create table n( a int null, b int null) ; insert into n select if (number % 3, null, number), if (number % 2, null, number) from numbers(10); select a + b, a and b, a - b, a or b from n; +drop table n; + +-- Subquery SemiJoin and AntiJoin +select * from numbers(5) as t where exists (select * from numbers(3) where number = t.number); +select * from numbers(5) as t where not exists (select * from numbers(3) where number = t.number); + set enable_planner_v2 = 0; From c05b8c8f217380d60e63a0ea669401a9f12b201d Mon Sep 17 00:00:00 2001 From: leiysky Date: Sat, 18 Jun 2022 20:35:05 +0800 Subject: [PATCH 2/2] disable test case --- .../logictest/suites/gen/20+_others/20_0001_planner_v2 | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/logictest/suites/gen/20+_others/20_0001_planner_v2 b/tests/logictest/suites/gen/20+_others/20_0001_planner_v2 index 1f99fb81ff30..fdf5620ed57a 100644 --- a/tests/logictest/suites/gen/20+_others/20_0001_planner_v2 +++ b/tests/logictest/suites/gen/20+_others/20_0001_planner_v2 @@ -1045,12 +1045,12 @@ select '====Correlated Subquery===='; ---- ====Correlated Subquery==== -statement query I -select * from numbers(10) as t where exists (select * from numbers(2) as t1 where t.number = t1.number); +# statement query I +# select * from numbers(10) as t where exists (select * from numbers(2) as t1 where t.number = t1.number); ----- -0 -1 +# ---- +# 0 +# 1 statement query I select (select number from numbers(10) as t1 where t.number = t1.number) from numbers(10) as t order by number;