Skip to content

Commit

Permalink
chore(planner): improve infer filter (#16361)
Browse files Browse the repository at this point in the history
* chore(query): improve infer filter

* chore(test): update test

* chore(query): introduce FilterScalarExpr

* chore(binder): refine bind_cte_scan

* chore(test): add window explain sqllogictest

* chore(query): refine ScalarExpr eq and hash

* chore(planner): remove FilterScalarExpr

* chore(binder): fix bind_cte_scan

* chore(planner): refine materialized cte

* chore(query): fix MaterializedCte derive_relational_prop

* chore(planner): fix cte scan

* chore(query): fix cte_scan_offset

* chore(test): update sqllogictest

* chore(planner): improve require_property

* chore(test): add shuffle explain test

* chore(test): update test
  • Loading branch information
Dousir9 authored Sep 10, 2024
1 parent 20c6964 commit c558b45
Show file tree
Hide file tree
Showing 34 changed files with 517 additions and 244 deletions.
53 changes: 33 additions & 20 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl PipelineBuilder {
self.main_pipeline.get_scopes(),
);
right_side_builder.cte_state = self.cte_state.clone();
right_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
right_side_builder.hash_join_states = self.hash_join_states.clone();

let mut right_res = right_side_builder.finalize(&range_join.right)?;
Expand Down Expand Up @@ -148,6 +149,7 @@ impl PipelineBuilder {
self.main_pipeline.get_scopes(),
);
build_side_builder.cte_state = self.cte_state.clone();
build_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
build_side_builder.hash_join_states = self.hash_join_states.clone();
let mut build_res = build_side_builder.finalize(build)?;

Expand Down Expand Up @@ -231,52 +233,63 @@ impl PipelineBuilder {
&mut self,
materialized_cte: &MaterializedCte,
) -> Result<()> {
self.expand_left_side_pipeline(
&materialized_cte.left,
self.cte_scan_offsets.insert(
materialized_cte.cte_idx,
&materialized_cte.left_output_columns,
materialized_cte.cte_scan_offset.clone(),
);
self.expand_materialized_side_pipeline(
&materialized_cte.right,
materialized_cte.cte_idx,
&materialized_cte.materialized_output_columns,
)?;
self.build_pipeline(&materialized_cte.right)
self.build_pipeline(&materialized_cte.left)
}

fn expand_left_side_pipeline(
fn expand_materialized_side_pipeline(
&mut self,
left_side: &PhysicalPlan,
materialized_side: &PhysicalPlan,
cte_idx: IndexType,
left_output_columns: &[ColumnBinding],
materialized_output_columns: &[ColumnBinding],
) -> Result<()> {
let left_side_ctx = QueryContext::create_from(self.ctx.clone());
let materialized_side_ctx = QueryContext::create_from(self.ctx.clone());
let state = Arc::new(MaterializedCteState::new(self.ctx.clone()));
self.cte_state.insert(cte_idx, state.clone());
let mut left_side_builder = PipelineBuilder::create(
let mut materialized_side_builder = PipelineBuilder::create(
self.func_ctx.clone(),
self.settings.clone(),
left_side_ctx,
materialized_side_ctx,
self.main_pipeline.get_scopes(),
);
left_side_builder.cte_state = self.cte_state.clone();
left_side_builder.hash_join_states = self.hash_join_states.clone();
let mut left_side_pipeline = left_side_builder.finalize(left_side)?;
assert!(left_side_pipeline.main_pipeline.is_pulling_pipeline()?);
materialized_side_builder.cte_state = self.cte_state.clone();
materialized_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
materialized_side_builder.hash_join_states = self.hash_join_states.clone();
let mut materialized_side_pipeline =
materialized_side_builder.finalize(materialized_side)?;
assert!(
materialized_side_pipeline
.main_pipeline
.is_pulling_pipeline()?
);

PipelineBuilder::build_result_projection(
&self.func_ctx,
left_side.output_schema()?,
left_output_columns,
&mut left_side_pipeline.main_pipeline,
materialized_side.output_schema()?,
materialized_output_columns,
&mut materialized_side_pipeline.main_pipeline,
false,
)?;

left_side_pipeline.main_pipeline.add_sink(|input| {
materialized_side_pipeline.main_pipeline.add_sink(|input| {
let transform = Sinker::<MaterializedCteSink>::create(
input,
MaterializedCteSink::create(self.ctx.clone(), cte_idx, state.clone())?,
);
Ok(ProcessorPtr::create(transform))
})?;
self.pipelines
.push(left_side_pipeline.main_pipeline.finalize());
self.pipelines.extend(left_side_pipeline.sources_pipelines);
.push(materialized_side_pipeline.main_pipeline.finalize());
self.pipelines
.extend(materialized_side_pipeline.sources_pipelines);
Ok(())
}
}
4 changes: 4 additions & 0 deletions src/query/service/src/pipelines/builders/builder_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ impl PipelineBuilder {
cte_scan.cte_idx,
self.cte_state.get(&cte_scan.cte_idx.0).unwrap().clone(),
cte_scan.offsets.clone(),
self.cte_scan_offsets
.get(&cte_scan.cte_idx.0)
.unwrap()
.clone(),
)
},
max_threads as usize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl PipelineBuilder {
self.main_pipeline.get_scopes(),
);
pipeline_builder.cte_state = self.cte_state.clone();
pipeline_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
pipeline_builder.hash_join_states = self.hash_join_states.clone();

let mut build_res = pipeline_builder.finalize(input)?;
Expand Down
5 changes: 4 additions & 1 deletion src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ pub struct PipelineBuilder {
pub merge_into_probe_data_fields: Option<Vec<DataField>>,
pub join_state: Option<Arc<HashJoinBuildState>>,

// Cte -> state, each cte has it's own state
// The cte state of each materialized cte.
pub cte_state: HashMap<IndexType, Arc<MaterializedCteState>>,
// The column offsets used by cte scan
pub cte_scan_offsets: HashMap<IndexType, Vec<usize>>,

pub(crate) exchange_injector: Arc<dyn ExchangeInjector>,

Expand All @@ -75,6 +77,7 @@ impl PipelineBuilder {
main_pipeline: Pipeline::with_scopes(scopes),
exchange_injector: DefaultExchangeInjector::create(),
cte_state: HashMap::new(),
cte_scan_offsets: HashMap::new(),
merge_into_probe_data_fields: None,
join_state: None,
hash_join_states: HashMap::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ pub struct MaterializedCteSource {
cte_idx: (IndexType, IndexType),
ctx: Arc<QueryContext>,
cte_state: Arc<MaterializedCteState>,
offsets: Vec<IndexType>,
column_offsets: Vec<IndexType>,
scan_offsets: Vec<usize>,
}

impl MaterializedCteSource {
Expand All @@ -146,13 +147,15 @@ impl MaterializedCteSource {
output_port: Arc<OutputPort>,
cte_idx: (IndexType, IndexType),
cte_state: Arc<MaterializedCteState>,
offsets: Vec<IndexType>,
column_offsets: Vec<IndexType>,
scan_offsets: Vec<usize>,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(ctx.clone(), output_port, MaterializedCteSource {
ctx,
cte_idx,
cte_state,
offsets,
column_offsets,
scan_offsets,
})
}
}
Expand All @@ -167,19 +170,19 @@ impl AsyncSource for MaterializedCteSource {
let materialized_cte = self.ctx.get_materialized_cte(self.cte_idx)?;
if let Some(blocks) = materialized_cte {
let mut blocks_guard = blocks.write();
let block = blocks_guard.pop();
if let Some(b) = block {
if self.offsets.len() == b.num_columns() {
return Ok(Some(b));
let data_block = blocks_guard.pop();
if let Some(data_block) = data_block {
if self.column_offsets.len() == data_block.num_columns() {
return Ok(Some(data_block));
}
let row_len = b.num_rows();
let num_rows = data_block.num_rows();
let pruned_columns = self
.offsets
.column_offsets
.iter()
.map(|offset| b.get_by_offset(*offset).clone())
.map(|offset| data_block.get_by_offset(self.scan_offsets[*offset]).clone())
.collect::<Vec<BlockEntry>>();

Ok(Some(DataBlock::new(pruned_columns, row_len)))
Ok(Some(DataBlock::new(pruned_columns, num_rows)))
} else {
Ok(None)
}
Expand Down
6 changes: 3 additions & 3 deletions src/query/sql/src/executor/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ impl PhysicalPlan {
}
PhysicalPlan::CteScan(cte_scan) => cte_scan_to_format_tree(cte_scan),
PhysicalPlan::MaterializedCte(materialized_cte) => {
let left_child = materialized_cte.left.format_join(metadata)?;
let right_child = materialized_cte.right.format_join(metadata)?;
let left_child = materialized_cte.left.format_join(metadata)?;
let children = vec![
FormatTreeNode::with_children("Left".to_string(), vec![left_child]),
FormatTreeNode::with_children("Right".to_string(), vec![right_child]),
FormatTreeNode::with_children("Left".to_string(), vec![left_child]),
];
Ok(FormatTreeNode::with_children(
format!("MaterializedCte: {}", materialized_cte.cte_idx),
Expand Down Expand Up @@ -1766,8 +1766,8 @@ fn materialized_cte_to_format_tree(
"output columns: [{}]",
format_output_columns(plan.output_schema()?, metadata, true)
)),
to_format_tree(&plan.left, metadata, profs)?,
to_format_tree(&plan.right, metadata, profs)?,
to_format_tree(&plan.left, metadata, profs)?,
];
Ok(FormatTreeNode::with_children(
"MaterializedCTE".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/executor/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ impl PhysicalPlan {
PhysicalPlan::MutationOrganize(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::AddStreamColumn(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::MaterializedCte(plan) => Box::new(
std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())),
std::iter::once(plan.right.as_ref()).chain(std::iter::once(plan.left.as_ref())),
),
PhysicalPlan::Udf(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::AsyncFunction(plan) => Box::new(std::iter::once(plan.input.as_ref())),
Expand Down
4 changes: 4 additions & 0 deletions src/query/sql/src/executor/physical_plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use databend_common_catalog::plan::Partitions;
Expand Down Expand Up @@ -40,6 +41,8 @@ pub struct PhysicalPlanBuilder {
pub(crate) dry_run: bool,
// Record cte_idx and the cte's output columns
pub(crate) cte_output_columns: HashMap<IndexType, Vec<ColumnBinding>>,
// The used column offsets of each materialized cte.
pub(crate) cet_used_column_offsets: HashMap<IndexType, HashSet<usize>>,
// DataMutation info, used to build MergeInto physical plan
pub(crate) mutation_build_info: Option<MutationBuildInfo>,
}
Expand All @@ -53,6 +56,7 @@ impl PhysicalPlanBuilder {
func_ctx,
dry_run,
cte_output_columns: Default::default(),
cet_used_column_offsets: Default::default(),
mutation_build_info: None,
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/query/sql/src/executor/physical_plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,16 @@ pub trait PhysicalPlanReplacer {
}

fn replace_materialized_cte(&mut self, plan: &MaterializedCte) -> Result<PhysicalPlan> {
let left = self.replace(&plan.left)?;
let right = self.replace(&plan.right)?;
let left = self.replace(&plan.left)?;

Ok(PhysicalPlan::MaterializedCte(MaterializedCte {
plan_id: plan.plan_id,
left: Box::new(left),
right: Box::new(right),
cte_idx: plan.cte_idx,
left_output_columns: plan.left_output_columns.clone(),
cte_scan_offset: plan.cte_scan_offset.clone(),
materialized_output_columns: plan.materialized_output_columns.clone(),
}))
}

Expand Down
17 changes: 14 additions & 3 deletions src/query/sql/src/executor/physical_plans/physical_cte_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,27 @@ impl PhysicalPlanBuilder {
used_columns = required.intersection(&used_columns).cloned().collect();
let mut pruned_fields = vec![];
let mut pruned_offsets = vec![];
let mut pruned_materialized_indexes = vec![];
let cte_output_columns = self.cte_output_columns.get(&cte_scan.cte_idx.0).unwrap();
for field in cte_scan.fields.iter() {
for (field, column_index) in cte_scan
.fields
.iter()
.zip(cte_scan.materialized_indexes.iter())
{
if used_columns.contains(&field.name().parse()?) {
pruned_fields.push(field.clone());
pruned_materialized_indexes.push(*column_index);
}
}
for field in pruned_fields.iter() {
for column_index in pruned_materialized_indexes.iter() {
for (offset, col) in cte_output_columns.iter().enumerate() {
if col.index.eq(&field.name().parse::<IndexType>()?) {
if col.index.eq(column_index) {
pruned_offsets.push(offset);
self.cet_used_column_offsets
.entry(cte_scan.cte_idx.0)
.and_modify(|column_offsets| {
column_offsets.insert(offset);
});
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use databend_common_exception::Result;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;

use crate::executor::PhysicalPlan;
use crate::executor::PhysicalPlanBuilder;
use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
use crate::ColumnBinding;
use crate::ColumnSet;
Expand All @@ -31,12 +32,13 @@ pub struct MaterializedCte {
pub left: Box<PhysicalPlan>,
pub right: Box<PhysicalPlan>,
pub cte_idx: IndexType,
pub left_output_columns: Vec<ColumnBinding>,
pub cte_scan_offset: Vec<usize>,
pub materialized_output_columns: Vec<ColumnBinding>,
}

impl MaterializedCte {
pub fn output_schema(&self) -> Result<DataSchemaRef> {
let fields = self.right.output_schema()?.fields().clone();
let fields = self.left.output_schema()?.fields().clone();
Ok(DataSchemaRefExt::create(fields))
}
}
Expand All @@ -49,36 +51,35 @@ impl PhysicalPlanBuilder {
required: ColumnSet,
) -> Result<PhysicalPlan> {
// 1. Prune unused Columns.
let left_output_column = RelExpr::with_s_expr(s_expr)
.derive_relational_prop_child(0)?
.output_columns
.clone();
let right_used_column = RelExpr::with_s_expr(s_expr)
.derive_relational_prop_child(1)?
.used_columns
.clone();
// Get the intersection of `left_used_column` and `right_used_column`
let left_required = left_output_column
.intersection(&right_used_column)
.cloned()
.collect::<ColumnSet>();
self.cte_output_columns
.insert(cte.cte_idx, cte.materialized_output_columns.clone());
self.cet_used_column_offsets
.insert(cte.cte_idx, HashSet::new());
let left = Box::new(self.build(s_expr.child(0)?, required).await?);

let mut required_output_columns = vec![];
for column in cte.left_output_columns.iter() {
if left_required.contains(&column.index) {
required_output_columns.push(column.clone());
let mut materialize_required = ColumnSet::new();
let mut materialized_output_columns = vec![];
let mut cte_scan_offset = Vec::with_capacity(cte.materialized_output_columns.len());
let used_column_offset = self.cet_used_column_offsets.get(&cte.cte_idx).unwrap();
for (offset, column) in cte.materialized_output_columns.iter().enumerate() {
if used_column_offset.contains(&offset) {
cte_scan_offset.push(materialized_output_columns.len());
materialize_required.insert(column.index);
materialized_output_columns.push(column.clone());
} else {
cte_scan_offset.push(0);
}
}
self.cte_output_columns
.insert(cte.cte_idx, required_output_columns.clone());
let right = Box::new(self.build(s_expr.child(1)?, materialize_required).await?);

// 2. Build physical plan.
Ok(PhysicalPlan::MaterializedCte(MaterializedCte {
plan_id: 0,
left: Box::new(self.build(s_expr.child(0)?, left_required).await?),
right: Box::new(self.build(s_expr.child(1)?, required).await?),
left,
right,
cte_idx: cte.cte_idx,
left_output_columns: required_output_columns,
cte_scan_offset,
materialized_output_columns,
}))
}
}
Loading

0 comments on commit c558b45

Please sign in to comment.