Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planner): support using and natural for join #5423

Merged
merged 4 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions query/src/sql/planner/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ pub struct ColumnBinding {
/// Another example is aggregation. In a `GROUP BY` context, aggregate funtions
/// will be extracted and be added to `BindContext` as a `ColumnBinding`.
pub scalar: Option<Box<Scalar>>,

/// Consider the sql: `select * from t join t1 using(a)`.
/// The result should only contain one `a` column.
/// So we need mark `t.a` or `t1.a` duplicated.
pub duplicated: bool,
xudong963 marked this conversation as resolved.
Show resolved Hide resolved
}

/// `BindContext` stores all the free variables in a query and tracks the context of binding procedure.
Expand Down
169 changes: 130 additions & 39 deletions query/src/sql/planner/binder/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::sql::optimizer::ColumnSet;
use crate::sql::optimizer::SExpr;
use crate::sql::planner::binder::scalar::ScalarBinder;
use crate::sql::planner::binder::Binder;
use crate::sql::plans::BoundColumnRef;
use crate::sql::plans::FilterPlan;
use crate::sql::plans::LogicalInnerJoin;
use crate::sql::plans::Scalar;
Expand Down Expand Up @@ -63,11 +64,11 @@ impl<'a> Binder {
let mut left_join_conditions: Vec<Scalar> = vec![];
let mut right_join_conditions: Vec<Scalar> = vec![];
let mut other_conditions: Vec<Scalar> = vec![];
let join_condition_resolver = JoinConditionResolver::new(
let mut join_condition_resolver = JoinConditionResolver::new(
self.ctx.clone(),
&left_context,
&right_context,
&bind_context,
&mut bind_context,
&join.condition,
);
join_condition_resolver
Expand Down Expand Up @@ -163,7 +164,7 @@ struct JoinConditionResolver<'a> {

left_context: &'a BindContext,
right_context: &'a BindContext,
join_context: &'a BindContext,
join_context: &'a mut BindContext,
join_condition: &'a JoinCondition<'a>,
}

