Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ORDER BY in AggregateUDF #8984

Closed
Tracked by #8583
alamb opened this issue Jan 24, 2024 · 9 comments
Closed
Tracked by #8583

Support ORDER BY in AggregateUDF #8984

alamb opened this issue Jan 24, 2024 · 9 comments
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jan 24, 2024

Is your feature request related to a problem or challenge?

Some built in aggregates (such as FIRST_VALUE, LAST_VALUE and ARRAY_AGG) support an optional ORDER BY argument that defines the order they see their input. For example:

❯ create table foo(x int, y int) as values (1, 100),(2, 100),(0, 200);
0 rows in set. Query took 0.003 seconds.

-- note the `ORDER BY x` in the argument to `FIRST_VALUE`select FIRST_VALUE(x ORDER BY x) from foo GROUP BY y;
+--------------------+
| FIRST_VALUE(foo.x) |
+--------------------+
| 1                  |
| 0                  |
+--------------------+
2 rows in set. Query took 0.008 seconds.

This is not supported today in user defined aggregates

Describe the solution you'd like

I would like to be be able to create a user defined aggregate that can specify its input order.

This would roughly require:

  1. Extending the AggregateUDFImpl trait to communicate the ordering somehow .
  2. Updating the implementation of https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.AggregateExpr.html#method.order_bys
  3. writing an end to end test in https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/user_defined/user_defined_aggregates.rs showing it all working

Here are some other places that likely need to changed
https://github.com/apache/arrow-datafusion/blob/b5db7187763bc4511aaffdd6d89b2f0908f17938/datafusion/core/src/physical_planner.rs#L242-L252

https://github.com/apache/arrow-datafusion/blob/b5db7187763bc4511aaffdd6d89b2f0908f17938/datafusion/core/src/physical_planner.rs#L1663-L1690

Maybe looking at how OrderSensitiveArrayAgg is implemented can help https://github.com/apache/arrow-datafusion/blob/5d70c32a9a4accf21e9f27ff5ed62666cbbcbe54/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs#L45

Describe alternatives you've considered

No response

Additional context

No response

@alamb
Copy link
Contributor Author

alamb commented Jan 24, 2024

This is a neat project to work on, I think largely it would be an API design question

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jan 25, 2024

Interested in this

@jayzhan211
Copy link
Contributor

I think we also need to update order_by in sql_function_to_expr
https://github.com/apache/arrow-datafusion/blob/8a4bad46540598c6acdf432bde08c2a4c76c5039/datafusion/sql/src/expr/function.rs#L166-L172

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jan 28, 2024

@alamb I'm trying to add ordering_req in AggregateUDFImpl. But LexOrdering is physical_expr, importing it causes cyclic dependency.

    /// Returns the lexical ordering requirements of the aggregate expression.
    fn ordering_req(&self) -> &LexOrdering {
        &self.ordering_req
    }

I'm testing with FirstValueAccumulator which has argument ordering_req: LexOrdering. Both OrderSensitiveArrayAgg and FirstValue includes ordering_req: LexOrdering, so it seems reasonable to me that we can also have LexOrdering for UDF. However, I'm not sure how can I have LexOrdering for AggregateUDFImpl without cyclic dependency.

If ordering_req return with Vec<datafusion_expr::Expr> instead of LexOrdering, then UDF caller needs to convert Expr::Sort to LexOrdering by themself inside their accumulator like FirstValueUDFAccumulator::new(data_type: &DataType, ordering_req: Option<Vec<Expr>>), not sure does it make sense

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jan 29, 2024

It seems that #8793 has the similar issue #8793 (comment). But I'm not sure can I also move trait PhysicalExpr and struct PhysicalSortExpr to datafusion_expr.

@alamb
Copy link
Contributor Author

alamb commented Jan 29, 2024

so it seems reasonable to me that we can also have LexOrdering for UDF. However, I'm not sure how can I have LexOrdering for AggregateUDFImpl without cyclic dependency.

Maybe we can have the AggregateUDF declare its needed ordering in terms of Exprs and then translate to LexOrdering as part of physical planning

Perhaps we can follow the model of

    pub file_sort_order: Vec<Vec<Expr>>,

In ListingOptions: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingOptions.html

@jayzhan211
Copy link
Contributor

jayzhan211 commented Mar 16, 2024

@alamb Do you know the rationale of AccumulatorFactoryFunction, why do we need a closure like argument for accumulator, but not directly passing Accumulator around (with Arc<dyn Accumulator> or Box<dyn Accumulator>)? Can we create Accumulator directly without any given arguments?

For example

        let accumulator: AccumulatorFactoryFunction =
            Arc::new(move |_| Ok(Box::new(Self::new(Arc::clone(&captured_state)))));

        // Directly construct accumulator and pass around it.
        let accumulator = Box::new(Self::new(Arc::clone(&captured_state)));
   or  let accumulator = Arc::new(Self::new(Arc::clone(&captured_state))); 

Rewrite to

    /// Old

    /// Return a new [`Accumulator`] that aggregates values for a specific
    /// group during query execution.
    fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>;

    /// New: no arguments, and replace box with Arc to have share ownership
     fn accumulator(&self) -> Result<Arc<dyn Accumulator>>;

The current AccumulatorFactoryFunction requires given arguments, which is not flexible for arbitrary arguments for UDAF cases. I'm finding the way that do not require given arguments but get the accumulator directly

@alamb
Copy link
Contributor Author

alamb commented Mar 16, 2024

Do you know the rationale of AccumulatorFactoryFunction, why do we need a closure like argument for accumulator, but not directly passing Accumulator around (with Arc or Box)? Can we create Accumulator directly without any given arguments?

One thing that may be related is that the hash aggregator creates an Accumulator per group (not per aggregate function in the query).

And each Accumulator stores state for a particular group (so a single Accumulator instance can't be shared across groups, for example)

@jayzhan211
Copy link
Contributor

Complete with #9874

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants