Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projection Expression - Input Field Inconsistencies during Projection #10088

Merged
merged 8 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
#[derive(Default)]
pub struct AggregateStatistics {}

/// The name of the column corresponding to [`COUNT_STAR_EXPANSION`]
const COUNT_STAR_NAME: &str = "COUNT(*)";

impl AggregateStatistics {
#[allow(missing_docs)]
pub fn new() -> Self {
Expand Down Expand Up @@ -144,7 +141,7 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>>
fn take_optimizable_table_count(
agg_expr: &dyn AggregateExpr,
stats: &Statistics,
) -> Option<(ScalarValue, &'static str)> {
) -> Option<(ScalarValue, String)> {
if let (&Precision::Exact(num_rows), Some(casted_expr)) = (
&stats.num_rows,
agg_expr.as_any().downcast_ref::<expressions::Count>(),
Expand All @@ -158,7 +155,7 @@ fn take_optimizable_table_count(
if lit_expr.value() == &COUNT_STAR_EXPANSION {
return Some((
ScalarValue::Int64(Some(num_rows as i64)),
COUNT_STAR_NAME,
casted_expr.name().to_owned(),
));
}
}
Expand Down Expand Up @@ -427,7 +424,7 @@ pub(crate) mod tests {
/// What name would this aggregate produce in a plan?
fn column_name(&self) -> &'static str {
match self {
Self::CountStar => COUNT_STAR_NAME,
Self::CountStar => "COUNT(*)",
Self::ColumnA(_) => "COUNT(a)",
}
}
Expand Down
31 changes: 23 additions & 8 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ use datafusion_expr::expr::{
WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::expr_vec_fmt;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, RecursiveQuery, ScalarFunctionDefinition,
Expand All @@ -107,6 +108,7 @@ fn create_function_physical_name(
fun: &str,
distinct: bool,
args: &[Expr],
order_by: Option<&Vec<Expr>>,
) -> Result<String> {
let names: Vec<String> = args
.iter()
Expand All @@ -117,7 +119,12 @@ fn create_function_physical_name(
true => "DISTINCT ",
false => "",
};
Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))

let phys_name = format!("{}({}{})", fun, distinct_str, names.join(","));

Ok(order_by
.map(|order_by| format!("{} ORDER BY [{}]", phys_name, expr_vec_fmt!(order_by)))
.unwrap_or(phys_name))
}

fn physical_name(e: &Expr) -> Result<String> {
Expand Down Expand Up @@ -237,22 +244,30 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
return internal_err!("Function `Expr` with name should be resolved.");
}

create_function_physical_name(fun.name(), false, &fun.args)
create_function_physical_name(fun.name(), false, &fun.args, None)
}
Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
create_function_physical_name(&fun.to_string(), false, args)
Expr::WindowFunction(WindowFunction {
fun,
args,
order_by,
..
}) => {
create_function_physical_name(&fun.to_string(), false, args, Some(order_by))
}
Expr::AggregateFunction(AggregateFunction {
func_def,
distinct,
args,
filter,
order_by: _,
order_by,
null_treatment: _,
}) => match func_def {
AggregateFunctionDefinition::BuiltIn(..) => {
create_function_physical_name(func_def.name(), *distinct, args)
}
AggregateFunctionDefinition::BuiltIn(..) => create_function_physical_name(
func_def.name(),
*distinct,
args,
order_by.as_ref(),
),
AggregateFunctionDefinition::UDF(fun) => {
// TODO: Add support for filter by in AggregateUDF
if filter.is_some() {
Expand Down
41 changes: 31 additions & 10 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,9 @@ impl FirstValuePhysicalExpr {
}

pub fn convert_to_last(self) -> LastValuePhysicalExpr {
let name = if self.name.starts_with("FIRST") {
format!("LAST{}", &self.name[5..])
} else {
format!("LAST_VALUE({})", self.expr)
};
let mut name = format!("LAST{}", &self.name[5..]);
replace_order_by_clause(&mut name);

let FirstValuePhysicalExpr {
expr,
input_data_type,
Expand Down Expand Up @@ -583,11 +581,9 @@ impl LastValuePhysicalExpr {
}

pub fn convert_to_first(self) -> FirstValuePhysicalExpr {
let name = if self.name.starts_with("LAST") {
format!("FIRST{}", &self.name[4..])
} else {
format!("FIRST_VALUE({})", self.expr)
};
let mut name = format!("FIRST{}", &self.name[4..]);
replace_order_by_clause(&mut name);

let LastValuePhysicalExpr {
expr,
input_data_type,
Expand Down Expand Up @@ -895,6 +891,31 @@ fn convert_to_sort_cols(
.collect::<Vec<_>>()
}

fn replace_order_by_clause(order_by: &mut String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than doing string manipulation here, maybe we could call create_function_physical_name to just create the right name in the first place 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- create_function_physical_name doesn't have sufficient information (Exprs etc to do this)

I suppose the alternate is to remember the relevant parts of the expression, but that also seems brittle.

I can't think of anything better at the moment

let suffixes = [
(" DESC NULLS FIRST]", " ASC NULLS LAST]"),
(" ASC NULLS FIRST]", " DESC NULLS LAST]"),
(" DESC NULLS LAST]", " ASC NULLS FIRST]"),
(" ASC NULLS LAST]", " DESC NULLS FIRST]"),
];

if let Some(start) = order_by.find("ORDER BY [") {
if let Some(end) = order_by[start..].find(']') {
let order_by_start = start + 9;
let order_by_end = start + end;

let column_order = &order_by[order_by_start..=order_by_end];
for &(suffix, replacement) in &suffixes {
if column_order.ends_with(suffix) {
let new_order = column_order.replace(suffix, replacement);
order_by.replace_range(order_by_start..=order_by_end, &new_order);
break;
}
}
}
}
}

#[cfg(test)]
mod tests {
use arrow::array::Int64Array;
Expand Down
6 changes: 5 additions & 1 deletion datafusion/physical-expr/src/equivalence/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use arrow::datatypes::SchemaRef;

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
use datafusion_common::{internal_err, Result};

use crate::expressions::Column;
use crate::PhysicalExpr;
Expand Down Expand Up @@ -67,6 +67,10 @@ impl ProjectionMapping {
// Conceptually, `source_expr` and `expression` should be the same.
let idx = col.index();
let matching_input_field = input_schema.field(idx);
if col.name() != matching_input_field.name() {
return internal_err!("Input field name {} does not match with the projection expression {}",
matching_input_field.name(),col.name())
}
let matching_input_column =
Column::new(matching_input_field.name(), idx);
Ok(Transformed::yes(Arc::new(matching_input_column)))
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/agg_func_substitute.slt
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ logical_plan
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
Comment on lines +47 to +51
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true

Expand All @@ -64,11 +64,11 @@ logical_plan
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true

Expand Down
12 changes: 6 additions & 6 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]]
02)--TableScan: agg_order projection=[c1, c2, c3]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
04)------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], has_header=true
Expand Down Expand Up @@ -3478,9 +3478,9 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]]
02)--TableScan: convert_first_last_table projection=[c1, c3]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1)]
01)AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1)]
03)----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS LAST]], has_header=true

Expand All @@ -3492,8 +3492,8 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]]
02)--TableScan: convert_first_last_table projection=[c1, c2]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1)]
01)AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1)]
03)----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]], has_header=true
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/distinct_on.slt
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ physical_plan
01)ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@1 as c3, FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@2 as c2]
02)--SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
03)----SortExec: expr=[c1@0 ASC NULLS LAST]
04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true

Expand Down
Loading