Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

union all of many literals very slow (separate stage) for each #13896

Open
tooptoop4 opened this issue Dec 25, 2019 · 5 comments
Open

union all of many literals very slow (separate stage) for each #13896

tooptoop4 opened this issue Dec 25, 2019 · 5 comments

Comments

@tooptoop4
Copy link

with lkp as (select 'de' c , 'germany' d union all select 'it' c, 'italy' d union all select 'ca' c, 'canada' d) select * from lkp

if i include +200 selects in that with block to get all countries the query is very slow. looks like a new stage/task gets run for each row! i think this could be optimised to combine into single stage since they are all literals

@tooptoop4
Copy link
Author

similar to #3876 and trinodb/trino#14

@rschlussel
Copy link
Contributor

The optimizer should be able to rewrite this to a single values node. meaning the current plan

presto> explain (type distributed) CREATE TABLE my_table AS select 1 as a UNION ALL SELECT 1;
                                                            Query Plan                                                              
-----------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [COORDINATOR_ONLY]                                                                                                      
     Output layout: [rows_6]                                                                                                        
     Output partitioning: SINGLE []                                                                                                 
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                  
     - Output[rows] => [rows_6:bigint]                                                                                              
             rows := rows_6                                                                                                         
         - TableCommit[Optional[prism.di.my_table]] => [rows_6:bigint]                                                              
             - RemoteSource[1] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]                                       
                                                                                                                                    
 Fragment 1 [ROUND_ROBIN]                                                                                                           
     Output layout: [rows, fragments, commitcontext]                                                                                
     Output partitioning: SINGLE []                                                                                                 
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                  
     - LocalExchange[ROUND_ROBIN] () => [rows:bigint, fragments:varbinary, commitcontext:varbinary]                                 
         - TableWriterMerge => [rows_7:bigint, fragments_8:varbinary, commitcontext_9:varbinary]                                    
             - LocalExchange[SINGLE] () => [partialrowcount:bigint, partialfragments:varbinary, partialcontext:varbinary]          
                 - TableWriter => [partialrowcount:bigint, partialfragments:varbinary, partialcontext:varbinary]                    
                         a := expr                                                                                                  
                         Statistics collected: 0                                                                                    
                     - RemoteSource[2] => [expr:integer]                                                                            
         - TableWriterMerge => [rows_10:bigint, fragments_11:varbinary, commitcontext_12:varbinary]                                 
             - LocalExchange[SINGLE] () => [partialrowcount_13:bigint, partialfragments_14:varbinary, partialcontext_15:varbinary] 
                 - TableWriter => [partialrowcount_13:bigint, partialfragments_14:varbinary, partialcontext_15:varbinary]          
                         a := expr_1                                                                                                
                         Statistics collected: 0                                                                                    
                     - RemoteSource[3] => [expr_1:integer]                                                                          
                                                                                                                                    
 Fragment 2 [SINGLE]                                                                                                                
     Output layout: [expr]                                                                                                          
     Output partitioning: ROUND_ROBIN []                                                                                            
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                  
     - Project[] => [expr:integer]                                                                                                  
             Estimates: {rows: 1 (5B), cpu: 5.00, memory: 0.00, network: 0.00}                                                      
             expr := INTEGER 1                                                                                                      
         - LocalExchange[ROUND_ROBIN] () => []                                                                                      
                 Estimates: {rows: 1 (0B), cpu: 0.00, memory: 0.00, network: 0.00}                                                  
             - Values => []                                                                                                         
                     Estimates: {rows: 1 (0B), cpu: 0.00, memory: 0.00, network: 0.00}                                              
                     ()                                                                                                             
                                                                                                                                    
 Fragment 3 [SINGLE]                                                                                                                
     Output layout: [expr_1]                                                                                                        
     Output partitioning: ROUND_ROBIN []                                                                                            
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                  
     - Project[] => [expr_1:integer]                                                                                                
             Estimates: {rows: 1 (5B), cpu: 5.00, memory: 0.00, network: 0.00}                                                      
             expr_1 := INTEGER 1                                                                                                    
         - LocalExchange[ROUND_ROBIN] () => []                                                                                      
                 Estimates: {rows: 1 (0B), cpu: 0.00, memory: 0.00, network: 0.00}                                                  
             - Values => []                                                                                                         
                     Estimates: {rows: 1 (0B), cpu: 0.00, memory: 0.00, network: 0.00}                                              
                     ()                                                                                                             
                                                                                                                                    
                                                                                                                                    
(1 row)

should be the same as

presto:di> explain (type distributed) CREATE TABLE my_table(a) AS select * from (values (1), (1));
                                                      Query Plan                                                      
----------------------------------------------------------------------------------------------------------------------
 Fragment 0 [COORDINATOR_ONLY]                                                                                        
     Output layout: [rows_5]                                                                                          
     Output partitioning: SINGLE []                                                                                   
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                    
     - Output[rows] => [rows_5:bigint]                                                                                
             rows := rows_5                                                                                           
         - TableCommit[Optional[prism.di.my_table]] => [rows_5:bigint]                                                
             - RemoteSource[1] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]                         
                                                                                                                      
 Fragment 1 [ROUND_ROBIN]                                                                                             
     Output layout: [rows, fragments, commitcontext]                                                                  
     Output partitioning: SINGLE []                                                                                   
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                    
     - TableWriterMerge => [rows:bigint, fragments:varbinary, commitcontext:varbinary]                                
         - LocalExchange[SINGLE] () => [partialrowcount:bigint, partialfragments:varbinary, partialcontext:varbinary] 
             - TableWriter => [partialrowcount:bigint, partialfragments:varbinary, partialcontext:varbinary]          
                     a := field                                                                                       
                     Statistics collected: 0                                                                          
                 - RemoteSource[2] => [field:integer]                                                                 
                                                                                                                      
 Fragment 2 [SINGLE]                                                                                                  
     Output layout: [field]                                                                                           
     Output partitioning: ROUND_ROBIN []                                                                              
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                    
     - Values => [field:integer]                                                                                      
             Estimates: {rows: 2 (10B), cpu: 0.00, memory: 0.00, network: 0.00}                                       
             (INTEGER 1)                                                                                              
             (INTEGER 1)   

I'm thinking two optimizer rules together should solve the issue

  1. push project into values - rewrites the SELECT 1 plan (projection over an empty values node) to be the same as SELECT * FROM (VALUES 1) (single values node).
  2. combine values node children of union all to a single values node with multiple rows.

@rschlussel
Copy link
Contributor

@ankitdixit is planning to work on this.

@ankitdixit
Copy link
Contributor

@rongrong @rschlussel #15097

@Donderia
Copy link

Donderia commented Mar 11, 2021

Hi @ankitdixit , @rschlussel - is this enhancement merged on master branch ? if so; which release version reflects this change

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants