Skip to content

Commit

Permalink
change to pattern match
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Mar 31, 2024
1 parent a2d6410 commit 9b43649
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 86 deletions.
76 changes: 28 additions & 48 deletions datafusion/core/src/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
use std::sync::Arc;

use crate::physical_optimizer::utils::is_global_limit;
use crate::physical_optimizer::PhysicalOptimizerRule;

use crate::physical_plan::limit::GlobalLimitExec;
Expand All @@ -35,8 +34,7 @@ use datafusion_common::Result;

use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;

use super::utils::is_limit_terminator;
use datafusion_physical_plan::limit;

#[allow(missing_docs)]
pub struct LimitPushdown {}
Expand All @@ -54,10 +52,6 @@ impl PhysicalOptimizerRule for LimitPushdown {
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// if this node is not a global limit, then directly return
if !is_global_limit(&plan) {
return Ok(plan);
}
// we traverse the treenode to try to push down the limit same logic as project push down
plan.transform_down(&push_down_limit).data()
}
Expand All @@ -71,62 +65,48 @@ impl PhysicalOptimizerRule for LimitPushdown {
}
}
impl LimitPushdown {}
fn new_global_limit_with_input() {}
// try to push down current limit, based on the son
fn push_down_limit(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
// for pattern like GlobalLimit -> CoalescePartitionsExec -> CoalesceBatchesExec , we convert it into
// GlobalLimit->CloalescePartitionExec->CoalesceBatchesExec(new fetch)
if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>() {
let input = global_limit.input().as_any();
if let Some(_) = input.downcast_ref::<CoalescePartitionsExec>() {
return Ok(Transformed::yes(swap_with_coalesce_partition(global_limit)));
} else if let Some(coalesce_batches) = input.downcast_ref::<CoalesceBatchesExec>()
if let Some(coalesce_partition_batch) =
input.downcast_ref::<CoalescePartitionsExec>()
{
return Ok(Transformed::yes(reset_and_get_new_limit(
global_limit,
coalesce_batches,
)));
} else if is_limit_terminator(global_limit.input()) {
return Ok(Transformed::no(plan));
let new_input = coalesce_partition_batch.input().as_any();
if let Some(coalesce_batch) = new_input.downcast_ref::<CoalesceBatchesExec>()
{
return Ok(Transformed::yes(generate_new_limit_pattern(
global_limit,
coalesce_batch,
)));
} else {
return Ok(Transformed::no(plan));
}
} else {
return Ok(Transformed::no(plan));
}
} else {
return Ok(Transformed::no(plan));
}
}
// swap the coalesce_patition exec with current limit
fn swap_with_coalesce_partition(plan: &GlobalLimitExec) -> Arc<dyn ExecutionPlan> {
Arc::new(CoalescePartitionsExec::new(make_with_child(
plan,
&plan.input().children()[0],
)))
}

// create a new node with its child
fn make_with_child(
plan: &GlobalLimitExec,
child: &Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(GlobalLimitExec::new(
child.clone(),
plan.skip(),
plan.fetch(),
))
}

// reset target size and return a new plan
fn reset_and_get_new_limit(
plan: &GlobalLimitExec,
child: &CoalesceBatchesExec,
// generate corresponding pattern
fn generate_new_limit_pattern(
limit_exec: &GlobalLimitExec,
coalesce_batch: &CoalesceBatchesExec,
) -> Arc<dyn ExecutionPlan> {
let fetch = match plan.fetch() {
Some(0) | None => child.target_batch_size(),
Some(fetch_size) => fetch_size,
};
let mut grand_exec = CoalesceBatchesExec::new(
coalesce_batch.input().clone(),
coalesce_batch.target_batch_size(),
);
grand_exec.set_inner_fetch(limit_exec.fetch());
let grand_child = Arc::new(grand_exec);
Arc::new(GlobalLimitExec::new(
Arc::new(CoalesceBatchesExec::new(child.input().clone(), fetch)),
plan.skip(),
plan.fetch(),
Arc::new(CoalescePartitionsExec::new(grand_child)),
limit_exec.skip(),
limit_exec.fetch(),
))
}
26 changes: 0 additions & 26 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,29 +117,3 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<RepartitionExec>()
}

