Skip to content

Commit

Permalink
Add support for DISTINCT and ORDER BY in ARRAY_AGG
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi authored and gabotechs committed Feb 2, 2025
1 parent 48a28af commit 9400d72
Show file tree
Hide file tree
Showing 6 changed files with 653 additions and 165 deletions.
28 changes: 5 additions & 23 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{
exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError,
FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema,
UnnestOptions,
Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
};
use datafusion_expr_common::type_coercion::binary::type_union_resolution;

Expand Down Expand Up @@ -1518,27 +1517,10 @@ pub fn validate_unique_names<'a>(
/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
if left_plan.schema().fields().len() != right_plan.schema().fields().len() {
return plan_err!(
"UNION queries have different number of columns: \
left has {} columns whereas right has {} columns",
left_plan.schema().fields().len(),
right_plan.schema().fields().len()
);
}

// Temporarily use the schema from the left input and later rely on the analyzer to
// coerce the two schemas into a common one.

// Functional Dependencies doesn't preserve after UNION operation
let schema = (**left_plan.schema()).clone();
let schema =
Arc::new(schema.with_functional_dependencies(FunctionalDependencies::empty())?);

Ok(LogicalPlan::Union(Union {
inputs: vec![Arc::new(left_plan), Arc::new(right_plan)],
schema,
}))
Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
Arc::new(left_plan),
Arc::new(right_plan),
])?))
}

/// Create Projection
Expand Down
115 changes: 107 additions & 8 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,15 +699,13 @@ impl LogicalPlan {
}))
}
LogicalPlan::Union(Union { inputs, schema }) => {
let input_schema = inputs[0].schema();
// If inputs are not pruned do not change schema
// TODO this seems wrong (shouldn't we always use the schema of the input?)
let schema = if schema.fields().len() == input_schema.fields().len() {
Arc::clone(&schema)
let first_input_schema = inputs[0].schema();
if schema.fields().len() == first_input_schema.fields().len() {
// If inputs are not pruned do not change schema
Ok(LogicalPlan::Union(Union { inputs, schema }))
} else {
Arc::clone(input_schema)
};
Ok(LogicalPlan::Union(Union { inputs, schema }))
Ok(LogicalPlan::Union(Union::try_new(inputs)?))
}
}
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
Expand Down Expand Up @@ -2645,6 +2643,107 @@ pub struct Union {
pub schema: DFSchemaRef,
}

impl Union {
/// Constructs new Union instance deriving schema from inputs.
fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
let schema = Self::derive_schema_from_inputs(&inputs, false)?;
Ok(Union { inputs, schema })
}

/// Constructs new Union instance deriving schema from inputs.
/// Inputs do not have to have matching types and produced schema will
/// take type from the first input.
// TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
let schema = Self::derive_schema_from_inputs(&inputs, true)?;
Ok(Union { inputs, schema })
}

/// Constructs new Union instance deriving schema from inputs.
///
/// `loose_types` if true, inputs do not have to have matching types and produced schema will
/// take type from the first input. TODO (<https://github.com/apache/datafusion/issues/14380>) this is not necessarily reasonable behavior.
fn derive_schema_from_inputs(
inputs: &[Arc<LogicalPlan>],
loose_types: bool,
) -> Result<DFSchemaRef> {
if inputs.len() < 2 {
return plan_err!("UNION requires at least two inputs");
}
let first_schema = inputs[0].schema();
let fields_count = first_schema.fields().len();
for input in inputs.iter().skip(1) {
if fields_count != input.schema().fields().len() {
return plan_err!(
"UNION queries have different number of columns: \
left has {} columns whereas right has {} columns",
fields_count,
input.schema().fields().len()
);
}
}

let union_fields = (0..fields_count)
.map(|i| {
let fields = inputs
.iter()
.map(|input| input.schema().field(i))
.collect::<Vec<_>>();
let first_field = fields[0];
let name = first_field.name();
let data_type = if loose_types {
// TODO apply type coercion here, or document why it's better to defer
// temporarily use the data type from the left input and later rely on the analyzer to
// coerce the two schemas into a common one.
first_field.data_type()
} else {
fields.iter().skip(1).try_fold(
first_field.data_type(),
|acc, field| {
if acc != field.data_type() {
return plan_err!(
"UNION field {i} have different type in inputs: \
left has {} whereas right has {}",
first_field.data_type(),
field.data_type()
);
}
Ok(acc)
},
)?
};
let nullable = fields.iter().any(|field| field.is_nullable());
let mut field = Field::new(name, data_type.clone(), nullable);
let field_metadata =
intersect_maps(fields.iter().map(|field| field.metadata()));
field.set_metadata(field_metadata);
// TODO reusing table reference from the first schema is probably wrong
let table_reference = first_schema.qualified_field(i).0.cloned();
Ok((table_reference, Arc::new(field)))
})
.collect::<Result<_>>()?;
let union_schema_metadata =
intersect_maps(inputs.iter().map(|input| input.schema().metadata()));

// Functional Dependencies doesn't preserve after UNION operation
let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
let schema = Arc::new(schema);

Ok(schema)
}
}

