Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support expr in values clause of multi table insert #15147

Merged
merged 12 commits into from
Apr 2, 2024
17 changes: 16 additions & 1 deletion src/query/ast/src/ast/statements/insert_multi_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,22 @@ pub struct IntoClause {
pub database: Option<Identifier>,
pub table: Identifier,
pub target_columns: Vec<Identifier>,
pub source_columns: Vec<Identifier>,
pub source_columns: Vec<SourceExpr>,
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub enum SourceExpr {
Expr(Expr),
Default,
}

impl Display for SourceExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
SourceExpr::Expr(expr) => expr.fmt(f),
SourceExpr::Default => write!(f, "DEFAULT"),
}
}
}

impl Display for IntoClause {
Expand Down
6 changes: 5 additions & 1 deletion src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2345,12 +2345,16 @@ fn when_clause(i: Input) -> IResult<WhenClause> {
}

fn into_clause(i: Input) -> IResult<IntoClause> {
let source_expr = alt((
map(rule! {DEFAULT}, |_| SourceExpr::Default),
map(rule! { #expr }, SourceExpr::Expr),
));
map(
rule! {
INTO
~ #dot_separated_idents_1_to_3
~ ( "(" ~ #comma_separated_list1(ident) ~ ")" )?
~ (VALUES ~ "(" ~ #comma_separated_list1(ident) ~ ")" )?
~ (VALUES ~ "(" ~ #comma_separated_list1(source_expr) ~ ")" )?
},
|(_, (catalog, database, table), opt_target_columns, opt_source_columns)| IntoClause {
catalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataField;
use databend_common_expression::DataSchema;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::RemoteExpr;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_sql::executor::physical_plans::CastSchema;
use databend_common_sql::executor::physical_plans::ChunkAppendData;
use databend_common_sql::executor::physical_plans::ChunkCastSchema;
use databend_common_sql::executor::physical_plans::ChunkCommitInsert;
use databend_common_sql::executor::physical_plans::ChunkEvalScalar;
use databend_common_sql::executor::physical_plans::ChunkFillAndReorder;
use databend_common_sql::executor::physical_plans::ChunkMerge;
use databend_common_sql::executor::physical_plans::ChunkProject;
use databend_common_sql::executor::physical_plans::FillAndReorder;
use databend_common_sql::executor::physical_plans::MultiInsertEvalScalar;
use databend_common_sql::executor::physical_plans::Project;
use databend_common_sql::executor::physical_plans::SerializableTable;
use databend_common_sql::executor::physical_plans::ShuffleStrategy;
Expand Down Expand Up @@ -87,21 +90,39 @@ impl InsertMultiTableInterpreter {
let (mut root, metadata) = self.build_source_physical_plan().await?;
let update_stream_meta = build_update_stream_meta_seq(self.ctx.clone(), &metadata).await?;
let source_schema = root.output_schema()?;
let mut branches = self.build_insert_into_branches().await?;
let branches = self.build_insert_into_branches().await?;
let serializable_tables = branches
.build_serializable_target_tables(self.ctx.clone())
.await?;
let deduplicated_serializable_tables = branches
.build_deduplicated_serializable_target_tables(self.ctx.clone())
.await?;
let predicates = branches.build_predicates(source_schema.as_ref())?;
let projections = branches.build_projections();
let source_schemas = projections
let eval_scalars = branches.build_eval_scalars(source_schema.as_ref())?;

// Source schemas for each branch to be casted to the target schema
// It may be output of eval scalar(if exists) or just the source subquery's schema
let source_schemas = eval_scalars
.iter()
.map(|opt_projection| {
opt_projection
.map(|opt_exprs| {
opt_exprs
.as_ref()
.map(|p| Arc::new(source_schema.project(p)))
.map(
|MultiInsertEvalScalar {
remote_exprs,
projection: _,
}| {
let mut evaled_fields = Vec::with_capacity(remote_exprs.len());
for eval_scalar_expr in remote_exprs {
let data_type = eval_scalar_expr
.as_expr(&BUILTIN_FUNCTIONS)
.data_type()
.clone();
evaled_fields.push(DataField::new("", data_type));
}
Arc::new(DataSchema::new(evaled_fields))
},
)
.unwrap_or_else(|| source_schema.clone())
})
.collect::<Vec<_>>();
Expand All @@ -128,10 +149,10 @@ impl InsertMultiTableInterpreter {
predicates,
}));

root = PhysicalPlan::ChunkProject(Box::new(ChunkProject {
root = PhysicalPlan::ChunkEvalScalar(Box::new(ChunkEvalScalar {
plan_id: 0,
input: Box::new(root),
projections,
eval_scalars,
}));

root = PhysicalPlan::ChunkCastSchema(Box::new(ChunkCastSchema {
Expand Down Expand Up @@ -249,11 +270,16 @@ impl InsertMultiTableInterpreter {
catalog,
database,
table,
projection,
casted_schema,
source_scalar_exprs,
} = into;
let table = self.ctx.get_table(catalog, database, table).await?;
branches.push(table, condition, projection.clone(), casted_schema.clone());
branches.push(
table,
condition,
source_scalar_exprs.clone(),
casted_schema.clone(),
);
}

Ok(branches)
Expand All @@ -278,11 +304,18 @@ fn not(expr: ScalarExpr) -> ScalarExpr {
})
}

fn scalar_expr_to_remote_expr(expr: &ScalarExpr, block_schema: &DataSchema) -> Result<RemoteExpr> {
let expr = expr
.as_expr()?
.project_column_ref(|col| block_schema.index_of(&col.index.to_string()).unwrap());
Ok(expr.as_remote_expr())
}

#[derive(Default)]
struct InsertIntoBranches {
tables: Vec<Arc<dyn Table>>,
conditions: Vec<Option<ScalarExpr>>,
projections: Vec<Option<Vec<usize>>>,
source_exprs: Vec<Option<Vec<ScalarExpr>>>,
casted_schemas: Vec<DataSchemaRef>,
len: usize,
}
Expand All @@ -292,12 +325,12 @@ impl InsertIntoBranches {
&mut self,
table: Arc<dyn Table>,
condition: Option<ScalarExpr>,
projection: Option<Vec<usize>>,
source_exprs: Option<Vec<ScalarExpr>>,
casted_schema: DataSchemaRef,
) {
self.tables.push(table);
self.conditions.push(condition);
self.projections.push(projection);
self.source_exprs.push(source_exprs);
self.casted_schemas.push(casted_schema);
self.len += 1;
}
Expand Down Expand Up @@ -362,8 +395,29 @@ impl InsertIntoBranches {
Ok(predicates)
}

fn build_projections(&mut self) -> Vec<Option<Vec<usize>>> {
std::mem::take(&mut self.projections)
fn build_eval_scalars(
&self,
source_schema: &DataSchema,
) -> Result<Vec<Option<MultiInsertEvalScalar>>> {
let mut eval_scalars = vec![];
for opt_scalar_exprs in self.source_exprs.iter() {
if let Some(scalar_exprs) = opt_scalar_exprs {
let exprs = scalar_exprs
.iter()
.map(|scalar_expr| scalar_expr_to_remote_expr(scalar_expr, source_schema))
.collect::<Result<Vec<_>>>()?;
let source_field_num = source_schema.fields().len();
let evaled_num = scalar_exprs.len();
let projection = (source_field_num..source_field_num + evaled_num).collect();
eval_scalars.push(Some(MultiInsertEvalScalar {
remote_exprs: exprs,
projection,
}));
} else {
eval_scalars.push(None);
}
}
Ok(eval_scalars)
}

fn build_cast_schema(&self, source_schemas: Vec<DataSchemaRef>) -> Vec<Option<CastSchema>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ use databend_common_pipeline_transforms::processors::TransformSortPartial;
use databend_common_sql::executor::physical_plans::ChunkAppendData;
use databend_common_sql::executor::physical_plans::ChunkCastSchema;
use databend_common_sql::executor::physical_plans::ChunkCommitInsert;
use databend_common_sql::executor::physical_plans::ChunkEvalScalar;
use databend_common_sql::executor::physical_plans::ChunkFillAndReorder;
use databend_common_sql::executor::physical_plans::ChunkFilter;
use databend_common_sql::executor::physical_plans::ChunkMerge;
use databend_common_sql::executor::physical_plans::ChunkProject;
use databend_common_sql::executor::physical_plans::Duplicate;
use databend_common_sql::executor::physical_plans::Shuffle;
use databend_common_storages_fuse::operations::CommitMultiTableInsert;
Expand Down Expand Up @@ -74,18 +74,19 @@ impl PipelineBuilder {
Ok(())
}

pub(crate) fn build_chunk_project(&mut self, plan: &ChunkProject) -> Result<()> {
pub(crate) fn build_chunk_eval_scalar(&mut self, plan: &ChunkEvalScalar) -> Result<()> {
self.build_pipeline(&plan.input)?;
if plan.projections.iter().all(|x| x.is_none()) {
if plan.eval_scalars.iter().all(|x| x.is_none()) {
return Ok(());
}
let num_input_columns = plan.input.output_schema()?.num_fields();
let mut f: Vec<DynTransformBuilder> = Vec::with_capacity(plan.projections.len());
for projection in plan.projections.iter() {
if let Some(projection) = projection {
f.push(Box::new(self.project_transform_builder(
projection.clone(),
let mut f: Vec<DynTransformBuilder> = Vec::with_capacity(plan.eval_scalars.len());
for eval_scalar in plan.eval_scalars.iter() {
if let Some(eval_scalar) = eval_scalar {
f.push(Box::new(self.map_transform_builder(
num_input_columns,
eval_scalar.remote_exprs.clone(),
Some(eval_scalar.projection.clone()),
)?));
} else {
f.push(Box::new(self.dummy_transform_builder()?));
Expand Down
15 changes: 11 additions & 4 deletions src/query/service/src/pipelines/builders/transform_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_pipeline_transforms::processors::TransformCompact;
use databend_common_pipeline_transforms::processors::TransformDummy;
use databend_common_sql::evaluator::BlockOperator;
use databend_common_sql::evaluator::CompoundBlockOperator;
use databend_common_sql::ColumnSet;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::operations::TableMutationAggregator;
use databend_common_storages_fuse::operations::TransformSerializeBlock;
Expand Down Expand Up @@ -154,20 +155,26 @@ impl PipelineBuilder {
})
}

pub(crate) fn project_transform_builder(
pub(crate) fn map_transform_builder(
&self,
projection: Vec<usize>,
num_input_columns: usize,
remote_exprs: Vec<RemoteExpr>,
projections: Option<ColumnSet>,
) -> Result<impl Fn(Arc<InputPort>, Arc<OutputPort>) -> Result<ProcessorPtr>> {
let func_ctx = self.func_ctx.clone();
let exprs = remote_exprs
.iter()
.map(|scalar| scalar.as_expr(&BUILTIN_FUNCTIONS))
.collect::<Vec<_>>();
Ok(move |input, output| {
Ok(ProcessorPtr::create(CompoundBlockOperator::create(
input,
output,
num_input_columns,
func_ctx.clone(),
vec![BlockOperator::Project {
projection: projection.clone(),
vec![BlockOperator::Map {
exprs: exprs.clone(),
projections: projections.clone(),
}],
)))
})
Expand Down
4 changes: 3 additions & 1 deletion src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ impl PipelineBuilder {
PhysicalPlan::Duplicate(duplicate) => self.build_duplicate(duplicate),
PhysicalPlan::Shuffle(shuffle) => self.build_shuffle(shuffle),
PhysicalPlan::ChunkFilter(chunk_filter) => self.build_chunk_filter(chunk_filter),
PhysicalPlan::ChunkProject(chunk_project) => self.build_chunk_project(chunk_project),
PhysicalPlan::ChunkEvalScalar(chunk_project) => {
self.build_chunk_eval_scalar(chunk_project)
}
PhysicalPlan::ChunkCastSchema(chunk_cast_schema) => {
self.build_chunk_cast_schema(chunk_cast_schema)
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/executor/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ fn to_format_tree(
PhysicalPlan::Duplicate(_) => Ok(FormatTreeNode::new("Duplicate".to_string())),
PhysicalPlan::Shuffle(_) => Ok(FormatTreeNode::new("Shuffle".to_string())),
PhysicalPlan::ChunkFilter(_) => Ok(FormatTreeNode::new("ChunkFilter".to_string())),
PhysicalPlan::ChunkProject(_) => Ok(FormatTreeNode::new("ChunkProject".to_string())),
PhysicalPlan::ChunkEvalScalar(_) => Ok(FormatTreeNode::new("ChunkProject".to_string())),
SkyFan2002 marked this conversation as resolved.
Show resolved Hide resolved
PhysicalPlan::ChunkCastSchema(_) => Ok(FormatTreeNode::new("ChunkCastSchema".to_string())),
PhysicalPlan::ChunkFillAndReorder(_) => {
Ok(FormatTreeNode::new("ChunkFillAndReorder".to_string()))
Expand Down
16 changes: 8 additions & 8 deletions src/query/sql/src/executor/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ use crate::executor::physical_plans::AggregatePartial;
use crate::executor::physical_plans::ChunkAppendData;
use crate::executor::physical_plans::ChunkCastSchema;
use crate::executor::physical_plans::ChunkCommitInsert;
use crate::executor::physical_plans::ChunkEvalScalar;
use crate::executor::physical_plans::ChunkFillAndReorder;
use crate::executor::physical_plans::ChunkFilter;
use crate::executor::physical_plans::ChunkMerge;
use crate::executor::physical_plans::ChunkProject;
use crate::executor::physical_plans::CommitSink;
use crate::executor::physical_plans::CompactSource;
use crate::executor::physical_plans::ConstantTableScan;
Expand Down Expand Up @@ -137,7 +137,7 @@ pub enum PhysicalPlan {
Duplicate(Box<Duplicate>),
Shuffle(Box<Shuffle>),
ChunkFilter(Box<ChunkFilter>),
ChunkProject(Box<ChunkProject>),
ChunkEvalScalar(Box<ChunkEvalScalar>),
ChunkCastSchema(Box<ChunkCastSchema>),
ChunkFillAndReorder(Box<ChunkFillAndReorder>),
ChunkAppendData(Box<ChunkAppendData>),
Expand Down Expand Up @@ -351,7 +351,7 @@ impl PhysicalPlan {
*next_id += 1;
plan.input.adjust_plan_id(next_id);
}
PhysicalPlan::ChunkProject(plan) => {
PhysicalPlan::ChunkEvalScalar(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
plan.input.adjust_plan_id(next_id);
Expand Down Expand Up @@ -428,7 +428,7 @@ impl PhysicalPlan {
PhysicalPlan::Duplicate(v) => v.plan_id,
PhysicalPlan::Shuffle(v) => v.plan_id,
PhysicalPlan::ChunkFilter(v) => v.plan_id,
PhysicalPlan::ChunkProject(v) => v.plan_id,
PhysicalPlan::ChunkEvalScalar(v) => v.plan_id,
PhysicalPlan::ChunkCastSchema(v) => v.plan_id,
PhysicalPlan::ChunkFillAndReorder(v) => v.plan_id,
PhysicalPlan::ChunkAppendData(v) => v.plan_id,
Expand Down Expand Up @@ -480,7 +480,7 @@ impl PhysicalPlan {
PhysicalPlan::Duplicate(plan) => plan.input.output_schema(),
PhysicalPlan::Shuffle(plan) => plan.input.output_schema(),
PhysicalPlan::ChunkFilter(plan) => plan.input.output_schema(),
PhysicalPlan::ChunkProject(_) => todo!(),
PhysicalPlan::ChunkEvalScalar(_) => todo!(),
PhysicalPlan::ChunkCastSchema(_) => todo!(),
PhysicalPlan::ChunkFillAndReorder(_) => todo!(),
PhysicalPlan::ChunkAppendData(_) => todo!(),
Expand Down Expand Up @@ -537,7 +537,7 @@ impl PhysicalPlan {
PhysicalPlan::Duplicate(_) => "Duplicate".to_string(),
PhysicalPlan::Shuffle(_) => "Shuffle".to_string(),
PhysicalPlan::ChunkFilter(_) => "ChunkFilter".to_string(),
PhysicalPlan::ChunkProject(_) => "ChunkProject".to_string(),
PhysicalPlan::ChunkEvalScalar(_) => "ChunkProject".to_string(),
PhysicalPlan::ChunkCastSchema(_) => "ChunkCastSchema".to_string(),
PhysicalPlan::ChunkFillAndReorder(_) => "ChunkFillAndReorder".to_string(),
PhysicalPlan::ChunkAppendData(_) => "ChunkAppendData".to_string(),
Expand Down Expand Up @@ -605,7 +605,7 @@ impl PhysicalPlan {
PhysicalPlan::Duplicate(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::Shuffle(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::ChunkFilter(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::ChunkProject(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::ChunkEvalScalar(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::ChunkCastSchema(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::ChunkFillAndReorder(plan) => {
Box::new(std::iter::once(plan.input.as_ref()))
Expand Down Expand Up @@ -660,7 +660,7 @@ impl PhysicalPlan {
| PhysicalPlan::Duplicate(_)
| PhysicalPlan::Shuffle(_)
| PhysicalPlan::ChunkFilter(_)
| PhysicalPlan::ChunkProject(_)
| PhysicalPlan::ChunkEvalScalar(_)
| PhysicalPlan::ChunkCastSchema(_)
| PhysicalPlan::ChunkFillAndReorder(_)
| PhysicalPlan::ChunkAppendData(_)
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/executor/physical_plan_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> {
PhysicalPlan::Duplicate(_) => "Duplicate".fmt(f)?,
PhysicalPlan::Shuffle(_) => "Shuffle".fmt(f)?,
PhysicalPlan::ChunkFilter(_) => "ChunkFilter".fmt(f)?,
PhysicalPlan::ChunkProject(_) => "ChunkProject".fmt(f)?,
PhysicalPlan::ChunkEvalScalar(_) => "ChunkProject".fmt(f)?,
SkyFan2002 marked this conversation as resolved.
Show resolved Hide resolved
PhysicalPlan::ChunkCastSchema(_) => "ChunkCastSchema".fmt(f)?,
PhysicalPlan::ChunkFillAndReorder(_) => "ChunkFillAndReorder".fmt(f)?,
PhysicalPlan::ChunkAppendData(_) => "ChunkAppendData".fmt(f)?,
Expand Down
Loading
Loading