Skip to content

Commit

Permalink
Merge pull request #5410 from ygf11/dev
Browse files Browse the repository at this point in the history
Feature: Support DISTINCT in new planner
  • Loading branch information
leiysky authored May 18, 2022
2 parents f9acfea + ab2b2a4 commit 2e78aa1
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 1 deletion.
10 changes: 9 additions & 1 deletion query/src/sql/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod data_schema_builder;
mod expression_builder;
mod util;

use std::ops::Not;
use std::sync::Arc;

use common_datavalues::DataField;
Expand Down Expand Up @@ -339,7 +340,7 @@ impl PipelineBuilder {
group_expressions.push(expr);
}

if !find_aggregate_exprs(&group_expressions).is_empty() {
if !aggregate.from_distinct && !find_aggregate_exprs(&group_expressions).is_empty() {
return Err(ErrorCode::SyntaxException(
"Group by clause cannot contain aggregate functions",
));
Expand All @@ -351,6 +352,7 @@ impl PipelineBuilder {
let pre_input_schema = input_schema.clone();
let input_schema =
schema_builder.build_group_by(input_schema, group_expressions.as_slice())?;

if !input_schema.eq(&pre_input_schema) {
pipeline.add_transform(|transform_input_port, transform_output_port| {
ExpressionTransform::try_create(
Expand All @@ -364,6 +366,12 @@ impl PipelineBuilder {
})?;
}

// Since transform has been added, making group expressions as column expr is safe.
group_expressions
.iter_mut()
.filter(|expr| matches!(expr, Expression::Column(_)).not())
.for_each(|expr| *expr = Expression::Column(expr.column_name()));

// Process aggregation function with non-column expression, such as sum(3)
let pre_input_schema = input_schema.clone();
let res =
Expand Down
1 change: 1 addition & 0 deletions query/src/sql/planner/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl<'a> Binder {
let aggregate_plan = AggregatePlan {
group_items: group_expr,
aggregate_functions: agg_info.aggregate_functions.clone(),
from_distinct: false,
};
let new_expr = SExpr::create_unary(aggregate_plan.into(), child);
Ok((new_expr, output_context))
Expand Down
46 changes: 46 additions & 0 deletions query/src/sql/planner/binder/distinct.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_exception::Result;

use crate::sql::binder::Binder;
use crate::sql::optimizer::SExpr;
use crate::sql::plans::AggregatePlan;
use crate::sql::plans::BoundColumnRef;
use crate::sql::plans::Scalar;
use crate::sql::BindContext;

impl<'a> Binder {
pub(super) fn bind_distinct(&self, bind_context: &BindContext, child: SExpr) -> Result<SExpr> {
// Like aggregate, we just use scalar directly.
let group_items: Vec<Scalar> = bind_context
.all_column_bindings()
.iter()
.map(|v| {
v.scalar
.clone()
.map(|scalar| *scalar)
.unwrap_or_else(|| Scalar::BoundColumnRef(BoundColumnRef { column: v.clone() }))
})
.collect();

let distinct_plan = AggregatePlan {
group_items,
aggregate_functions: vec![],
from_distinct: true,
};

Ok(SExpr::create_unary(distinct_plan.into(), child))
}
}
1 change: 1 addition & 0 deletions query/src/sql/planner/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::storages::Table;

mod aggregate;
mod bind_context;
mod distinct;
mod join;
mod limit;
mod project;
Expand Down
4 changes: 4 additions & 0 deletions query/src/sql/planner/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ impl<'a> Binder {
s_expr = self.bind_where(&from_context, expr, s_expr, true).await?;
}

if stmt.distinct {
s_expr = self.bind_distinct(&output_context, s_expr)?;
}

s_expr = self.bind_projection(&output_context, s_expr)?;

if !order_by.is_empty() {
Expand Down
2 changes: 2 additions & 0 deletions query/src/sql/planner/plans/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub struct AggregatePlan {
pub group_items: Vec<Scalar>,
// aggregate scalar expressions, such as: sum(col1), count(*);
pub aggregate_functions: Vec<Scalar>,
// True if the plan is generated from distinct, else the plan is a normal aggregate;
pub from_distinct: bool,
}

impl BasePlan for AggregatePlan {
Expand Down
10 changes: 10 additions & 0 deletions tests/suites/0_stateless/20+_others/20_0001_planner_v2.result
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ new_planner
0 8 5
====Context Function====
default
==== Distinct =====
0
1
2
1
0
1
2
3
4
===Inner Join with Using===
4 3 4
6 5 6
Expand Down
7 changes: 7 additions & 0 deletions tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ select '====Context Function====';
use default;
select database();

-- distinct
select '==== Distinct =====';
SELECT DISTINCT * FROM numbers(3) ORDER BY number;
SELECT DISTINCT 1 FROM numbers(3);
SELECT DISTINCT (number %3) as c FROM numbers(1000) ORDER BY c;
SELECT DISTINCT count(number %3) as c FROM numbers(10) group by number % 3 ORDER BY c;

-- Inner join with using
select '===Inner Join with Using===';
drop table if exists t1;
Expand Down

0 comments on commit 2e78aa1

Please sign in to comment.