Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement LimitPushDown for ExecutionPlan #9815

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions datafusion/core/src/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! The [`LimitPushdown`] The LimitPushdown optimization rule is designed
//! to improve the performance of query execution by pushing the LIMIT clause down
//! through the execution plan as far as possible, ideally directly
//! to the [`CoalesceBatchesExec`]. to reduce target_batch_size This means that instead of processing
//! a large amount of data and then applying the limit at the end,
//! the system tries to limit the amount of data being processed throughout the execution of the query.

use std::sync::Arc;

use crate::physical_optimizer::PhysicalOptimizerRule;

use crate::physical_plan::limit::GlobalLimitExec;
use crate::physical_plan::ExecutionPlan;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;

use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;

#[allow(missing_docs)]
pub struct LimitPushdown {}

impl LimitPushdown {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
impl Default for LimitPushdown {
fn default() -> Self {
Self::new()
}
}
impl PhysicalOptimizerRule for LimitPushdown {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// we traverse the treenode to try to push down the limit same logic as project push down
plan.transform_down(&push_down_limit).data()
}

fn name(&self) -> &str {
"LimitPushdown"
}

fn schema_check(&self) -> bool {
true
}
}
impl LimitPushdown {}
// try to push down current limit, based on the son
fn push_down_limit(
Copy link
Contributor

@berkaysynnada berkaysynnada Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can also set a fetch count for CoalesceBatchesExec without changing the plan order. A global fetch count may be carried across the subtree until facing with breaking plan, but I don't know if it would bring more capability. Can there be plans which cannot swap with limit but also do not break the required fetch count?

plan: Arc<dyn ExecutionPlan>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
// for pattern like GlobalLimit -> CoalescePartitionsExec -> CoalesceBatchesExec , we convert it into
// GlobalLimit->CloalescePartitionExec->CoalesceBatchesExec(new fetch)
if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>() {
let input = global_limit.input().as_any();
if let Some(coalesce_partition_batch) =
input.downcast_ref::<CoalescePartitionsExec>()
{
let new_input = coalesce_partition_batch.input().as_any();
if let Some(coalesce_batch) = new_input.downcast_ref::<CoalesceBatchesExec>()
{
Ok(Transformed::yes(generate_new_limit_pattern(
global_limit,
coalesce_batch,
)))
} else {
Ok(Transformed::no(plan))
}
} else {
Ok(Transformed::no(plan))
}
} else {
Ok(Transformed::no(plan))
}
}
// generate corresponding pattern
fn generate_new_limit_pattern(
limit_exec: &GlobalLimitExec,
coalesce_batch: &CoalesceBatchesExec,
) -> Arc<dyn ExecutionPlan> {
let mut grand_exec = CoalesceBatchesExec::new(
coalesce_batch.input().clone(),
coalesce_batch.target_batch_size(),
);
grand_exec.set_inner_fetch(limit_exec.fetch());
let grand_child = Arc::new(grand_exec);
Arc::new(GlobalLimitExec::new(
Arc::new(CoalescePartitionsExec::new(grand_child)),
limit_exec.skip(),
limit_exec.fetch(),
))
}
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod combine_partial_final_agg;
pub mod enforce_distribution;
pub mod enforce_sorting;
pub mod join_selection;
pub mod limit_pushdown;
pub mod limited_distinct_aggregation;
pub mod optimizer;
pub mod output_requirements;
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/physical_optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::sync::Arc;

use super::limit_pushdown::LimitPushdown;
use super::projection_pushdown::ProjectionPushdown;
use crate::config::ConfigOptions;
use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
Expand Down Expand Up @@ -126,6 +127,11 @@ impl PhysicalOptimizer {
// are not present, the load of executors such as join or union will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
// The LimitPushdown rule tries to pushdown GlobalLimitExec to as close as
// to CoalesceBatchesExec since the target_batch_size for CoalesceBatchesExec may be
// large and if we could pushdown a LimitExec to close to it and reset its target_batch_size
// we could save time for a less size
Arc::new(LimitPushdown::new()),
];

Self::with_rules(rules)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement};

use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::tree_node::PlanContext;

