Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add projection to FilterExec #12281

Merged
merged 5 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ pub fn remove_unnecessary_projections(
} else if input.is::<CoalescePartitionsExec>() {
try_swapping_with_coalesce_partitions(projection)?
} else if let Some(filter) = input.downcast_ref::<FilterExec>() {
try_swapping_with_filter(projection, filter)?
try_swapping_with_filter(projection, filter)?.map_or_else(
|| try_embed_projection(projection, filter),
|e| Ok(Some(e)),
)?
} else if let Some(repartition) = input.downcast_ref::<RepartitionExec>() {
try_swapping_with_repartition(projection, repartition)?
} else if let Some(sort) = input.downcast_ref::<SortExec>() {
Expand All @@ -134,7 +137,7 @@ pub fn remove_unnecessary_projections(
try_pushdown_through_union(projection, union)?
} else if let Some(hash_join) = input.downcast_ref::<HashJoinExec>() {
try_pushdown_through_hash_join(projection, hash_join)?.map_or_else(
|| try_embed_to_hash_join(projection, hash_join),
|| try_embed_projection(projection, hash_join),
|e| Ok(Some(e)),
)?
} else if let Some(cross_join) = input.downcast_ref::<CrossJoinExec>() {
Expand Down Expand Up @@ -535,11 +538,27 @@ fn try_pushdown_through_union(
Ok(Some(Arc::new(UnionExec::new(new_children))))
}

trait EmbeddedProjection: ExecutionPlan + Sized {
fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>;
}

impl EmbeddedProjection for HashJoinExec {
fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
self.with_projection(projection)
}
}

impl EmbeddedProjection for FilterExec {
fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
self.with_projection(projection)
}
}

/// Some projection can't be pushed down left input or right input of hash join because filter or on need may need some columns that won't be used in later.
/// By embed those projection to hash join, we can reduce the cost of build_batch_from_indices in hash join (build_batch_from_indices need to can compute::take() for each column) and avoid unnecessary output creation.
fn try_embed_to_hash_join(
fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
projection: &ProjectionExec,
hash_join: &HashJoinExec,
execution_plan: &Exec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
// Collect all column indices from the given projection expressions.
let projection_index = collect_column_indices(projection.expr());
Expand All @@ -549,20 +568,20 @@ fn try_embed_to_hash_join(
};

// If the projection indices is the same as the input columns, we don't need to embed the projection to hash join.
// Check the projection_index is 0..n-1 and the length of projection_index is the same as the length of hash_join schema fields.
// Check the projection_index is 0..n-1 and the length of projection_index is the same as the length of execution_plan schema fields.
if projection_index.len() == projection_index.last().unwrap() + 1
&& projection_index.len() == hash_join.schema().fields().len()
&& projection_index.len() == execution_plan.schema().fields().len()
{
return Ok(None);
}

let new_hash_join =
Arc::new(hash_join.with_projection(Some(projection_index.to_vec()))?);
let new_execution_plan =
Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?);

