diff --git a/common/datavalues/benches/data_type.rs b/common/datavalues/benches/data_type.rs index a92876bc9c5c..6ccfb323dadc 100644 --- a/common/datavalues/benches/data_type.rs +++ b/common/datavalues/benches/data_type.rs @@ -62,11 +62,11 @@ fn add_benchmark(c: &mut Criterion) { }); } -fn data_type_ptr_create(ty: &DataTypePtr, values: &Vec) -> Result { +fn data_type_ptr_create(ty: &DataTypePtr, values: &[DataValue]) -> Result { ty.create_column(values) } -fn data_type_enum_create(ty: &DataTypeImpl, values: &Vec) -> Result { +fn data_type_enum_create(ty: &DataTypeImpl, values: &[DataValue]) -> Result { ty.create_column(values) } diff --git a/query/src/interpreters/interpreter_select_v2.rs b/query/src/interpreters/interpreter_select_v2.rs index 72fd121a7156..c000bfe50c10 100644 --- a/query/src/interpreters/interpreter_select_v2.rs +++ b/query/src/interpreters/interpreter_select_v2.rs @@ -32,9 +32,11 @@ pub struct SelectInterpreterV2 { } impl SelectInterpreterV2 { - #[allow(dead_code)] - pub fn try_create(ctx: Arc, query: String) -> Result { - Ok(Arc::new(SelectInterpreterV2 { ctx, query })) + pub fn try_create(ctx: Arc, query: &str) -> Result { + Ok(Arc::new(SelectInterpreterV2 { + ctx, + query: query.to_string(), + })) } } @@ -56,4 +58,12 @@ impl Interpreter for SelectInterpreterV2 { let executor_stream = Box::pin(ProcessorExecutorStream::create(executor)?); Ok(Box::pin(self.ctx.try_create_abortable(executor_stream)?)) } + + async fn start(&self) -> Result<()> { + Ok(()) + } + + async fn finish(&self) -> Result<()> { + Ok(()) + } } diff --git a/query/src/interpreters/mod.rs b/query/src/interpreters/mod.rs index 42bc327084cd..5b4a17c52cdc 100644 --- a/query/src/interpreters/mod.rs +++ b/query/src/interpreters/mod.rs @@ -95,6 +95,7 @@ pub use interpreter_role_drop::DropRoleInterpreter; pub use interpreter_role_grant::GrantRoleInterpreter; pub use interpreter_role_revoke::RevokeRoleInterpreter; pub use interpreter_select::SelectInterpreter; +pub use interpreter_select_v2::SelectInterpreterV2; pub use interpreter_setting::SettingInterpreter; pub use interpreter_show_databases::ShowDatabasesInterpreter; pub use interpreter_show_functions::ShowFunctionsInterpreter; diff --git a/query/src/servers/mysql/mysql_interactive_worker.rs b/query/src/servers/mysql/mysql_interactive_worker.rs index 74e6e45429c5..ee2fcba191ae 100644 --- a/query/src/servers/mysql/mysql_interactive_worker.rs +++ b/query/src/servers/mysql/mysql_interactive_worker.rs @@ -35,7 +35,9 @@ use opensrv_mysql::StatementMetaWriter; use rand::RngCore; use tokio_stream::StreamExt; +use crate::interpreters::Interpreter; use crate::interpreters::InterpreterFactory; +use crate::interpreters::SelectInterpreterV2; use crate::servers::mysql::writers::DFInitResultWriter; use crate::servers::mysql::writers::DFQueryResultWriter; use crate::servers::mysql::MySQLFederated; @@ -273,15 +275,42 @@ impl InteractiveWorkerBase { tracing::info!("Normal query: {}", query); let context = self.session.create_query_context().await?; context.attach_query_str(query); + let (plan, hints) = PlanParser::parse_with_hint(query, context.clone()).await; + if let (Some(hint_error_code), Err(error_code)) = ( + hints + .iter() + .find(|v| v.error_code.is_some()) + .and_then(|x| x.error_code), + &plan, + ) { + // Pre-check if parsing error can be ignored + if hint_error_code == error_code.code() { + return Ok((vec![DataBlock::empty()], String::from(""))); + } + } + + let plan = plan?; + let settings = context.get_settings(); + + let interpreter: Arc = + if settings.get_enable_new_processor_framework()? != 0 + && settings.get_enable_planner_v2()? != 0 + && matches!(plan, PlanNode::Select(..)) + { + // New planner is enabled, and the statement is ensured to be `SELECT` statement. + SelectInterpreterV2::try_create(context.clone(), query)? + } else { + InterpreterFactory::get(context.clone(), plan)? + }; match hints .iter() .find(|v| v.error_code.is_some()) .and_then(|x| x.error_code) { - None => Self::exec_query(plan, &context).await, - Some(hint_error_code) => match Self::exec_query(plan, &context).await { + None => Self::exec_query(interpreter, &context).await, + Some(hint_error_code) => match Self::exec_query(interpreter, &context).await { Ok(_) => Err(ErrorCode::UnexpectedError(format!( "Expected server error code: {} but got: Ok.", hint_error_code @@ -303,13 +332,12 @@ impl InteractiveWorkerBase { } } - #[tracing::instrument(level = "debug", skip(plan, context))] + #[tracing::instrument(level = "debug", skip(interpreter, context))] async fn exec_query( - plan: Result, + interpreter: Arc, context: &Arc, ) -> Result<(Vec, String)> { let instant = Instant::now(); - let interpreter = InterpreterFactory::get(context.clone(), plan?)?; let query_result = context.try_spawn( async move { diff --git a/query/src/sessions/session_settings.rs b/query/src/sessions/session_settings.rs index 04e85048a45c..5c855070e9eb 100644 --- a/query/src/sessions/session_settings.rs +++ b/query/src/sessions/session_settings.rs @@ -113,37 +113,39 @@ impl Settings { level: ScopeLevel::Session, desc: "Enable new processor framework if value != 0, default value: 1", }, - + // enable_planner_v2 + SettingValue { + default_value: DataValue::UInt64(0), + user_setting: UserSetting::create("enable_planner_v2", DataValue::UInt64(0)), + level: ScopeLevel::Session, + desc: "Enable planner v2 by setting this variable to 1, default value: 0", + }, SettingValue { default_value: DataValue::String("\n".as_bytes().to_vec()), user_setting: UserSetting::create("record_delimiter", DataValue::String("\n".as_bytes().to_vec())), level: ScopeLevel::Session, desc: "Format record_delimiter, default value: \n", }, - SettingValue { - default_value:DataValue::String(",".as_bytes().to_vec()), + default_value: DataValue::String(",".as_bytes().to_vec()), user_setting: UserSetting::create("field_delimiter", DataValue::String(",".as_bytes().to_vec())), level: ScopeLevel::Session, desc: "Format field delimiter, default value: ,", }, - SettingValue { default_value: DataValue::UInt64(1), user_setting: UserSetting::create("empty_as_default", DataValue::UInt64(1)), level: ScopeLevel::Session, desc: "Format empty_as_default, default value: 1", }, - SettingValue { default_value: DataValue::UInt64(0), user_setting: UserSetting::create("skip_header", DataValue::UInt64(0)), level: ScopeLevel::Session, desc: "Whether to skip the input header, default value: 0", }, - SettingValue { - default_value:DataValue::String("UTC".as_bytes().to_vec()), + default_value: DataValue::String("UTC".as_bytes().to_vec()), user_setting: UserSetting::create("timezone", DataValue::String("UTC".as_bytes().to_vec())), level: ScopeLevel::Session, desc: "Timezone, default value: UTC,", @@ -216,6 +218,11 @@ impl Settings { self.try_get_u64(key) } + pub fn get_enable_planner_v2(&self) -> Result { + static KEY: &str = "enable_planner_v2"; + self.try_get_u64(KEY) + } + pub fn get_field_delimiter(&self) -> Result> { let key = "field_delimiter"; self.check_and_get_setting_value(key) diff --git a/query/src/sql/exec/data_schema_builder.rs b/query/src/sql/exec/data_schema_builder.rs index 8da6b2e5b4bd..efdf26dd1510 100644 --- a/query/src/sql/exec/data_schema_builder.rs +++ b/query/src/sql/exec/data_schema_builder.rs @@ -22,6 +22,7 @@ use common_exception::Result; use crate::sql::exec::util::format_field_name; use crate::sql::plans::PhysicalScan; use crate::sql::plans::ProjectPlan; +use crate::sql::IndexType; use crate::sql::Metadata; pub struct DataSchemaBuilder<'a> { @@ -70,4 +71,21 @@ impl<'a> DataSchemaBuilder<'a> { Ok(Arc::new(DataSchema::new(fields))) } + + pub fn build_canonical_schema(&self, columns: &[IndexType]) -> DataSchemaRef { + let mut fields: Vec = vec![]; + for index in columns { + let column_entry = self.metadata.column(*index); + let field_name = column_entry.name.clone(); + let field = if column_entry.nullable { + DataField::new_nullable(field_name.as_str(), column_entry.data_type.clone()) + } else { + DataField::new(field_name.as_str(), column_entry.data_type.clone()) + }; + + fields.push(field); + } + + Arc::new(DataSchema::new(fields)) + } } diff --git a/query/src/sql/exec/mod.rs b/query/src/sql/exec/mod.rs index 85df79ee97e0..338e6c750044 100644 --- a/query/src/sql/exec/mod.rs +++ b/query/src/sql/exec/mod.rs @@ -18,9 +18,12 @@ mod util; use std::sync::Arc; +use common_datavalues::DataField; +use common_datavalues::DataSchema; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; +use common_planners::Expression; pub use util::decode_field_name; pub use util::format_field_name; @@ -35,21 +38,29 @@ use crate::sql::plans::PhysicalScan; use crate::sql::plans::PlanType; use crate::sql::plans::ProjectPlan; use crate::sql::plans::Scalar; +use crate::sql::IndexType; use crate::sql::Metadata; /// Helper to build a `Pipeline` from `SExpr` pub struct PipelineBuilder { ctx: Arc, metadata: Metadata, + result_columns: Vec<(IndexType, String)>, expression: SExpr, pipeline: NewPipeline, } impl PipelineBuilder { - pub fn new(ctx: Arc, metadata: Metadata, expression: SExpr) -> Self { + pub fn new( + ctx: Arc, + result_columns: Vec<(IndexType, String)>, + metadata: Metadata, + expression: SExpr, + ) -> Self { PipelineBuilder { ctx, metadata, + result_columns, expression, pipeline: NewPipeline::create(), } @@ -57,10 +68,50 @@ impl PipelineBuilder { pub fn spawn(mut self) -> Result { let expr = self.expression.clone(); - self.build_pipeline(&expr)?; + let schema = self.build_pipeline(&expr)?; + self.align_data_schema(schema)?; + let settings = self.ctx.get_settings(); + self.pipeline + .set_max_threads(settings.get_max_threads()? as usize); Ok(self.pipeline) } + fn align_data_schema(&mut self, input_schema: DataSchemaRef) -> Result<()> { + let mut projections = Vec::with_capacity(self.result_columns.len()); + let mut output_fields = Vec::with_capacity(self.result_columns.len()); + for (index, name) in self.result_columns.iter() { + let column_entry = self.metadata.column(*index); + let field_name = &column_entry.name; + projections.push(Expression::Alias( + name.clone(), + Box::new(Expression::Column(format_field_name( + field_name.as_str(), + *index, + ))), + )); + let field = if column_entry.nullable { + DataField::new_nullable(name.as_str(), column_entry.data_type.clone()) + } else { + DataField::new(name.as_str(), column_entry.data_type.clone()) + }; + output_fields.push(field); + } + let output_schema = Arc::new(DataSchema::new(output_fields)); + + self.pipeline + .add_transform(|transform_input_port, transform_output_port| { + ProjectionTransform::try_create( + transform_input_port, + transform_output_port, + input_schema.clone(), + output_schema.clone(), + projections.clone(), + self.ctx.clone(), + ) + })?; + Ok(()) + } + fn build_pipeline(&mut self, expression: &SExpr) -> Result { if !check_physical(expression) { return Err(ErrorCode::LogicalError("Invalid physical plan")); @@ -75,14 +126,18 @@ impl PipelineBuilder { } PlanType::Project => { let project = plan.as_any().downcast_ref::().unwrap(); - self.build_project(project, &expression.children()[0]) + let input_schema = self.build_pipeline(&expression.children()[0])?; + self.build_project(project, input_schema) } _ => Err(ErrorCode::LogicalError("Invalid physical plan")), } } - fn build_project(&mut self, project: &ProjectPlan, child: &SExpr) -> Result { - let input_schema = self.build_pipeline(child)?; + fn build_project( + &mut self, + project: &ProjectPlan, + input_schema: DataSchemaRef, + ) -> Result { let schema_builder = DataSchemaBuilder::new(&self.metadata); let output_schema = schema_builder.build_project(project, input_schema.clone())?; @@ -113,8 +168,35 @@ impl PipelineBuilder { let plan = table_entry.source.clone(); let table = self.ctx.build_table_from_source_plan(&plan)?; + self.ctx.try_set_partitions(plan.parts.clone())?; table.read2(self.ctx.clone(), &plan, &mut self.pipeline)?; + let columns: Vec = scan.columns.iter().cloned().collect(); + let projections: Vec = columns + .iter() + .map(|index| { + let column_entry = self.metadata.column(*index); + Expression::Alias( + format_field_name(column_entry.name.as_str(), column_entry.column_index), + Box::new(Expression::Column(column_entry.name.clone())), + ) + }) + .collect(); let schema_builder = DataSchemaBuilder::new(&self.metadata); - schema_builder.build_physical_scan(scan) + let input_schema = schema_builder.build_canonical_schema(&columns); + let output_schema = schema_builder.build_physical_scan(scan)?; + + self.pipeline + .add_transform(|transform_input_port, transform_output_port| { + ProjectionTransform::try_create( + transform_input_port, + transform_output_port, + input_schema.clone(), + output_schema.clone(), + projections.clone(), + self.ctx.clone(), + ) + })?; + + Ok(output_schema) } } diff --git a/query/src/sql/optimizer/heuristic/mod.rs b/query/src/sql/optimizer/heuristic/mod.rs index 205629aa6cf6..f7136cd3ac42 100644 --- a/query/src/sql/optimizer/heuristic/mod.rs +++ b/query/src/sql/optimizer/heuristic/mod.rs @@ -43,10 +43,12 @@ impl HeuristicOptimizer { } fn optimize_expression(&self, s_expr: &SExpr) -> Result { - let mut result = s_expr.clone(); + let mut optimized_children = Vec::with_capacity(s_expr.arity()); for expr in s_expr.children() { - result = self.apply_transform_rules(expr, &self.rules)?; + optimized_children.push(self.optimize_expression(expr)?); } + let optimized_expr = SExpr::create(s_expr.plan().clone(), optimized_children, None); + let result = self.apply_transform_rules(&optimized_expr, &self.rules)?; Ok(result) } diff --git a/query/src/sql/optimizer/optimize_context.rs b/query/src/sql/optimizer/optimize_context.rs index 6c2f10043faa..b44420371baa 100644 --- a/query/src/sql/optimizer/optimize_context.rs +++ b/query/src/sql/optimizer/optimize_context.rs @@ -39,7 +39,7 @@ impl OptimizeContext { .all_column_bindings() .iter() .map(|col| NamedColumn { - index: col.index.unwrap(), + index: col.index, name: col.column_name.clone(), }) .collect(); diff --git a/query/src/sql/optimizer/s_expr.rs b/query/src/sql/optimizer/s_expr.rs index 9d776c7d93fc..d2121df1baf4 100644 --- a/query/src/sql/optimizer/s_expr.rs +++ b/query/src/sql/optimizer/s_expr.rs @@ -73,7 +73,7 @@ impl SExpr { pub fn match_pattern(&self, pattern: &SExpr) -> bool { if pattern.plan.plan_type() != PlanType::Pattern { // Pattern is plan - if self.plan.plan_type() == pattern.plan.plan_type() { + if self.plan.plan_type() != pattern.plan.plan_type() { return false; } diff --git a/query/src/sql/planner/binder/bind_context.rs b/query/src/sql/planner/binder/bind_context.rs index 22ecfdad529c..f0466a643e00 100644 --- a/query/src/sql/planner/binder/bind_context.rs +++ b/query/src/sql/planner/binder/bind_context.rs @@ -28,7 +28,7 @@ pub struct ColumnBinding { // Column name of this `ColumnBinding` in current context pub column_name: String, // Column index of ColumnBinding - pub index: Option, + pub index: IndexType, pub data_type: DataTypePtr, pub nullable: bool, @@ -134,4 +134,17 @@ impl BindContext { Ok(result.remove(0)) } } + + /// Get result columns of current context in order. + /// For example, a query `SELECT b, a AS b FROM t` has `[(index_of(b), "b"), index_of(a), "b"]` as + /// its result columns. + /// + /// This method is used to retrieve the physical representation of result set of + /// a query. + pub fn result_columns(&self) -> Vec<(IndexType, String)> { + self.columns + .iter() + .map(|col| (col.index, col.column_name.clone())) + .collect() + } } diff --git a/query/src/sql/planner/binder/project.rs b/query/src/sql/planner/binder/project.rs index f3d36e4b1251..fb4a8c7a1e94 100644 --- a/query/src/sql/planner/binder/project.rs +++ b/query/src/sql/planner/binder/project.rs @@ -27,6 +27,7 @@ use crate::sql::planner::binder::Binder; use crate::sql::planner::binder::ColumnBinding; use crate::sql::plans::ProjectItem; use crate::sql::plans::ProjectPlan; +use crate::sql::plans::Scalar; impl Binder { /// Try to build a `ProjectPlan` to satisfy `output_context`. @@ -38,7 +39,7 @@ impl Binder { if let Some(expr) = &column_binding.scalar { projections.push(ProjectItem { expr: expr.clone(), - index: column_binding.index.unwrap(), + index: column_binding.index, }); } } @@ -70,6 +71,7 @@ impl Binder { /// For scalar expressions and aggregate expressions, we will register new columns for /// them in `Metadata`. And notice that, the semantic of aggregate expressions won't be checked /// in this function. + #[allow(unreachable_patterns)] pub(super) fn normalize_select_list( &mut self, select_list: &[SelectTarget], @@ -115,15 +117,45 @@ impl Binder { None => get_expr_display_string(expr), }; - self.metadata - .add_column(expr_name.clone(), data_type.clone(), nullable, None); - let column_binding = ColumnBinding { - table_name: None, - column_name: expr_name, - index: None, - data_type, - nullable, - scalar: Some(bound_expr), + // If expr is a ColumnRef, then it's a pass-through column. There is no need to + // generate a new ColumnEntry for it. + let column_binding = match bound_expr.as_any().downcast_ref::().unwrap() + { + Scalar::ColumnRef { + index, + data_type, + nullable, + } => { + let table_name = self + .metadata + .column(*index) + .table_index + .map(|idx| self.metadata.table(idx).name.clone()); + ColumnBinding { + table_name, + column_name: expr_name, + index: *index, + data_type: data_type.clone(), + nullable: *nullable, + scalar: None, + } + } + _ => { + let index = self.metadata.add_column( + expr_name.clone(), + data_type.clone(), + nullable, + None, + ); + ColumnBinding { + table_name: None, + column_name: expr_name, + index, + data_type, + nullable, + scalar: Some(bound_expr), + } + } }; output_context.add_column_binding(column_binding); } diff --git a/query/src/sql/planner/binder/scalar.rs b/query/src/sql/planner/binder/scalar.rs index acebabbe5137..77e84ce741d8 100644 --- a/query/src/sql/planner/binder/scalar.rs +++ b/query/src/sql/planner/binder/scalar.rs @@ -20,6 +20,7 @@ use common_datavalues::DataTypePtr; use common_exception::Result; use crate::sql::planner::binder::BindContext; +use crate::sql::plans::Scalar; /// Helper for binding scalar expression with `BindContext`. pub struct ScalarBinder; @@ -34,9 +35,13 @@ impl ScalarBinder { Expr::ColumnRef { table, column, .. } => { let table_name: Option = table.clone().map(|ident| ident.name); let column_name = column.name.clone(); - let _column_binding = bind_context.resolve_column(table_name, column_name)?; + let column_binding = bind_context.resolve_column(table_name, column_name)?; - todo!() + Ok(Arc::new(Scalar::ColumnRef { + index: column_binding.index, + data_type: column_binding.data_type.clone(), + nullable: column_binding.nullable, + })) } _ => todo!(), } diff --git a/query/src/sql/planner/binder/select.rs b/query/src/sql/planner/binder/select.rs index 771657cea823..800184eff1c8 100644 --- a/query/src/sql/planner/binder/select.rs +++ b/query/src/sql/planner/binder/select.rs @@ -112,7 +112,7 @@ impl Binder { let column_binding = ColumnBinding { table_name: Some(table.name.clone()), column_name: column.name.clone(), - index: Some(column.column_index), + index: column.column_index, data_type: column.data_type.clone(), nullable: column.nullable, scalar: None, diff --git a/query/src/sql/planner/mod.rs b/query/src/sql/planner/mod.rs index 416ce304a4da..cd9c6ef1e17e 100644 --- a/query/src/sql/planner/mod.rs +++ b/query/src/sql/planner/mod.rs @@ -60,7 +60,13 @@ impl Planner { let optimized_expr = optimize(bind_result.s_expr().clone(), optimize_context)?; // Step 4: build executable Pipeline with SExpr - let pb = PipelineBuilder::new(self.context.clone(), bind_result.metadata, optimized_expr); + let result_columns = bind_result.bind_context.result_columns(); + let pb = PipelineBuilder::new( + self.context.clone(), + result_columns, + bind_result.metadata, + optimized_expr, + ); let pipeline = pb.spawn()?; Ok(pipeline) diff --git a/query/tests/it/storages/system/settings_table.rs b/query/tests/it/storages/system/settings_table.rs index f712fb1167d2..701ca82e155a 100644 --- a/query/tests/it/storages/system/settings_table.rs +++ b/query/tests/it/storages/system/settings_table.rs @@ -36,6 +36,7 @@ async fn test_settings_table() -> Result<()> { "| | | | | | |", "| empty_as_default | 1 | 1 | SESSION | Format empty_as_default, default value: 1 | UInt64 |", "| enable_new_processor_framework | 1 | 1 | SESSION | Enable new processor framework if value != 0, default value: 1 | UInt64 |", + "| enable_planner_v2 | 0 | 0 | SESSION | Enable planner v2 by setting this variable to 1, default value: 0 | UInt64 |", "| field_delimiter | , | , | SESSION | Format field delimiter, default value: , | String |", "| flight_client_timeout | 60 | 60 | SESSION | Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds | UInt64 |", "| max_block_size | 10000 | 10000 | SESSION | Maximum block size for reading | UInt64 |", diff --git a/tests/suites/0_stateless/06_show/06_0003_show_settings.result b/tests/suites/0_stateless/06_show/06_0003_show_settings.result index 15f6e498d723..14fad02ba1da 100644 --- a/tests/suites/0_stateless/06_show/06_0003_show_settings.result +++ b/tests/suites/0_stateless/06_show/06_0003_show_settings.result @@ -1,5 +1,6 @@ empty_as_default 1 1 SESSION Format empty_as_default, default value: 1 UInt64 enable_new_processor_framework 1 1 SESSION Enable new processor framework if value != 0, default value: 1 UInt64 +enable_planner_v2 0 0 SESSION Enable planner v2 by setting this variable to 1, default value: 0 UInt64 field_delimiter , , SESSION Format field delimiter, default value: , String flight_client_timeout 60 60 SESSION Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds UInt64 max_block_size 10000 10000 SESSION Maximum block size for reading UInt64