Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor merge into optimizer and fix unexpected distribution plan #15507

Merged
merged 7 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use databend_common_pipeline_core::processors::PlanProfile;
use databend_common_sql::binder::ExplainConfig;
use databend_common_sql::optimizer::ColumnSet;
use databend_common_sql::plans::FunctionCall;
use databend_common_sql::plans::MergeInto;
use databend_common_sql::plans::UpdatePlan;
use databend_common_sql::BindContext;
use databend_common_sql::MetadataRef;
Expand All @@ -40,6 +41,7 @@ use databend_common_users::UserApiProvider;
use super::InsertMultiTableInterpreter;
use super::InterpreterFactory;
use super::UpdateInterpreter;
use crate::interpreters::interpreter_merge_into::MergeIntoInterpreter;
use crate::interpreters::Interpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
Expand Down Expand Up @@ -191,7 +193,7 @@ impl Interpreter for ExplainInterpreter {
// todo:(JackTan25), we need to make all execute2() just do `build pipeline` work,
// don't take real actions. for now we fix #13657 like below.
let pipeline = match &self.plan {
Plan::Query { .. } => {
Plan::Query { .. } | Plan::MergeInto(_) => {
let interpter =
InterpreterFactory::get(self.ctx.clone(), &self.plan).await?;
interpter.execute2().await?
Expand All @@ -216,6 +218,7 @@ impl Interpreter for ExplainInterpreter {
)
.await?
}
Plan::MergeInto(merge_into) => self.explain_merge_fragments(merge_into).await?,
Plan::Update(update) => self.explain_update_fragments(update.as_ref()).await?,
_ => {
return Err(ErrorCode::Unimplemented("Unsupported EXPLAIN statement"));
Expand Down Expand Up @@ -563,4 +566,21 @@ impl ExplainInterpreter {
}
Ok(vec![DataBlock::concat(&result)?])
}

async fn explain_merge_fragments(&self, merge_into: &MergeInto) -> Result<Vec<DataBlock>> {
let interpreter = MergeIntoInterpreter::try_create(self.ctx.clone(), merge_into.clone())?;
let (plan, _) = interpreter.build_physical_plan().await?;
let root_fragment = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?;

let mut fragments_actions = QueryFragmentsActions::create(self.ctx.clone());
root_fragment.get_actions(self.ctx.clone(), &mut fragments_actions)?;

let display_string = fragments_actions
.display_indent(&merge_into.meta_data)
.to_string();

let line_split_result = display_string.lines().collect::<Vec<_>>();
let formatted_plan = StringType::from_data(line_split_result);
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}
}
8 changes: 5 additions & 3 deletions src/query/service/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,11 @@ impl InterpreterFactory {
Plan::Insert(insert) => InsertInterpreter::try_create(ctx, *insert.clone()),

Plan::Replace(replace) => ReplaceInterpreter::try_create(ctx, *replace.clone()),
Plan::MergeInto(merge_into) => {
MergeIntoInterpreter::try_create(ctx, *merge_into.clone())
}
Plan::MergeInto(merge_into) => Ok(Arc::new(MergeIntoInterpreter::try_create(
ctx,
*merge_into.clone(),
)?)),

Plan::Delete(delete) => Ok(Arc::new(DeleteInterpreter::try_create(
ctx,
*delete.clone(),
Expand Down
50 changes: 17 additions & 33 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::executor::PhysicalPlanBuilder;
use databend_common_sql::plans;
use databend_common_sql::plans::MergeInto as MergePlan;
use databend_common_sql::plans::RelOperator;
use databend_common_sql::IndexType;
use databend_common_sql::ScalarExpr;
use databend_common_sql::TypeCheck;
Expand All @@ -60,7 +59,6 @@ use itertools::Itertools;
use crate::interpreters::common::dml_build_update_stream_req;
use crate::interpreters::HookOperator;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
Expand All @@ -74,8 +72,8 @@ pub struct MergeIntoInterpreter {
}

impl MergeIntoInterpreter {
pub fn try_create(ctx: Arc<QueryContext>, plan: MergePlan) -> Result<InterpreterPtr> {
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved
Ok(Arc::new(MergeIntoInterpreter { ctx, plan }))
pub fn try_create(ctx: Arc<QueryContext>, plan: MergePlan) -> Result<MergeIntoInterpreter> {
Ok(MergeIntoInterpreter { ctx, plan })
}
}

Expand Down Expand Up @@ -125,7 +123,7 @@ impl Interpreter for MergeIntoInterpreter {
}

impl MergeIntoInterpreter {
async fn build_physical_plan(&self) -> Result<(PhysicalPlan, TableInfo)> {
pub async fn build_physical_plan(&self) -> Result<(PhysicalPlan, TableInfo)> {
let MergePlan {
bind_context,
input,
Expand All @@ -145,8 +143,10 @@ impl MergeIntoInterpreter {
split_idx,
row_id_index,
can_try_update_column_only,
enable_right_broadcast,
..
} = &self.plan;
let enable_right_broadcast = *enable_right_broadcast;
let mut columns_set = columns_set.clone();
let table = self.ctx.get_table(catalog, database, table_name).await?;
let fuse_table = table.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
Expand Down Expand Up @@ -211,16 +211,8 @@ impl MergeIntoInterpreter {
let table_name = table_name.clone();
let input = input.clone();

// we need to extract join plan, but we need to give this exchange
// back at last.
let (input, extract_exchange) = if let RelOperator::Exchange(_) = input.plan() {
(Box::new(input.child(0)?.clone()), true)
} else {
(input, false)
};

let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false);
let mut join_input = builder.build(&input, *columns_set.clone()).await?;
let join_input = builder.build(&input, *columns_set.clone()).await?;

// find row_id column index
let join_output_schema = join_input.output_schema()?;
Expand Down Expand Up @@ -265,7 +257,7 @@ impl MergeIntoInterpreter {
}
}

if *distributed && !*change_join_order {
if enable_right_broadcast {
row_number_idx = Some(join_output_schema.index_of(ROW_NUMBER_COL_NAME)?);
}

Expand All @@ -276,7 +268,7 @@ impl MergeIntoInterpreter {
));
}

if *distributed && row_number_idx.is_none() && !*change_join_order {
if enable_right_broadcast && row_number_idx.is_none() {
return Err(ErrorCode::InvalidRowIdIndex(
"can't get internal row_number_idx when running merge into",
));
Expand All @@ -285,17 +277,6 @@ impl MergeIntoInterpreter {
let table_info = fuse_table.get_table_info().clone();
let catalog_ = self.ctx.get_catalog(catalog).await?;

if !*distributed && extract_exchange {
join_input = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(join_input),
kind: FragmentKind::Merge,
keys: vec![],
allow_adjust_parallelism: true,
ignore_exchange: false,
});
};

// transform unmatched for insert
// reference to func `build_eval_scalar`
// (DataSchemaRef, Option<RemoteExpr>, Vec<RemoteExpr>,Vec<usize>) => (source_schema, condition, value_exprs)
Expand Down Expand Up @@ -432,6 +413,7 @@ impl MergeIntoInterpreter {
can_try_update_column_only: *can_try_update_column_only,
plan_id: u32::MAX,
merge_into_split_idx,
enable_right_broadcast,
}))
} else {
let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto {
Expand All @@ -444,23 +426,25 @@ impl MergeIntoInterpreter {
row_id_idx,
segments: segments.clone(),
distributed: true,
output_schema: match *change_join_order {
false => DataSchemaRef::new(DataSchema::new(vec![
join_output_schema.fields[row_number_idx.unwrap()].clone(),
])),
true => DataSchemaRef::new(DataSchema::new(vec![DataField::new(
output_schema: if let Some(row_number_idx) = row_number_idx {
DataSchemaRef::new(DataSchema::new(vec![
join_output_schema.fields[row_number_idx].clone(),
]))
} else {
DataSchemaRef::new(DataSchema::new(vec![DataField::new(
ROW_ID_COL_NAME,
databend_common_expression::types::DataType::Number(
databend_common_expression::types::NumberDataType::UInt64,
),
)])),
)]))
},
merge_type: merge_type.clone(),
change_join_order: *change_join_order,
target_build_optimization: false, // we don't support for distributed mode for now..
can_try_update_column_only: *can_try_update_column_only,
plan_id: u32::MAX,
merge_into_split_idx,
enable_right_broadcast,
}));
// if change_join_order = true, it means the target is build side,
// in this way, we will do matched operation and not matched operation
Expand Down
30 changes: 15 additions & 15 deletions src/query/service/src/pipelines/builders/builder_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl PipelineBuilder {
}
assert!(self.join_state.is_some());
assert!(self.merge_into_probe_data_fields.is_some());

self.main_pipeline.resize(1, false)?;
let join_state = self.join_state.clone().unwrap();
// split row_number and log
// output_port_row_number
Expand Down Expand Up @@ -291,6 +291,7 @@ impl PipelineBuilder {
// we will receive MutationLogs only without rowids.
return Ok(());
}
self.main_pipeline.resize(1, false)?;
// we will receive MutationLogs and rowids. So we should apply
// rowids firstly and then send all mutation logs to commit sink.
// we need to spilt rowid and mutationlogs, and we can get pipeitems:
Expand Down Expand Up @@ -366,9 +367,10 @@ impl PipelineBuilder {
change_join_order,
can_try_update_column_only,
merge_into_split_idx,
enable_right_broadcast,
..
} = merge_into;

let enable_right_broadcast = *enable_right_broadcast;
self.build_pipeline(input)?;

self.main_pipeline
Expand Down Expand Up @@ -457,10 +459,8 @@ impl PipelineBuilder {
}

if need_unmatch {
// distributed: false, standalone mode, we need to add insert processor
// (distributed,change join order):(true,true) target is build side, we
// need to support insert in local node.
if !*distributed || *change_join_order {
// If merge into doesn't contain right broadcast join, execute insert in local.
if !enable_right_broadcast {
let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create(
unmatched.clone(),
input.output_schema()?,
Expand Down Expand Up @@ -613,7 +613,7 @@ impl PipelineBuilder {
}
} else if need_match && need_unmatch {
// remove first row_id port and last row_number_port
if !*change_join_order {
if enable_right_broadcast {
self.main_pipeline.output_len() - 2
} else {
// remove first row_id port
Expand All @@ -624,7 +624,7 @@ impl PipelineBuilder {
self.main_pipeline.output_len() - 1
} else {
// there are only row_number
if !*change_join_order {
if enable_right_broadcast {
0
} else {
// unmatched prot
Expand Down Expand Up @@ -659,7 +659,7 @@ impl PipelineBuilder {
// receive row_id
builder.add_items_prepend(vec![create_dummy_item()]);
}
if need_unmatch && !*change_join_order {
if enable_right_broadcast {
// receive row_number
builder.add_items(vec![create_dummy_item()]);
}
Expand Down Expand Up @@ -721,7 +721,7 @@ impl PipelineBuilder {
}

// need to receive row_number, we should give a dummy item here.
if *distributed && need_unmatch && !*change_join_order {
if enable_right_broadcast {
builder.add_items(vec![create_dummy_item()]);
}
self.main_pipeline.add_pipe(builder.finalize());
Expand Down Expand Up @@ -773,7 +773,7 @@ impl PipelineBuilder {
}

// need to receive row_number, we should give a dummy item here.
if *distributed && need_unmatch && !*change_join_order {
if enable_right_broadcast {
builder.add_items(vec![create_dummy_item()]);
}
self.main_pipeline.add_pipe(builder.finalize());
Expand Down Expand Up @@ -817,7 +817,7 @@ impl PipelineBuilder {
}

// receive row_number
if *distributed && need_unmatch && !*change_join_order {
if enable_right_broadcast {
pipe_items.push(create_dummy_item());
}

Expand Down Expand Up @@ -853,7 +853,7 @@ impl PipelineBuilder {
}

// with row_number
if *distributed && need_unmatch && !change_join_order {
if enable_right_broadcast {
ranges.push(vec![self.main_pipeline.output_len() - 1]);
}

Expand All @@ -877,7 +877,7 @@ impl PipelineBuilder {
vec.push(serialize_segment_transform.into_pipe_item());
}

if need_unmatch && !*change_join_order {
if enable_right_broadcast {
vec.push(create_dummy_item())
}
vec
Expand Down Expand Up @@ -911,7 +911,7 @@ impl PipelineBuilder {
));

// accumulate row_number
if *distributed && need_unmatch && !*change_join_order {
if enable_right_broadcast {
let pipe_items = if need_match {
vec![
create_dummy_item(),
Expand Down
4 changes: 0 additions & 4 deletions src/query/service/src/schedulers/fragments/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,6 @@ impl PhysicalPlanReplacer for Fragmenter {

fn replace_merge_into(&mut self, plan: &MergeInto) -> Result<PhysicalPlan> {
let input = self.replace(&plan.input)?;
if !plan.change_join_order {
self.state = State::SelectLeaf;
}
Ok(PhysicalPlan::MergeInto(Box::new(MergeInto {
input: Box::new(input),
..plan.clone()
Expand Down Expand Up @@ -232,7 +229,6 @@ impl PhysicalPlanReplacer for Fragmenter {
// Consume current fragments to prevent them being consumed by `probe_input`.
fragments.append(&mut self.fragments);
let probe_input = self.replace(plan.probe.as_ref())?;

fragments.append(&mut self.fragments);
self.fragments = fragments;

Expand Down
Loading
Loading