Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure PartitionedOutputOperator is run with fixed local distribution #13834

Merged

Conversation

arhimondr
Copy link
Contributor

Description

PartitionedOutputOperator maintains buffers for each output partition.
When the operator is run in the same pipeline as the TableScanOperator the
buffers are flushed after each split resulting in small pages being created.

Is this change a fix, improvement, new feature, refactoring, or other?

Improvement

Is this a change to the core query engine, a connector, client library, or the SPI interfaces? (be specific)

Core engine

How would you describe this change to a non-technical end user or system administrator?

Performance improvement

Related issues, pull requests, and links

Documentation

(X) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

(X) No release notes entries required.
( ) Release notes entries required with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Aug 24, 2022
@arhimondr arhimondr force-pushed the partitioned-output-fixed-distribution branch from dce8c8d to 2c1b725 Compare August 25, 2022 01:38
@arhimondr arhimondr requested review from sopel39 and losipiuk August 25, 2022 01:38
Copy link
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to my limited expertise with this code it looks good. @sopel39 please take a look.

Also we should run benchmarks on this PR before merging IMO.

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will introduce performance regression (wall time) due to extra buffer and thread switching. I experimented with something similar for PA aggregation. Due to the issue, we also disabled spill in source stage joins (#315)

Usually source stage loop is super efficient from TS to output because it lacks any context switching.

If anything, IMO we should move merging of pages out of partition output operator to buffering layer (maybe with some opportunistic locking)

@przemekak
Copy link
Member

Attaching benchmarks for this change.
As you can see it is a little bit slower, especially visible on TPCDS q67.
partitioned-output-fixed-distribution.pdf

@arhimondr arhimondr force-pushed the partitioned-output-fixed-distribution branch 4 times, most recently from b88f4c0 to 3400d4e Compare November 2, 2022 21:09
@arhimondr arhimondr requested a review from sopel39 November 2, 2022 21:09
@arhimondr
Copy link
Contributor Author

@losipiuk @sopel39 @przemekak

Updated

The biggest regressions (q67, q22) were due to the fact that the plan shaped has changed and the AddExchangesBelowPartialAggregationOverGroupIdRuleSet could no longer perform necessary optimization.

To address the small discrepancy that came out during the last benchmark I optimized local exchange a bit. By removing PageReference and changing the data structure within a buffer from a linked list based Queue to an array based queue I reduced number of allocations. Also I decreased the number of lock acquisitions.

Currently the benchmark results look nearly identical:
Benchmarks comparison.pdf

I also tried to simulate an alleged worst case locally (when splits are fully consumed and the exchange is not necessary):

set session hive.compression_codec = 'NONE';
create table lineitem as select returnflag, quantity, extendedprice, discount from tpch.sf100.lineitem;

SELECT returnflag, sum(quantity), avg(extendedprice), sum(discount) FROM lineitem GROUP BY 1
UNION ALL
SELECT returnflag, sum(quantity), avg(extendedprice), sum(discount) FROM lineitem GROUP BY 1
UNION ALL
SELECT returnflag, sum(quantity), avg(extendedprice), sum(discount) FROM lineitem GROUP BY 1
UNION ALL
SELECT returnflag, sum(quantity), avg(extendedprice), sum(discount) FROM lineitem GROUP BY 1;

And I didn't find any difference in latency and cpu utilization with and without an additional local exchange.

The profiler also doesn't show any significant differences neither in CPU utilization not in lock contention:
flamegraphs.zip. Honestly I couldn't even find any frames associated with a local exchange.

Since there's no observable difference in a common case i think we should consider merging the change as it helps with providing better efficiency for corner cases (high cardinallity partitioning + selective predicates)

@@ -104,25 +104,33 @@
typeOf(ExchangeNode.class)
.with(scope().equalTo(REMOTE))
.with(source().matching(
// PushPartialAggregationThroughExchange adds a projection. However, it can be removed if RemoveRedundantIdentityProjections is run in the mean-time.
typeOf(ProjectNode.class).capturedAs(PROJECTION)
typeOf(ProjectNode.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any ProjectNode is fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that one of the rules adds identity projections that are later removed. This rule is trying to capture these projections. @findepi could you please take a look and validate whether my understanding is correct?

.with(step().equalTo(AggregationNode.Step.PARTIAL))
.with(nonEmpty(groupingColumns()))
typeOf(ExchangeNode.class)
.with(scope().equalTo(LOCAL))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any Local exchange is fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my understanding this rule is trying to capture a specific shape of a plan that got change with introduction of an additional local exchange. Thus in this plane shape other types of local exchange shouldn't be expected. But it would be better to wait for @findepi to take a look to make sure my understanding is correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arhimondr sorry, i don't remember. it feels like i wrote this code like 5 years ago

return forceFixedDistributionForPartitionedOutputOperatorEnabled;
}

@Config("experimental.force-fixed-distribution-for-partitioned-output-operator-enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is super long. Alsy why experimental
maybe optimizer.force-partitioned-output-fixed-distribution

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add some documentation here and maybe in some .rst if we have any.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intend to remove this property after there's enough confidence that no regression is introduced. End users shouldn't be changing this property, as the plan shape alterations may result in some planner rules (such as AddExchangesBelowPartialAggregationOverGroupIdRuleSet to no longer kick in)

Copy link
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sopel39
Copy link
Member

sopel39 commented Nov 4, 2022

I wonder if this approach will cause limit queries to run for longer than needed, e.g. output rows will be now buffered until full page is collected.

@losipiuk
Copy link
Member

losipiuk commented Nov 4, 2022

I wonder if this approach will cause limit queries to run for longer than needed, e.g. output rows will be now buffered until full page is collected.

Do we have limit queries in benchmark? Should we extend benchmark with those?

@przemekak
Copy link
Member

I wonder if this approach will cause limit queries to run for longer than needed, e.g. output rows will be now buffered until full page is collected.

Do we have limit queries in benchmark? Should we extend benchmark with those?

Actually most of TPCDS queruies have LIMIT 100 in them, e,g,:
https://github.com/trinodb/trino/blob/master/testing/trino-benchmark-queries/src/main/resources/sql/presto/tpcds/q57.sql#L58

@sopel39
Copy link
Member

sopel39 commented Nov 8, 2022

Actually most of TPCDS queruies have LIMIT 100 in them, e,g,:

These are on aggregation output, so entire subquery must execute

@losipiuk
Copy link
Member

losipiuk commented Nov 8, 2022

Actually most of TPCDS queruies have LIMIT 100 in them, e,g,:

These are on aggregation output, so entire subquery must execute

We probably should benchmark with a query which just does super selective table scan with limit.

@arhimondr
Copy link
Contributor Author

I wonder if this approach will cause limit queries to run for longer than needed, e.g. output rows will be now buffered until full page is collected.

@sopel39 It is not necessarily true. The TableScan pipeline would be allowed to "run" as long as there is buffer space (32MB). But it doesn't prevent a consumer pipeline to run if there's a free thread available.

We probably should benchmark with a query which just does super selective table scan with limit.

@losipiuk Yeah, given the complex interactions involved in short circuiting limit and the fact that we had problems with it in the past I don't think it's a bad idea to validate. Let me check.

@sopel39
Copy link
Member

sopel39 commented Nov 9, 2022

@arhimondr could you add a test that partial limit is above the local RR exchange that you are adding. That should mitigate potential limit issues

But it doesn't prevent a consumer pipeline to run if there's a free thread available.

Consumer won't be able to fetch page until page builder is considered full.

@arhimondr
Copy link
Contributor Author

@sopel39 I think I now understand the problem. Great catch.

Discussed with @losipiuk offline and it looks like could you add a test that partial limit is above the local RR exchange that you are adding might not even be sufficient. As @losipiuk suggested there could be a situation when a single task produces less rows than the limit, so the partial limit won't be triggered and the result won't get delivered to a downstream task until the upstream task finishes. Let me think about it more.

@arhimondr arhimondr force-pushed the partitioned-output-fixed-distribution branch from 3400d4e to a43b3c5 Compare November 9, 2022 23:26
@arhimondr
Copy link
Contributor Author

@sopel39 @losipiuk It feels like it's a classic throughput vs latency problem that is intrinsic to buffering. It is hard to say what is the right flush strategy.

I tried to approach this problem by flushing after a certain delay (see the last commit: a43b3c5) However I'm not sure what is the best strategy.

CC: @martint

@sopel39
Copy link
Member

sopel39 commented Nov 10, 2022

@sopel39 @losipiuk It feels like it's a classic throughput vs latency problem that is intrinsic to buffering. It is hard to say what is the right flush strategy.

@arhimondr if you have a plan like:

RemoteExchange
     |
 PartialLimit
     |
LocalExchange
     |
TableScan

then RemoteExchange wouldn't have to wait until full page is buffered. I don't think it would introduce regressions then, would it?

@losipiuk
Copy link
Member

@sopel39 @losipiuk It feels like it's a classic throughput vs latency problem that is intrinsic to buffering. It is hard to say what is the right flush strategy.

@arhimondr if you have a plan like:

RemoteExchange
     |
 PartialLimit
     |
LocalExchange
     |
TableScan

then RemoteExchange wouldn't have to wait until full page is buffered. I don't think it would introduce regressions then, would it?

Yeah. But it still will wait until PartialLimit triggers. Which may never happen if single task did not produce enought matching rows.

@sopel39
Copy link
Member

sopel39 commented Nov 10, 2022

Which may never happen if single task did not produce enought matching rows.

That's FTE issue, right? In pipeline mode it would work

@losipiuk
Copy link
Member

Which may never happen if single task did not produce enought matching rows.

That's FTE issue, right? In pipeline mode it would work

I think it would not. Unless I am missing sth. We can chat offline.

@arhimondr arhimondr force-pushed the partitioned-output-fixed-distribution branch from a43b3c5 to 06c056b Compare November 10, 2022 21:35
@arhimondr
Copy link
Contributor Author

Actually it looks like today it's not possible for a partial limit to endup between a table scan and a PartitionedOutputOperator. Any LimitNode (event a partial created by the PushLimitThroughOuterJoin) is unconditionally split into two by the AddExchanges rule: https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java#L539

So the plan of a LEFT OUTER JOIN (seemingly the only query shape that can be impacted by this change) looks like following:

Fragment 0 [SINGLE]                                                                                                                           
     Output layout: [comment, comment_15]                                                                                                      
     Output partitioning: SINGLE []                                                                                                            
     Output[columnNames = [comment, comment]]                                                                                                  
     │   Layout: [comment:varchar(44), comment_15:varchar(44)]                                                                                 
     │   Estimates: {rows: 10 (631B), cpu: 0, memory: 0B, network: 0B}                                                                         
     │   comment := comment_15                                                                                                                 
     └─ Limit[count = 10]                                                                                                                      
        │   Layout: [comment:varchar(44), comment_15:varchar(44)]                                                                              
        │   Estimates: {rows: 10 (631B), cpu: 631, memory: 0B, network: 0B}                                                                    
        └─ LocalExchange[partitioning = SINGLE]                                                                                                
           │   Layout: [comment:varchar(44), comment_15:varchar(44)]                                                                           
           │   Estimates: {rows: 10 (631B), cpu: 0, memory: 0B, network: 0B}                                                                   
           └─ RemoteSource[sourceFragmentIds = [1]]                                                                                            
                  Layout: [comment:varchar(44), comment_15:varchar(44)]                                                                        
                                                                                                                                               
 Fragment 1 [HASH]                                                                                                                             
     Output layout: [comment, comment_15]                                                                                                      
     Output partitioning: SINGLE []                                                                                                            
     LimitPartial[count = 10]                                                                                                                  
     │   Layout: [comment:varchar(44), comment_15:varchar(44)]                                                                                 
     │   Estimates: {rows: 10 (631B), cpu: 631, memory: 0B, network: 0B}                                                                       
     └─ LeftJoin[criteria = ("linenumber" = "linenumber_3"), hash = [$hashvalue, $hashvalue_23], distribution = PARTITIONED]                   
        │   Layout: [comment:varchar(44), comment_15:varchar(44)]                                                                              
        │   Estimates: {rows: 85969 (5.18MB), cpu: 7.79M, memory: 2.61MB, network: 0B}                                                         
        │   Distribution: PARTITIONED                                                                                                          
        ├─ RemoteSource[sourceFragmentIds = [2]]                                                                                               
        │      Layout: [linenumber:integer, comment:varchar(44), $hashvalue:bigint]                                                            
        └─ LocalExchange[partitioning = SINGLE]                                                                                                
           │   Layout: [linenumber_3:integer, comment_15:varchar(44), $hashvalue_23:bigint]                                                    
           │   Estimates: {rows: 60175 (2.61MB), cpu: 0, memory: 0B, network: 0B}                                                              
           └─ RemoteSource[sourceFragmentIds = [4]]                                                                                            
                  Layout: [linenumber_3:integer, comment_15:varchar(44), $hashvalue_24:bigint]                                                 
                                                                                                                                               
 Fragment 2 [SINGLE]                                                                                                                           
     Output layout: [linenumber, comment, $hashvalue_19]                                                                                       
     Output partitioning: HASH [linenumber][$hashvalue_19]                                                                                     
     LocalExchange[partitioning = ROUND_ROBIN]                                                                                                 
     │   Layout: [linenumber:integer, comment:varchar(44), $hashvalue_19:bigint]                                                               
     │   Estimates: {rows: 10 (456B), cpu: 456, memory: 0B, network: 0B}                                                                       
     └─ Limit[count = 10]                                                                                                                      
        │   Layout: [linenumber:integer, comment:varchar(44), $hashvalue_20:bigint]                                                            
        │   Estimates: {rows: 10 (456B), cpu: 456, memory: 0B, network: 0B}                                                                    
        └─ LocalExchange[partitioning = SINGLE]                                                                                                
           │   Layout: [linenumber:integer, comment:varchar(44), $hashvalue_20:bigint]                                                         
           │   Estimates: {rows: 10 (456B), cpu: 0, memory: 0B, network: 0B}                                                                   
           └─ RemoteSource[sourceFragmentIds = [3]]                                                                                            
                  Layout: [linenumber:integer, comment:varchar(44), $hashvalue_21:bigint]                                                      
                                                                                                                                               
 Fragment 3 [SOURCE]                                                                                                                           
     Output layout: [linenumber, comment, $hashvalue_22]                                                                                       
     Output partitioning: SINGLE []                                                                                                            
     Project[]                                                                                                                                 
     │   Layout: [linenumber:integer, comment:varchar(44), $hashvalue_22:bigint]                                                               
     │   Estimates: {rows: 10 (456B), cpu: 456, memory: 0B, network: 0B}                                                                       
     │   $hashvalue_22 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("linenumber"), 0))                                           
     └─ LimitPartial[count = 10]                                                                                                               
        │   Layout: [linenumber:integer, comment:varchar(44)]                                                                                  
        │   Estimates: {rows: 10 (366B), cpu: 366, memory: 0B, network: 0B}                                                                    
        └─ TableScan[table = hive:tpch:lineitem]                                                                                               
               Layout: [linenumber:integer, comment:varchar(44)]                                                                               
               Estimates: {rows: 60175 (2.10MB), cpu: 2.10M, memory: 0B, network: 0B}                                                          
               linenumber := linenumber:int:REGULAR                                                                                            
               comment := comment:varchar(44):REGULAR                                                                                          
                                                                                                                                               
 Fragment 4 [SOURCE]                                                                                                                           
     Output layout: [linenumber_3, comment_15, $hashvalue_25]                                                                                  
     Output partitioning: HASH [linenumber_3][$hashvalue_25]                                                                                   
     LocalExchange[partitioning = ROUND_ROBIN]                                                                                                 
     │   Layout: [linenumber_3:integer, comment_15:varchar(44), $hashvalue_25:bigint]                                                          
     │   Estimates: {rows: 60175 (2.61MB), cpu: 2.61M, memory: 0B, network: 0B}                                                                
     └─ ScanProject[table = hive:tpch:lineitem]                                                                                                
            Layout: [linenumber_3:integer, comment_15:varchar(44), $hashvalue_26:bigint]                                                       
            Estimates: {rows: 60175 (2.61MB), cpu: 2.10M, memory: 0B, network: 0B}/{rows: 60175 (2.61MB), cpu: 2.61M, memory: 0B, network: 0B} 
            $hashvalue_26 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("linenumber_3"), 0))                                      
            comment_15 := comment:varchar(44):REGULAR                                                                                          
            linenumber_3 := linenumber:int:REGULAR                   

