From acc63fc6eb6e92551e7e6960292246496da4c531 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 30 Sep 2024 18:44:45 -0700 Subject: [PATCH 1/2] test(12687): reproducer of missing metadata bug --- .../sqllogictest/test_files/metadata.slt | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/datafusion/sqllogictest/test_files/metadata.slt b/datafusion/sqllogictest/test_files/metadata.slt index 3b2b219244f5..0f505a7d7b53 100644 --- a/datafusion/sqllogictest/test_files/metadata.slt +++ b/datafusion/sqllogictest/test_files/metadata.slt @@ -58,5 +58,39 @@ WHERE "data"."id" = "samples"."id"; 1 3 + + +# Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687 +statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +select count(distinct name) from table_with_metadata; + +# Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687 +statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +select approx_median(distinct id) from table_with_metadata; + +# Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687 +statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +select array_agg(distinct id) from table_with_metadata; + +query I +select distinct id from table_with_metadata order by id; +---- +1 +3 +NULL + +query I +select count(id) from table_with_metadata; +---- +2 + +query I +select count(id) cnt from table_with_metadata group by name order by cnt; +---- +0 +1 +1 + + statement ok drop table table_with_metadata; From 4c5f37a049bce8de4c0f492277d31d98a7e205d2 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 30 Sep 2024 18:51:34 -0700 Subject: [PATCH 2/2] fix(12687): minimum change needed to fix the missing metadata --- .../physical-plan/src/aggregates/mod.rs | 25 ++++++++++++------- datafusion/physical-plan/src/projection.rs | 2 +- .../sqllogictest/test_files/metadata.slt | 10 +++++--- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 2bdaed479655..9466ff6dd459 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -26,6 +26,7 @@ use crate::aggregates::{ topk_stream::GroupedTopKAggregateStream, }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use crate::projection::get_field_metadata; use crate::windows::get_ordered_partition_by_indices; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, @@ -795,14 +796,17 @@ fn create_schema( ) -> Result { let mut fields = Vec::with_capacity(group_expr.len() + aggr_expr.len()); for (index, (expr, name)) in group_expr.iter().enumerate() { - fields.push(Field::new( - name, - expr.data_type(input_schema)?, - // In cases where we have multiple grouping sets, we will use NULL expressions in - // order to align the grouping sets. So the field must be nullable even if the underlying - // schema field is not. - group_expr_nullable[index] || expr.nullable(input_schema)?, - )) + fields.push( + Field::new( + name, + expr.data_type(input_schema)?, + // In cases where we have multiple grouping sets, we will use NULL expressions in + // order to align the grouping sets. So the field must be nullable even if the underlying + // schema field is not. + group_expr_nullable[index] || expr.nullable(input_schema)?, + ) + .with_metadata(get_field_metadata(expr, input_schema).unwrap_or_default()), + ) } match mode { @@ -823,7 +827,10 @@ fn create_schema( } } - Ok(Schema::new(fields)) + Ok(Schema::new_with_metadata( + fields, + input_schema.metadata().clone(), + )) } fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index f1b9cdaf728f..4c889d1fc88c 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -237,7 +237,7 @@ impl ExecutionPlan for ProjectionExec { /// If e is a direct column reference, returns the field level /// metadata for that field, if any. Otherwise returns None -fn get_field_metadata( +pub(crate) fn get_field_metadata( e: &Arc, input_schema: &Schema, ) -> Option> { diff --git a/datafusion/sqllogictest/test_files/metadata.slt b/datafusion/sqllogictest/test_files/metadata.slt index 0f505a7d7b53..f38281abc5ab 100644 --- a/datafusion/sqllogictest/test_files/metadata.slt +++ b/datafusion/sqllogictest/test_files/metadata.slt @@ -61,15 +61,19 @@ WHERE "data"."id" = "samples"."id"; # Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687 -statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +query I select count(distinct name) from table_with_metadata; +---- +2 # Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687 -statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +query I select approx_median(distinct id) from table_with_metadata; +---- +2 # Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687 -statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +statement ok select array_agg(distinct id) from table_with_metadata; query I