Skip to content

Commit

Permalink
[opt](nereids) recover adoptive bucket shuffle (#36784)
Browse files Browse the repository at this point in the history
1. Recover adoptive bucket shuffle and re-using the
enable_bucket_shuffle_join to control whether to enable it, default
remains true.
2. Remove enable_bucket_shuffle_downgrade option.

Co-authored-by: zhongjian.xzj <zhongjian.xzj@zhongjianxzjdeMacBook-Pro.local>
  • Loading branch information
xzj7019 and zhongjian.xzj authored Jul 3, 2024
1 parent d81ed7b commit 4e448b6
Show file tree
Hide file tree
Showing 36 changed files with 78 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
Expand All @@ -39,6 +40,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
Expand Down Expand Up @@ -199,12 +201,53 @@ public Boolean visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Void c
return true;
}

private boolean isBucketShuffleDownGrade(DistributionSpecHash srcSideSpec) {
boolean isBucketShuffleDownGrade = ConnectContext.get().getSessionVariable().isEnableBucketShuffleDownGrade();
if (!isBucketShuffleDownGrade) {
private boolean isBucketShuffleDownGrade(Plan oneSidePlan, DistributionSpecHash otherSideSpec) {
// improper to do bucket shuffle join:
// oneSide:
// - base table and tablets' number is small enough (< paraInstanceNum)
// otherSide:
// - ShuffleType.EXECUTION_BUCKETED
boolean isEnableBucketShuffleJoin = ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin();
if (!isEnableBucketShuffleJoin) {
return true;
} else if (otherSideSpec.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
|| !(oneSidePlan instanceof GroupPlan)) {
return false;
} else {
return srcSideSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED;
PhysicalOlapScan candidate = findDownGradeBucketShuffleCandidate((GroupPlan) oneSidePlan);
if (candidate == null || candidate.getTable() == null
|| candidate.getTable().getDefaultDistributionInfo() == null) {
return false;
} else {
int prunedPartNum = candidate.getSelectedPartitionIds().size();
int bucketNum = candidate.getTable().getDefaultDistributionInfo().getBucketNum();
int totalBucketNum = prunedPartNum * bucketNum;
int backEndNum = Math.max(1, ConnectContext.get().getEnv().getClusterInfo()
.getBackendsNumber(true));
int paraNum = Math.max(1, ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
int totalParaNum = Math.min(10, backEndNum * paraNum);
return totalBucketNum < totalParaNum;
}
}
}

private PhysicalOlapScan findDownGradeBucketShuffleCandidate(GroupPlan groupPlan) {
if (groupPlan == null || groupPlan.getGroup() == null
|| groupPlan.getGroup().getPhysicalExpressions().isEmpty()) {
return null;
} else {
Plan targetPlan = groupPlan.getGroup().getPhysicalExpressions().get(0).getPlan();
while (targetPlan != null
&& (targetPlan instanceof PhysicalProject || targetPlan instanceof PhysicalFilter)
&& !((GroupPlan) targetPlan.child(0)).getGroup().getPhysicalExpressions().isEmpty()) {
targetPlan = ((GroupPlan) targetPlan.child(0)).getGroup()
.getPhysicalExpressions().get(0).getPlan();
}
if (targetPlan == null || !(targetPlan instanceof PhysicalOlapScan)) {
return null;
} else {
return (PhysicalOlapScan) targetPlan;
}
}
}

Expand Down Expand Up @@ -241,6 +284,9 @@ public Boolean visitPhysicalHashJoin(
throw new RuntimeException("should not come here, two children of shuffle join should all be shuffle");
}

Plan leftChild = hashJoin.child(0);
Plan rightChild = hashJoin.child(1);

DistributionSpecHash leftHashSpec = (DistributionSpecHash) leftDistributionSpec;
DistributionSpecHash rightHashSpec = (DistributionSpecHash) rightDistributionSpec;

Expand All @@ -261,7 +307,7 @@ public Boolean visitPhysicalHashJoin(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (isBucketShuffleDownGrade(rightHashSpec)) {
} else if (isBucketShuffleDownGrade(leftChild, rightHashSpec)) {
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
Expand All @@ -270,7 +316,7 @@ public Boolean visitPhysicalHashJoin(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (isBucketShuffleDownGrade(leftHashSpec)) {
} else if (isBucketShuffleDownGrade(rightChild, leftHashSpec)) {
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,6 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_AGG_STATE = "enable_agg_state";

public static final String ENABLE_BUCKET_SHUFFLE_DOWNGRADE = "enable_bucket_shuffle_downgrade";

public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline";

public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT = "enable_single_distinct_column_opt";
Expand Down Expand Up @@ -855,9 +853,6 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN, varType = VariableAnnotation.EXPERIMENTAL_ONLINE)
public boolean enableBucketShuffleJoin = true;

@VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_DOWNGRADE, needForward = true)
public boolean enableBucketShuffleDownGrade = false;

/**
* explode function row count enlarge factor.
*/
Expand Down Expand Up @@ -2577,10 +2572,6 @@ public boolean isEnableBucketShuffleJoin() {
return enableBucketShuffleJoin;
}

public boolean isEnableBucketShuffleDownGrade() {
return enableBucketShuffleDownGrade;
}

public boolean isEnableOdbcTransaction() {
return enableOdbcTransaction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PhysicalResultSink
----------------PhysicalProject
------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=(( not (substring(ca_zip, 1, 5) = substring(s_zip, 1, 5)))) build RFs:RF4 s_store_sk->[ss_store_sk]
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN shuffle] hashCondition=((customer.c_current_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 c_current_addr_sk->[ca_address_sk]
----------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((customer.c_current_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 c_current_addr_sk->[ca_address_sk]
------------------------PhysicalProject
--------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
------------------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ PhysicalResultSink
--------PhysicalProject
----------hashJoin[INNER_JOIN shuffle] hashCondition=((asceding.rnk = descending.rnk)) otherCondition=()
------------PhysicalProject
--------------hashJoin[INNER_JOIN broadcast] hashCondition=((i1.i_item_sk = asceding.item_sk)) otherCondition=() build RFs:RF1 item_sk->[i_item_sk]
--------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((i1.i_item_sk = asceding.item_sk)) otherCondition=() build RFs:RF1 item_sk->[i_item_sk]
----------------PhysicalProject
------------------PhysicalOlapScan[item] apply RFs: RF1
----------------PhysicalProject
Expand Down Expand Up @@ -37,7 +37,7 @@ PhysicalResultSink
--------------------------------------------------filter((store_sales.ss_store_sk = 4) and ss_hdemo_sk IS NULL)
----------------------------------------------------PhysicalOlapScan[store_sales]
------------PhysicalProject
--------------hashJoin[INNER_JOIN broadcast] hashCondition=((i2.i_item_sk = descending.item_sk)) otherCondition=() build RFs:RF0 item_sk->[i_item_sk]
--------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((i2.i_item_sk = descending.item_sk)) otherCondition=() build RFs:RF0 item_sk->[i_item_sk]
----------------PhysicalProject
------------------PhysicalOlapScan[item] apply RFs: RF0
----------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ PhysicalResultSink
----------------------------------------PhysicalProject
------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((customer_address.ca_county = store.s_county) and (customer_address.ca_state = store.s_state)) otherCondition=() build RFs:RF4 s_county->[ca_county];RF5 s_state->[ca_state]
--------------------------------------------PhysicalProject
----------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((my_customers.c_current_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 c_current_addr_sk->[ca_address_sk]
----------------------------------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((my_customers.c_current_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 c_current_addr_sk->[ca_address_sk]
------------------------------------------------PhysicalProject
--------------------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF3 RF4 RF5
------------------------------------------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ PhysicalResultSink
--------------------PhysicalDistribute[DistributionSpecHash]
----------------------hashAgg[LOCAL]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN shuffle] hashCondition=((web_sales.ws_bill_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF11 ws_bill_addr_sk->[ca_address_sk]
--------------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((web_sales.ws_bill_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF11 ws_bill_addr_sk->[ca_address_sk]
----------------------------PhysicalProject
------------------------------filter((customer_address.ca_gmt_offset = -6.00))
--------------------------------PhysicalOlapScan[customer_address] apply RFs: RF11
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ PhysicalResultSink
--------------PhysicalDistribute[DistributionSpecHash]
----------------hashAgg[LOCAL]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN shuffle] hashCondition=((a.ca_address_sk = c.c_current_addr_sk)) otherCondition=() build RFs:RF5 c_current_addr_sk->[ca_address_sk]
--------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((a.ca_address_sk = c.c_current_addr_sk)) otherCondition=() build RFs:RF5 c_current_addr_sk->[ca_address_sk]
----------------------PhysicalProject
------------------------PhysicalOlapScan[customer_address] apply RFs: RF5
----------------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ PhysicalResultSink
----------PhysicalDistribute[DistributionSpecGather]
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN shuffle] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF10 c_current_addr_sk->[ca_address_sk]
----------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF10 c_current_addr_sk->[ca_address_sk]
------------------PhysicalProject
--------------------filter((customer_address.ca_gmt_offset = -7.00))
----------------------PhysicalOlapScan[customer_address] apply RFs: RF10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------PhysicalTopN[LOCAL_SORT]
--------PhysicalProject
----------hashJoin[INNER_JOIN shuffle] hashCondition=((customer.c_current_addr_sk = current_addr.ca_address_sk)) otherCondition=(( not (ca_city = bought_city))) build RFs:RF5 c_current_addr_sk->[ca_address_sk]
----------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((customer.c_current_addr_sk = current_addr.ca_address_sk)) otherCondition=(( not (ca_city = bought_city))) build RFs:RF5 c_current_addr_sk->[ca_address_sk]
------------PhysicalProject
--------------PhysicalOlapScan[customer_address] apply RFs: RF5
------------PhysicalProject
Expand All @@ -15,7 +15,7 @@ PhysicalResultSink
----------------PhysicalProject
------------------hashAgg[LOCAL]
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN shuffle] hashCondition=((store_sales.ss_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 ss_addr_sk->[ca_address_sk]
----------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((store_sales.ss_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 ss_addr_sk->[ca_address_sk]
------------------------PhysicalProject
--------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
------------------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ PhysicalResultSink
------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------hashAgg[LOCAL]
----------------------------------PhysicalProject
------------------------------------hashJoin[INNER_JOIN shuffle] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF0 c_current_addr_sk->[ca_address_sk]
------------------------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF0 c_current_addr_sk->[ca_address_sk]
--------------------------------------PhysicalProject
----------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF0
--------------------------------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ PhysicalResultSink
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[catalog_returns] apply RFs: RF3 RF4 RF5
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN shuffle] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF2 c_current_addr_sk->[ca_address_sk]
------------------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF2 c_current_addr_sk->[ca_address_sk]
--------------------------------PhysicalProject
----------------------------------filter((customer_address.ca_gmt_offset = -7.00))
------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
----------hashAgg[DISTINCT_LOCAL]
------------hashAgg[GLOBAL]
--------------hashAgg[LOCAL]
----------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=() build RFs:RF6 ws_order_number->[wr_order_number,ws_order_number]
----------------hashJoin[RIGHT_SEMI_JOIN colocated] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=() build RFs:RF6 ws_order_number->[wr_order_number,ws_order_number]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN shuffle] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number]
----------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF5 RF6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ PhysicalResultSink
------------PhysicalDistribute[DistributionSpecHash]
--------------hashAgg[LOCAL]
----------------PhysicalProject
------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((web_sales.ws_web_page_sk = web_page.wp_web_page_sk)) otherCondition=() build RFs:RF9 ws_web_page_sk->[wp_web_page_sk]
------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_web_page_sk = web_page.wp_web_page_sk)) otherCondition=() build RFs:RF9 ws_web_page_sk->[wp_web_page_sk]
--------------------PhysicalProject
----------------------PhysicalOlapScan[web_page] apply RFs: RF9
--------------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ PhysicalResultSink
--------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 1999))
----------------------------------PhysicalOlapScan[date_dim] apply RFs: RF7
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() build RFs:RF6 ss_promo_sk->[p_promo_sk]
--------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() build RFs:RF6 ss_promo_sk->[p_promo_sk]
----------------------------------PhysicalProject
------------------------------------filter((((promotion.p_channel_dmail = 'Y') OR (promotion.p_channel_email = 'Y')) OR (promotion.p_channel_tv = 'Y')))
--------------------------------------PhysicalOlapScan[promotion] apply RFs: RF6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ PhysicalResultSink
--------------------------filter((customer_address.ca_country = 'United States') and ca_state IN ('DE', 'FL', 'ID', 'IL', 'IN', 'MT', 'ND', 'OH', 'TX'))
----------------------------PhysicalOlapScan[customer_address] apply RFs: RF8
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((web_sales.ws_web_page_sk = web_page.wp_web_page_sk)) otherCondition=() build RFs:RF7 ws_web_page_sk->[wp_web_page_sk]
--------------------------hashJoin[INNER_JOIN shuffle] hashCondition=((web_sales.ws_web_page_sk = web_page.wp_web_page_sk)) otherCondition=() build RFs:RF7 ws_web_page_sk->[wp_web_page_sk]
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[web_page] apply RFs: RF7
----------------------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ PhysicalResultSink
--------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 1999))
----------------------------------PhysicalOlapScan[date_dim] apply RFs: RF7
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() build RFs:RF6 ss_promo_sk->[p_promo_sk]
--------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() build RFs:RF6 ss_promo_sk->[p_promo_sk]
----------------------------------PhysicalProject
------------------------------------filter((((promotion.p_channel_dmail = 'Y') OR (promotion.p_channel_email = 'Y')) OR (promotion.p_channel_tv = 'Y')))
--------------------------------------PhysicalOlapScan[promotion] apply RFs: RF6
Expand Down
Loading

0 comments on commit 4e448b6

Please sign in to comment.