Skip to content

Commit

Permalink
Projection Expression - Input Field Inconsistencies during Projection (
Browse files Browse the repository at this point in the history
…apache#10088)

* agg fixes

* test updates

* fixing count mismatch

* Update aggregate_statistics.rs

* catch different names

* minor
  • Loading branch information
berkaysynnada authored and ccciudatu committed Apr 26, 2024
1 parent 6b2e999 commit 63f113e
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
#[derive(Default)]
pub struct AggregateStatistics {}

/// The name of the column corresponding to [`COUNT_STAR_EXPANSION`]
const COUNT_STAR_NAME: &str = "COUNT(*)";

impl AggregateStatistics {
#[allow(missing_docs)]
pub fn new() -> Self {
Expand Down Expand Up @@ -144,7 +141,7 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>>
fn take_optimizable_table_count(
agg_expr: &dyn AggregateExpr,
stats: &Statistics,
) -> Option<(ScalarValue, &'static str)> {
) -> Option<(ScalarValue, String)> {
if let (&Precision::Exact(num_rows), Some(casted_expr)) = (
&stats.num_rows,
agg_expr.as_any().downcast_ref::<expressions::Count>(),
Expand All @@ -158,7 +155,7 @@ fn take_optimizable_table_count(
if lit_expr.value() == &COUNT_STAR_EXPANSION {
return Some((
ScalarValue::Int64(Some(num_rows as i64)),
COUNT_STAR_NAME,
casted_expr.name().to_owned(),
));
}
}
Expand Down Expand Up @@ -427,7 +424,7 @@ pub(crate) mod tests {
/// What name would this aggregate produce in a plan?
fn column_name(&self) -> &'static str {
match self {
Self::CountStar => COUNT_STAR_NAME,
Self::CountStar => "COUNT(*)",
Self::ColumnA(_) => "COUNT(a)",
}
}
Expand Down
31 changes: 23 additions & 8 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ use datafusion_expr::expr::{
WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::expr_vec_fmt;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery,
Expand All @@ -108,6 +109,7 @@ fn create_function_physical_name(
fun: &str,
distinct: bool,
args: &[Expr],
order_by: Option<&Vec<Expr>>,
) -> Result<String> {
let names: Vec<String> = args
.iter()
Expand All @@ -118,7 +120,12 @@ fn create_function_physical_name(
true => "DISTINCT ",
false => "",
};
Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))

let phys_name = format!("{}({}{})", fun, distinct_str, names.join(","));

Ok(order_by
.map(|order_by| format!("{} ORDER BY [{}]", phys_name, expr_vec_fmt!(order_by)))
.unwrap_or(phys_name))
}

fn physical_name(e: &Expr) -> Result<String> {
Expand Down Expand Up @@ -238,22 +245,30 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
return internal_err!("Function `Expr` with name should be resolved.");
}

create_function_physical_name(fun.name(), false, &fun.args)
create_function_physical_name(fun.name(), false, &fun.args, None)
}
Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
create_function_physical_name(&fun.to_string(), false, args)
Expr::WindowFunction(WindowFunction {
fun,
args,
order_by,
..
}) => {
create_function_physical_name(&fun.to_string(), false, args, Some(order_by))
}
Expr::AggregateFunction(AggregateFunction {
func_def,
distinct,
args,
filter,
order_by: _,
order_by,
null_treatment: _,
}) => match func_def {
AggregateFunctionDefinition::BuiltIn(..) => {
create_function_physical_name(func_def.name(), *distinct, args)
}
AggregateFunctionDefinition::BuiltIn(..) => create_function_physical_name(
func_def.name(),
*distinct,
args,
order_by.as_ref(),
),
AggregateFunctionDefinition::UDF(fun) => {
// TODO: Add support for filter by in AggregateUDF
if filter.is_some() {
Expand Down
41 changes: 31 additions & 10 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,11 +415,9 @@ impl FirstValuePhysicalExpr {
}

pub fn convert_to_last(self) -> LastValuePhysicalExpr {
let name = if self.name.starts_with("FIRST") {
format!("LAST{}", &self.name[5..])
} else {
format!("LAST_VALUE({})", self.expr)
};
let mut name = format!("LAST{}", &self.name[5..]);
replace_order_by_clause(&mut name);

let FirstValuePhysicalExpr {
expr,
input_data_type,
Expand Down Expand Up @@ -593,11 +591,9 @@ impl LastValuePhysicalExpr {
}

pub fn convert_to_first(self) -> FirstValuePhysicalExpr {
let name = if self.name.starts_with("LAST") {
format!("FIRST{}", &self.name[4..])
} else {
format!("FIRST_VALUE({})", self.expr)
};
let mut name = format!("FIRST{}", &self.name[4..]);
replace_order_by_clause(&mut name);

let LastValuePhysicalExpr {
expr,
input_data_type,
Expand Down Expand Up @@ -905,6 +901,31 @@ fn convert_to_sort_cols(
.collect::<Vec<_>>()
}

fn replace_order_by_clause(order_by: &mut String) {
let suffixes = [
(" DESC NULLS FIRST]", " ASC NULLS LAST]"),
(" ASC NULLS FIRST]", " DESC NULLS LAST]"),
(" DESC NULLS LAST]", " ASC NULLS FIRST]"),
(" ASC NULLS LAST]", " DESC NULLS FIRST]"),
];

if let Some(start) = order_by.find("ORDER BY [") {
if let Some(end) = order_by[start..].find(']') {
let order_by_start = start + 9;
let order_by_end = start + end;

let column_order = &order_by[order_by_start..=order_by_end];
for &(suffix, replacement) in &suffixes {
if column_order.ends_with(suffix) {
let new_order = column_order.replace(suffix, replacement);
order_by.replace_range(order_by_start..=order_by_end, &new_order);
break;
}
}
}
}
}

#[cfg(test)]
mod tests {
use arrow::array::Int64Array;
Expand Down
6 changes: 5 additions & 1 deletion datafusion/physical-expr/src/equivalence/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use arrow::datatypes::SchemaRef;

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
use datafusion_common::{internal_err, Result};

use crate::expressions::Column;
use crate::PhysicalExpr;
Expand Down Expand Up @@ -67,6 +67,10 @@ impl ProjectionMapping {
// Conceptually, `source_expr` and `expression` should be the same.
let idx = col.index();
let matching_input_field = input_schema.field(idx);
if col.name() != matching_input_field.name() {
return internal_err!("Input field name {} does not match with the projection expression {}",
matching_input_field.name(),col.name())
}
let matching_input_column =
Column::new(matching_input_field.name(), idx);
Ok(Transformed::yes(Arc::new(matching_input_column)))
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/agg_func_substitute.slt
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ logical_plan
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true

Expand All @@ -64,11 +64,11 @@ logical_plan
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true

Expand Down
12 changes: 6 additions & 6 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]]
02)--TableScan: agg_order projection=[c1, c2, c3]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
04)------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], has_header=true
Expand Down Expand Up @@ -3520,9 +3520,9 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]]
02)--TableScan: convert_first_last_table projection=[c1, c3]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1)]
01)AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1)]
03)----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS LAST]], has_header=true

Expand All @@ -3534,8 +3534,8 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]]
02)--TableScan: convert_first_last_table projection=[c1, c2]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1)]
01)AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1)]
03)----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]], has_header=true
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/distinct_on.slt
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ physical_plan
01)ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@1 as c3, FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@2 as c2]
02)--SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
03)----SortExec: expr=[c1@0 ASC NULLS LAST]
04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true

Expand Down
Loading

0 comments on commit 63f113e

Please sign in to comment.