Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swordfish): Optimize grouped aggregations #3534

Merged
merged 20 commits into from
Dec 17, 2024

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Dec 10, 2024

Optimize swordfish grouped aggs for high cardinality groups

Approach

There's 3 strategies for grouped aggs:

  1. Partition each input morsel into N partitions, then do a partial agg. (good for high cardinality).
  2. Do a partial agg, then partition into N partitions. (good for low cardinality). Can be optimized with Sharded probe table #3556
  3. Partition only, no partial agg. (only for map_groups, which has no partial agg).

Notes on alternative approaches

  • Distributing partitions across workers (i.e. having each worker being responsible for accumulating only one partition) is much slower for low cardinality aggs (TPCH Q1 would have been 1.5x slower). This is because most of the work will end up being on only a few workers, reducing parallelism.
  • Simply partitioning the input and then only aggregating at the end works well with higher cardinality, but low cardinality takes a hit. (TPCH Q1 would have been 2.5x slower).
  • Probe Table approach was much slower, due to many calls to the multi-table dyn comparator. It was also much more complex to implement.

Benchmarks

MrPowers Benchmarks results (seconds, lower is better).

Query this PR Pyrunner Current swordfish
q1 0.285720 0.768858 0.356499
q2 4.780064 6.122199 53.340565
q3 2.201079 3.922857 16.935125
q4 0.313106 0.545192 0.335541
q5 1.618228 2.889354 10.665339
q7 2.087872 3.856998 16.072660
q10 6.306756 8.173738 53.800501

@github-actions github-actions bot added the feat label Dec 10, 2024
Copy link

codspeed-hq bot commented Dec 10, 2024

CodSpeed Performance Report

Merging #3534 will degrade performances by 41.94%

Comparing colin/swordfish-grouped-aggs (af2bd8d) with main (6c21917)

Summary

❌ 1 regressions
✅ 26 untouched benchmarks

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main colin/swordfish-grouped-aggs Change
test_iter_rows_first_row[100 Small Files] 191.1 ms 329.1 ms -41.94%

Copy link

codecov bot commented Dec 10, 2024

Codecov Report

Attention: Patch coverage is 89.91354% with 35 lines in your changes missing coverage. Please review.

Project coverage is 77.81%. Comparing base (35ed63c) to head (af2bd8d).
Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
...aft-local-execution/src/sinks/grouped_aggregate.rs 91.95% 23 Missing ⚠️
src/common/daft-config/src/python.rs 40.00% 9 Missing ⚠️
src/daft-local-execution/src/pipeline.rs 66.66% 3 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff            @@
##             main    #3534    +/-   ##
========================================
  Coverage   77.81%   77.81%            
========================================
  Files         716      716            
  Lines       87987    88241   +254     
========================================
+ Hits        68463    68667   +204     
- Misses      19524    19574    +50     
Files with missing lines Coverage Δ
daft/context.py 87.19% <ø> (ø)
src/common/daft-config/src/lib.rs 84.05% <100.00%> (+0.47%) ⬆️
src/daft-local-execution/src/lib.rs 95.29% <ø> (ø)
src/daft-local-execution/src/sinks/aggregate.rs 97.33% <100.00%> (+1.58%) ⬆️
src/daft-local-execution/src/pipeline.rs 88.69% <66.66%> (-1.89%) ⬇️
src/common/daft-config/src/python.rs 68.39% <40.00%> (-2.17%) ⬇️
...aft-local-execution/src/sinks/grouped_aggregate.rs 91.95% <91.95%> (ø)

... and 6 files with indirect coverage changes

@colin-ho colin-ho marked this pull request as ready for review December 10, 2024 06:47
@colin-ho colin-ho requested a review from samster25 December 10, 2024 07:14
@colin-ho
Copy link
Contributor Author

colin-ho commented Dec 11, 2024

Full TPCH SF10 queries on swordfish before and after this optimization.

Query Before After Change
Q1 491.84ms 496.15ms +0.04%
Q2 132.02ms 136.15ms +3.1%
Q3 443.25ms 445.37ms +0.5%
Q4 2.47s 2.53s +2.4%
Q5 708.50ms 732.99ms +3.5%
Q6 162.42ms 159.65ms -1.7%
Q7 444.77ms 423.86ms -4.7%
Q8 520.86ms 522.75ms +0.4%
Q9 3.59s 3.55s -1.1%
Q10 2.33s 2.27s -2.6%
Q11 253.52ms 235.87ms -12.5%
Q12 388.85ms 396.85ms +2.1%
Q13 4.36s 4.36s 0.0%
Q14 488.97ms 495.60ms +1.4%
Q15 496.65ms 484.14ms -12.2%
Q16 444.69ms 203.63ms -62.1%
Q17 18.29s 18.20s -0.5%
Q18 5.44s 3.19s -37.9%
Q19 4.93s 4.98s +1.0%
Q20 3.23s 1.22s -62.2%
Q21 37.42s 10.34s -73.0%
Q22 2.24s 2.17s -3.1%

// This is the threshold for when we should aggregate the unaggregated partitions
const PARTIAL_AGG_THRESHOLD: usize = 10_000;
// This is the maximum number of partitions we can have
const MAX_NUM_PARTITIONS: usize = 16;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we cap this rather than always take num cores?

Not a suggestion, just curious

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it would be bad for low cardinality aggs if there's too many partitions. But I modified the algorithm to cater for low cardinality aggs, so this is removed.

@colin-ho colin-ho requested a review from samster25 December 13, 2024 02:01
src/daft-local-execution/src/sinks/grouped_aggregate.rs Outdated Show resolved Hide resolved
src/daft-local-execution/src/sinks/grouped_aggregate.rs Outdated Show resolved Hide resolved
const HIGH_CARDINALITY_THRESHOLD_RATIO: f64 = 0.8;

fn new(num_partitions: usize) -> Self {
let inner_states = (0..num_partitions).map(|_| None).collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can also do vec![None; num_partitions]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The vec! macro requires SinglePartitionAggregateState to impl Clone, which it doesn't because Micropartition doesn't implement it.

strategy.execute_strategy(inner_states, input, params)?;
}
// Otherwise, determine a strategy based on the input data.
else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So an issue I see here is that this decision is determined at a per partition basis rather than globally.

This means that as a morsel goes into each core, the decision is made independently of one another so you can have multiple different strategies at the same time.

I think we can instead use a Mutex to protect the strategy (shared across all partition) and then we can cache it in the state after we read it once since it's not going to change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, implemented. Tested and found no regressions

src/daft-local-execution/src/sinks/grouped_aggregate.rs Outdated Show resolved Hide resolved
@colin-ho colin-ho requested a review from samster25 December 16, 2024 02:02
}

// Otherwise, determine the strategy and execute
let decided_strategy = determine_agg_strategy(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be cleaner to have this as a method and it performs the caching of the decided_strategy. that way the code could look like

let decided_strategy = self.determine_agg_strategy();
decided_strategy.execute_strategy(inner_states, input, params)?;

@colin-ho colin-ho merged commit e148248 into main Dec 17, 2024
42 of 43 checks passed
@colin-ho colin-ho deleted the colin/swordfish-grouped-aggs branch December 17, 2024 06:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants