From d6790c8b7b9377b54d614be86bd5ff2c916ee6af Mon Sep 17 00:00:00 2001 From: xudong963 Date: Tue, 17 May 2022 15:08:49 +0800 Subject: [PATCH 1/4] feat(planner): implement using() in join --- query/src/sql/planner/binder/bind_context.rs | 5 + query/src/sql/planner/binder/join.rs | 148 +++++++++++++----- query/src/sql/planner/binder/project.rs | 5 + query/src/sql/planner/binder/select.rs | 1 + .../20+_others/20_0001_planner_v2.result | 7 + .../20+_others/20_0001_planner_v2.sql | 14 ++ 6 files changed, 141 insertions(+), 39 deletions(-) diff --git a/query/src/sql/planner/binder/bind_context.rs b/query/src/sql/planner/binder/bind_context.rs index deb7b365b06b..f932774f1b9b 100644 --- a/query/src/sql/planner/binder/bind_context.rs +++ b/query/src/sql/planner/binder/bind_context.rs @@ -44,6 +44,11 @@ pub struct ColumnBinding { /// Another example is aggregation. In a `GROUP BY` context, aggregate funtions /// will be extracted and be added to `BindContext` as a `ColumnBinding`. pub scalar: Option>, + + /// Consider the sql: `select * from t join t1 using(a)`. + /// The result should only contain one `a` column. + /// So we need mark `t.a` or `t1.a` duplicated. + pub duplicated: bool, } /// `BindContext` stores all the free variables in a query and tracks the context of binding procedure. diff --git a/query/src/sql/planner/binder/join.rs b/query/src/sql/planner/binder/join.rs index 2376349d6901..c89c7991eefd 100644 --- a/query/src/sql/planner/binder/join.rs +++ b/query/src/sql/planner/binder/join.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use async_recursion::async_recursion; use common_ast::ast::Expr; +use common_ast::ast::Identifier; use common_ast::ast::Join; use common_ast::ast::JoinCondition; use common_ast::ast::JoinOperator; @@ -31,6 +32,7 @@ use crate::sql::optimizer::ColumnSet; use crate::sql::optimizer::SExpr; use crate::sql::planner::binder::scalar::ScalarBinder; use crate::sql::planner::binder::Binder; +use crate::sql::plans::BoundColumnRef; use crate::sql::plans::FilterPlan; use crate::sql::plans::LogicalInnerJoin; use crate::sql::plans::Scalar; @@ -63,11 +65,11 @@ impl<'a> Binder { let mut left_join_conditions: Vec = vec![]; let mut right_join_conditions: Vec = vec![]; let mut other_conditions: Vec = vec![]; - let join_condition_resolver = JoinConditionResolver::new( + let mut join_condition_resolver = JoinConditionResolver::new( self.ctx.clone(), &left_context, &right_context, - &bind_context, + &mut bind_context, &join.condition, ); join_condition_resolver @@ -163,7 +165,7 @@ struct JoinConditionResolver<'a> { left_context: &'a BindContext, right_context: &'a BindContext, - join_context: &'a BindContext, + join_context: &'a mut BindContext, join_condition: &'a JoinCondition<'a>, } @@ -172,7 +174,7 @@ impl<'a> JoinConditionResolver<'a> { ctx: Arc, left_context: &'a BindContext, right_context: &'a BindContext, - join_context: &'a BindContext, + join_context: &'a mut BindContext, join_condition: &'a JoinCondition<'a>, ) -> Self { Self { @@ -185,7 +187,7 @@ impl<'a> JoinConditionResolver<'a> { } pub async fn resolve( - &self, + &mut self, left_join_conditions: &mut Vec, right_join_conditions: &mut Vec, other_join_conditions: &mut Vec, @@ -200,8 +202,9 @@ impl<'a> JoinConditionResolver<'a> { ) .await?; } - JoinCondition::Using(_) => { - return Err(ErrorCode::UnImplement("USING clause is not supported yet. Please specify join condition with ON clause.")); + JoinCondition::Using(identifiers) => { + self.resolve_using(identifiers, left_join_conditions, right_join_conditions) + .await?; } JoinCondition::Natural => { return Err(ErrorCode::UnImplement("NATURAL JOIN is not supported yet. Please specify join condition with ON clause.")); @@ -252,44 +255,111 @@ impl<'a> JoinConditionResolver<'a> { // // Only equi-predicate can be exploited by common join algorithms(e.g. sort-merge join, hash join). // For the predicates that aren't equi-predicate, we will lift them as a `Filter` operator. - if let Some((mut left, mut right)) = split_equivalent_predicate(predicate) { - let left_used_columns = left.used_columns(); - let right_used_columns = right.used_columns(); - let left_columns: ColumnSet = self.left_context.all_column_bindings().iter().fold( - ColumnSet::new(), - |mut acc, v| { + if let Some((left, right)) = split_equivalent_predicate(predicate) { + self.add_conditions(left, right, left_join_conditions, right_join_conditions)?; + } else { + other_join_conditions.push(predicate.clone()); + } + Ok(()) + } + + async fn resolve_using( + &mut self, + identifiers: &[Identifier<'_>], + left_join_conditions: &mut Vec, + right_join_conditions: &mut Vec, + ) -> Result<()> { + for join_key in identifiers.iter() { + let join_key_name = join_key.name.as_str(); + let mut left_scalars = vec![]; + for col_binding in self.left_context.columns.iter() { + if col_binding.column_name == join_key_name { + left_scalars.push(Scalar::BoundColumnRef(BoundColumnRef { + column: col_binding.clone(), + })); + } + } + if left_scalars.is_empty() { + return Err(ErrorCode::SemanticError(format!( + "column {} specified in USING clause does not exist in left table", + join_key_name + ))); + } + assert_eq!(left_scalars.len(), 1); + let mut right_scalars = vec![]; + for col_binding in self.right_context.columns.iter() { + if col_binding.column_name == join_key_name { + right_scalars.push(Scalar::BoundColumnRef(BoundColumnRef { + column: col_binding.clone(), + })); + } + } + if right_scalars.is_empty() { + return Err(ErrorCode::SemanticError(format!( + "column {} specified in USING clause does not exist in right table", + join_key_name + ))); + } + assert_eq!(right_scalars.len(), 1); + for col_binding in self.join_context.columns.iter_mut() { + if col_binding.column_name == join_key_name { + col_binding.duplicated = true; + break; + } + } + self.add_conditions( + left_scalars[0].clone(), + right_scalars[0].clone(), + left_join_conditions, + right_join_conditions, + )?; + } + Ok(()) + } + + fn add_conditions( + &self, + mut left: Scalar, + mut right: Scalar, + left_join_conditions: &mut Vec, + right_join_conditions: &mut Vec, + ) -> Result<()> { + let left_used_columns = left.used_columns(); + let right_used_columns = right.used_columns(); + let left_columns: ColumnSet = + self.left_context + .all_column_bindings() + .iter() + .fold(ColumnSet::new(), |mut acc, v| { acc.insert(v.index); acc - }, - ); - let right_columns: ColumnSet = self.right_context.all_column_bindings().iter().fold( - ColumnSet::new(), - |mut acc, v| { + }); + let right_columns: ColumnSet = + self.right_context + .all_column_bindings() + .iter() + .fold(ColumnSet::new(), |mut acc, v| { acc.insert(v.index); acc - }, - ); + }); - // Bump types of left conditions and right conditions - let left_type = left.data_type(); - let right_type = right.data_type(); - let least_super_type = merge_types(&left_type, &right_type)?; - left = wrap_cast_if_needed(left, &least_super_type); - right = wrap_cast_if_needed(right, &least_super_type); + // Bump types of left conditions and right conditions + let left_type = left.data_type(); + let right_type = right.data_type(); + let least_super_type = merge_types(&left_type, &right_type)?; + left = wrap_cast_if_needed(left, &least_super_type); + right = wrap_cast_if_needed(right, &least_super_type); - if left_used_columns.is_subset(&left_columns) - && right_used_columns.is_subset(&right_columns) - { - left_join_conditions.push(left); - right_join_conditions.push(right); - } else if left_used_columns.is_subset(&right_columns) - && right_used_columns.is_subset(&left_columns) - { - left_join_conditions.push(right); - right_join_conditions.push(left); - } - } else { - other_join_conditions.push(predicate.clone()); + if left_used_columns.is_subset(&left_columns) + && right_used_columns.is_subset(&right_columns) + { + left_join_conditions.push(left); + right_join_conditions.push(right); + } else if left_used_columns.is_subset(&right_columns) + && right_used_columns.is_subset(&left_columns) + { + left_join_conditions.push(right); + right_join_conditions.push(left); } Ok(()) } diff --git a/query/src/sql/planner/binder/project.rs b/query/src/sql/planner/binder/project.rs index 6addac1784d6..80afedb88c78 100644 --- a/query/src/sql/planner/binder/project.rs +++ b/query/src/sql/planner/binder/project.rs @@ -94,6 +94,9 @@ impl<'a> Binder { // Expands wildcard star, for example we have a table `t(a INT, b INT)`: // The query `SELECT * FROM t` will be expanded into `SELECT t.a, t.b FROM t` for column_binding in input_context.all_column_bindings() { + if column_binding.duplicated { + continue; + } output_context.add_column_binding(column_binding.clone()); } } @@ -121,6 +124,7 @@ impl<'a> Binder { index: column_ref.column.index, data_type, scalar: Some(Box::new(bound_expr.clone())), + duplicated: false, }, _ => { let index = self.metadata.add_column( @@ -136,6 +140,7 @@ impl<'a> Binder { index, data_type, scalar: Some(Box::new(bound_expr.clone())), + duplicated: false, } } }; diff --git a/query/src/sql/planner/binder/select.rs b/query/src/sql/planner/binder/select.rs index f9e1147014e4..5e402a40f38e 100644 --- a/query/src/sql/planner/binder/select.rs +++ b/query/src/sql/planner/binder/select.rs @@ -284,6 +284,7 @@ impl<'a> Binder { index: column.column_index, data_type: column.data_type.clone(), scalar: None, + duplicated: false, }; bind_context.add_column_binding(column_binding); } diff --git a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result index c928a8ba5810..d7d3fcd028e8 100644 --- a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result +++ b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result @@ -243,3 +243,10 @@ new_planner 0 8 5 ====Context Function==== default +===Inner Join with Using=== +4 3 4 +6 5 6 +3 +5 +4 +6 diff --git a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql index 8519e7778c8d..dfd7357e587b 100644 --- a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql +++ b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql @@ -175,4 +175,18 @@ select '====Context Function===='; use default; select database(); +-- Inner join with using +select '===Inner Join with Using==='; +drop table if exists t1; +create table t1(a int, b int); +insert into t1 values(7, 8), (3, 4), (5, 6); +drop table if exists t2; +create table t2(a int, d int); +insert into t2 values(1, 2), (3, 4), (5, 6); +select * from t1 join t2 using(a); +select t1.a from t1 join t2 using(a); +select t2.d from t1 join t2 using(a); +drop table t1; +drop table t2; + set enable_planner_v2 = 0; From 01b3abf74cbdd69a81ced952e82d613240724d91 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Tue, 17 May 2022 16:15:48 +0800 Subject: [PATCH 2/4] feat(planner): implement natural in join --- query/src/sql/planner/binder/join.rs | 33 +++++++++++++++---- .../20+_others/20_0001_planner_v2.result | 2 ++ .../20+_others/20_0001_planner_v2.sql | 1 + 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/query/src/sql/planner/binder/join.rs b/query/src/sql/planner/binder/join.rs index c89c7991eefd..ec980ecdd9f6 100644 --- a/query/src/sql/planner/binder/join.rs +++ b/query/src/sql/planner/binder/join.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use async_recursion::async_recursion; use common_ast::ast::Expr; -use common_ast::ast::Identifier; use common_ast::ast::Join; use common_ast::ast::JoinCondition; use common_ast::ast::JoinOperator; @@ -203,11 +202,22 @@ impl<'a> JoinConditionResolver<'a> { .await?; } JoinCondition::Using(identifiers) => { - self.resolve_using(identifiers, left_join_conditions, right_join_conditions) + let using_columns = identifiers + .iter() + .map(|ident| ident.name.clone()) + .collect::>(); + self.resolve_using(using_columns, left_join_conditions, right_join_conditions) .await?; } JoinCondition::Natural => { - return Err(ErrorCode::UnImplement("NATURAL JOIN is not supported yet. Please specify join condition with ON clause.")); + // NATURAL is a shorthand form of USING: it forms a USING list consisting of all column names that appear in both input tables + // As with USING, these columns appear only once in the output table + // Todo(xudong963) If there are no common column names, NATURAL JOIN behaves like JOIN ... ON TRUE, producing a cross-product join. + let mut using_columns = vec![]; + // Find common columns in both input tables + self.find_using_columns(&mut using_columns)?; + self.resolve_using(using_columns, left_join_conditions, right_join_conditions) + .await? } JoinCondition::None => { return Err(ErrorCode::UnImplement("JOIN without condition is not supported yet. Please specify join condition with ON clause.")); @@ -265,12 +275,12 @@ impl<'a> JoinConditionResolver<'a> { async fn resolve_using( &mut self, - identifiers: &[Identifier<'_>], + using_columns: Vec, left_join_conditions: &mut Vec, right_join_conditions: &mut Vec, ) -> Result<()> { - for join_key in identifiers.iter() { - let join_key_name = join_key.name.as_str(); + for join_key in using_columns.iter() { + let join_key_name = join_key.as_str(); let mut left_scalars = vec![]; for col_binding in self.left_context.columns.iter() { if col_binding.column_name == join_key_name { @@ -363,4 +373,15 @@ impl<'a> JoinConditionResolver<'a> { } Ok(()) } + + fn find_using_columns(&self, using_columns: &mut Vec) -> Result<()> { + for left_column in self.left_context.all_column_bindings().iter() { + for right_column in self.right_context.all_column_bindings().iter() { + if left_column.column_name == right_column.column_name { + using_columns.push(left_column.column_name.clone()); + } + } + } + Ok(()) + } } diff --git a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result index d7d3fcd028e8..cec298be8067 100644 --- a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result +++ b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result @@ -250,3 +250,5 @@ default 5 4 6 +4 3 4 +6 5 6 diff --git a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql index dfd7357e587b..999a5d416fcb 100644 --- a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql +++ b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql @@ -186,6 +186,7 @@ insert into t2 values(1, 2), (3, 4), (5, 6); select * from t1 join t2 using(a); select t1.a from t1 join t2 using(a); select t2.d from t1 join t2 using(a); +select * from t1 natural join t2; drop table t1; drop table t2; From ac52d595914d5a896ed29d43d821df72897d0d57 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Tue, 17 May 2022 19:45:09 +0800 Subject: [PATCH 3/4] refine variable name --- query/src/sql/planner/binder/bind_context.rs | 2 +- query/src/sql/planner/binder/join.rs | 2 +- query/src/sql/planner/binder/project.rs | 6 +++--- query/src/sql/planner/binder/select.rs | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/query/src/sql/planner/binder/bind_context.rs b/query/src/sql/planner/binder/bind_context.rs index f932774f1b9b..3b3cee4438e2 100644 --- a/query/src/sql/planner/binder/bind_context.rs +++ b/query/src/sql/planner/binder/bind_context.rs @@ -48,7 +48,7 @@ pub struct ColumnBinding { /// Consider the sql: `select * from t join t1 using(a)`. /// The result should only contain one `a` column. /// So we need mark `t.a` or `t1.a` duplicated. - pub duplicated: bool, + pub visible_in_unqualified_wildcard: bool, } /// `BindContext` stores all the free variables in a query and tracks the context of binding procedure. diff --git a/query/src/sql/planner/binder/join.rs b/query/src/sql/planner/binder/join.rs index ec980ecdd9f6..9eac6b394b09 100644 --- a/query/src/sql/planner/binder/join.rs +++ b/query/src/sql/planner/binder/join.rs @@ -313,7 +313,7 @@ impl<'a> JoinConditionResolver<'a> { assert_eq!(right_scalars.len(), 1); for col_binding in self.join_context.columns.iter_mut() { if col_binding.column_name == join_key_name { - col_binding.duplicated = true; + col_binding.visible_in_unqualified_wildcard = false; break; } } diff --git a/query/src/sql/planner/binder/project.rs b/query/src/sql/planner/binder/project.rs index 80afedb88c78..ebf6d853e689 100644 --- a/query/src/sql/planner/binder/project.rs +++ b/query/src/sql/planner/binder/project.rs @@ -94,7 +94,7 @@ impl<'a> Binder { // Expands wildcard star, for example we have a table `t(a INT, b INT)`: // The query `SELECT * FROM t` will be expanded into `SELECT t.a, t.b FROM t` for column_binding in input_context.all_column_bindings() { - if column_binding.duplicated { + if !column_binding.visible_in_unqualified_wildcard { continue; } output_context.add_column_binding(column_binding.clone()); @@ -124,7 +124,7 @@ impl<'a> Binder { index: column_ref.column.index, data_type, scalar: Some(Box::new(bound_expr.clone())), - duplicated: false, + visible_in_unqualified_wildcard: true, }, _ => { let index = self.metadata.add_column( @@ -140,7 +140,7 @@ impl<'a> Binder { index, data_type, scalar: Some(Box::new(bound_expr.clone())), - duplicated: false, + visible_in_unqualified_wildcard: true, } } }; diff --git a/query/src/sql/planner/binder/select.rs b/query/src/sql/planner/binder/select.rs index 5e402a40f38e..c123c6d89f64 100644 --- a/query/src/sql/planner/binder/select.rs +++ b/query/src/sql/planner/binder/select.rs @@ -284,7 +284,7 @@ impl<'a> Binder { index: column.column_index, data_type: column.data_type.clone(), scalar: None, - duplicated: false, + visible_in_unqualified_wildcard: true, }; bind_context.add_column_binding(column_binding); } From 75d93f8bcfd055629c5bf02ff25183b05200382b Mon Sep 17 00:00:00 2001 From: xudong963 Date: Tue, 17 May 2022 19:46:42 +0800 Subject: [PATCH 4/4] fix annotation --- query/src/sql/planner/binder/bind_context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/query/src/sql/planner/binder/bind_context.rs b/query/src/sql/planner/binder/bind_context.rs index 3b3cee4438e2..f75765c10943 100644 --- a/query/src/sql/planner/binder/bind_context.rs +++ b/query/src/sql/planner/binder/bind_context.rs @@ -47,7 +47,7 @@ pub struct ColumnBinding { /// Consider the sql: `select * from t join t1 using(a)`. /// The result should only contain one `a` column. - /// So we need mark `t.a` or `t1.a` duplicated. + /// So we need make `t.a` or `t1.a` invisible in unqualified wildcard. pub visible_in_unqualified_wildcard: bool, }