Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planner): Support select operator in new planner framework #5059

Merged
merged 3 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions query/src/sql/exec/expression_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::sql::exec::util::format_field_name;
use crate::sql::plans::Scalar;
use crate::sql::IndexType;
use crate::sql::Metadata;
use crate::sql::ScalarExprRef;

pub struct ExpressionBuilder<'a> {
metadata: &'a Metadata,
Expand All @@ -32,6 +33,9 @@ impl<'a> ExpressionBuilder<'a> {
pub fn build(&self, scalar: &Scalar) -> Result<Expression> {
match scalar {
Scalar::ColumnRef { index, .. } => self.build_column_ref(*index),
Scalar::Equal { left, right } => {
self.build_binary_operator(left.clone(), right.clone(), "=".to_string())
}
}
}

Expand All @@ -42,4 +46,19 @@ impl<'a> ExpressionBuilder<'a> {
index,
)))
}

pub fn build_binary_operator(
&self,
left: ScalarExprRef,
right: ScalarExprRef,
op: String,
) -> Result<Expression> {
let left_child = self.build(left.as_any().downcast_ref::<Scalar>().unwrap())?;
let right_child = self.build(right.as_any().downcast_ref::<Scalar>().unwrap())?;
Ok(Expression::BinaryExpression {
left: Box::new(left_child),
op,
right: Box::new(right_child),
})
}
}
29 changes: 29 additions & 0 deletions query/src/sql/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ pub use util::decode_field_name;
pub use util::format_field_name;

use crate::pipelines::new::processors::ProjectionTransform;
use crate::pipelines::new::processors::TransformFilter;
use crate::pipelines::new::NewPipeline;
use crate::sessions::QueryContext;
use crate::sql::exec::data_schema_builder::DataSchemaBuilder;
use crate::sql::exec::expression_builder::ExpressionBuilder;
use crate::sql::exec::util::check_physical;
use crate::sql::optimizer::SExpr;
use crate::sql::plans::FilterPlan;
use crate::sql::plans::PhysicalScan;
use crate::sql::plans::PlanType;
use crate::sql::plans::ProjectPlan;
Expand Down Expand Up @@ -129,6 +131,11 @@ impl PipelineBuilder {
let input_schema = self.build_pipeline(&expression.children()[0])?;
self.build_project(project, input_schema)
}
PlanType::Filter => {
let filter = plan.as_any().downcast_ref::<FilterPlan>().unwrap();
let input_schema = self.build_pipeline(&expression.children()[0])?;
self.build_filter(filter, input_schema)
}
_ => Err(ErrorCode::LogicalError("Invalid physical plan")),
}
}
Expand Down Expand Up @@ -163,6 +170,28 @@ impl PipelineBuilder {
Ok(output_schema)
}

fn build_filter(
&mut self,
filter: &FilterPlan,
input_schema: DataSchemaRef,
) -> Result<DataSchemaRef> {
let output_schema = input_schema.clone();
let eb = ExpressionBuilder::create(&self.metadata);
let scalar = filter.predicate.as_any().downcast_ref::<Scalar>().unwrap();
let pred = eb.build(scalar)?;
self.pipeline
.add_transform(|transform_input_port, transform_output_port| {
TransformFilter::try_create(
input_schema.clone(),
pred.clone(),
transform_input_port,
transform_output_port,
self.ctx.clone(),
)
})?;
Ok(output_schema)
}

fn build_physical_scan(&mut self, scan: &PhysicalScan) -> Result<DataSchemaRef> {
let table_entry = self.metadata.table(scan.table_index);
let plan = table_entry.source.clone();
Expand Down
30 changes: 29 additions & 1 deletion query/src/sql/planner/binder/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
use std::any::Any;
use std::sync::Arc;

use common_ast::ast::BinaryOperator;
use common_ast::ast::Expr;
use common_datavalues::DataTypePtr;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::sql::planner::binder::BindContext;
Expand All @@ -43,7 +45,33 @@ impl ScalarBinder {
nullable: column_binding.nullable,
}))
}
_ => todo!(),
Expr::BinaryOp { op, left, right } => {
self.bind_binary_op(op, left.as_ref(), right.as_ref(), bind_context)
}
_ => Err(ErrorCode::UnImplement(format!(
"Unsupported expr: {:?}",
expr
))),
}
}

