Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Memory usage + performance with large numbers of groups / High Cardinality Aggregates #6937

Closed
alamb opened this issue Jul 12, 2023 · 9 comments · Fixed by #11627
Closed
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jul 12, 2023

Is your feature request related to a problem or challenge?

When running a query with "high cardinality" grouping in DataFusion, the memory usage increases linearly both with the number of groups (expected) but also with the number of cores.

Is the root cause of @ychen7's observation that ClickBench q32 fails As #5276 (comment)

To reproduce, get the ClickBench data https://github.com/ClickHouse/ClickBench/tree/main#data-loading and run this:

CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION 'hits.parquet';

set datafusion.execution.target_partitions = 1;
SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10;

set datafusion.execution.target_partitions = 4;
SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10;

This is what the memory usage looks like:
Screenshot 2023-07-12 at 4 07 10 PM

The reason for this behavior can be found in the plan and the multi-stage hash grouping that is done:

explain SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10;
| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|               |   SortPreservingMergeExec: [c@2 DESC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |     SortExec: fetch=10, expr=[c@2 DESC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |       ProjectionExec: expr=[WatchID@0 as WatchID, ClientIP@1 as ClientIP, COUNT(UInt8(1))@2 as c, SUM(hits.IsRefresh)@3 as SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)@4 as AVG(hits.ResolutionWidth)]                                                                                                                                                                                                                                                                                                                                                       |
|               |         AggregateExec: mode=FinalPartitioned, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]                                                                                                                                                                                                                                                                                                                                                                                        |
|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |             RepartitionExec: partitioning=Hash([WatchID@0, ClientIP@1], 16), input_partitions=16                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |               AggregateExec: mode=Partial, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]                                                                                                                                                                                                                                                                                                                                                                                           |
|               |                 ParquetExec: file_groups={16 groups: [[Users/alamb/Software/clickbench_hits_compatible/hits.parquet:0..923748528], [Users/alamb/Software/clickbench_hits_compatible/hits.parquet:923748528..1847497056], [Users/alamb/Software/clickbench_hits_compatible/hits.parquet:1847497056..2771245584], [Users/alamb/Software/clickbench_hits_compatible/hits.parquet:2771245584..3694994112], [Users/alamb/Software/clickbench_hits_compatible/hits.parquet:3694994112..4618742640], ...]}, projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth] |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Specifically since the groups are arbitrarily distributed in the files, the first AggregateExec: mode=Partial has to build a hash table that has entries for all groups. As the number of target partitions goes up, the number of AggregateExec: mode=Partial goes up to and thus so does the number of copies of the data

The AggregateExec: mode=FinalPartitioned only see a distinct subset of the keys and thus as the number of target partitions goes up there are more AggregateExec: mode=FinalPartitioned s each sees a smaller and smaller subset of the group keys

In pictures:

               ▲                          ▲                                                    
               │                          │                                                    
               │                          │                                                    
               │                          │                                                    
               │                          │                                                    
               │                          │                                                    
   ┌───────────────────────┐  ┌───────────────────────┐       4. Each AggregateMode::Final     
   │GroupBy                │  │GroupBy                │       GroupBy has an entry for its     
   │(AggregateMode::Final) │  │(AggregateMode::Final) │       subset of groups (in this case   
   │                       │  │                       │       that means half the entries)     
   └───────────────────────┘  └───────────────────────┘                                        
               ▲                          ▲                                                    
               │                          │                                                    
               └─────────────┬────────────┘                                                    
                             │                                                                 
                             │                                                                 
                             │                                                                 
                ┌─────────────────────────┐                   3. Repartitioning by hash(group  
                │       Repartition       │                   keys) ensures that each distinct 
                │         HASH(x)         │                   group key now appears in exactly 
                └─────────────────────────┘                   one partition                    
                             ▲                                                                 
                             │                                                                 
             ┌───────────────┴─────────────┐                                                   
             │                             │                                                   
             │                             │                                                   
