Skip to content

Commit

Permalink
[opt](nereids) recover adoptive bucket shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjian.xzj authored and zhongjian.xzj committed Jun 26, 2024
1 parent 7cb2b14 commit 94ba2e2
Show file tree
Hide file tree
Showing 30 changed files with 46 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ private boolean isBucketShuffleDownGrade(Plan oneSidePlan, DistributionSpecHash
int bucketNum = candidate.getTable().getDefaultDistributionInfo().getBucketNum();
int backEndNum = Math.max(1, ConnectContext.get().getEnv().getClusterInfo()
.getAllBackendIds(true).size());
int paraNum = Math.max(1, ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
int paraNum = Math.min(4, Math.max(1, ConnectContext.get()
.getSessionVariable().getParallelExecInstanceNum()));
return bucketNum < backEndNum * paraNum;
}
}
Expand All @@ -231,7 +232,6 @@ private PhysicalOlapScan findDownGradeBucketShuffleCandidate(GroupPlan groupPlan
|| groupPlan.getGroup().getPhysicalExpressions().isEmpty()) {
return null;
} else {
// TODO: support external table
Plan targetPlan = groupPlan.getGroup().getPhysicalExpressions().get(0).getPlan();
while (targetPlan != null
&& (targetPlan instanceof PhysicalProject || targetPlan instanceof PhysicalFilter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ PhysicalResultSink
------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=(( not (substring(ca_zip, 1, 5) = substring(s_zip, 1, 5)))) build RFs:RF4 s_store_sk->[ss_store_sk]
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_current_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 c_current_addr_sk->[ca_address_sk]
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
------------------------PhysicalProject
--------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF2 ss_customer_sk->[c_customer_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PhysicalResultSink
----------------hashJoin[INNER_JOIN] hashCondition=((i1.i_item_sk = asceding.item_sk)) otherCondition=() build RFs:RF1 item_sk->[i_item_sk]
------------------PhysicalProject
--------------------PhysicalOlapScan[item] apply RFs: RF1
------------------PhysicalDistribute[DistributionSpecReplicated]
------------------PhysicalDistribute[DistributionSpecHash]
--------------------PhysicalProject
----------------------filter((rnk < 11))
------------------------PhysicalWindow
Expand Down Expand Up @@ -44,7 +44,7 @@ PhysicalResultSink
----------------hashJoin[INNER_JOIN] hashCondition=((i2.i_item_sk = descending.item_sk)) otherCondition=() build RFs:RF0 item_sk->[i_item_sk]
------------------PhysicalProject
--------------------PhysicalOlapScan[item] apply RFs: RF0
------------------PhysicalDistribute[DistributionSpecReplicated]
------------------PhysicalDistribute[DistributionSpecHash]
--------------------PhysicalProject
----------------------filter((rnk < 11))
------------------------PhysicalWindow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ PhysicalResultSink
------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((my_customers.c_current_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 c_current_addr_sk->[ca_address_sk]
--------------------------------------------------PhysicalProject
----------------------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF3 RF4 RF5
--------------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
--------------------------------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------------------------------PhysicalProject
------------------------------------------------------hashAgg[GLOBAL]
--------------------------------------------------------PhysicalDistribute[DistributionSpecHash]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ PhysicalResultSink
----------------------hashAgg[LOCAL]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_bill_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF11 ws_bill_addr_sk->[ca_address_sk]
----------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------PhysicalProject
--------------------------------filter((customer_address.ca_gmt_offset = -6.00))
----------------------------------PhysicalOlapScan[customer_address] apply RFs: RF11
----------------------------PhysicalProject
------------------------------filter((customer_address.ca_gmt_offset = -6.00))
--------------------------------PhysicalOlapScan[customer_address] apply RFs: RF11
----------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF10 i_item_sk->[ws_item_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ PhysicalResultSink
----------------hashAgg[LOCAL]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN] hashCondition=((a.ca_address_sk = c.c_current_addr_sk)) otherCondition=() build RFs:RF5 c_current_addr_sk->[ca_address_sk]
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------PhysicalProject
--------------------------PhysicalOlapScan[customer_address] apply RFs: RF5
----------------------PhysicalProject
------------------------PhysicalOlapScan[customer_address] apply RFs: RF5
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN] hashCondition=((c.c_customer_sk = s.ss_customer_sk)) otherCondition=() build RFs:RF4 ss_customer_sk->[c_customer_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ PhysicalResultSink
--------------hashAgg[LOCAL]
----------------PhysicalProject
------------------hashJoin[INNER_JOIN] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF10 c_current_addr_sk->[ca_address_sk]
--------------------PhysicalDistribute[DistributionSpecHash]
----------------------PhysicalProject
------------------------filter((customer_address.ca_gmt_offset = -7.00))
--------------------------PhysicalOlapScan[customer_address] apply RFs: RF10
--------------------PhysicalProject
----------------------filter((customer_address.ca_gmt_offset = -7.00))
------------------------PhysicalOlapScan[customer_address] apply RFs: RF10
--------------------PhysicalDistribute[DistributionSpecHash]
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF9 ss_customer_sk->[c_customer_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ PhysicalResultSink
------PhysicalTopN[LOCAL_SORT]
--------PhysicalProject
----------hashJoin[INNER_JOIN] hashCondition=((customer.c_current_addr_sk = current_addr.ca_address_sk)) otherCondition=(( not (ca_city = bought_city))) build RFs:RF5 c_current_addr_sk->[ca_address_sk]
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalProject
----------------PhysicalOlapScan[customer_address] apply RFs: RF5
------------PhysicalProject
--------------PhysicalOlapScan[customer_address] apply RFs: RF5
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN] hashCondition=((dn.ss_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF4 ss_customer_sk->[c_customer_sk]
Expand All @@ -20,9 +19,8 @@ PhysicalResultSink
----------------------hashAgg[LOCAL]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 ss_addr_sk->[ca_address_sk]
----------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------PhysicalProject
--------------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
----------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk)) otherCondition=() build RFs:RF2 hd_demo_sk->[ss_hdemo_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ PhysicalResultSink
----------------------------------hashAgg[LOCAL]
------------------------------------PhysicalProject
--------------------------------------hashJoin[INNER_JOIN] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF0 c_current_addr_sk->[ca_address_sk]
----------------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------------PhysicalProject
--------------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF0
----------------------------------------PhysicalProject
------------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF0
----------------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------------PhysicalProject
--------------------------------------------filter((customer.c_preferred_cust_flag = 'Y'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ PhysicalResultSink
----------------------------PhysicalDistribute[DistributionSpecReplicated]
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF2 c_current_addr_sk->[ca_address_sk]
----------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------PhysicalProject
--------------------------------------filter((customer_address.ca_gmt_offset = -7.00))
----------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF2
----------------------------------PhysicalProject
------------------------------------filter((customer_address.ca_gmt_offset = -7.00))
--------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF2
----------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------PhysicalProject
--------------------------------------hashJoin[INNER_JOIN] hashCondition=((household_demographics.hd_demo_sk = customer.c_current_hdemo_sk)) otherCondition=() build RFs:RF1 hd_demo_sk->[c_current_hdemo_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------------hashAgg[GLOBAL]
----------------hashAgg[LOCAL]
------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=() build RFs:RF6 ws_order_number->[wr_order_number,ws_order_number]
--------------------PhysicalDistribute[DistributionSpecHash]
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number]
--------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF5 RF6
--------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[web_returns] apply RFs: RF6
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number]
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF5 RF6
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------PhysicalOlapScan[web_returns] apply RFs: RF6
--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF7 ws_order_number->[ws_order_number,ws_order_number]
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ PhysicalResultSink
------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_web_page_sk = web_page.wp_web_page_sk)) otherCondition=() build RFs:RF9 ws_web_page_sk->[wp_web_page_sk]
--------------------PhysicalProject
----------------------PhysicalOlapScan[web_page] apply RFs: RF9
--------------------PhysicalDistribute[DistributionSpecHash]
--------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN] hashCondition=((reason.r_reason_sk = web_returns.wr_reason_sk)) otherCondition=() build RFs:RF8 r_reason_sk->[wr_reason_sk]
--------------------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ PhysicalResultSink
--------------------------------------------PhysicalProject
----------------------------------------------filter((((promotion.p_channel_dmail = 'Y') OR (promotion.p_channel_email = 'Y')) OR (promotion.p_channel_tv = 'Y')))
------------------------------------------------PhysicalOlapScan[promotion] apply RFs: RF6
--------------------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------------------------------PhysicalProject
------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=() build RFs:RF5 s_store_sk->[ss_store_sk]
--------------------------------------------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ PhysicalResultSink
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_web_page_sk = web_page.wp_web_page_sk)) otherCondition=() build RFs:RF7 ws_web_page_sk->[wp_web_page_sk]
------------------------------PhysicalProject
--------------------------------PhysicalOlapScan[web_page] apply RFs: RF7
------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[web_page] apply RFs: RF7
------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------PhysicalProject
----------------------------------hashJoin[INNER_JOIN] hashCondition=((cd1.cd_education_status = cd2.cd_education_status) and (cd1.cd_marital_status = cd2.cd_marital_status) and (cd2.cd_demo_sk = web_returns.wr_returning_cdemo_sk)) otherCondition=() build RFs:RF4 wr_returning_cdemo_sk->[cd_demo_sk];RF5 cd_marital_status->[cd_marital_status];RF6 cd_education_status->[cd_education_status]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ PhysicalResultSink
--------------------------------------------PhysicalProject
----------------------------------------------filter((((promotion.p_channel_dmail = 'Y') OR (promotion.p_channel_email = 'Y')) OR (promotion.p_channel_tv = 'Y')))
------------------------------------------------PhysicalOlapScan[promotion] apply RFs: RF6
--------------------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------------------------------PhysicalProject
------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=() build RFs:RF5 s_store_sk->[ss_store_sk]
--------------------------------------------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ PhysicalResultSink
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_web_page_sk = web_page.wp_web_page_sk)) otherCondition=() build RFs:RF7 ws_web_page_sk->[wp_web_page_sk]
------------------------------PhysicalProject
--------------------------------PhysicalOlapScan[web_page] apply RFs: RF7
------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[web_page] apply RFs: RF7
------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------PhysicalProject
----------------------------------hashJoin[INNER_JOIN] hashCondition=((cd1.cd_education_status = cd2.cd_education_status) and (cd1.cd_marital_status = cd2.cd_marital_status) and (cd2.cd_demo_sk = web_returns.wr_returning_cdemo_sk)) otherCondition=() build RFs:RF4 wr_returning_cdemo_sk->[cd_demo_sk];RF5 cd_marital_status->[cd_marital_status];RF6 cd_education_status->[cd_education_status]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ suite("test_bucket_shuffle_join") {
) ENGINE=OLAP
DUPLICATE KEY(`id`,`name`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`,`name`) BUCKETS 4
DISTRIBUTED BY HASH(`id`,`name`) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
Expand All @@ -46,7 +46,7 @@ suite("test_bucket_shuffle_join") {
) ENGINE=OLAP
DUPLICATE KEY(`id`,`name`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`,`name`) BUCKETS 5
DISTRIBUTED BY HASH(`id`,`name`) BUCKETS 40
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
Expand All @@ -63,7 +63,7 @@ suite("test_bucket_shuffle_join") {
) ENGINE=OLAP
DUPLICATE KEY(`id`,`name`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`,`name`) BUCKETS 6
DISTRIBUTED BY HASH(`id`,`name`) BUCKETS 48
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
Expand Down Expand Up @@ -99,7 +99,7 @@ suite("test_bucket_shuffle_join") {
sql """
create table shuffle_join_t1 ( a varchar(10) not null )
ENGINE=OLAP
DISTRIBUTED BY HASH(a) BUCKETS 5
DISTRIBUTED BY HASH(a) BUCKETS 40
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
Expand All @@ -110,7 +110,7 @@ suite("test_bucket_shuffle_join") {
sql """
create table shuffle_join_t2 ( a varchar(5) not null, b string not null, c char(3) not null )
ENGINE=OLAP
DISTRIBUTED BY HASH(a) BUCKETS 5
DISTRIBUTED BY HASH(a) BUCKETS 40
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ suite("multi_leading") {
sql "set ignore_shape_nodes='PhysicalProject'"
sql 'set enable_fallback_to_original_planner=false'
sql 'set runtime_filter_mode=OFF'
sql 'set enable_bucket_shuffle_join=false'

// create tables
sql """drop table if exists t1;"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ suite("query13") {
sql 'set enable_runtime_filter_prune=false'
sql 'set runtime_filter_type=8'
sql 'set dump_nereids_memo=false'
sql 'set enable_bucket_shuffle_downgrade=true'
sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
def ds = """select avg(ss_quantity)
,avg(ss_ext_sales_price)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ suite("query19") {
sql 'set enable_runtime_filter_prune=false'
sql 'set runtime_filter_type=8'
sql 'set dump_nereids_memo=false'
sql 'set enable_bucket_shuffle_downgrade=true'
sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
def ds = """select i_brand_id brand_id, i_brand brand, i_manufact_id, i_manufact,
sum(ss_ext_sales_price) ext_price
Expand Down
Loading

0 comments on commit 94ba2e2

Please sign in to comment.