fn bind_binary_op(
&self,
op: &BinaryOperator,
left_child: &Expr,
right_child: &Expr,
bind_context: &BindContext,
) -> Result<ScalarExprRef> {
let left_scalar = self.bind_expr(left_child, bind_context)?;
let right_scalar = self.bind_expr(right_child, bind_context)?;
match op {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to match the op here, can't we just leave it to Expression?

--- a/query/src/sql/planner/binder/scalar.rs
+++ b/query/src/sql/planner/binder/scalar.rs
@@ -64,16 +64,10 @@ impl ScalarBinder {
     ) -> Result<ScalarExprRef> {
         let left_scalar = self.bind_expr(left_child, bind_context)?;
         let right_scalar = self.bind_expr(right_child, bind_context)?;
-        match op {
-            BinaryOperator::Eq => Ok(Arc::new(Scalar::Equal {
-                left: left_scalar,
-                right: right_scalar,
-            })),
-            _ => Err(ErrorCode::UnImplement(format!(
-                "Unsupported binary operator: {}",
-                op.to_string()
-            ))),
-        }
+        Ok(Arc::new(Scalar::Binary {
+            left: left_scalar,
+            right: right_scalar,
+        }))
     }
 }

Then other ops will be supported

mysql> set enable_planner_v2=1;
No connection. Trying to reconnect...
Connection id:    8
Current database: *** NONE ***

Query OK, 0 rows affected (0.56 sec)
Read 0 rows, 0 B in 0.537 sec., 0 rows/sec., 0 B/sec.

mysql> select * from t;
+------+
| a    |
+------+
|    1 |
|    2 |
+------+
2 rows in set (0.06 sec)
Read 2 rows, 8 B in 0.017 sec., 115.6 rows/sec., 462.41 B/sec.

mysql> select a from t where a <= a;
+------+
| a    |
+------+
|    1 |
|    2 |
+------+
2 rows in set (0.04 sec)
Read 2 rows, 8 B in 0.005 sec., 378.62 rows/sec., 1.51 KB/sec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some binary operators have useful property, I'd like to make them typed.

More scalar expressions would be supported later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The useful property will be used in the optimizer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for instance cardinality estimation.

BinaryOperator::Eq => Ok(Arc::new(Scalar::Equal {
left: left_scalar,
right: right_scalar,
})),
_ => Err(ErrorCode::UnImplement(format!(
"Unsupported binary operator: {op}",
))),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions query/src/sql/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::sync::Arc;

pub use binder::ScalarExpr;
pub use binder::ScalarExprRef;
use common_ast::parser::parse_sql;
use common_ast::parser::tokenize_sql;
use common_exception::ErrorCode;
Expand Down
7 changes: 7 additions & 0 deletions query/src/sql/planner/plans/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

use std::any::Any;

use common_datavalues::BooleanType;
use common_datavalues::DataTypePtr;

use crate::sql::planner::binder::ScalarExpr;
use crate::sql::planner::binder::ScalarExprRef;
use crate::sql::IndexType;

pub enum Scalar {
Expand All @@ -25,6 +27,10 @@ pub enum Scalar {
data_type: DataTypePtr,
nullable: bool,
},
Equal {
left: ScalarExprRef,
right: ScalarExprRef,
},
}

impl ScalarExpr for Scalar {
Expand All @@ -35,6 +41,7 @@ impl ScalarExpr for Scalar {
nullable,
..
} => (data_type.clone(), *nullable),
Scalar::Equal { .. } => (BooleanType::arc(), false),
}
}

Expand Down