┌─────────────────────────┐  ┌──────────────────────────┐     2. Each AggregateMode::Partial   
│        GroubyBy         │  │         GroubyBy         │     GroupBy has an entry for *all*   
│(AggregateMode::Partial) │  │ (AggregateMode::Partial) │     the groups                       
└─────────────────────────┘  └──────────────────────────┘                                      
             ▲                             ▲                                                   
             │                            ┌┘                                                   
             │                            │                                                    
        .─────────.                  .─────────.                                               
     ,─'           '─.            ,─'           '─.                                            
    ;      Input      :          ;      Input      :          1. Since input data is           
    :   Partition 0   ;          :   Partition 1   ;          arbitrarily or RoundRobin        
     ╲               ╱            ╲               ╱           distributed, each partition      
      '─.         ,─'              '─.         ,─'            likely has all distinct          
         `───────'                    `───────'                                                                           

Some example data:

              ┌─────┐                ┌─────┐                                                 
              │  1  │                │  3  │                                                 
              ├─────┤                ├─────┤                                                 
              │  2  │                │  4  │                After repartitioning by          
              └─────┘                └─────┘                hash(group keys), each distinct  
              ┌─────┐                ┌─────┐                group key now appears in exactly 
              │  1  │                │  3  │                one partition                    
              ├─────┤                ├─────┤                                                 
              │  2  │                │  4  │                                                 
              └─────┘                └─────┘                                                 
                                                                                             
                                                                                             
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                
                                                                                             
              ┌─────┐                ┌─────┐                                                 
              │  2  │                │  2  │                                                 
              ├─────┤                ├─────┤                                                 
              │  1  │                │  2  │                                                 
              ├─────┤                ├─────┤                                                 
              │  3  │                │  3  │                                                 
              ├─────┤                ├─────┤                                                 
              │  4  │                │  1  │                                                 
              └─────┘                └─────┘                Input data is arbitrarily or     
                ...                    ...                  RoundRobin distributed, each     
              ┌─────┐                ┌─────┐                partition likely has all         
              │  1  │                │  4  │                distinct group keys              
              ├─────┤                ├─────┤                                                 
              │  4  │                │  3  │                                                 
              ├─────┤                ├─────┤                                                 
              │  1  │                │  1  │                                                 
              ├─────┤                ├─────┤                                                 
              │  4  │                │  3  │                                                 
              └─────┘                └─────┘                                                 
                                                                                             
          group values           group values                                                
          in partition 0         in partition 1                                              
                                                                                             

Describe the solution you'd like

TLDR is I would like to propose updating the AggregateExec: mode=Partial to emit their hash tables if they see more than some fixed size number of groups (I think @mingmwang said DuckDB uses a value of 10,000 for this)

This approach bounds the memory usage (to some fixed constant * the target partitioning) and also should perform quite well

In the literature I think this approach could be called "dynamic partitioning" as it switches approaches based on the actual cardinality of the groups in the dataset

Describe alternatives you've considered

One potential thing that might be suggested is simply to repartition the input to AggregateExec: mode=Partial

This approach would definitely reduce the memory requirements, but it would mean that we would have to hash repartition all the input rows so the number of input values that need to be hashed / copied would likely be much higher (at least as long as the group hasher and hash repartitioner can't share the hashes, which is the case today)

The current strategy actually works very well for low cardinality group bys because the AggregateExec: mode=Partial can reduce the size of the intermediate result that needs to be hash repartitioned to a very small size

Additional context

We saw this in IOx while working on some tracing queries that look very similar to the ClickBench query, something like the following to get the top ten traces

SELECT trace_id, max(time)
FROM traces
GROUP BY trace_id
ORDER BY max(time)
LIMIT 10;
@alamb alamb added the enhancement New feature or request label Jul 12, 2023
@Dandandan
Copy link
Contributor

Dandandan commented Jul 13, 2023

I did some tests just based on a heuristic (e.g. number of columns in input / group by) in #6938 but saw both perf. improvements (likely the high cardinality queries) and degradations (it also seems hash partitioning is not really fast ATM).

Also for distributed systems like Ballista, the partial / final approach probably works better in most cases (even for higher cardinality ones), so I think we would have to make this behaviour configurable.

@alamb
Copy link
Contributor Author

alamb commented Jul 13, 2023

it also seems hash partitioning is not really fast ATM)