// Build projection expressions for update_expr. Zip the projection_index with the new_hash_join output schema fields.
// Build projection expressions for update_expr. Zip the projection_index with the new_execution_plan output schema fields.
let embed_project_exprs = projection_index
.iter()
.zip(new_hash_join.schema().fields())
.zip(new_execution_plan.schema().fields())
.map(|(index, field)| {
(
Arc::new(Column::new(field.name(), *index)) as Arc<dyn PhysicalExpr>,
Expand All @@ -583,10 +602,10 @@ fn try_embed_to_hash_join(
// Old projection may contain some alias or expression such as `a + 1` and `CAST('true' AS BOOLEAN)`, but our projection_exprs in hash join just contain column, so we need to create the new projection to keep the original projection.
let new_projection = Arc::new(ProjectionExec::try_new(
new_projection_exprs,
new_hash_join.clone(),
new_execution_plan.clone(),
)?);
if is_projection_removable(&new_projection) {
Ok(Some(new_hash_join))
Ok(Some(new_execution_plan))
} else {
Ok(Some(new_projection))
}
Expand Down
25 changes: 25 additions & 0 deletions datafusion/physical-expr/src/equivalence/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ impl ProjectionMapping {
.map(|map| Self { map })
}

/// Constructs a subset mapping using the provided indices.
///
/// This is used when the output is a subset of the input without any
/// other transformations. The indices are for columns in the schema.
pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A brief docstring would be helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 0c05fb9

let projection_exprs = project_index_to_exprs(indices, schema);
ProjectionMapping::try_new(&projection_exprs, schema)
}

/// Iterate over pairs of (source, target) expressions
pub fn iter(
&self,
Expand Down Expand Up @@ -110,6 +119,22 @@ impl ProjectionMapping {
}
}

fn project_index_to_exprs(
projection_index: &[usize],
schema: &SchemaRef,
) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
projection_index
.iter()
.map(|index| {
let field = schema.field(*index);
(
Arc::new(Column::new(field.name(), *index)) as Arc<dyn PhysicalExpr>,
field.name().to_owned(),
)
})
.collect::<Vec<_>>()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
28 changes: 27 additions & 1 deletion datafusion/physical-expr/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
use std::fmt;
use std::sync::Arc;

use crate::{physical_exprs_equal, EquivalenceProperties, PhysicalExpr};
use crate::{
equivalence::ProjectionMapping, expressions::UnKnownColumn, physical_exprs_equal,
EquivalenceProperties, PhysicalExpr,
};

/// Output partitioning supported by [`ExecutionPlan`]s.
///
Expand Down Expand Up @@ -191,6 +194,29 @@ impl Partitioning {
_ => false,
}
}

/// Calculate the output partitioning after applying the given projection.
pub fn project(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write a docstring here also?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 0c05fb9

&self,
projection_mapping: &ProjectionMapping,
input_eq_properties: &EquivalenceProperties,
) -> Self {
if let Partitioning::Hash(exprs, part) = self {
let normalized_exprs = exprs
.iter()
.map(|expr| {
input_eq_properties
.project_expr(expr, projection_mapping)
.unwrap_or_else(|| {
Arc::new(UnKnownColumn::new(&expr.to_string()))
})
})
.collect();
Partitioning::Hash(normalized_exprs, *part)
} else {
self.clone()
}
}
}

impl PartialEq for Partitioning {
Expand Down
27 changes: 9 additions & 18 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::aggregates::{
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::get_ordered_partition_by_indices;
use crate::{
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, Partitioning,
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode,
SendableRecordBatchStream, Statistics,
};

Expand All @@ -41,7 +41,7 @@ use datafusion_execution::TaskContext;
use datafusion_expr::Accumulator;
use datafusion_physical_expr::{
equivalence::{collapse_lex_req, ProjectionMapping},
expressions::{Column, UnKnownColumn},
expressions::Column,
physical_exprs_contains, EquivalenceProperties, LexOrdering, LexRequirement,
PhysicalExpr, PhysicalSortRequirement,
};
Expand Down Expand Up @@ -567,26 +567,16 @@ impl AggregateExec {
.project(projection_mapping, schema);

// Get output partitioning:
let mut output_partitioning = input.output_partitioning().clone();
if mode.is_first_stage() {
let input_partitioning = input.output_partitioning().clone();
let output_partitioning = if mode.is_first_stage() {
// First stage aggregation will not change the output partitioning,
// but needs to respect aliases (e.g. mapping in the GROUP BY
// expression).
let input_eq_properties = input.equivalence_properties();
if let Partitioning::Hash(exprs, part) = output_partitioning {
let normalized_exprs = exprs
.iter()
.map(|expr| {
input_eq_properties
.project_expr(expr, projection_mapping)
.unwrap_or_else(|| {
Arc::new(UnKnownColumn::new(&expr.to_string()))
})
})
.collect();
output_partitioning = Partitioning::Hash(normalized_exprs, part);
}
}
input_partitioning.project(projection_mapping, input_eq_properties)
} else {
input_partitioning.clone()
};

// Determine execution mode:
let mut exec_mode = input.execution_mode();
Expand Down Expand Up @@ -1233,6 +1223,7 @@ mod tests {
use crate::common::collect;
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::Partitioning;
use futures::{FutureExt, Stream};

// Generate a schema which consists of 5 columns (a, b, c, d, e)
Expand Down
Loading