/// This utility function adds a `SortExec` above an operator according to the
/// given ordering requirements while preserving the original partitioning.
pub fn add_sort_above<T: Clone + Default>(
Expand Down
35 changes: 29 additions & 6 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! vectorized processing by upstream operators.

use std::any::Any;

use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -49,6 +50,8 @@ pub struct CoalesceBatchesExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
/// the fetch count
fetch: Option<usize>,
}

impl CoalesceBatchesExec {
Expand All @@ -60,6 +63,7 @@ impl CoalesceBatchesExec {
target_batch_size,
metrics: ExecutionPlanMetricsSet::new(),
cache,
fetch: None,
}
}

Expand All @@ -83,6 +87,9 @@ impl CoalesceBatchesExec {
input.execution_mode(), // Execution Mode
)
}
pub fn set_inner_fetch(&mut self, siz: Option<usize>) {
self.fetch = siz;
}
}

impl DisplayAs for CoalesceBatchesExec {
Expand All @@ -93,11 +100,19 @@ impl DisplayAs for CoalesceBatchesExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"CoalesceBatchesExec: target_batch_size={}",
self.target_batch_size
)
if let Some(fetch) = self.fetch {
write!(
f,
"CoalesceBatchesExec: target_batch_size={} fetch= {}",
self.target_batch_size, fetch
)
} else {
write!(
f,
"CoalesceBatchesExec: target_batch_size={}",
self.target_batch_size
)
}
}
}
}
Expand Down Expand Up @@ -148,6 +163,11 @@ impl ExecutionPlan for CoalesceBatchesExec {
input: self.input.execute(partition, context)?,
schema: self.input.schema(),
target_batch_size: self.target_batch_size,
fetch: if let Some(fetch_cnt) = self.fetch {
fetch_cnt
} else {
usize::MAX
},
buffer: Vec::new(),
buffered_rows: 0,
is_closed: false,
Expand All @@ -171,6 +191,8 @@ struct CoalesceBatchesStream {
schema: SchemaRef,
/// Minimum number of rows for coalesces batches
target_batch_size: usize,
/// fetch count passed by upper LimitExec
fetch: usize,
/// Buffered batches
buffer: Vec<RecordBatch>,
/// Buffered row count
Expand Down Expand Up @@ -216,7 +238,8 @@ impl CoalesceBatchesStream {
match input_batch {
Poll::Ready(x) => match x {
Some(Ok(batch)) => {
if batch.num_rows() >= self.target_batch_size
if (batch.num_rows() >= self.target_batch_size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need some tests of this new logic added to repartition exec

|| batch.num_rows() >= self.fetch)
&& self.buffer.is_empty()
{
return Poll::Ready(Some(Ok(batch)));
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ impl ExecutionPlan for GlobalLimitExec {

// GlobalLimitExec requires a single input partition
if 1 != self.input.output_partitioning().partition_count() {
println!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this println should be removed

"**************** \n partition_count is {:?} \n **************** \n",
self.input.output_partitioning().partition_count()
);
return internal_err!("GlobalLimitExec requires a single input partition");
}

Expand Down
3 changes: 3 additions & 0 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ physical_plan after OutputRequirements CsvExec: file_groups={1 group: [[WORKSPAC
physical_plan after PipelineChecker SAME TEXT AS ABOVE
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after LimitPushdown SAME TEXT AS ABOVE
physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true
physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]

Expand Down Expand Up @@ -312,6 +313,7 @@ GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Co
physical_plan after PipelineChecker SAME TEXT AS ABOVE
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after LimitPushdown SAME TEXT AS ABOVE
physical_plan
GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
Expand Down Expand Up @@ -348,6 +350,7 @@ GlobalLimitExec: skip=0, fetch=10
physical_plan after PipelineChecker SAME TEXT AS ABOVE
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after LimitPushdown SAME TEXT AS ABOVE
physical_plan
GlobalLimitExec: skip=0, fetch=10
--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/repartition.slt
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ Limit: skip=0, fetch=5
physical_plan
GlobalLimitExec: skip=0, fetch=5
--CoalescePartitionsExec
----CoalesceBatchesExec: target_batch_size=8192
----CoalesceBatchesExec: target_batch_size=8192 fetch= 5
------FilterExec: c3@2 > 0
--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we also push the limit to the StreamingTableExec as well?

Loading