Yes -- here are some ideas to improve things:

  1. Reuse the hash values (already computed for the Partial group bys)
  2. Avoid the extra copy with CoalesceBatches -- today we "take" in in repartiton (which copies things) and then CoealsceBatches concat's them again with another copy)

@mingmwang
Copy link
Contributor

I think we can skip the partial aggregation adaptively if the partial aggregation can not reduce the row count.

@mingmwang
Copy link
Contributor

DuckDB's partitioned hash table only take place with its pipeline execution model.

@alamb
Copy link
Contributor Author

alamb commented Jul 13, 2023

I think we can skip the partial aggregation adaptively if the partial aggregation can not reduce the row count.

Yes I think this is a good strategy 👍

@Dandandan
Copy link
Contributor

I think this PR contains some nice ideas
duckdb/duckdb#8475

@Dandandan
Copy link
Contributor

Dandandan commented Aug 16, 2023

I did some experimentation with emitting of rows once every 10K is hit (but keep grouping) in this branch:

https://github.com/Dandandan/arrow-datafusion/tree/emit_rows_aggregate

Doing that reduces the memory usage, but often with higher cost, which can be seen in the benchmark:

Comparing main and emit_rows_aggregate
--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ emit_rows_aggregate ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 294.54ms │            293.83ms │     no change │
│ QQuery 2     │  38.74ms │             37.99ms │     no change │
│ QQuery 3     │  48.79ms │             48.48ms │     no change │
│ QQuery 4     │  40.19ms │             39.64ms │     no change │
│ QQuery 5     │  96.84ms │             95.31ms │     no change │
│ QQuery 6     │  10.81ms │             10.73ms │     no change │
│ QQuery 7     │ 197.87ms │            198.20ms │     no change │
│ QQuery 8     │  73.50ms │             76.23ms │     no change │
│ QQuery 9     │ 145.04ms │            144.91ms │     no change │
│ QQuery 10    │  94.58ms │             93.05ms │     no change │
│ QQuery 11    │  41.47ms │             42.64ms │     no change │
│ QQuery 12    │  68.41ms │             68.07ms │     no change │
│ QQuery 13    │ 103.76ms │            104.87ms │     no change │
│ QQuery 14    │  13.44ms │             13.35ms │     no change │
│ QQuery 15    │  15.91ms │             15.26ms │     no change │
│ QQuery 16    │  41.70ms │             39.29ms │ +1.06x faster │
│ QQuery 17    │ 110.51ms │            191.93ms │  1.74x slower │
│ QQuery 18    │ 291.45ms │            314.17ms │  1.08x slower │
│ QQuery 19    │  58.09ms │             59.28ms │     no change │
│ QQuery 20    │  81.24ms │             72.96ms │ +1.11x faster │
│ QQuery 21    │ 267.58ms │            262.45ms │     no change │
│ QQuery 22    │  29.58ms │             29.95ms │     no change │
└──────────────┴──────────┴─────────────────────┴───────────────┘

@alamb
Copy link
Contributor Author

alamb commented Aug 16, 2023

Doing that reduces the memory usage, but often with higher cost, which can be seen in the benchmark:

Maybe we can get the performance back somehow (like make the output creation faster somehow) 🤔

Alternately, we could consider making a single group operator that does the two phase grouping within itself

so instead of

group by (final)
  repartition
    group by (initil)

We would have

group by

And do the repartitioning within the operator itself (and thus if the first phase isn't helping, we can switch to the second phase)

This might impact downstream projects like ballista that want to distribute the first phase, however 🤔

@alamb alamb changed the title Improve Memory usage with large numbers of groups Improve Memory usage + performance with large numbers of groups Oct 17, 2023
@alamb alamb changed the title Improve Memory usage + performance with large numbers of groups Improve Memory usage + performance with large numbers of groups / High Cardinality Aggregates Oct 26, 2023
@jayzhan211 jayzhan211 self-assigned this Jul 24, 2024
@alamb
Copy link
Contributor Author

alamb commented Jul 29, 2024

I think we can skip the partial aggregation adaptively if the partial aggregation can not reduce the row count.

Update here is that @korowa did exactly this in #11627

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
4 participants