Skip to content

Commit

Permalink
Refactor optimize projections rule, combines (eliminate, merge, pushd…
Browse files Browse the repository at this point in the history
…own projections) (#8340)
  • Loading branch information
mustafasrepo authored Nov 29, 2023
1 parent e21b031 commit 19bdcdc
Show file tree
Hide file tree
Showing 16 changed files with 1,011 additions and 819 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ async fn csv_explain_verbose_plans() {
// Since the plan contains path that are environmentally
// dependant(e.g. full path of the test file), only verify
// important content
assert_contains!(&actual, "logical_plan after push_down_projection");
assert_contains!(&actual, "logical_plan after optimize_projections");
assert_contains!(&actual, "physical_plan");
assert_contains!(&actual, "FilterExec: c2@1 > 10");
assert_contains!(actual, "ProjectionExec: expr=[c1@0 as c1]");
Expand Down
9 changes: 5 additions & 4 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, DistinctOn, EmptyRelation,
Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, Prepare, Projection, Repartition, Sort, StringifiedPlan,
Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
projection_schema, Aggregate, Analyze, CrossJoin, DescribeTable, Distinct,
DistinctOn, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint,
JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection,
Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
ToStringifiedPlan, Union, Unnest, Values, Window,
};
pub use statement::{
SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd,
Expand Down
74 changes: 49 additions & 25 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,15 +551,9 @@ impl LogicalPlan {
Projection::try_new(projection.expr.to_vec(), Arc::new(inputs[0].clone()))
.map(LogicalPlan::Projection)
}
LogicalPlan::Window(Window {
window_expr,
schema,
..
}) => Ok(LogicalPlan::Window(Window {
input: Arc::new(inputs[0].clone()),
window_expr: window_expr.to_vec(),
schema: schema.clone(),
})),
LogicalPlan::Window(Window { window_expr, .. }) => Ok(LogicalPlan::Window(
Window::try_new(window_expr.to_vec(), Arc::new(inputs[0].clone()))?,
)),
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
Expand Down Expand Up @@ -837,10 +831,19 @@ impl LogicalPlan {
LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
node: e.node.from_template(&expr, inputs),
})),
LogicalPlan::Union(Union { schema, .. }) => Ok(LogicalPlan::Union(Union {
inputs: inputs.iter().cloned().map(Arc::new).collect(),
schema: schema.clone(),
})),
LogicalPlan::Union(Union { schema, .. }) => {
let input_schema = inputs[0].schema();
// If inputs are not pruned do not change schema.
let schema = if schema.fields().len() == input_schema.fields().len() {
schema
} else {
input_schema
};
Ok(LogicalPlan::Union(Union {
inputs: inputs.iter().cloned().map(Arc::new).collect(),
schema: schema.clone(),
}))
}
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
Distinct::All(_) => Distinct::All(Arc::new(inputs[0].clone())),
Expand Down Expand Up @@ -1792,11 +1795,8 @@ pub struct Projection {
impl Projection {
/// Create a new Projection
pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
let schema = Arc::new(DFSchema::new_with_metadata(
exprlist_to_fields(&expr, &input)?,
input.schema().metadata().clone(),
)?);
Self::try_new_with_schema(expr, input, schema)
let projection_schema = projection_schema(&input, &expr)?;
Self::try_new_with_schema(expr, input, projection_schema)
}

/// Create a new Projection using the specified output schema
Expand All @@ -1808,11 +1808,6 @@ impl Projection {
if expr.len() != schema.fields().len() {
return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
}
// Update functional dependencies of `input` according to projection
// expressions:
let id_key_groups = calc_func_dependencies_for_project(&expr, &input)?;
let schema = schema.as_ref().clone();
let schema = Arc::new(schema.with_functional_dependencies(id_key_groups));
Ok(Self {
expr,
input,
Expand All @@ -1836,6 +1831,29 @@ impl Projection {
}
}

/// Computes the schema of the result produced by applying a projection to the input logical plan.
///
/// # Arguments
///
/// * `input`: A reference to the input `LogicalPlan` for which the projection schema
/// will be computed.
/// * `exprs`: A slice of `Expr` expressions representing the projection operation to apply.
///
/// # Returns
///
/// A `Result` containing an `Arc<DFSchema>` representing the schema of the result
/// produced by the projection operation. If the schema computation is successful,
/// the `Result` will contain the schema; otherwise, it will contain an error.
pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
let mut schema = DFSchema::new_with_metadata(
exprlist_to_fields(exprs, input)?,
input.schema().metadata().clone(),
)?;
schema = schema
.with_functional_dependencies(calc_func_dependencies_for_project(exprs, input)?);
Ok(Arc::new(schema))
}

/// Aliased subquery
#[derive(Clone, PartialEq, Eq, Hash)]
// mark non_exhaustive to encourage use of try_new/new()
Expand Down Expand Up @@ -1934,8 +1952,7 @@ impl Window {
/// Create a new window operator.
pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
let mut window_fields: Vec<DFField> = input.schema().fields().clone();
window_fields
.extend_from_slice(&exprlist_to_fields(window_expr.iter(), input.as_ref())?);
window_fields.extend_from_slice(&exprlist_to_fields(window_expr.iter(), &input)?);
let metadata = input.schema().metadata().clone();

// Update functional dependencies for window:
Expand Down Expand Up @@ -2357,6 +2374,13 @@ impl Aggregate {
schema,
})
}

/// Get the length of the group by expression in the output schema
/// This is not simply group by expression length. Expression may be
/// GroupingSet, etc. In these case we need to get inner expression lengths.
pub fn group_expr_len(&self) -> Result<usize> {
grouping_set_expr_count(&self.group_expr)
}
}