Expand All @@ -172,7 +173,7 @@ impl<'a> JoinConditionResolver<'a> {
ctx: Arc<QueryContext>,
left_context: &'a BindContext,
right_context: &'a BindContext,
join_context: &'a BindContext,
join_context: &'a mut BindContext,
join_condition: &'a JoinCondition<'a>,
) -> Self {
Self {
Expand All @@ -185,7 +186,7 @@ impl<'a> JoinConditionResolver<'a> {
}

pub async fn resolve(
&self,
&mut self,
left_join_conditions: &mut Vec<Scalar>,
right_join_conditions: &mut Vec<Scalar>,
other_join_conditions: &mut Vec<Scalar>,
Expand All @@ -200,11 +201,23 @@ impl<'a> JoinConditionResolver<'a> {
)
.await?;
}
JoinCondition::Using(_) => {
return Err(ErrorCode::UnImplement("USING clause is not supported yet. Please specify join condition with ON clause."));
JoinCondition::Using(identifiers) => {
let using_columns = identifiers
.iter()
.map(|ident| ident.name.clone())
.collect::<Vec<String>>();
self.resolve_using(using_columns, left_join_conditions, right_join_conditions)
.await?;
}
JoinCondition::Natural => {
return Err(ErrorCode::UnImplement("NATURAL JOIN is not supported yet. Please specify join condition with ON clause."));
// NATURAL is a shorthand form of USING: it forms a USING list consisting of all column names that appear in both input tables
// As with USING, these columns appear only once in the output table
// Todo(xudong963) If there are no common column names, NATURAL JOIN behaves like JOIN ... ON TRUE, producing a cross-product join.
let mut using_columns = vec![];
// Find common columns in both input tables
self.find_using_columns(&mut using_columns)?;
self.resolve_using(using_columns, left_join_conditions, right_join_conditions)
.await?
}
JoinCondition::None => {
return Err(ErrorCode::UnImplement("JOIN without condition is not supported yet. Please specify join condition with ON clause."));
Expand Down Expand Up @@ -252,44 +265,122 @@ impl<'a> JoinConditionResolver<'a> {
//
// Only equi-predicate can be exploited by common join algorithms(e.g. sort-merge join, hash join).
// For the predicates that aren't equi-predicate, we will lift them as a `Filter` operator.
if let Some((mut left, mut right)) = split_equivalent_predicate(predicate) {
let left_used_columns = left.used_columns();
let right_used_columns = right.used_columns();
let left_columns: ColumnSet = self.left_context.all_column_bindings().iter().fold(
ColumnSet::new(),
|mut acc, v| {
if let Some((left, right)) = split_equivalent_predicate(predicate) {
self.add_conditions(left, right, left_join_conditions, right_join_conditions)?;
} else {
other_join_conditions.push(predicate.clone());
}
Ok(())
}

async fn resolve_using(
&mut self,
using_columns: Vec<String>,
left_join_conditions: &mut Vec<Scalar>,
right_join_conditions: &mut Vec<Scalar>,
) -> Result<()> {
for join_key in using_columns.iter() {
let join_key_name = join_key.as_str();
let mut left_scalars = vec![];
for col_binding in self.left_context.columns.iter() {
if col_binding.column_name == join_key_name {
left_scalars.push(Scalar::BoundColumnRef(BoundColumnRef {
column: col_binding.clone(),
}));
}
}
if left_scalars.is_empty() {
return Err(ErrorCode::SemanticError(format!(
"column {} specified in USING clause does not exist in left table",
join_key_name
)));
}
assert_eq!(left_scalars.len(), 1);
let mut right_scalars = vec![];
for col_binding in self.right_context.columns.iter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better use find

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it in another PR :)

if col_binding.column_name == join_key_name {
right_scalars.push(Scalar::BoundColumnRef(BoundColumnRef {
column: col_binding.clone(),
}));
}
}
if right_scalars.is_empty() {
return Err(ErrorCode::SemanticError(format!(
"column {} specified in USING clause does not exist in right table",
join_key_name
)));
}
assert_eq!(right_scalars.len(), 1);
for col_binding in self.join_context.columns.iter_mut() {
if col_binding.column_name == join_key_name {
col_binding.duplicated = true;
break;
}
}
self.add_conditions(
left_scalars[0].clone(),
right_scalars[0].clone(),
left_join_conditions,
right_join_conditions,
)?;
}
Ok(())
}

fn add_conditions(
&self,
mut left: Scalar,
mut right: Scalar,
left_join_conditions: &mut Vec<Scalar>,
right_join_conditions: &mut Vec<Scalar>,
) -> Result<()> {
let left_used_columns = left.used_columns();
let right_used_columns = right.used_columns();
let left_columns: ColumnSet =
self.left_context
.all_column_bindings()
.iter()
.fold(ColumnSet::new(), |mut acc, v| {
acc.insert(v.index);
acc
},
);
let right_columns: ColumnSet = self.right_context.all_column_bindings().iter().fold(
ColumnSet::new(),
|mut acc, v| {
});
let right_columns: ColumnSet =
self.right_context
.all_column_bindings()
.iter()
.fold(ColumnSet::new(), |mut acc, v| {
acc.insert(v.index);
acc
},
);
});

// Bump types of left conditions and right conditions
let left_type = left.data_type();
let right_type = right.data_type();
let least_super_type = merge_types(&left_type, &right_type)?;
left = wrap_cast_if_needed(left, &least_super_type);
right = wrap_cast_if_needed(right, &least_super_type);
// Bump types of left conditions and right conditions
let left_type = left.data_type();
let right_type = right.data_type();
let least_super_type = merge_types(&left_type, &right_type)?;
left = wrap_cast_if_needed(left, &least_super_type);
right = wrap_cast_if_needed(right, &least_super_type);

if left_used_columns.is_subset(&left_columns)
&& right_used_columns.is_subset(&right_columns)
{
left_join_conditions.push(left);
right_join_conditions.push(right);
} else if left_used_columns.is_subset(&right_columns)
&& right_used_columns.is_subset(&left_columns)
{
left_join_conditions.push(right);
right_join_conditions.push(left);
if left_used_columns.is_subset(&left_columns)
&& right_used_columns.is_subset(&right_columns)
{
left_join_conditions.push(left);
right_join_conditions.push(right);
} else if left_used_columns.is_subset(&right_columns)
&& right_used_columns.is_subset(&left_columns)
{
left_join_conditions.push(right);
right_join_conditions.push(left);
}
Ok(())
}

fn find_using_columns(&self, using_columns: &mut Vec<String>) -> Result<()> {
for left_column in self.left_context.all_column_bindings().iter() {
for right_column in self.right_context.all_column_bindings().iter() {
if left_column.column_name == right_column.column_name {
using_columns.push(left_column.column_name.clone());
}
}
} else {
other_join_conditions.push(predicate.clone());
}
Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions query/src/sql/planner/binder/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ impl<'a> Binder {
// Expands wildcard star, for example we have a table `t(a INT, b INT)`:
// The query `SELECT * FROM t` will be expanded into `SELECT t.a, t.b FROM t`
for column_binding in input_context.all_column_bindings() {
if column_binding.duplicated {
continue;
}
output_context.add_column_binding(column_binding.clone());
}
}
Expand Down Expand Up @@ -121,6 +124,7 @@ impl<'a> Binder {
index: column_ref.column.index,
data_type,
scalar: Some(Box::new(bound_expr.clone())),
duplicated: false,
},
_ => {
let index = self.metadata.add_column(
Expand All @@ -136,6 +140,7 @@ impl<'a> Binder {
index,
data_type,
scalar: Some(Box::new(bound_expr.clone())),
duplicated: false,
}
}
};
Expand Down
1 change: 1 addition & 0 deletions query/src/sql/planner/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl<'a> Binder {
index: column.column_index,
data_type: column.data_type.clone(),
scalar: None,
duplicated: false,
};
bind_context.add_column_binding(column_binding);
}
Expand Down
9 changes: 9 additions & 0 deletions tests/suites/0_stateless/20+_others/20_0001_planner_v2.result
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,12 @@ new_planner
0 8 5
====Context Function====
default
===Inner Join with Using===
4 3 4
6 5 6
3
5
4
6
4 3 4
6 5 6
15 changes: 15 additions & 0 deletions tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,19 @@ select '====Context Function====';
use default;
select database();

-- Inner join with using
select '===Inner Join with Using===';
drop table if exists t1;
create table t1(a int, b int);
insert into t1 values(7, 8), (3, 4), (5, 6);
drop table if exists t2;
create table t2(a int, d int);
insert into t2 values(1, 2), (3, 4), (5, 6);
select * from t1 join t2 using(a);
select t1.a from t1 join t2 using(a);
select t2.d from t1 join t2 using(a);
select * from t1 natural join t2;
drop table t1;
drop table t2;

set enable_planner_v2 = 0;