Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a Priority Queue based Aggregation with limit #7192

Merged
merged 6 commits into from
Sep 13, 2023

Conversation

avantgardnerio
Copy link
Contributor

@avantgardnerio avantgardnerio commented Aug 4, 2023

Which issue does this PR close?

Closes #7191.

Rationale for this change

Described in issue.

What changes are included in this PR?

  1. A new GroupedTopKAggregateStream aggregation
  2. A new limit property on AggregateExec
  3. An optimizer rule to copy the limit from the SortExec if applicable

Are these changes tested?

  • AggregateExec now prints lim=X if there's a limit, and I added some tests to assert this
  • unit tests for the Map & Heap
  • need to A/B test with and without this rule enabled
  • need to add criterion benchmarks
  • need to performance optimize until it is at least no slower than the existing approach
  • sqllogictests to compare to existing functionality

Are there any user-facing changes?

  1. Some Top K queries should not crash
  2. I probably broke other things so this is a draft All the existing tests now pass

Notes

Concerns to address:

  • the OwnedRow code is not columnar, vectorized, etc most queries will use a single column
  • use the existing Acculumators? not required since this is only min/max
  • filters are not yet applied unsupported edge case for now
  • NULLs are supported
  • this should be a whole new Exec node, not just a new Stream type?
  • key types other than String now supports String + all primitive keys
  • replace TreeMap with custom index-based heap

Out of scope

  • handle multiple keys & values with OwnedRow
  • performance boost by moving from index-heap to RawPointer heap?

@avantgardnerio avantgardnerio requested a review from alamb August 4, 2023 01:30
@github-actions github-actions bot added the core Core DataFusion crate label Aug 4, 2023
@alamb
Copy link
Contributor

alamb commented Aug 4, 2023

I plan to give this a look later today -- thank you @avantgardnerio

@alamb
Copy link
Contributor

alamb commented Aug 5, 2023

The more I think about this code / approach the more I like it ❤️ -- I spent some time writing up how I think this basic strategy can be applied to all the various TopK type queries at #7198 (comment)

I think my writeup assumes a slightly different packaging / deciding how to invoke this operator, but the basic idea I think is the same.

Thank you for sparking this @avantgardnerio

@github-actions github-actions bot added sql SQL Planner optimizer Optimizer rules substrait labels Aug 6, 2023
@avantgardnerio avantgardnerio force-pushed the bg_aggregate_pushdown branch 2 times, most recently from 222c458 to 8729398 Compare August 6, 2023 16:30
@avantgardnerio
Copy link
Contributor Author

Would anyone be able to provide advice on debugging sql logic tests? This error doesn't seem very informative.. I'd expect to see more of a diff than this:?

+   physical_plan after limit aggregation SAME TEXT AS ABOVE
    physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true
at tests/sqllogictests/test_files/explain.slt:173

error: test failed, to rerun pass `-p datafusion --test sqllogictests`

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Aug 7, 2023
@alamb
Copy link
Contributor

alamb commented Aug 7, 2023

Would anyone be able to provide advice on debugging sql logic tests? This error doesn't seem very informative.. I'd expect to see more of a diff than this:?

The docs are here:
https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/sqllogictests

using

cargo test --test sqllogictests -- --complete

Would likely save you time

I believe that diff says a new line was added to the explain plan (which makes sense if you have added a new optimizer pass)

@avantgardnerio
Copy link
Contributor Author

avantgardnerio commented Aug 9, 2023

TLDR: with the naive, unoptimized version in place, it looks to be 2X slower according to a test with realistic data:

Screenshot 2023-08-09 at 1 58 03 PM

This is based upon the fact that currently, the normal aggregation is running twice or with the rule enabled 1 of each.

GlobalLimitExec: skip=0, fetch=10
  SortPreservingMergeExec: [MAX(traces.timestamp_ms)@1 DESC], fetch=10
    SortExec: fetch=10, expr=[MAX(traces.timestamp_ms)@1 DESC]
      AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp_ms)], lim=[10]
        CoalesceBatchesExec: target_batch_size=8192
          RepartitionExec: partitioning=Hash([trace_id@0], 10), input_partitions=10
            AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp_ms)]
              RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
                MemoryExec: partitions=1, partition_sizes=[1]

I'm not super worried because:

  1. it just validates the concerns we all had about tree balancing and heap allocations
  2. when the new rule runs twice, the second invocation should be on negligible data, so I predict it's be on back to par with the unlimited aggregation

No matter what, this rule is much more memory efficient. I'll pass the limit down the tree and we'll see if I'm right and we match speed.

@avantgardnerio
Copy link
Contributor Author

avantgardnerio commented Aug 9, 2023

We can see it doing the right thing now:

GlobalLimitExec: skip=0, fetch=10
  SortPreservingMergeExec: [MAX(traces.timestamp_ms)@1 DESC], fetch=10
    SortExec: fetch=10, expr=[MAX(traces.timestamp_ms)@1 DESC]
      AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp_ms)], lim=[10]
        CoalesceBatchesExec: target_batch_size=8192
          RepartitionExec: partitioning=Hash([trace_id@0], 10), input_partitions=10
            AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp_ms)], lim=[10]
              MemoryExec: partitions=10, partition_sizes=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
