Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planner): Add switch to enable new planner #4989

Merged
merged 1 commit into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/datavalues/benches/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ fn add_benchmark(c: &mut Criterion) {
});
}

fn data_type_ptr_create(ty: &DataTypePtr, values: &Vec<DataValue>) -> Result<ColumnRef> {
fn data_type_ptr_create(ty: &DataTypePtr, values: &[DataValue]) -> Result<ColumnRef> {
ty.create_column(values)
}

fn data_type_enum_create(ty: &DataTypeImpl, values: &Vec<DataValue>) -> Result<ColumnRef> {
fn data_type_enum_create(ty: &DataTypeImpl, values: &[DataValue]) -> Result<ColumnRef> {
ty.create_column(values)
}

Expand Down
16 changes: 13 additions & 3 deletions query/src/interpreters/interpreter_select_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ pub struct SelectInterpreterV2 {
}

impl SelectInterpreterV2 {
#[allow(dead_code)]
pub fn try_create(ctx: Arc<QueryContext>, query: String) -> Result<InterpreterPtr> {
Ok(Arc::new(SelectInterpreterV2 { ctx, query }))
pub fn try_create(ctx: Arc<QueryContext>, query: &str) -> Result<InterpreterPtr> {
Ok(Arc::new(SelectInterpreterV2 {
ctx,
query: query.to_string(),
}))
}
}

Expand All @@ -56,4 +58,12 @@ impl Interpreter for SelectInterpreterV2 {
let executor_stream = Box::pin(ProcessorExecutorStream::create(executor)?);
Ok(Box::pin(self.ctx.try_create_abortable(executor_stream)?))
}

async fn start(&self) -> Result<()> {
Ok(())
}

async fn finish(&self) -> Result<()> {
Ok(())
}
}
1 change: 1 addition & 0 deletions query/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub use interpreter_role_drop::DropRoleInterpreter;
pub use interpreter_role_grant::GrantRoleInterpreter;
pub use interpreter_role_revoke::RevokeRoleInterpreter;
pub use interpreter_select::SelectInterpreter;
pub use interpreter_select_v2::SelectInterpreterV2;
pub use interpreter_setting::SettingInterpreter;
pub use interpreter_show_databases::ShowDatabasesInterpreter;
pub use interpreter_show_functions::ShowFunctionsInterpreter;
Expand Down
38 changes: 33 additions & 5 deletions query/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use opensrv_mysql::StatementMetaWriter;
use rand::RngCore;
use tokio_stream::StreamExt;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterFactory;
use crate::interpreters::SelectInterpreterV2;
use crate::servers::mysql::writers::DFInitResultWriter;
use crate::servers::mysql::writers::DFQueryResultWriter;
use crate::servers::mysql::MySQLFederated;
Expand Down Expand Up @@ -273,15 +275,42 @@ impl<W: std::io::Write> InteractiveWorkerBase<W> {
tracing::info!("Normal query: {}", query);
let context = self.session.create_query_context().await?;
context.attach_query_str(query);

let (plan, hints) = PlanParser::parse_with_hint(query, context.clone()).await;
if let (Some(hint_error_code), Err(error_code)) = (
hints
.iter()
.find(|v| v.error_code.is_some())
.and_then(|x| x.error_code),
&plan,
) {
// Pre-check if parsing error can be ignored
if hint_error_code == error_code.code() {
return Ok((vec![DataBlock::empty()], String::from("")));
}
}

let plan = plan?;
let settings = context.get_settings();

let interpreter: Arc<dyn Interpreter> =
if settings.get_enable_new_processor_framework()? != 0
&& settings.get_enable_planner_v2()? != 0
&& matches!(plan, PlanNode::Select(..))
{
// New planner is enabled, and the statement is ensured to be `SELECT` statement.
SelectInterpreterV2::try_create(context.clone(), query)?
} else {
InterpreterFactory::get(context.clone(), plan)?
};

match hints
.iter()
.find(|v| v.error_code.is_some())
.and_then(|x| x.error_code)
{
None => Self::exec_query(plan, &context).await,
Some(hint_error_code) => match Self::exec_query(plan, &context).await {
None => Self::exec_query(interpreter, &context).await,
Some(hint_error_code) => match Self::exec_query(interpreter, &context).await {
Ok(_) => Err(ErrorCode::UnexpectedError(format!(
"Expected server error code: {} but got: Ok.",
hint_error_code
Expand All @@ -303,13 +332,12 @@ impl<W: std::io::Write> InteractiveWorkerBase<W> {
}
}

#[tracing::instrument(level = "debug", skip(plan, context))]
#[tracing::instrument(level = "debug", skip(interpreter, context))]
async fn exec_query(
plan: Result<PlanNode>,
interpreter: Arc<dyn Interpreter>,
context: &Arc<QueryContext>,
) -> Result<(Vec<DataBlock>, String)> {
let instant = Instant::now();
let interpreter = InterpreterFactory::get(context.clone(), plan?)?;

let query_result = context.try_spawn(
async move {
Expand Down
21 changes: 14 additions & 7 deletions query/src/sessions/session_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,37 +113,39 @@ impl Settings {
level: ScopeLevel::Session,
desc: "Enable new processor framework if value != 0, default value: 1",
},

// enable_planner_v2
SettingValue {
default_value: DataValue::UInt64(0),
user_setting: UserSetting::create("enable_planner_v2", DataValue::UInt64(0)),
level: ScopeLevel::Session,
desc: "Enable planner v2 by setting this variable to 1, default value: 0",
},
SettingValue {
default_value: DataValue::String("\n".as_bytes().to_vec()),
user_setting: UserSetting::create("record_delimiter", DataValue::String("\n".as_bytes().to_vec())),
level: ScopeLevel::Session,
desc: "Format record_delimiter, default value: \n",
},

SettingValue {
default_value:DataValue::String(",".as_bytes().to_vec()),
default_value: DataValue::String(",".as_bytes().to_vec()),
user_setting: UserSetting::create("field_delimiter", DataValue::String(",".as_bytes().to_vec())),
level: ScopeLevel::Session,
desc: "Format field delimiter, default value: ,",
},

SettingValue {
default_value: DataValue::UInt64(1),
user_setting: UserSetting::create("empty_as_default", DataValue::UInt64(1)),
level: ScopeLevel::Session,
desc: "Format empty_as_default, default value: 1",
},

SettingValue {
default_value: DataValue::UInt64(0),
user_setting: UserSetting::create("skip_header", DataValue::UInt64(0)),
level: ScopeLevel::Session,
desc: "Whether to skip the input header, default value: 0",
},

SettingValue {
default_value:DataValue::String("UTC".as_bytes().to_vec()),
default_value: DataValue::String("UTC".as_bytes().to_vec()),
user_setting: UserSetting::create("timezone", DataValue::String("UTC".as_bytes().to_vec())),
level: ScopeLevel::Session,
desc: "Timezone, default value: UTC,",
Expand Down Expand Up @@ -216,6 +218,11 @@ impl Settings {
self.try_get_u64(key)
}

pub fn get_enable_planner_v2(&self) -> Result<u64> {
static KEY: &str = "enable_planner_v2";
self.try_get_u64(KEY)
}

pub fn get_field_delimiter(&self) -> Result<Vec<u8>> {
let key = "field_delimiter";
self.check_and_get_setting_value(key)
Expand Down
18 changes: 18 additions & 0 deletions query/src/sql/exec/data_schema_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_exception::Result;
use crate::sql::exec::util::format_field_name;
use crate::sql::plans::PhysicalScan;
use crate::sql::plans::ProjectPlan;
use crate::sql::IndexType;
use crate::sql::Metadata;

pub struct DataSchemaBuilder<'a> {
Expand Down Expand Up @@ -70,4 +71,21 @@ impl<'a> DataSchemaBuilder<'a> {

Ok(Arc::new(DataSchema::new(fields)))
}

pub fn build_canonical_schema(&self, columns: &[IndexType]) -> DataSchemaRef {
let mut fields: Vec<DataField> = vec![];
for index in columns {
let column_entry = self.metadata.column(*index);
let field_name = column_entry.name.clone();
let field = if column_entry.nullable {
DataField::new_nullable(field_name.as_str(), column_entry.data_type.clone())
} else {
DataField::new(field_name.as_str(), column_entry.data_type.clone())
};

fields.push(field);
}

Arc::new(DataSchema::new(fields))
}
}
94 changes: 88 additions & 6 deletions query/src/sql/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ mod util;

use std::sync::Arc;

use common_datavalues::DataField;
use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
use common_planners::Expression;
pub use util::decode_field_name;
pub use util::format_field_name;

Expand All @@ -35,32 +38,80 @@ use crate::sql::plans::PhysicalScan;
use crate::sql::plans::PlanType;
use crate::sql::plans::ProjectPlan;
use crate::sql::plans::Scalar;
use crate::sql::IndexType;
use crate::sql::Metadata;

/// Helper to build a `Pipeline` from `SExpr`
pub struct PipelineBuilder {
ctx: Arc<QueryContext>,
metadata: Metadata,
result_columns: Vec<(IndexType, String)>,
expression: SExpr,
pipeline: NewPipeline,
}

impl PipelineBuilder {
pub fn new(ctx: Arc<QueryContext>, metadata: Metadata, expression: SExpr) -> Self {
pub fn new(
ctx: Arc<QueryContext>,
result_columns: Vec<(IndexType, String)>,
metadata: Metadata,
expression: SExpr,
) -> Self {
PipelineBuilder {
ctx,
metadata,
result_columns,
expression,
pipeline: NewPipeline::create(),
}
}

pub fn spawn(mut self) -> Result<NewPipeline> {
let expr = self.expression.clone();
self.build_pipeline(&expr)?;
let schema = self.build_pipeline(&expr)?;
self.align_data_schema(schema)?;
let settings = self.ctx.get_settings();
self.pipeline
.set_max_threads(settings.get_max_threads()? as usize);
Ok(self.pipeline)
}

fn align_data_schema(&mut self, input_schema: DataSchemaRef) -> Result<()> {
let mut projections = Vec::with_capacity(self.result_columns.len());
let mut output_fields = Vec::with_capacity(self.result_columns.len());
for (index, name) in self.result_columns.iter() {
let column_entry = self.metadata.column(*index);
let field_name = &column_entry.name;
projections.push(Expression::Alias(
name.clone(),
Box::new(Expression::Column(format_field_name(
field_name.as_str(),
*index,
))),
));
let field = if column_entry.nullable {
DataField::new_nullable(name.as_str(), column_entry.data_type.clone())
} else {
DataField::new(name.as_str(), column_entry.data_type.clone())
};
output_fields.push(field);
}
let output_schema = Arc::new(DataSchema::new(output_fields));

self.pipeline
.add_transform(|transform_input_port, transform_output_port| {
ProjectionTransform::try_create(
transform_input_port,
transform_output_port,
input_schema.clone(),
output_schema.clone(),
projections.clone(),
self.ctx.clone(),
)
})?;
Ok(())
}

fn build_pipeline(&mut self, expression: &SExpr) -> Result<DataSchemaRef> {
if !check_physical(expression) {
return Err(ErrorCode::LogicalError("Invalid physical plan"));
Expand All @@ -75,14 +126,18 @@ impl PipelineBuilder {
}
PlanType::Project => {
let project = plan.as_any().downcast_ref::<ProjectPlan>().unwrap();
self.build_project(project, &expression.children()[0])
let input_schema = self.build_pipeline(&expression.children()[0])?;
self.build_project(project, input_schema)
}
_ => Err(ErrorCode::LogicalError("Invalid physical plan")),
}
}

fn build_project(&mut self, project: &ProjectPlan, child: &SExpr) -> Result<DataSchemaRef> {
let input_schema = self.build_pipeline(child)?;
fn build_project(
&mut self,
project: &ProjectPlan,
input_schema: DataSchemaRef,
) -> Result<DataSchemaRef> {
let schema_builder = DataSchemaBuilder::new(&self.metadata);
let output_schema = schema_builder.build_project(project, input_schema.clone())?;

Expand Down Expand Up @@ -113,8 +168,35 @@ impl PipelineBuilder {
let plan = table_entry.source.clone();

let table = self.ctx.build_table_from_source_plan(&plan)?;
self.ctx.try_set_partitions(plan.parts.clone())?;
table.read2(self.ctx.clone(), &plan, &mut self.pipeline)?;
let columns: Vec<IndexType> = scan.columns.iter().cloned().collect();
let projections: Vec<Expression> = columns
.iter()
.map(|index| {
let column_entry = self.metadata.column(*index);
Expression::Alias(
format_field_name(column_entry.name.as_str(), column_entry.column_index),
Box::new(Expression::Column(column_entry.name.clone())),
)
})
.collect();
let schema_builder = DataSchemaBuilder::new(&self.metadata);
schema_builder.build_physical_scan(scan)
let input_schema = schema_builder.build_canonical_schema(&columns);
let output_schema = schema_builder.build_physical_scan(scan)?;

self.pipeline
.add_transform(|transform_input_port, transform_output_port| {
ProjectionTransform::try_create(
transform_input_port,
transform_output_port,
input_schema.clone(),
output_schema.clone(),
projections.clone(),
self.ctx.clone(),
)
})?;

Ok(output_schema)
}
}
6 changes: 4 additions & 2 deletions query/src/sql/optimizer/heuristic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ impl HeuristicOptimizer {
}

fn optimize_expression(&self, s_expr: &SExpr) -> Result<SExpr> {
let mut result = s_expr.clone();
let mut optimized_children = Vec::with_capacity(s_expr.arity());
for expr in s_expr.children() {
result = self.apply_transform_rules(expr, &self.rules)?;
optimized_children.push(self.optimize_expression(expr)?);
}
let optimized_expr = SExpr::create(s_expr.plan().clone(), optimized_children, None);
let result = self.apply_transform_rules(&optimized_expr, &self.rules)?;

Ok(result)
}
Expand Down
2 changes: 1 addition & 1 deletion query/src/sql/optimizer/optimize_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl OptimizeContext {
.all_column_bindings()
.iter()
.map(|col| NamedColumn {
index: col.index.unwrap(),
index: col.index,
name: col.column_name.clone(),
})
.collect();
Expand Down
2 changes: 1 addition & 1 deletion query/src/sql/optimizer/s_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl SExpr {
pub fn match_pattern(&self, pattern: &SExpr) -> bool {
if pattern.plan.plan_type() != PlanType::Pattern {
// Pattern is plan
if self.plan.plan_type() == pattern.plan.plan_type() {
if self.plan.plan_type() != pattern.plan.plan_type() {
return false;
}

Expand Down
Loading