Skip to content

Commit

Permalink
Merge pull request #4989 from leiysky/switch-new-planner
Browse files Browse the repository at this point in the history
feat(planner): Add switch to enable new planner
  • Loading branch information
mergify[bot] authored Apr 21, 2022
2 parents e54bef9 + ad2c6d5 commit a9c5b6a
Show file tree
Hide file tree
Showing 17 changed files with 248 additions and 42 deletions.
4 changes: 2 additions & 2 deletions common/datavalues/benches/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ fn add_benchmark(c: &mut Criterion) {
});
}

fn data_type_ptr_create(ty: &DataTypePtr, values: &Vec<DataValue>) -> Result<ColumnRef> {
fn data_type_ptr_create(ty: &DataTypePtr, values: &[DataValue]) -> Result<ColumnRef> {
ty.create_column(values)
}

fn data_type_enum_create(ty: &DataTypeImpl, values: &Vec<DataValue>) -> Result<ColumnRef> {
fn data_type_enum_create(ty: &DataTypeImpl, values: &[DataValue]) -> Result<ColumnRef> {
ty.create_column(values)
}

Expand Down
16 changes: 13 additions & 3 deletions query/src/interpreters/interpreter_select_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ pub struct SelectInterpreterV2 {
}

impl SelectInterpreterV2 {
#[allow(dead_code)]
pub fn try_create(ctx: Arc<QueryContext>, query: String) -> Result<InterpreterPtr> {
Ok(Arc::new(SelectInterpreterV2 { ctx, query }))
pub fn try_create(ctx: Arc<QueryContext>, query: &str) -> Result<InterpreterPtr> {
Ok(Arc::new(SelectInterpreterV2 {
ctx,
query: query.to_string(),
}))
}
}

Expand All @@ -56,4 +58,12 @@ impl Interpreter for SelectInterpreterV2 {
let executor_stream = Box::pin(ProcessorExecutorStream::create(executor)?);
Ok(Box::pin(self.ctx.try_create_abortable(executor_stream)?))
}

async fn start(&self) -> Result<()> {
Ok(())
}

async fn finish(&self) -> Result<()> {
Ok(())
}
}
1 change: 1 addition & 0 deletions query/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub use interpreter_role_drop::DropRoleInterpreter;
pub use interpreter_role_grant::GrantRoleInterpreter;
pub use interpreter_role_revoke::RevokeRoleInterpreter;
pub use interpreter_select::SelectInterpreter;
pub use interpreter_select_v2::SelectInterpreterV2;
pub use interpreter_setting::SettingInterpreter;
pub use interpreter_show_databases::ShowDatabasesInterpreter;
pub use interpreter_show_functions::ShowFunctionsInterpreter;
Expand Down
38 changes: 33 additions & 5 deletions query/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use opensrv_mysql::StatementMetaWriter;
use rand::RngCore;
use tokio_stream::StreamExt;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterFactory;
use crate::interpreters::SelectInterpreterV2;
use crate::servers::mysql::writers::DFInitResultWriter;
use crate::servers::mysql::writers::DFQueryResultWriter;
use crate::servers::mysql::MySQLFederated;
Expand Down Expand Up @@ -273,15 +275,42 @@ impl<W: std::io::Write> InteractiveWorkerBase<W> {
tracing::info!("Normal query: {}", query);
let context = self.session.create_query_context().await?;
context.attach_query_str(query);

let (plan, hints) = PlanParser::parse_with_hint(query, context.clone()).await;
if let (Some(hint_error_code), Err(error_code)) = (
hints
.iter()
.find(|v| v.error_code.is_some())
.and_then(|x| x.error_code),
&plan,
) {
// Pre-check if parsing error can be ignored
if hint_error_code == error_code.code() {
return Ok((vec![DataBlock::empty()], String::from("")));
}
}

let plan = plan?;
let settings = context.get_settings();

let interpreter: Arc<dyn Interpreter> =
if settings.get_enable_new_processor_framework()? != 0
&& settings.get_enable_planner_v2()? != 0
&& matches!(plan, PlanNode::Select(..))
{
// New planner is enabled, and the statement is ensured to be `SELECT` statement.
SelectInterpreterV2::try_create(context.clone(), query)?
} else {
InterpreterFactory::get(context.clone(), plan)?
};

match hints
.iter()
.find(|v| v.error_code.is_some())
.and_then(|x| x.error_code)
{
None => Self::exec_query(plan, &context).await,
Some(hint_error_code) => match Self::exec_query(plan, &context).await {
None => Self::exec_query(interpreter, &context).await,
Some(hint_error_code) => match Self::exec_query(interpreter, &context).await {
Ok(_) => Err(ErrorCode::UnexpectedError(format!(
"Expected server error code: {} but got: Ok.",
hint_error_code
Expand All @@ -303,13 +332,12 @@ impl<W: std::io::Write> InteractiveWorkerBase<W> {
}
}

#[tracing::instrument(level = "debug", skip(plan, context))]
#[tracing::instrument(level = "debug", skip(interpreter, context))]
async fn exec_query(
plan: Result<PlanNode>,
interpreter: Arc<dyn Interpreter>,
context: &Arc<QueryContext>,
) -> Result<(Vec<DataBlock>, String)> {
let instant = Instant::now();
let interpreter = InterpreterFactory::get(context.clone(), plan?)?;

let query_result = context.try_spawn(
async move {
Expand Down
21 changes: 14 additions & 7 deletions query/src/sessions/session_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,37 +113,39 @@ impl Settings {
level: ScopeLevel::Session,
desc: "Enable new processor framework if value != 0, default value: 1",
},

// enable_planner_v2
SettingValue {
default_value: DataValue::UInt64(0),
user_setting: UserSetting::create("enable_planner_v2", DataValue::UInt64(0)),
level: ScopeLevel::Session,
desc: "Enable planner v2 by setting this variable to 1, default value: 0",
},
SettingValue {
default_value: DataValue::String("\n".as_bytes().to_vec()),
user_setting: UserSetting::create("record_delimiter", DataValue::String("\n".as_bytes().to_vec())),
level: ScopeLevel::Session,
desc: "Format record_delimiter, default value: \n",
},

SettingValue {
default_value:DataValue::String(",".as_bytes().to_vec()),
default_value: DataValue::String(",".as_bytes().to_vec()),
user_setting: UserSetting::create("field_delimiter", DataValue::String(",".as_bytes().to_vec())),
level: ScopeLevel::Session,
desc: "Format field delimiter, default value: ,",
},

SettingValue {
default_value: DataValue::UInt64(1),
user_setting: UserSetting::create("empty_as_default", DataValue::UInt64(1)),
level: ScopeLevel::Session,
desc: "Format empty_as_default, default value: 1",
},

SettingValue {
default_value: DataValue::UInt64(0),
user_setting: UserSetting::create("skip_header", DataValue::UInt64(0)),
level: ScopeLevel::Session,
desc: "Whether to skip the input header, default value: 0",
},

SettingValue {
default_value:DataValue::String("UTC".as_bytes().to_vec()),
default_value: DataValue::String("UTC".as_bytes().to_vec()),
user_setting: UserSetting::create("timezone", DataValue::String("UTC".as_bytes().to_vec())),
level: ScopeLevel::Session,
desc: "Timezone, default value: UTC,",
Expand Down Expand Up @@ -216,6 +218,11 @@ impl Settings {
self.try_get_u64(key)
}

pub fn get_enable_planner_v2(&self) -> Result<u64> {
static KEY: &str = "enable_planner_v2";
self.try_get_u64(KEY)
}

pub fn get_field_delimiter(&self) -> Result<Vec<u8>> {
let key = "field_delimiter";
self.check_and_get_setting_value(key)
Expand Down
18 changes: 18 additions & 0 deletions query/src/sql/exec/data_schema_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_exception::Result;
use crate::sql::exec::util::format_field_name;
use crate::sql::plans::PhysicalScan;
use crate::sql::plans::ProjectPlan;
use crate::sql::IndexType;
use crate::sql::Metadata;

pub struct DataSchemaBuilder<'a> {
Expand Down Expand Up @@ -70,4 +71,21 @@ impl<'a> DataSchemaBuilder<'a> {

Ok(Arc::new(DataSchema::new(fields)))
}

pub fn build_canonical_schema(&self, columns: &[IndexType]) -> DataSchemaRef {
let mut fields: Vec<DataField> = vec![];
for index in columns {
let column_entry = self.metadata.column(*index);
let field_name = column_entry.name.clone();
let field = if column_entry.nullable {
DataField::new_nullable(field_name.as_str(), column_entry.data_type.clone())
} else {
DataField::new(field_name.as_str(), column_entry.data_type.clone())
};

fields.push(field);
}

Arc::new(DataSchema::new(fields))
}
}
94 changes: 88 additions & 6 deletions query/src/sql/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ mod util;

use std::sync::Arc;

use common_datavalues::DataField;
use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
use common_planners::Expression;
pub use util::decode_field_name;
pub use util::format_field_name;

Expand All @@ -35,32 +38,80 @@ use crate::sql::plans::PhysicalScan;
use crate::sql::plans::PlanType;
use crate::sql::plans::ProjectPlan;
use crate::sql::plans::Scalar;
use crate::sql::IndexType;
use crate::sql::Metadata;

/// Helper to build a `Pipeline` from `SExpr`
pub struct PipelineBuilder {
ctx: Arc<QueryContext>,
metadata: Metadata,
result_columns: Vec<(IndexType, String)>,
expression: SExpr,
pipeline: NewPipeline,
}

impl PipelineBuilder {
pub fn new(ctx: Arc<QueryContext>, metadata: Metadata, expression: SExpr) -> Self {
pub fn new(
ctx: Arc<QueryContext>,
result_columns: Vec<(IndexType, String)>,
metadata: Metadata,
expression: SExpr,
) -> Self {
PipelineBuilder {
ctx,
metadata,
result_columns,
expression,
pipeline: NewPipeline::create(),
}
}

pub fn spawn(mut self) -> Result<NewPipeline> {
let expr = self.expression.clone();
self.build_pipeline(&expr)?;
let schema = self.build_pipeline(&expr)?;
self.align_data_schema(schema)?;
let settings = self.ctx.get_settings();
self.pipeline
.set_max_threads(settings.get_max_threads()? as usize);
Ok(self.pipeline)
}

fn align_data_schema(&mut self, input_schema: DataSchemaRef) -> Result<()> {
let mut projections = Vec::with_capacity(self.result_columns.len());
let mut output_fields = Vec::with_capacity(self.result_columns.len());
for (index, name) in self.result_columns.iter() {
let column_entry = self.metadata.column(*index);
let field_name = &column_entry.name;
projections.push(Expression::Alias(
name.clone(),
Box::new(Expression::Column(format_field_name(
field_name.as_str(),
*index,
))),
));
let field = if column_entry.nullable {
DataField::new_nullable(name.as_str(), column_entry.data_type.clone())
} else {
DataField::new(name.as_str(), column_entry.data_type.clone())
};
output_fields.push(field);
}
let output_schema = Arc::new(DataSchema::new(output_fields));

self.pipeline
.add_transform(|transform_input_port, transform_output_port| {
ProjectionTransform::try_create(
transform_input_port,
transform_output_port,
input_schema.clone(),
output_schema.clone(),
projections.clone(),
self.ctx.clone(),
)
})?;
Ok(())
}

fn build_pipeline(&mut self, expression: &SExpr) -> Result<DataSchemaRef> {
if !check_physical(expression) {
return Err(ErrorCode::LogicalError("Invalid physical plan"));
Expand All @@ -75,14 +126,18 @@ impl PipelineBuilder {
}
PlanType::Project => {
let project = plan.as_any().downcast_ref::<ProjectPlan>().unwrap();
self.build_project(project, &expression.children()[0])
let input_schema = self.build_pipeline(&expression.children()[0])?;
self.build_project(project, input_schema)
}
_ => Err(ErrorCode::LogicalError("Invalid physical plan")),
}
}

fn build_project(&mut self, project: &ProjectPlan, child: &SExpr) -> Result<DataSchemaRef> {
let input_schema = self.build_pipeline(child)?;
fn build_project(
&mut self,
project: &ProjectPlan,
input_schema: DataSchemaRef,
) -> Result<DataSchemaRef> {
let schema_builder = DataSchemaBuilder::new(&self.metadata);
let output_schema = schema_builder.build_project(project, input_schema.clone())?;

Expand Down Expand Up @@ -113,8 +168,35 @@ impl PipelineBuilder {
let plan = table_entry.source.clone();

let table = self.ctx.build_table_from_source_plan(&plan)?;
self.ctx.try_set_partitions(plan.parts.clone())?;
table.read2(self.ctx.clone(), &plan, &mut self.pipeline)?;
let columns: Vec<IndexType> = scan.columns.iter().cloned().collect();
let projections: Vec<Expression> = columns
.iter()
.map(|index| {
let column_entry = self.metadata.column(*index);
Expression::Alias(
format_field_name(column_entry.name.as_str(), column_entry.column_index),
Box::new(Expression::Column(column_entry.name.clone())),
)
})
.collect();
let schema_builder = DataSchemaBuilder::new(&self.metadata);
schema_builder.build_physical_scan(scan)
let input_schema = schema_builder.build_canonical_schema(&columns);
let output_schema = schema_builder.build_physical_scan(scan)?;

self.pipeline
.add_transform(|transform_input_port, transform_output_port| {
ProjectionTransform::try_create(
transform_input_port,
transform_output_port,
input_schema.clone(),
output_schema.clone(),
projections.clone(),
self.ctx.clone(),
)
})?;

Ok(output_schema)
}
}
6 changes: 4 additions & 2 deletions query/src/sql/optimizer/heuristic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ impl HeuristicOptimizer {
}

fn optimize_expression(&self, s_expr: &SExpr) -> Result<SExpr> {
let mut result = s_expr.clone();
let mut optimized_children = Vec::with_capacity(s_expr.arity());
for expr in s_expr.children() {
result = self.apply_transform_rules(expr, &self.rules)?;
optimized_children.push(self.optimize_expression(expr)?);
}
let optimized_expr = SExpr::create(s_expr.plan().clone(), optimized_children, None);
let result = self.apply_transform_rules(&optimized_expr, &self.rules)?;

Ok(result)
}
Expand Down
2 changes: 1 addition & 1 deletion query/src/sql/optimizer/optimize_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl OptimizeContext {
.all_column_bindings()
.iter()
.map(|col| NamedColumn {
index: col.index.unwrap(),
index: col.index,
name: col.column_name.clone(),
})
.collect();
Expand Down
2 changes: 1 addition & 1 deletion query/src/sql/optimizer/s_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl SExpr {
pub fn match_pattern(&self, pattern: &SExpr) -> bool {
if pattern.plan.plan_type() != PlanType::Pattern {
// Pattern is plan
if self.plan.plan_type() == pattern.plan.plan_type() {
if self.plan.plan_type() != pattern.plan.plan_type() {
return false;
}

Expand Down
Loading

0 comments on commit a9c5b6a

Please sign in to comment.