got batch with 8000 rows
emit batch with 10 rows
got batch with 8000 rows
emit batch with 10 rows
got batch with 8000 rows
emit batch with 10 rows
got batch with 8000 rows
emit batch with 10 rows
got batch with 8000 rows
emit batch with 10 rows
got batch with 8000 rows
emit batch with 10 rows
got batch with 8000 rows
emit batch with 10 rows
got batch with 8000 rows
emit batch with 10 rows
got batch with 8000 rows
emit batch with 10 rows
got batch with 8000 rows
emit batch with 10 rows

got batch with 13 rows
emit batch with 10 rows
got batch with 12 rows
emit batch with 10 rows
got batch with 12 rows
emit batch with 10 rows
got batch with 14 rows
emit batch with 10 rows
got batch with 11 rows
emit batch with 10 rows
got batch with 8 rows
emit batch with 8 rows
got batch with 7 rows
emit batch with 7 rows
got batch with 11 rows
emit batch with 10 rows
got batch with 5 rows
emit batch with 5 rows
got batch with 7 rows
emit batch with 7 rows

but very slowly (debug mode is 10x, divide by 10 for release):

+----------------------------------+--------------------------+
| trace_id                         | MAX(traces.timestamp_ms) |
+----------------------------------+--------------------------+
| 2e09ebbb4cb110202e6ee274418eaff9 | 1690937510093            |
| 8c46e3daa65cd6720c1763751ff99f2f | 1690937510093            |
| e1de659ba388107b2ae1b0302d1a933d | 1690937510091            |
| 522d35c60450ac951e320acfdde281a7 | 1690937510091            |
| 998e424750c5cb2e92adea88577cced8 | 1690937510090            |
| d518d3f57375dc9ef79772e7b98ad39d | 1690937510088            |
| e6002e35635bc941cfa1c0b8e24903a5 | 1690937510088            |
| a321a88f60f1836f0900e9f43f59f90d | 1690937510088            |
| 8bbf8ec2eda9821d4463bcc0a760327f | 1690937510088            |
| a998a8f6cce15226c9a927084e3b3c60 | 1690937510088            |
+----------------------------------+--------------------------+
Aggregated 80000 rows in 344.1415ms

Edit: it's almost like there is some high, fixed cost to running this stream 🤔 Welp, at least testing is in place. I'll start tracking down performance issues tomorrow.

@Dandandan
Copy link
Contributor

@avantgardnerio seems best to profile it ATM and see where the most time is spent

@avantgardnerio
Copy link
Contributor Author

Relevant trace:
Screenshot 2023-08-10 at 8 58 00 AM

Not one big expense, just lots of the little ones we all expected.

@avantgardnerio avantgardnerio force-pushed the bg_aggregate_pushdown branch 2 times, most recently from f5f70f0 to dda0c17 Compare August 28, 2023 21:11
@avantgardnerio avantgardnerio mentioned this pull request Aug 30, 2023
Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current PR is looking good to me, I think in a good shape to be merged and to be continued/extended.

I've one small remaining comment about the rand dependency.

@github-actions github-actions bot added the optimizer Optimizer rules label Sep 3, 2023
@alamb
Copy link
Contributor

alamb commented Sep 5, 2023

Let me know if there is anything I can do for this PR -- I think merging the PR and continuing to iterate would be a fine idea, given how long this one has been outstanding and how large it has grown

@avantgardnerio
Copy link
Contributor Author

Let me know if there is anything I can do for this PR -- I think merging the PR and continuing to iterate would be a fine idea, given how long this one has been outstanding and how large it has grown

Thanks, I was waiting for a non-coralogix ✅ since I introduced a bunch of unsafe I didn't want to railroad it in.

@alamb
Copy link
Contributor

alamb commented Sep 5, 2023

I am backed up on reviews as I was off last week. I will try and find time to review this tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @avantgardnerio -- I didn't review the implementation in detail but I skimmed it and it looked solid to me (and I trust that @Dandandan and @thinkharderdev 's attention is sufficient.

I think this PR is almost ready to merge, the only things I think it needs are:

  1. An end to end test for actually limiting the values: https://github.com/apache/arrow-datafusion/pull/7192/files#r1301686217
  2. The follow on work suggested by @ozankabak in https://github.com/apache/arrow-datafusion/pull/7192/files#r1308198186

Also, if someone wanted to change this code in the future, are there benchmarks that would catch any performance regressions?

datafusion/common/src/config.rs Show resolved Hide resolved


query TI
select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think it is important to have an end to end that that actually limits the number of values coming out - as I mentioned here I think this test only has 4 distinct groups and thus a limit 4 doesn't actually do any limiting.

@avantgardnerio
Copy link
Contributor Author

are there benchmarks that would catch any performance regressions

There is a benchmark. I'm not sure... I think the github action fails if that regresses?

@avantgardnerio
Copy link
Contributor Author

limit 4 doesn't actually do any limiting

I added some limit 3 tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate enhancement New feature or request physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Memory is coupled to group by cardinality, even when the aggregate output is truncated by a limit clause
6 participants