-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Projection Pushdown in PhysicalPlan #8073
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have collaborated with @berkaysynnada and reviewed this PR carefully over the last week. Almost all the changes are within a new file that implements the rule (projection_pushdown.rs
), so it should be an easy review.
@crepererum implemented something similar to this in IOx -- can you please review this PR as well @crepererum ? Maybe we can contribute some of IOx's tests cases back upstream? |
try_swapping_with_nested_loop_join(projection, nl_join)? | ||
} else if let Some(sm_join) = input.downcast_ref::<SortMergeJoinExec>() { | ||
try_swapping_with_sort_merge_join(projection, sm_join)? | ||
} else if let Some(sym_join) = input.downcast_ref::<SymmetricHashJoinExec>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we make this registry-based so that custom execs could also profit from this pass?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can make a method on ExecutionPlan
and then add the relevant methods to each impl ExecutionPlan
, similar to what I did in #7936
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We like this idea and considered how we can do this, but didn't see an obvious design to follow. Any suggestions on how we can do this? Also, do you think we should get the functionality in first and do the refactor as a follow-on PR, or should we incorporate it in this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could add a function like this to ExecutionPlan:
trait ExecutionPlan {
...
/// Tries to push a projection of the output of this ExecutionPlan
/// *into* itself input by rewriting the internal expressions.
///
/// For example,
/// (TODO EXAMPLE HERE
///
/// If the ExecutionPlan does not support pushdown, ,returns Ok(None) (the default)
fn push_projection(&self, projection: &[(Arc<dyn PhysicalExpr>, String)]) -> Result<Option<Arc<dyn ExecutionPlan>>>) {
Ok(None)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I guess this whole problem is:
How is a set of optimizer passes linked to a set of nodes, while both sets are extendible?
I see the following rough solutions:
A: omniscient optimizer
The optimizer knows all node types. This is what this PR does (and what many other passes do).
This doesn't scale well.
B: omniscient nodes
The nodes know all optimizer passes and implement them themselves. This kinda sounds like what @alamb proposes.
This doesn't scale well.
C: registry-based linking
Developers are aware of the which nodes can be optimized in which way and can fill out gaps in the optimizer-node matrix. The issue is mostly how this registry should be implemented. Rust has a bunch of crates for that that are all not great (due to the issue of the initialization order). Luckily we all know which node types are in a plan (because you could traverse the plan) so we could hook registry initialization in there. Something like:
trait ExecutionPlan {
fn register_hook_for_optimizer_pass(....);
}
D: abstraction
This is what most other optimizer passes do: they read some abstract property of the node (like "schema", "num children", "output ordering", ...) and infer the correct behavior based on that. I think we could use something like this here as well. Namely if you would know what columns of an input schema are used by the node itself and which are just "pass-through", you could apply projection pushdown automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC what @alamb proposes is almost at the boundary of categories B and D. If we can define what the proposed function does purely in terms of projection behavior (which would still have meaning independent from the pushdown rule), we can consider it to be in category D.
We think that category A is not in-line with Datafusion's philosophy, and I think we all agree on this. However, in many cases, category A-type implementations serve as a good stepping stone as we try to gain a good understanding of how a good category C/D design looks like. So, on our end, we typically employ the strategy of getting a good test suite and a baseline category A implementation done first, and then progressively migrate towards a long-term category C/D solution. This PR is one of such first steps 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with a follow-up ticket "make this rule generic" we could accept the solution in this PR, WDYT @alamb ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @berkaysynnada and @ozankabak -- this PR is on my review list but I probably won't get to it until tomorrow
physical_plan | ||
ProjectionExec: expr=[a@0 as a] | ||
--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true | ||
physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST], has_header=true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a nice improvement in plan (it avoids scanning b now)
try_swapping_with_nested_loop_join(projection, nl_join)? | ||
} else if let Some(sm_join) = input.downcast_ref::<SortMergeJoinExec>() { | ||
try_swapping_with_sort_merge_join(projection, sm_join)? | ||
} else if let Some(sym_join) = input.downcast_ref::<SymmetricHashJoinExec>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could add a function like this to ExecutionPlan:
trait ExecutionPlan {
...
/// Tries to push a projection of the output of this ExecutionPlan
/// *into* itself input by rewriting the internal expressions.
///
/// For example,
/// (TODO EXAMPLE HERE
///
/// If the ExecutionPlan does not support pushdown, ,returns Ok(None) (the default)
fn push_projection(&self, projection: &[(Arc<dyn PhysicalExpr>, String)]) -> Result<Option<Arc<dyn ExecutionPlan>>>) {
Ok(None)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @berkaysynnada and @ozankabak -- I took a look at the code and I have some ideas of how to simplify it, but we can do so as a follow on PR.
This is a very nice contribution
@@ -107,6 +108,13 @@ impl PhysicalOptimizer { | |||
// into an `order by max(x) limit y`. In this case it will copy the limit value down | |||
// to the aggregation, allowing it to use only y number of accumulators. | |||
Arc::new(TopKAggregation::new()), | |||
// The ProjectionPushdown rule tries to push projections towards | |||
// the sources in the execution plan. As a result of this process, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// the sources in the execution plan. As a result of this process, | |
// the sources in the execution plan, in addition to the projection pushdown | |
// that happens in the LogicalPlan optimizer. As a result of this process, |
// If the projection does not narrow the the schema, we should not try to push it down: | ||
if projection.expr().len() >= projection.input().schema().fields().len() { | ||
return Ok(None); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is repeated for almost every operator -- it might be possible to pull it into remove_unnecessary_projections
and remove all the duplication here
}, | ||
)) | ||
} | ||
} else if let Some(binary) = expr_any.downcast_ref::<BinaryExpr>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern is basically recursing through the PhysicalExpr tree manually, and only covers some subset of of the nodes.
I tried rewriting it using TreeNode, which is both less code and covers all PhysicalExpr types, not just a subset, and the tests still pass.
I will make a follow on PR with the proposed simplification
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a follow on PR with some proposed simplifications: #8109 |
Which issue does this PR close?
Closes #.
Rationale for this change
Pushing down the
ProjectionExec
is generally beneficial for execution. Hence, whenever feasible and advantageous, we should aim to swap it with its input. This PR introduces a rule for this purpose. While a similar rule exists in the logical planning stage, some cases may emerge for further optimization after some optimizer rules have worked.What changes are included in this PR?
This PR introduces a PhysicalOptimizerRule
ProjectionPushdown
implemented at the final optimization step. The rule initially checks if the operation is aProjectionExec
. If it is, the rule attempts to eliminate it if it's redundant. If not, it examines the input of the projection. Each operator has its own conditions for swapping with a projection. If the conditions are satisfied, the plan tree Projection <-- X <-- Y evolves to X <-- Projection <-- Y. Two projections can always be combined into one, and in some scenarios, projections can be removed from the plan if they can be propagated to the source providers.Are these changes tested?
Yes, unit tests have been added to cover each operator. Additionally, various .slt test changes show successful optimizations. Benchmark results are as follows:
Are there any user-facing changes?