/// Check whether the given operator is a [`GlobalLimitExec`].
pub fn is_global_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<GlobalLimitExec>()
}
/// Check whether the given plan is a terminator of [`GlobalLimitExec`].
pub fn is_limit_terminator(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<HashJoinExec>()
|| plan.as_any().is::<CrossJoinExec>()
|| plan.as_any().is::<MemoryExec>()
|| plan.as_any().is::<NestedLoopJoinExec>()
|| plan.as_any().is::<UnionExec>()
|| plan.as_any().is::<MemoryExec>()
|| plan.as_any().is::<PartialSortExec>()
|| plan.as_any().is::<ArrowExec>()
|| plan.as_any().is::<AvroExec>()
|| plan.as_any().is::<CsvExec>()
|| plan.as_any().is::<NdJsonExec>()
|| plan.as_any().is::<StatisticsExec>()
|| plan.as_any().is::<PlaceholderRowExec>()
|| plan.as_any().is::<RecursiveQueryExec>()
|| plan.as_any().is::<StreamingTableExec>()
|| plan.as_any().is::<InterleaveExec>()
|| plan.as_any().is::<UnnestExec>()
|| plan.as_any().is::<WindowAggExec>()
}
38 changes: 29 additions & 9 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! vectorized processing by upstream operators.
use std::any::Any;

use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -49,6 +50,8 @@ pub struct CoalesceBatchesExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
/// the fetch count
fetch: Option<usize>,
}

impl CoalesceBatchesExec {
Expand All @@ -60,6 +63,7 @@ impl CoalesceBatchesExec {
target_batch_size,
metrics: ExecutionPlanMetricsSet::new(),
cache,
fetch: None,
}
}

Expand All @@ -83,8 +87,8 @@ impl CoalesceBatchesExec {
input.execution_mode(), // Execution Mode
)
}
pub fn set_target_batch_size(&mut self, siz: usize) {
self.target_batch_size = siz;
pub fn set_inner_fetch(&mut self, siz: Option<usize>) {
self.fetch = siz;
}
}

Expand All @@ -96,11 +100,19 @@ impl DisplayAs for CoalesceBatchesExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"CoalesceBatchesExec: target_batch_size={}",
self.target_batch_size
)
if let Some(fetch) = self.fetch {
write!(
f,
"CoalesceBatchesExec: target_batch_size={} fetch= {}",
self.target_batch_size, fetch
)
} else {
write!(
f,
"CoalesceBatchesExec: target_batch_size={}",
self.target_batch_size
)
}
}
}
}
Expand Down Expand Up @@ -147,6 +159,11 @@ impl ExecutionPlan for CoalesceBatchesExec {
input: self.input.execute(partition, context)?,
schema: self.input.schema(),
target_batch_size: self.target_batch_size,
fetch: if let Some(fetch_cnt) = self.fetch {
fetch_cnt
} else {
usize::MAX
},
buffer: Vec::new(),
buffered_rows: 0,
is_closed: false,
Expand All @@ -170,6 +187,8 @@ struct CoalesceBatchesStream {
schema: SchemaRef,
/// Minimum number of rows for coalesces batches
target_batch_size: usize,
/// fetch count passed by upper LimitExec
fetch: usize,
/// Buffered batches
buffer: Vec<RecordBatch>,
/// Buffered row count
Expand Down Expand Up @@ -215,8 +234,9 @@ impl CoalesceBatchesStream {
match input_batch {
Poll::Ready(x) => match x {
Some(Ok(batch)) => {
if batch.num_rows() >= self.target_batch_size
&& self.buffer.is_empty()
if (batch.num_rows() >= self.target_batch_size
|| batch.num_rows() >= self.fetch)
|| self.buffer.is_empty()
{
return Poll::Ready(Some(Ok(batch)));
} else if batch.num_rows() == 0 {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sqllogictest/test_files/repartition.slt
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ Limit: skip=0, fetch=5
--Filter: sink_table.c3 > Int16(0)
----TableScan: sink_table projection=[c1, c2, c3]
physical_plan
CoalescePartitionsExec
--GlobalLimitExec: skip=0, fetch=5
----CoalesceBatchesExec: target_batch_size=5
GlobalLimitExec: skip=0, fetch=5
--CoalescePartitionsExec
----CoalesceBatchesExec: target_batch_size=8192 fetch= 5
------FilterExec: c3@2 > 0
--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

0 comments on commit 9b43649

Please sign in to comment.