Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add benchmark for memory-limited aggregation #13090

Merged
merged 2 commits into from
Oct 26, 2024

Conversation

2010YOUY01
Copy link
Contributor

@2010YOUY01 2010YOUY01 commented Oct 24, 2024

Which issue does this PR close?

Closes #7571

Rationale for this change

Adding benchmark for external aggregation.

The benchmark queries are simple aggregation on the TPCH lineitem table. For instance, SELECT DISTINCT l_orderkey FROM lineitem.

I think using lineitem table is suitable to benchmark memory-limited aggregation due to

  1. It's easy to choose different aggregation cardinality (figure is from DuckDB external aggregation paper)
    image
    Also we already have TPCH data generator and we can change scaling factor later
  2. Memory stress can be changed by adding more aggregates to SELECT clause (e.g. query select max(c1), max(c2)... max(c100) has larger memory pressure because it have to store large intermediates for many aggregation columns)

This PR only sets up the benchmark framework and adds two simple queries:
Q1: SELECT DISTINCT l_orderkey FROM lineitem;
Q2: SELECT DISTINCT l_orderkey, l_suppkey FROM lineitem;
And run with pre-defined memory limits (For example, Q1 requires 36MiB memory to run, the benchmark will run Q1 with 64, 32 and 16 MiB)

For now, it's not able to select more aggregation columns or set smaller memory limits (query fails), due to a known issue #13089
Once it's fixed, we can update the benchmark to run more diverse memory-limited aggregation workloads

What changes are included in this PR?

  1. One new binary program to run the benchmark
  2. Update benchmark script bench.rs for it

TODO:

  • Update benchmark README

Are these changes tested?

I tested locally

  1. Execute benchmark binary to run all benchmark queries and memory limits
    (under arrow-datafusion/benchmarks)
    cargo run --bin external_aggr -- benchmark -n 4 --iterations 5 -p '/Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1' -o '/tmp/aggr.json'
Benchmark Result
Q1(64.0 MB) iteration 0 took 1288.4 ms and returned 1 rows
Q1(64.0 MB) iteration 1 took 1193.3 ms and returned 1 rows
Q1(64.0 MB) iteration 2 took 1176.4 ms and returned 1 rows
Q1(64.0 MB) avg time: 1219.37 ms
Q1(32.0 MB) iteration 0 took 2166.6 ms and returned 1 rows
Q1(32.0 MB) iteration 1 took 2145.1 ms and returned 1 rows
Q1(32.0 MB) iteration 2 took 2129.6 ms and returned 1 rows
Q1(32.0 MB) avg time: 2147.09 ms
Q1(16.0 MB) iteration 0 took 2024.5 ms and returned 1 rows
Q1(16.0 MB) iteration 1 took 1952.5 ms and returned 1 rows
Q1(16.0 MB) iteration 2 took 2069.7 ms and returned 1 rows
Q1(16.0 MB) avg time: 2015.55 ms
Q2(512.0 MB) iteration 0 took 2453.9 ms and returned 1 rows
Q2(512.0 MB) iteration 1 took 2506.9 ms and returned 1 rows
Q2(512.0 MB) iteration 2 took 2507.0 ms and returned 1 rows
Q2(512.0 MB) avg time: 2489.25 ms
......
  1. Execute benchmark binary to run a single query with the provided memory limit
cargo run --bin external_aggr -- benchmark -n 4 --iterations 3 -p '/Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1' -o '/tmp/aggr.json' --qu
ery 1 --memory-limit 30M
Q1(30.0 MB) iteration 0 took 2034.1 ms and returned 1 rows
Q1(30.0 MB) iteration 1 took 1722.9 ms and returned 1 rows
Q1(30.0 MB) iteration 2 took 1875.0 ms and returned 1 rows
Q1(30.0 MB) avg time: 1877.37 ms
  1. Run all with bench.sh script
# under 'arrow-datafusion/benchmarks'
./bench.sh data tpch # This benchmark uses TPCH lineitem table (parquet format only)
./bench.sh run external_aggr
./bench.sh compare main main

Result:

--------------------
Benchmark external_aggr.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃     main ┃     main ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━┩
│ Q1(64.0 MB)  │ 127.07ms │ 127.07ms │ no change │
│ Q1(32.0 MB)  │ 159.59ms │ 159.59ms │ no change │
│ Q1(16.0 MB)  │ 131.80ms │ 131.80ms │ no change │
│ Q2(512.0 MB) │ 308.05ms │ 308.05ms │ no change │
│ Q2(256.0 MB) │ 671.76ms │ 671.76ms │ no change │
│ Q2(128.0 MB) │ 655.00ms │ 655.00ms │ no change │
│ Q2(64.0 MB)  │ 608.16ms │ 608.16ms │ no change │
│ Q2(32.0 MB)  │ 535.54ms │ 535.54ms │ no change │
└──────────────┴──────────┴──────────┴───────────┘

Are there any user-facing changes?

No

@github-actions github-actions bot added the execution Related to the execution crate label Oct 24, 2024
Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @2010YOUY01 -- I agree with @Dandandan -- very nice 👌