Basically on the left side the plan looks like following: SCAN -> Partial Limit -> Remote Gather -> Local Gather -> PartitionedOutputOperator.

This plan shape may be problematic for very large limits (e.g.: LIMIT 1_000_000_000) as it requires all the rows to be processed by a single node before passing them to a join and we may need to address that at some point. However at this point It doesn't seem like this PR should introduce any regression.

I'm dropping the Flush partitioned output on timeout to allow LIMIT queries finish early commit for now. @losipiuk @sopel39 I wonder if there's anything else here to be done before we can merge it?

@arhimondr arhimondr force-pushed the partitioned-output-fixed-distribution branch from 06c056b to 5355abe Compare November 15, 2022 17:57
exchangerSupplier = () -> new BroadcastExchanger(buffers, memoryManager);
}
else if (partitioning.equals(FIXED_ARBITRARY_DISTRIBUTION)) {
if (partitioning.equals(SINGLE_DISTRIBUTION) || partitioning.equals(FIXED_ARBITRARY_DISTRIBUTION)) {
exchangerSupplier = () -> new RandomExchanger(buffers, memoryManager);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this more expensive potentially?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean for single distribution? I don't think so. The broadcast implementation seemed to be more complex.

@@ -41,8 +42,11 @@

private final Consumer<LocalExchangeSource> onFinish;

private final BlockingQueue<PageReference> buffer = new LinkedBlockingDeque<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we never relied on buffer to be blocking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. new LinkedBlockingDeque<>() is unbounded

@@ -13,6 +13,7 @@
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment that PageReference is no longer needed after removing broadcast exchange

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I add it to the commit message? Not sure if it makes sense to keep mentions of the PageReference in the codebase, since the PageReference itself is going away

@sopel39 sopel39 dismissed their stale review November 16, 2022 21:55

don't block PR

@arhimondr arhimondr force-pushed the partitioned-output-fixed-distribution branch from 5355abe to b05a554 Compare November 18, 2022 16:25
PartitionedOutputOperator maintains buffers for each output partition.
When the operator is run in the same pipeline as the TableScanOperator the
buffers are flushed after each split resulting in small pages being created.
Broadcast local exchanges are never performed
Use ArrayDeque to avoid unnecessary allocations associated with
a LinkedList based queue
Avoid creating PageReference objects for every page
A new instance is created for each ExchangeSink
@arhimondr arhimondr force-pushed the partitioned-output-fixed-distribution branch from b05a554 to af5cc04 Compare November 19, 2022 10:56
@arhimondr arhimondr merged commit 45e111c into trinodb:master Nov 20, 2022
@arhimondr arhimondr deleted the partitioned-output-fixed-distribution branch November 20, 2022 03:00
@github-actions github-actions bot added this to the 404 milestone Nov 20, 2022
@@ -743,6 +744,9 @@ public PlanWithProperties visitExchange(ExchangeNode node, StreamPreferredProper
any().withOrderSensitivity(),
any().withOrderSensitivity());
}
if (isForceFixedDistributionForPartitionedOutputOperatorEnabled(session) && node.isHashPartitionedExchange()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this behavior only be induced for partitioned output operators on top of table scans? Intermediate stages shouldn't have the same flushing behaviors, so condition seems potentially overly-broad, but maybe that's handled by planAndEnforceChildren somehow?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

6 participants