fn intersect_maps<'a>(
inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
) -> HashMap<String, String> {
let mut inputs = inputs.into_iter();
let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default();
for input in inputs {
merged.retain(|k, v| input.get(k) == Some(v));
}
merged
}

// Manual implementation needed because of `schema` field. Comparison excludes this field.
impl PartialOrd for Union {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Expand Down
146 changes: 146 additions & 0 deletions datafusion/functions-aggregate-common/src/merge_arrays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,149 @@ pub fn merge_ordered_arrays(

Ok((merged_values, merged_orderings))
}

#[cfg(test)]
mod tests {
use super::*;

use std::collections::VecDeque;
use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array};

use datafusion_common::utils::get_row_at_idx;
use datafusion_common::{Result, ScalarValue};

#[test]
fn test_merge_asc() -> Result<()> {
let lhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
];
let n_row = lhs_arrays[0].len();
let lhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&lhs_arrays, idx))
.collect::<Result<VecDeque<_>>>()?;

let rhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
];
let n_row = rhs_arrays[0].len();
let rhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&rhs_arrays, idx))
.collect::<Result<VecDeque<_>>>()?;
let sort_options = vec![
SortOptions {
descending: false,
nulls_first: false,
},
SortOptions {
descending: false,
nulls_first: false,
},
];

let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
let lhs_vals = (0..lhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
.collect::<Result<VecDeque<_>>>()?;

let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
let rhs_vals = (0..rhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
.collect::<Result<VecDeque<_>>>()?;
let expected =
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef;
let expected_ts = vec![
Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef,
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef,
];

let (merged_vals, merged_ts) = merge_ordered_arrays(
&mut [lhs_vals, rhs_vals],
&mut [lhs_orderings, rhs_orderings],
&sort_options,
)?;
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
let merged_ts = (0..merged_ts[0].len())
.map(|col_idx| {
ScalarValue::iter_to_array(
(0..merged_ts.len())
.map(|row_idx| merged_ts[row_idx][col_idx].clone()),
)
})
.collect::<Result<Vec<_>>>()?;

assert_eq!(&merged_vals, &expected);
assert_eq!(&merged_ts, &expected_ts);

Ok(())
}

#[test]
fn test_merge_desc() -> Result<()> {
let lhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
];
let n_row = lhs_arrays[0].len();
let lhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&lhs_arrays, idx))
.collect::<Result<VecDeque<_>>>()?;

let rhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
];
let n_row = rhs_arrays[0].len();
let rhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&rhs_arrays, idx))
.collect::<Result<VecDeque<_>>>()?;
let sort_options = vec![
SortOptions {
descending: true,
nulls_first: false,
},
SortOptions {
descending: true,
nulls_first: false,
},
];

// Values (which will be merged) doesn't have to be ordered.
let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
let lhs_vals = (0..lhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
.collect::<Result<VecDeque<_>>>()?;

let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
let rhs_vals = (0..rhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
.collect::<Result<VecDeque<_>>>()?;
let expected =
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef;
let expected_ts = vec![
Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef,
Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef,
];
let (merged_vals, merged_ts) = merge_ordered_arrays(
&mut [lhs_vals, rhs_vals],
&mut [lhs_orderings, rhs_orderings],
&sort_options,
)?;
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
let merged_ts = (0..merged_ts[0].len())
.map(|col_idx| {
ScalarValue::iter_to_array(
(0..merged_ts.len())
.map(|row_idx| merged_ts[row_idx][col_idx].clone()),
)
})
.collect::<Result<Vec<_>>>()?;

assert_eq!(&merged_vals, &expected);
assert_eq!(&merged_ts, &expected_ts);
Ok(())
}
}
Loading

0 comments on commit 9400d72

Please sign in to comment.