-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement LimitPushDown for ExecutionPlan #9815
Conversation
if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>() { | ||
let input = global_limit.input().as_any(); | ||
if let Some(_) = input.downcast_ref::<CoalescePartitionsExec>() { | ||
return Ok(Transformed::yes(swap_with_coalesce_partition(global_limit))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add more rules then. currently only specify to the repartition.slt test
I got stuck in a plan like this |
Output partition number of the |
CoalescePartitionsExec | ||
--GlobalLimitExec: skip=0, fetch=5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change is not correct. We cannot push down GlobalLimitExec
though CoalescePartitionsExec
. However, we can convert following pattern
GlobalLimitExec: skip=0, fetch=5
--CoalescePartitionsExec
into
GlobalLimitExec: skip=0, fetch=5
--CoalescePartitionsExec
----LocalLimitExec: skip=0, fetch=5
if skip is larger than 0. LocalLimitExec
should still have skip=0
where fetch is skip+global limit fetch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it so the CoalescePartitionExec will also be a pushdown terminator and we just add a new LocalLimitExec below it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand the reason to add an extra LocalLimitExec, I think we could simply add a global fetch when hit the pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, that would be better. We can have fetch support in CoalescePArtitionsExec
@@ -83,6 +83,9 @@ impl CoalesceBatchesExec { | |||
input.execution_mode(), // Execution Mode | |||
) | |||
} | |||
pub fn set_target_batch_size(&mut self, siz: usize) { | |||
self.target_batch_size = siz; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of overwriting target_batch_size
. We can add fetch: Option<usize>
. CoalesceBatchesExec
can emit when hit to this count also. As well as target_batch_size
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
_config: &ConfigOptions, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
// if this node is not a global limit, then directly return | ||
if !is_global_limit(&plan) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extend the rule to pushdown GlobalLimit's which are not at the top of the plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely, this is just a draft to test the exist .slt test
@@ -83,6 +83,9 @@ impl CoalesceBatchesExec { | |||
input.execution_mode(), // Execution Mode | |||
) | |||
} | |||
pub fn set_target_batch_size(&mut self, siz: usize) { | |||
self.target_batch_size = siz; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if the fetch
count is larger than target_batch_size
, this will introduce some incorrect behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll set to max()
impl LimitPushdown {} | ||
fn new_global_limit_with_input() {} | ||
// try to push down current limit, based on the son | ||
fn push_down_limit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we can also set a fetch
count for CoalesceBatchesExec
without changing the plan order. A global fetch count may be carried across the subtree until facing with breaking plan, but I don't know if it would bring more capability. Can there be plans which cannot swap with limit but also do not break the required fetch count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Lordworms -- I took a quick look of this PR
I am probably missing something obvious but I don't understand the need for the pushdown pass in the physical optimizer.
If the usecase is to get a limit closer to StreamingTableExec
then maybe we can pushing the fetch
to the CoalesceBatchesExec
rather than the StreamingTableExec
?
It seems to me that a limit in the StreamingTable exec can likely be implemented more efficiently, and would already be handled by the existing Limit pushdown in the LogicalPlan.
Maybe @berkaysynnada or @mustafasrepo have some more context
@@ -167,6 +167,10 @@ impl ExecutionPlan for GlobalLimitExec { | |||
|
|||
// GlobalLimitExec requires a single input partition | |||
if 1 != self.input.output_partitioning().partition_count() { | |||
println!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this println should be removed
@@ -123,7 +123,7 @@ Limit: skip=0, fetch=5 | |||
physical_plan | |||
GlobalLimitExec: skip=0, fetch=5 | |||
--CoalescePartitionsExec | |||
----CoalesceBatchesExec: target_batch_size=8192 | |||
----CoalesceBatchesExec: target_batch_size=8192 fetch= 5 | |||
------FilterExec: c3@2 > 0 | |||
--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 | |||
----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we also push the limit to the StreamingTableExec
as well?
@@ -216,7 +238,8 @@ impl CoalesceBatchesStream { | |||
match input_batch { | |||
Poll::Ready(x) => match x { | |||
Some(Ok(batch)) => { | |||
if batch.num_rows() >= self.target_batch_size | |||
if (batch.num_rows() >= self.target_batch_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need some tests of this new logic added to repartition exec
|
Thanks @alamb for the feedbacks. @Lordworms's strategy is actually intuitive and reasonable, but maybe we need another way to solve the problem. If I summarize #9792, the problem is when a |
Edit: moved conversation to #9792 (comment) |
No worries at all -- we are all sorting this out together @Lordworms . Thank you for helping push it along |
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
Which issue does this PR close?
Closes #9792
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?