/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
Expand Down
94 changes: 0 additions & 94 deletions datafusion/optimizer/src/eliminate_project.rs

This file was deleted.

2 changes: 1 addition & 1 deletion datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ pub mod eliminate_limit;
pub mod eliminate_nested_union;
pub mod eliminate_one_union;
pub mod eliminate_outer_join;
pub mod eliminate_project;
pub mod extract_equijoin_predicate;
pub mod filter_null_join_keys;
pub mod merge_projection;
pub mod optimize_projections;
pub mod optimizer;
pub mod propagate_empty_relation;
pub mod push_down_filter;
Expand Down
106 changes: 5 additions & 101 deletions datafusion/optimizer/src/merge_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,105 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use crate::optimizer::ApplyOrder;
use crate::push_down_filter::replace_cols_by_name;
use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::Result;
use datafusion_expr::{Expr, LogicalPlan, Projection};

/// Optimization rule that merge [LogicalPlan::Projection].
#[derive(Default)]
pub struct MergeProjection;

impl MergeProjection {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl OptimizerRule for MergeProjection {
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Projection(parent_projection) => {
match parent_projection.input.as_ref() {
LogicalPlan::Projection(child_projection) => {
let new_plan =
merge_projection(parent_projection, child_projection)?;
Ok(Some(
self.try_optimize(&new_plan, _config)?.unwrap_or(new_plan),
))
}
_ => Ok(None),
}
}
_ => Ok(None),
}
}

fn name(&self) -> &str {
"merge_projection"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}
}

pub(super) fn merge_projection(
parent_projection: &Projection,
child_projection: &Projection,
) -> Result<LogicalPlan> {
let replace_map = collect_projection_expr(child_projection);
let new_exprs = parent_projection
.expr
.iter()
.map(|expr| replace_cols_by_name(expr.clone(), &replace_map))
.enumerate()
.map(|(i, e)| match e {
Ok(e) => {
let parent_expr = parent_projection.schema.fields()[i].qualified_name();
e.alias_if_changed(parent_expr)
}
Err(e) => Err(e),
})
.collect::<Result<Vec<_>>>()?;
// Use try_new, since schema changes with changing expressions.
let new_plan = LogicalPlan::Projection(Projection::try_new(
new_exprs,
child_projection.input.clone(),
)?);
Ok(new_plan)
}

pub fn collect_projection_expr(projection: &Projection) -> HashMap<String, Expr> {
projection
.schema
.fields()
.iter()
.enumerate()
.flat_map(|(i, field)| {
// strip alias
let expr = projection.expr[i].clone().unalias();
// Convert both qualified and unqualified fields
[
(field.name().clone(), expr.clone()),
(field.qualified_name(), expr),
]
})
.collect::<HashMap<_, _>>()
}

#[cfg(test)]
mod tests {
use crate::merge_projection::MergeProjection;
use crate::optimize_projections::OptimizeProjections;
use datafusion_common::Result;
use datafusion_expr::{
binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, LogicalPlan,
Expand All @@ -124,7 +28,7 @@ mod tests {
use crate::test::*;

fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> {
assert_optimized_plan_eq(Arc::new(MergeProjection::new()), plan, expected)
assert_optimized_plan_eq(Arc::new(OptimizeProjections::new()), plan, expected)
}

#[test]
Expand All @@ -136,7 +40,7 @@ mod tests {
.build()?;

let expected = "Projection: Int32(1) + test.a\
\n TableScan: test";
\n TableScan: test projection=[a]";
assert_optimized_plan_equal(&plan, expected)
}

Expand All @@ -150,7 +54,7 @@ mod tests {
.build()?;

let expected = "Projection: Int32(1) + test.a\
\n TableScan: test";
\n TableScan: test projection=[a]";
assert_optimized_plan_equal(&plan, expected)
}

Expand All @@ -163,7 +67,7 @@ mod tests {
.build()?;

let expected = "Projection: test.a AS alias\
\n TableScan: test";
\n TableScan: test projection=[a]";
assert_optimized_plan_equal(&plan, expected)
}
}
Loading

0 comments on commit 19bdcdc

Please sign in to comment.