diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index c7cff3ac26b11..b0c28e1455252 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -54,8 +54,7 @@ use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ exec_err, get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, - FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema, - UnnestOptions, + Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, }; use datafusion_expr_common::type_coercion::binary::type_union_resolution; @@ -1518,27 +1517,10 @@ pub fn validate_unique_names<'a>( /// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union /// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result { - if left_plan.schema().fields().len() != right_plan.schema().fields().len() { - return plan_err!( - "UNION queries have different number of columns: \ - left has {} columns whereas right has {} columns", - left_plan.schema().fields().len(), - right_plan.schema().fields().len() - ); - } - - // Temporarily use the schema from the left input and later rely on the analyzer to - // coerce the two schemas into a common one. - - // Functional Dependencies doesn't preserve after UNION operation - let schema = (**left_plan.schema()).clone(); - let schema = - Arc::new(schema.with_functional_dependencies(FunctionalDependencies::empty())?); - - Ok(LogicalPlan::Union(Union { - inputs: vec![Arc::new(left_plan), Arc::new(right_plan)], - schema, - })) + Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![ + Arc::new(left_plan), + Arc::new(right_plan), + ])?)) } /// Create Projection diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 7e9c0cb75ec82..446ae94108b1c 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -699,15 +699,13 @@ impl LogicalPlan { })) } LogicalPlan::Union(Union { inputs, schema }) => { - let input_schema = inputs[0].schema(); - // If inputs are not pruned do not change schema - // TODO this seems wrong (shouldn't we always use the schema of the input?) - let schema = if schema.fields().len() == input_schema.fields().len() { - Arc::clone(&schema) + let first_input_schema = inputs[0].schema(); + if schema.fields().len() == first_input_schema.fields().len() { + // If inputs are not pruned do not change schema + Ok(LogicalPlan::Union(Union { inputs, schema })) } else { - Arc::clone(input_schema) - }; - Ok(LogicalPlan::Union(Union { inputs, schema })) + Ok(LogicalPlan::Union(Union::try_new(inputs)?)) + } } LogicalPlan::Distinct(distinct) => { let distinct = match distinct { @@ -2645,6 +2643,107 @@ pub struct Union { pub schema: DFSchemaRef, } +impl Union { + /// Constructs new Union instance deriving schema from inputs. + fn try_new(inputs: Vec>) -> Result { + let schema = Self::derive_schema_from_inputs(&inputs, false)?; + Ok(Union { inputs, schema }) + } + + /// Constructs new Union instance deriving schema from inputs. + /// Inputs do not have to have matching types and produced schema will + /// take type from the first input. + // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all. + pub fn try_new_with_loose_types(inputs: Vec>) -> Result { + let schema = Self::derive_schema_from_inputs(&inputs, true)?; + Ok(Union { inputs, schema }) + } + + /// Constructs new Union instance deriving schema from inputs. + /// + /// `loose_types` if true, inputs do not have to have matching types and produced schema will + /// take type from the first input. TODO () this is not necessarily reasonable behavior. + fn derive_schema_from_inputs( + inputs: &[Arc], + loose_types: bool, + ) -> Result { + if inputs.len() < 2 { + return plan_err!("UNION requires at least two inputs"); + } + let first_schema = inputs[0].schema(); + let fields_count = first_schema.fields().len(); + for input in inputs.iter().skip(1) { + if fields_count != input.schema().fields().len() { + return plan_err!( + "UNION queries have different number of columns: \ + left has {} columns whereas right has {} columns", + fields_count, + input.schema().fields().len() + ); + } + } + + let union_fields = (0..fields_count) + .map(|i| { + let fields = inputs + .iter() + .map(|input| input.schema().field(i)) + .collect::>(); + let first_field = fields[0]; + let name = first_field.name(); + let data_type = if loose_types { + // TODO apply type coercion here, or document why it's better to defer + // temporarily use the data type from the left input and later rely on the analyzer to + // coerce the two schemas into a common one. + first_field.data_type() + } else { + fields.iter().skip(1).try_fold( + first_field.data_type(), + |acc, field| { + if acc != field.data_type() { + return plan_err!( + "UNION field {i} have different type in inputs: \ + left has {} whereas right has {}", + first_field.data_type(), + field.data_type() + ); + } + Ok(acc) + }, + )? + }; + let nullable = fields.iter().any(|field| field.is_nullable()); + let mut field = Field::new(name, data_type.clone(), nullable); + let field_metadata = + intersect_maps(fields.iter().map(|field| field.metadata())); + field.set_metadata(field_metadata); + // TODO reusing table reference from the first schema is probably wrong + let table_reference = first_schema.qualified_field(i).0.cloned(); + Ok((table_reference, Arc::new(field))) + }) + .collect::>()?; + let union_schema_metadata = + intersect_maps(inputs.iter().map(|input| input.schema().metadata())); + + // Functional Dependencies doesn't preserve after UNION operation + let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?; + let schema = Arc::new(schema); + + Ok(schema) + } +} + +fn intersect_maps<'a>( + inputs: impl IntoIterator>, +) -> HashMap { + let mut inputs = inputs.into_iter(); + let mut merged: HashMap = inputs.next().cloned().unwrap_or_default(); + for input in inputs { + merged.retain(|k, v| input.get(k) == Some(v)); + } + merged +} + // Manual implementation needed because of `schema` field. Comparison excludes this field. impl PartialOrd for Union { fn partial_cmp(&self, other: &Self) -> Option { diff --git a/datafusion/functions-aggregate-common/src/merge_arrays.rs b/datafusion/functions-aggregate-common/src/merge_arrays.rs index 9b9a1240c1a19..0cfea662497e1 100644 --- a/datafusion/functions-aggregate-common/src/merge_arrays.rs +++ b/datafusion/functions-aggregate-common/src/merge_arrays.rs @@ -193,3 +193,149 @@ pub fn merge_ordered_arrays( Ok((merged_values, merged_orderings)) } + +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::VecDeque; + use std::sync::Arc; + + use arrow::array::{ArrayRef, Int64Array}; + + use datafusion_common::utils::get_row_at_idx; + use datafusion_common::{Result, ScalarValue}; + + #[test] + fn test_merge_asc() -> Result<()> { + let lhs_arrays: Vec = vec![ + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), + Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), + ]; + let n_row = lhs_arrays[0].len(); + let lhs_orderings = (0..n_row) + .map(|idx| get_row_at_idx(&lhs_arrays, idx)) + .collect::>>()?; + + let rhs_arrays: Vec = vec![ + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), + Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), + ]; + let n_row = rhs_arrays[0].len(); + let rhs_orderings = (0..n_row) + .map(|idx| get_row_at_idx(&rhs_arrays, idx)) + .collect::>>()?; + let sort_options = vec![ + SortOptions { + descending: false, + nulls_first: false, + }, + SortOptions { + descending: false, + nulls_first: false, + }, + ]; + + let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; + let lhs_vals = (0..lhs_vals_arr.len()) + .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) + .collect::>>()?; + + let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; + let rhs_vals = (0..rhs_vals_arr.len()) + .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) + .collect::>>()?; + let expected = + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef; + let expected_ts = vec![ + Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef, + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef, + ]; + + let (merged_vals, merged_ts) = merge_ordered_arrays( + &mut [lhs_vals, rhs_vals], + &mut [lhs_orderings, rhs_orderings], + &sort_options, + )?; + let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; + let merged_ts = (0..merged_ts[0].len()) + .map(|col_idx| { + ScalarValue::iter_to_array( + (0..merged_ts.len()) + .map(|row_idx| merged_ts[row_idx][col_idx].clone()), + ) + }) + .collect::>>()?; + + assert_eq!(&merged_vals, &expected); + assert_eq!(&merged_ts, &expected_ts); + + Ok(()) + } + + #[test] + fn test_merge_desc() -> Result<()> { + let lhs_arrays: Vec = vec![ + Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), + Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), + ]; + let n_row = lhs_arrays[0].len(); + let lhs_orderings = (0..n_row) + .map(|idx| get_row_at_idx(&lhs_arrays, idx)) + .collect::>>()?; + + let rhs_arrays: Vec = vec![ + Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), + Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), + ]; + let n_row = rhs_arrays[0].len(); + let rhs_orderings = (0..n_row) + .map(|idx| get_row_at_idx(&rhs_arrays, idx)) + .collect::>>()?; + let sort_options = vec![ + SortOptions { + descending: true, + nulls_first: false, + }, + SortOptions { + descending: true, + nulls_first: false, + }, + ]; + + // Values (which will be merged) doesn't have to be ordered. + let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; + let lhs_vals = (0..lhs_vals_arr.len()) + .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) + .collect::>>()?; + + let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; + let rhs_vals = (0..rhs_vals_arr.len()) + .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) + .collect::>>()?; + let expected = + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef; + let expected_ts = vec![ + Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef, + Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef, + ]; + let (merged_vals, merged_ts) = merge_ordered_arrays( + &mut [lhs_vals, rhs_vals], + &mut [lhs_orderings, rhs_orderings], + &sort_options, + )?; + let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; + let merged_ts = (0..merged_ts[0].len()) + .map(|col_idx| { + ScalarValue::iter_to_array( + (0..merged_ts.len()) + .map(|row_idx| merged_ts[row_idx][col_idx].clone()), + ) + }) + .collect::>>()?; + + assert_eq!(&merged_vals, &expected); + assert_eq!(&merged_ts, &expected_ts); + Ok(()) + } +} diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 9fff05999122e..16a8acfe90b8b 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -19,8 +19,9 @@ use arrow::array::{new_empty_array, Array, ArrayRef, AsArray, ListArray, StructArray}; use arrow::datatypes::DataType; +use std::cmp::Ordering; -use arrow_schema::{Field, Fields}; +use arrow_schema::{Field, Fields, SortOptions}; use datafusion_common::cast::as_list_array; use datafusion_common::utils::{get_row_at_idx, SingleRowListArrayBuilder}; use datafusion_common::{exec_err, ScalarValue}; @@ -132,7 +133,32 @@ impl AggregateUDFImpl for ArrayAgg { let data_type = acc_args.exprs[0].data_type(acc_args.schema)?; if acc_args.is_distinct { - return Ok(Box::new(DistinctArrayAggAccumulator::try_new(&data_type)?)); + // Limitation similar to Postgres. The aggregation function can only mix + // DISTINCT and ORDER BY if all the expressions in the ORDER BY appear + // also in the arguments of the function. For example: + // + // ARRAY_AGG(DISTINCT col) + // + // can only be mixed with an ORDER BY if the order expression is "col". + // + // ARRAY_AGG(DISTINCT col ORDER BY col) <- Valid + // ARRAY_AGG(DISTINCT concat(col, '') ORDER BY concat(col, '')) <- Valid + // ARRAY_AGG(DISTINCT col ORDER BY other_col) <- Invalid + // ARRAY_AGG(DISTINCT col ORDER BY concat(col, '')) <- Invalid + if acc_args.ordering_req.len() > 1 { + return exec_err!("In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list"); + } + let mut sort_option: Option = None; + if let Some(order) = acc_args.ordering_req.first() { + if !order.expr.eq(&acc_args.exprs[0]) { + return exec_err!("In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list"); + } + sort_option = Some(order.options) + } + return Ok(Box::new(DistinctArrayAggAccumulator::try_new( + &data_type, + sort_option, + )?)); } if acc_args.ordering_req.is_empty() { @@ -322,13 +348,18 @@ impl Accumulator for ArrayAggAccumulator { struct DistinctArrayAggAccumulator { values: HashSet, datatype: DataType, + sort_options: Option, } impl DistinctArrayAggAccumulator { - pub fn try_new(datatype: &DataType) -> Result { + pub fn try_new( + datatype: &DataType, + sort_options: Option, + ) -> Result { Ok(Self { values: HashSet::new(), datatype: datatype.clone(), + sort_options, }) } } @@ -339,8 +370,8 @@ impl Accumulator for DistinctArrayAggAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.len() != 1 { - return internal_err!("expects single batch"); + if values.is_empty() { + return Ok(()); } let array = &values[0]; @@ -370,10 +401,32 @@ impl Accumulator for DistinctArrayAggAccumulator { } fn evaluate(&mut self) -> Result { - let values: Vec = self.values.iter().cloned().collect(); + let mut values: Vec = self.values.iter().cloned().collect(); if values.is_empty() { return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1)); } + + if let Some(opts) = self.sort_options { + values.sort_by(|a, b| { + if a.is_null() { + return match opts.nulls_first { + true => Ordering::Less, + false => Ordering::Greater, + }; + } + if b.is_null() { + return match opts.nulls_first { + true => Ordering::Greater, + false => Ordering::Less, + }; + } + match opts.descending { + true => b.partial_cmp(a).unwrap_or(Ordering::Equal), + false => a.partial_cmp(b).unwrap_or(Ordering::Equal), + } + }); + }; + let arr = ScalarValue::new_list(&values, &self.datatype, true); Ok(ScalarValue::List(arr)) } @@ -383,6 +436,8 @@ impl Accumulator for DistinctArrayAggAccumulator { - size_of_val(&self.values) + self.datatype.size() - size_of_val(&self.datatype) + - size_of_val(&self.sort_options) + + size_of::>() } } @@ -599,146 +654,321 @@ impl OrderSensitiveArrayAggAccumulator { #[cfg(test)] mod tests { use super::*; - - use std::collections::VecDeque; + use arrow_schema::{Field, FieldRef, Fields, Schema, SortOptions}; + use datafusion_common::cast::as_generic_string_array; + use datafusion_common::internal_err; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use std::sync::Arc; - use arrow::array::Int64Array; - use arrow_schema::SortOptions; + #[test] + fn no_duplicates_no_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?; + + acc1.update_batch(&[data(["a", "b", "c"])])?; + acc2.update_batch(&[data(["d", "e", "f"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); - use datafusion_common::utils::get_row_at_idx; - use datafusion_common::{Result, ScalarValue}; + assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]); + + Ok(()) + } #[test] - fn test_merge_asc() -> Result<()> { - let lhs_arrays: Vec = vec![ - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), - Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), - ]; - let n_row = lhs_arrays[0].len(); - let lhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&lhs_arrays, idx)) - .collect::>>()?; - - let rhs_arrays: Vec = vec![ - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), - Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), - ]; - let n_row = rhs_arrays[0].len(); - let rhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&rhs_arrays, idx)) - .collect::>>()?; - let sort_options = vec![ - SortOptions { - descending: false, - nulls_first: false, - }, - SortOptions { - descending: false, - nulls_first: false, - }, - ]; - - let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; - let lhs_vals = (0..lhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) - .collect::>>()?; - - let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; - let rhs_vals = (0..rhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) - .collect::>>()?; - let expected = - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef; - let expected_ts = vec![ - Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef, - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef, - ]; - - let (merged_vals, merged_ts) = merge_ordered_arrays( - &mut [lhs_vals, rhs_vals], - &mut [lhs_orderings, rhs_orderings], - &sort_options, - )?; - let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; - let merged_ts = (0..merged_ts[0].len()) - .map(|col_idx| { - ScalarValue::iter_to_array( - (0..merged_ts.len()) - .map(|row_idx| merged_ts[row_idx][col_idx].clone()), - ) - }) - .collect::>>()?; + fn no_duplicates_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[data(["a", "b", "c"])])?; + acc2.update_batch(&[data(["d", "e", "f"])])?; + acc1 = merge(acc1, acc2)?; - assert_eq!(&merged_vals, &expected); - assert_eq!(&merged_ts, &expected_ts); + let mut result = print_nulls(str_arr(acc1.evaluate()?)?); + result.sort(); + + assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]); Ok(()) } #[test] - fn test_merge_desc() -> Result<()> { - let lhs_arrays: Vec = vec![ - Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), - Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), - ]; - let n_row = lhs_arrays[0].len(); - let lhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&lhs_arrays, idx)) - .collect::>>()?; - - let rhs_arrays: Vec = vec![ - Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), - Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), - ]; - let n_row = rhs_arrays[0].len(); - let rhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&rhs_arrays, idx)) - .collect::>>()?; - let sort_options = vec![ - SortOptions { - descending: true, - nulls_first: false, - }, - SortOptions { - descending: true, - nulls_first: false, - }, - ]; - - // Values (which will be merged) doesn't have to be ordered. - let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; - let lhs_vals = (0..lhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) - .collect::>>()?; - - let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; - let rhs_vals = (0..rhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) - .collect::>>()?; - let expected = - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef; - let expected_ts = vec![ - Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef, - Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef, - ]; - let (merged_vals, merged_ts) = merge_ordered_arrays( - &mut [lhs_vals, rhs_vals], - &mut [lhs_orderings, rhs_orderings], - &sort_options, - )?; - let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; - let merged_ts = (0..merged_ts[0].len()) - .map(|col_idx| { - ScalarValue::iter_to_array( - (0..merged_ts.len()) - .map(|row_idx| merged_ts[row_idx][col_idx].clone()), - ) - }) - .collect::>>()?; + fn duplicates_no_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?; + + acc1.update_batch(&[data(["a", "b", "c"])])?; + acc2.update_batch(&[data(["a", "b", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]); - assert_eq!(&merged_vals, &expected); - assert_eq!(&merged_ts, &expected_ts); Ok(()) } + + #[test] + fn duplicates_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[data(["a", "b", "c"])])?; + acc2.update_batch(&[data(["a", "b", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let mut result = print_nulls(str_arr(acc1.evaluate()?)?); + result.sort(); + + assert_eq!(result, vec!["a", "b", "c"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_asc() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(false, false)) + .build_two()?; + + acc1.update_batch(&[data(["e", "b", "d"])])?; + acc2.update_batch(&[data(["f", "a", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_desc() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(true, false)) + .build_two()?; + + acc1.update_batch(&[data(["e", "b", "d"])])?; + acc2.update_batch(&[data(["f", "a", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]); + + Ok(()) + } + + #[test] + fn duplicates_distinct_sort_asc() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(false, false)) + .build_two()?; + + acc1.update_batch(&[data(["a", "c", "b"])])?; + acc2.update_batch(&[data(["b", "c", "a"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "c"]); + + Ok(()) + } + + #[test] + fn duplicates_distinct_sort_desc() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(true, false)) + .build_two()?; + + acc1.update_batch(&[data(["a", "c", "b"])])?; + acc2.update_batch(&[data(["b", "c", "a"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["c", "b", "a"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(false, true)) + .build_two()?; + + acc1.update_batch(&[data([Some("e"), Some("b"), None])])?; + acc2.update_batch(&[data([Some("f"), Some("a"), None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(false, false)) + .build_two()?; + + acc1.update_batch(&[data([Some("e"), Some("b"), None])])?; + acc2.update_batch(&[data([Some("f"), Some("a"), None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(true, true)) + .build_two()?; + + acc1.update_batch(&[data([Some("e"), Some("b"), None])])?; + acc2.update_batch(&[data([Some("f"), Some("a"), None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(true, false)) + .build_two()?; + + acc1.update_batch(&[data([Some("e"), Some("b"), None])])?; + acc2.update_batch(&[data([Some("f"), Some("a"), None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]); + + Ok(()) + } + + struct ArrayAggAccumulatorBuilder { + data_type: DataType, + distinct: bool, + ordering: LexOrdering, + schema: Schema, + } + + impl ArrayAggAccumulatorBuilder { + fn string() -> Self { + Self::new(DataType::Utf8) + } + + fn new(data_type: DataType) -> Self { + Self { + data_type: data_type.clone(), + distinct: Default::default(), + ordering: Default::default(), + schema: Schema { + fields: Fields::from(vec![Field::new( + "col", + DataType::List(FieldRef::new(Field::new( + "item", data_type, true, + ))), + true, + )]), + metadata: Default::default(), + }, + } + } + + fn distinct(mut self) -> Self { + self.distinct = true; + self + } + + fn order_by_col(mut self, col: &str, sort_options: SortOptions) -> Self { + self.ordering.extend([PhysicalSortExpr::new( + Arc::new( + Column::new_with_schema(col, &self.schema) + .expect("column not available in schema"), + ), + sort_options, + )]); + self + } + + fn build(&self) -> Result> { + ArrayAgg::default().accumulator(AccumulatorArgs { + return_type: &self.data_type, + schema: &self.schema, + ignore_nulls: false, + ordering_req: &self.ordering, + is_reversed: false, + name: "", + is_distinct: self.distinct, + exprs: &[Arc::new(Column::new("col", 0))], + }) + } + + fn build_two(&self) -> Result<(Box, Box)> { + Ok((self.build()?, self.build()?)) + } + } + + fn str_arr(value: ScalarValue) -> Result>> { + let ScalarValue::List(list) = value else { + return internal_err!("ScalarValue was not a List"); + }; + Ok(as_generic_string_array::(list.values())? + .iter() + .map(|v| v.map(|v| v.to_string())) + .collect()) + } + + fn print_nulls(sort: Vec>) -> Vec { + sort.into_iter() + .map(|v| v.unwrap_or("NULL".to_string())) + .collect() + } + + fn data(list: [T; N]) -> ArrayRef + where + ScalarValue: From, + { + let values: Vec<_> = list.into_iter().map(ScalarValue::from).collect(); + ScalarValue::iter_to_array(values).expect("Cannot convert to array") + } + + fn merge( + mut acc1: Box, + mut acc2: Box, + ) -> Result> { + let intermediate_state = acc2.state().and_then(|e| { + e.iter() + .map(|v| v.to_array()) + .collect::>>() + })?; + acc1.merge_batch(&intermediate_state)?; + Ok(acc1) + } } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index c8c544785d6de..306863e42986e 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -176,6 +176,17 @@ FROM ---- [0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm, 0keZ5G8BffGwgF2RwQD59TFzMStxCB, 0og6hSkhbX8AC1ktFS4kounvTzy8Vo, 1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO, 2T3wSlHdEmASmO0xcXHnndkKEt6bz8] +# array agg can use order by with distinct +query ? +SELECT array_agg(DISTINCT c13 ORDER BY c13) +FROM + (SELECT * + FROM aggregate_test_100 + ORDER BY c13 + LIMIT 5) as t1 +---- +[0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm, 0keZ5G8BffGwgF2RwQD59TFzMStxCB, 0og6hSkhbX8AC1ktFS4kounvTzy8Vo, 1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO, 2T3wSlHdEmASmO0xcXHnndkKEt6bz8] + statement ok CREATE EXTERNAL TABLE agg_order ( c1 INT NOT NULL, @@ -234,6 +245,11 @@ select column1, nth_value(column3, 2 order by column2, column4 desc) from array_ b [4, 5, 6] w [9, 5, 2] +query ? +select array_agg(DISTINCT column2 order by column2 desc) from array_agg_order_list_table; +---- +[2, 1] + statement ok drop table array_agg_order_list_table; diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 352c01ca295c9..cbd19bf3806fe 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -836,3 +836,18 @@ physical_plan # Clean up after the test statement ok drop table aggregate_test_100; + +# test for https://github.com/apache/datafusion/issues/14352 +query TB rowsort +SELECT + a, + a IS NOT NULL +FROM ( + -- second column, even though it's not selected, was necessary to reproduce the bug linked above + SELECT 'foo' AS a, 3 AS b + UNION ALL + SELECT NULL AS a, 4 AS b +) +---- +NULL false +foo true