Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add fetch to CoalescePartitionsExec #14499

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

mertak-synnada
Copy link
Contributor

Which issue does this PR close?

Closes #14446.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Feb 5, 2025
@@ -5032,18 +5032,17 @@ logical_plan
03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[min(aggregate_test_100.c1)]]
04)------TableScan: aggregate_test_100 projection=[c1, c3]
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that SKIP is missing.

Copy link
Contributor

@zhuqi-lucas zhuqi-lucas Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the SKIP setting is not supported for all with_fetch operators, for example:

SortPreservingMergeExec also support fetch but not support setting skip, it's default to 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We default to use skip=0 for the SortPreservingMergeExec:

 1 => match self.fetch {
                Some(fetch) => {
                    let stream = self.input.execute(0, context)?;
                    debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input with {fetch}");
                    Ok(Box::pin(LimitStream::new(
                        stream,
                        0,
                        Some(fetch),
                        BaselineMetrics::new(&self.metrics, partition),
                    )))
                }
impl LimitStream {
    pub fn new(
        input: SendableRecordBatchStream,
        skip: usize,
        fetch: Option<usize>,
        baseline_metrics: BaselineMetrics,
    ) -> Self {
        let schema = input.schema();
        Self {
            skip,
            fetch: fetch.unwrap_or(usize::MAX),
            input: Some(input),
            schema,
            baseline_metrics,
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as @zhuqi-lucas mentioned, only Limit operators support skip, and limit_pushdown is adding a Limit operator if skip exists, so this is only affecting plans without skip.

Here's the query result with skip:

query TT
EXPLAIN SELECT DISTINCT c3, min(c1) FROM aggregate_test_100 group by c3 limit 5 offset 3;
----
logical_plan
01)Limit: skip=3, fetch=5
02)--Aggregate: groupBy=[[aggregate_test_100.c3, min(aggregate_test_100.c1)]], aggr=[[]]
03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[min(aggregate_test_100.c1)]]
04)------TableScan: aggregate_test_100 projection=[c1, c3]
physical_plan
01)GlobalLimitExec: skip=3, fetch=5
02)--CoalescePartitionsExec: fetch=8
03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[8]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[8]
07)------------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[min(aggregate_test_100.c1)]
08)--------------CoalesceBatchesExec: target_batch_size=8192
09)----------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
10)------------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)]
11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
12)----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true

@@ -472,6 +498,7 @@ impl Stream for ObservedStream {
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.inner.poll_next_unpin(cx);
let poll = self.limit_reached(poll);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

let Some(fetch) = self.fetch else { return poll };

May be we can move the following to above, only call limit_reached when

let Some(fetch) = self.fetch {
    poll = self.limit_reached(poll);
}

Copy link
Contributor

@zhuqi-lucas zhuqi-lucas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate optimizer Optimizer rules physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add CoalescePartitionsExec fetch (limit) support
3 participants