I also plan to read the linked paper -- 🤓

Would it be possible to add a description of this benchmark here: https://github.com/apache/datafusion/tree/main/benchmarks#benchmarks (perhaps just adapt the very nice description on this PR?)

Also, are you interested in improving DataFusion's external aggregation capabilities? I think it is a non trivial gap at the moment and would be great to improve (and I would be interested in helping do so).

if you are, I can start organizing the work into some tickets to see if we can get some others to check it out too

I also ran it locally and it worked great ✅

benchmarks/bench.sh Show resolved Hide resolved

# Only parquet is supported
# External aggregation is not stable yet, set partitions to 4 to make sure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what isn't stable about it? Like it uses too many file handles or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the comment to be more specific, just to ensure this benchmark can run successfully on different machines

    # Since per-operator memory limit is calculated as (total-memory-limit / 
    # number-of-partitions), and by default `--partitions` is set to number of
    # CPU cores, we set a constant number of partitions to prevent this 
    # benchmark to fail on some machines.

Moreover, now memory-limited queries fail quite easily due to record_batch.get_array_memory_size() over-count the memory usage #13089, once it's fixed we can set default partitions larger

const AGGR_QUERIES: [&'static str; 2] = [
// Q1: Output size is ~25% of lineitem table
r#"
SELECT count(*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another type of query that is oten run is the classic "top 10" type that shows up in clickbench a lot:

Something like

> select l_orderkey, avg(l_extendedprice) as a, avg(l_discount) as d from 'lineitem' GROUP BY l_orderkey ORDER BY a DESC limit 10;
+------------+---------------+----------+
| l_orderkey | a             | d        |
+------------+---------------+----------+
| 3811460    | 104899.500000 | 0.050000 |
| 1744195    | 104649.500000 | 0.090000 |
| 5151266    | 104449.500000 | 0.000000 |
| 4571042    | 104399.500000 | 0.090000 |
| 1198304    | 104299.500000 | 0.020000 |
| 1134944    | 104249.000000 | 0.020000 |
| 2582850    | 104199.000000 | 0.000000 |
| 282754     | 103949.500000 | 0.020000 |
| 2038305    | 103949.000000 | 0.030000 |
| 944835     | 103899.500000 | 0.000000 |
+------------+---------------+----------+
10 row(s) fetched.
Elapsed 0.074 seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting case, sort can also be memory-limited

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting case, sort can also be memory-limited

It is true, though most engines (including DataFusion) will use TopK instead (aka not sort the entire stream, just keep the top 10 values) so this query would likely only spill in aggregation not in sort

/// Memory limits to run: 512MiB, 256MiB, 128MiB, 64MiB, 32MiB
static QUERY_MEMORY_LIMITS: OnceLock<HashMap<usize, Vec<u64>>> = OnceLock::new();

impl ExternalAggrConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another potential idea is to re-use some of the clickbench queries (there are several that have very large aggregations) with memory limits

assert!(ExternalAggrConfig::parse_memory_limit("500X").is_err());

// Test invalid number
assert!(ExternalAggrConfig::parse_memory_limit("abcM").is_err());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@2010YOUY01
Copy link
Contributor Author

Thank you @Dandandan @alamb , I have updated it as per the reviews.

Thanks @2010YOUY01 -- I agree with @Dandandan -- very nice 👌

I also plan to read the linked paper -- 🤓

Really nice paper, we can implement the same benchmark and compare in the future 😄
They implemented a unified buffer pool for both table data cache and operator (like aggregation) intermediate results, to easily support spilling in various operators.
I think they didn't mention any optimization specific to the spilling part of aggregation, and just use simple LRU policy in the buffer pool.
Maybe there are some spilling and merging specific optimizations we can explore (all of memory-limited aggregate/SortMergeJoin/Sort can benefit from)

Also, are you interested in improving DataFusion's external aggregation capabilities? I think it is a non trivial gap at the moment and would be great to improve (and I would be interested in helping do so).

if you are, I can start organizing the work into some tickets to see if we can get some others to check it out too

Yes, I'm start to look at related components now. Perhaps we can start with making memory-limited SQL queries more stable (e.g. more tests, make sure TPCH-SF1000 is able to run on laptop correctly), and later optimize.

@2010YOUY01 2010YOUY01 changed the title Adding benchmark for external aggregation Add benchmark for memory-limited aggregation Oct 25, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great -- thank you @2010YOUY01

I will file a ticket to try and organize the externalized aggregation work a bit more

$CARGO_COMMAND --bin external_aggr -- benchmark -n 4 --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}"
# Only parquet is supported.
# Since per-operator memory limit is calculated as (total-memory-limit /
# number-of-partitions), and by default `--partitions` is set to number of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb
Copy link
Contributor

alamb commented Oct 26, 2024

🚀

@alamb alamb merged commit 7df3e5c into apache:main Oct 26, 2024
26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
execution Related to the execution crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Benchmarks for group by spilling to disk
3 participants