Skip to content

Commit

Permalink
[opt](nereids) refine left semi/anti shortcut cost (#37060)
Browse files Browse the repository at this point in the history
Current left semi/anti join's cost doesn't consider be's short-cut opt,
this pr will refine this part of computing and distinguish the left and
right join costs.

---------

Co-authored-by: zhongjian.xzj <zhongjian.xzj@zhongjianxzjdeMacBook-Pro.local>
  • Loading branch information
xzj7019 and zhongjian.xzj authored Jul 3, 2024
1 parent e9500b3 commit d81ed7b
Show file tree
Hide file tree
Showing 19 changed files with 276 additions and 258 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,13 @@ public Cost visitPhysicalHashJoin(
leftRowCount + rightRowCount
);
}

double probeShortcutFactor = 1.0;
if (physicalHashJoin.getJoinType().isLeftSemiOrAntiJoin()
&& physicalHashJoin.getOtherJoinConjuncts().isEmpty()
&& physicalHashJoin.getMarkJoinConjuncts().isEmpty()) {
// left semi/anti has short-cut opt, add probe side factor for distinguishing from the right ones
probeShortcutFactor = context.getSessionVariable().getLeftSemiOrAntiProbeFactor();
}
if (context.isBroadcastJoin()) {
// compared with shuffle join, bc join will be taken a penalty for both build and probe side;
// currently we use the following factor as the penalty factor:
Expand All @@ -408,14 +414,16 @@ public Cost visitPhysicalHashJoin(
}
}
return CostV1.of(context.getSessionVariable(),
leftRowCount + rightRowCount * buildSideFactor + outputRowCount * probeSideFactor,
leftRowCount * probeShortcutFactor
+ rightRowCount * buildSideFactor
+ outputRowCount * probeSideFactor,
rightRowCount,
0
);
}
return CostV1.of(context.getSessionVariable(), leftRowCount + rightRowCount + outputRowCount,
rightRowCount,
0
return CostV1.of(context.getSessionVariable(),
leftRowCount * probeShortcutFactor + rightRowCount + outputRowCount,
rightRowCount, 0
);
}

Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ public class SessionVariable implements Serializable, Writable {

public static final String FORBID_UNKNOWN_COLUMN_STATS = "forbid_unknown_col_stats";
public static final String BROADCAST_RIGHT_TABLE_SCALE_FACTOR = "broadcast_right_table_scale_factor";
public static final String LEFT_SEMI_OR_ANTI_PROBE_FACTOR = "left_semi_or_anti_probe_factor";
public static final String BROADCAST_ROW_COUNT_LIMIT = "broadcast_row_count_limit";

// percentage of EXEC_MEM_LIMIT
Expand Down Expand Up @@ -1285,6 +1286,9 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
@VariableMgr.VarAttr(name = BROADCAST_RIGHT_TABLE_SCALE_FACTOR)
private double broadcastRightTableScaleFactor = 0.0;

@VariableMgr.VarAttr(name = LEFT_SEMI_OR_ANTI_PROBE_FACTOR)
private double leftSemiOrAntiProbeFactor = 0.1;

@VariableMgr.VarAttr(name = BROADCAST_ROW_COUNT_LIMIT, needForward = true)
private double broadcastRowCountLimit = 30000000;

Expand Down Expand Up @@ -2733,6 +2737,14 @@ public void setBroadcastRightTableScaleFactor(double broadcastRightTableScaleFac
this.broadcastRightTableScaleFactor = broadcastRightTableScaleFactor;
}

public double getLeftSemiOrAntiProbeFactor() {
return leftSemiOrAntiProbeFactor;
}

public void setLeftSemiOrAntiProbeFactor(double leftSemiOrAntiProbeFactor) {
this.leftSemiOrAntiProbeFactor = leftSemiOrAntiProbeFactor;
}

public double getBroadcastRowCountLimit() {
return broadcastRowCountLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,27 @@ PhysicalResultSink
----------hashAgg[GLOBAL]
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] hashCondition=((cs1.cs_order_number = cs2.cs_order_number)) otherCondition=(( not (cs_warehouse_sk = cs_warehouse_sk))) build RFs:RF4 cs_order_number->[cs_order_number]
----------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] hashCondition=((cs1.cs_order_number = cs2.cs_order_number)) otherCondition=(( not (cs_warehouse_sk = cs_warehouse_sk))) build RFs:RF3 cs_order_number->[cs_order_number]
------------------PhysicalProject
--------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4
------------------hashJoin[RIGHT_ANTI_JOIN shuffle] hashCondition=((cs1.cs_order_number = cr1.cr_order_number)) otherCondition=() build RFs:RF3 cs_order_number->[cr_order_number]
--------------------PhysicalProject
----------------------PhysicalOlapScan[catalog_returns] apply RFs: RF3
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN broadcast] hashCondition=((cs1.cs_call_center_sk = call_center.cc_call_center_sk)) otherCondition=() build RFs:RF2 cc_call_center_sk->[cs_call_center_sk]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((cs1.cs_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[cs_ship_date_sk]
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((cs1.cs_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF0 ca_address_sk->[cs_ship_addr_sk]
--------------------PhysicalOlapScan[catalog_sales] apply RFs: RF3
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN broadcast] hashCondition=((cs1.cs_call_center_sk = call_center.cc_call_center_sk)) otherCondition=() build RFs:RF2 cc_call_center_sk->[cs_call_center_sk]
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((cs1.cs_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[cs_ship_date_sk]
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((cs1.cs_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF0 ca_address_sk->[cs_ship_addr_sk]
------------------------------hashJoin[LEFT_ANTI_JOIN shuffle] hashCondition=((cs1.cs_order_number = cr1.cr_order_number)) otherCondition=()
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF0 RF1 RF2
--------------------------------PhysicalProject
----------------------------------filter((customer_address.ca_state = 'PA'))
------------------------------------PhysicalOlapScan[customer_address]
----------------------------PhysicalProject
------------------------------filter((date_dim.d_date <= '2002-05-31') and (date_dim.d_date >= '2002-04-01'))
--------------------------------PhysicalOlapScan[date_dim]
------------------------PhysicalProject
--------------------------filter((call_center.cc_county = 'Williamson County'))
----------------------------PhysicalOlapScan[call_center]
----------------------------------PhysicalOlapScan[catalog_returns]
------------------------------PhysicalProject
--------------------------------filter((customer_address.ca_state = 'PA'))
----------------------------------PhysicalOlapScan[customer_address]
--------------------------PhysicalProject
----------------------------filter((date_dim.d_date <= '2002-05-31') and (date_dim.d_date >= '2002-04-01'))
------------------------------PhysicalOlapScan[date_dim]
----------------------PhysicalProject
------------------------filter((call_center.cc_county = 'Williamson County'))
--------------------------PhysicalOlapScan[call_center]

Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,44 @@ PhysicalResultSink
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------hashAgg[LOCAL]
--------------------------PhysicalProject
----------------------------hashJoin[LEFT_ANTI_JOIN colocated] hashCondition=((store_returns.sr_ticket_number = store_sales.ss_ticket_number) and (store_sales.ss_item_sk = store_returns.sr_item_sk)) otherCondition=()
----------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF2 d_date_sk->[ss_sold_date_sk]
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF2 d_date_sk->[ss_sold_date_sk]
--------------------------------hashJoin[LEFT_ANTI_JOIN colocated] hashCondition=((store_returns.sr_ticket_number = store_sales.ss_ticket_number) and (store_sales.ss_item_sk = store_returns.sr_item_sk)) otherCondition=()
----------------------------------PhysicalProject
------------------------------------PhysicalOlapScan[store_sales] apply RFs: RF2
----------------------------------PhysicalProject
------------------------------------filter((date_dim.d_year = 1998))
--------------------------------------PhysicalOlapScan[date_dim]
------------------------------------PhysicalOlapScan[store_returns]
------------------------------PhysicalProject
--------------------------------PhysicalOlapScan[store_returns]
--------------------------------filter((date_dim.d_year = 1998))
----------------------------------PhysicalOlapScan[date_dim]
------------------PhysicalProject
--------------------hashAgg[GLOBAL]
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------hashAgg[LOCAL]
--------------------------PhysicalProject
----------------------------hashJoin[LEFT_ANTI_JOIN colocated] hashCondition=((web_returns.wr_order_number = web_sales.ws_order_number) and (web_sales.ws_item_sk = web_returns.wr_item_sk)) otherCondition=()
----------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[ws_sold_date_sk]
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[ws_sold_date_sk]
--------------------------------hashJoin[LEFT_ANTI_JOIN colocated] hashCondition=((web_returns.wr_order_number = web_sales.ws_order_number) and (web_sales.ws_item_sk = web_returns.wr_item_sk)) otherCondition=()
----------------------------------PhysicalProject
------------------------------------PhysicalOlapScan[web_sales] apply RFs: RF1
----------------------------------PhysicalProject
------------------------------------filter((date_dim.d_year = 1998))
--------------------------------------PhysicalOlapScan[date_dim]
------------------------------------PhysicalOlapScan[web_returns]
------------------------------PhysicalProject
--------------------------------PhysicalOlapScan[web_returns]
--------------------------------filter((date_dim.d_year = 1998))
----------------------------------PhysicalOlapScan[date_dim]
--------------PhysicalProject
----------------hashAgg[GLOBAL]
------------------PhysicalDistribute[DistributionSpecHash]
--------------------hashAgg[LOCAL]
----------------------PhysicalProject
------------------------hashJoin[LEFT_ANTI_JOIN colocated] hashCondition=((catalog_returns.cr_order_number = catalog_sales.cs_order_number) and (catalog_sales.cs_item_sk = catalog_returns.cr_item_sk)) otherCondition=()
------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF0 d_date_sk->[cs_sold_date_sk]
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF0 d_date_sk->[cs_sold_date_sk]
----------------------------hashJoin[LEFT_ANTI_JOIN colocated] hashCondition=((catalog_returns.cr_order_number = catalog_sales.cs_order_number) and (catalog_sales.cs_item_sk = catalog_returns.cr_item_sk)) otherCondition=()
------------------------------PhysicalProject
--------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF0
------------------------------PhysicalProject
--------------------------------filter((date_dim.d_year = 1998))
----------------------------------PhysicalOlapScan[date_dim]
--------------------------------PhysicalOlapScan[catalog_returns]
--------------------------PhysicalProject
----------------------------PhysicalOlapScan[catalog_returns]
----------------------------filter((date_dim.d_year = 1998))
------------------------------PhysicalOlapScan[date_dim]

Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,27 @@ PhysicalResultSink
----------hashAgg[GLOBAL]
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] hashCondition=((ws1.ws_order_number = ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = ws_warehouse_sk))) build RFs:RF4 ws_order_number->[ws_order_number]
----------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] hashCondition=((ws1.ws_order_number = ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = ws_warehouse_sk))) build RFs:RF3 ws_order_number->[ws_order_number]
------------------PhysicalProject
--------------------PhysicalOlapScan[web_sales] apply RFs: RF4
------------------hashJoin[RIGHT_ANTI_JOIN shuffle] hashCondition=((ws1.ws_order_number = wr1.wr_order_number)) otherCondition=() build RFs:RF3 ws_order_number->[wr_order_number]
--------------------PhysicalProject
----------------------PhysicalOlapScan[web_returns] apply RFs: RF3
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN broadcast] hashCondition=((ws1.ws_web_site_sk = web_site.web_site_sk)) otherCondition=() build RFs:RF2 web_site_sk->[ws_web_site_sk]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[ws_ship_date_sk]
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF0 ca_address_sk->[ws_ship_addr_sk]
--------------------PhysicalOlapScan[web_sales] apply RFs: RF3
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN broadcast] hashCondition=((ws1.ws_web_site_sk = web_site.web_site_sk)) otherCondition=() build RFs:RF2 web_site_sk->[ws_web_site_sk]
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[ws_ship_date_sk]
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF0 ca_address_sk->[ws_ship_addr_sk]
------------------------------hashJoin[LEFT_ANTI_JOIN shuffle] hashCondition=((ws1.ws_order_number = wr1.wr_order_number)) otherCondition=()
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF1 RF2
--------------------------------PhysicalProject
----------------------------------filter((customer_address.ca_state = 'OK'))
------------------------------------PhysicalOlapScan[customer_address]
----------------------------PhysicalProject
------------------------------filter((date_dim.d_date <= '2002-06-30') and (date_dim.d_date >= '2002-05-01'))
--------------------------------PhysicalOlapScan[date_dim]
------------------------PhysicalProject
--------------------------filter((web_site.web_company_name = 'pri'))
----------------------------PhysicalOlapScan[web_site]
----------------------------------PhysicalOlapScan[web_returns]
------------------------------PhysicalProject
--------------------------------filter((customer_address.ca_state = 'OK'))
----------------------------------PhysicalOlapScan[customer_address]
--------------------------PhysicalProject
----------------------------filter((date_dim.d_date <= '2002-06-30') and (date_dim.d_date >= '2002-05-01'))
------------------------------PhysicalOlapScan[date_dim]
----------------------PhysicalProject
------------------------filter((web_site.web_company_name = 'pri'))
--------------------------PhysicalOlapScan[web_site]

Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,13 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------------hashAgg[LOCAL]
------------------------------------PhysicalProject
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF18 d_date_sk->[ws_sold_date_sk]
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF17 d_date_sk->[ws_sold_date_sk]
----------------------------------------PhysicalProject
------------------------------------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=()
--------------------------------------------hashJoin[RIGHT_SEMI_JOIN shuffle] hashCondition=((web_sales.ws_item_sk = cross_items.ss_item_sk)) otherCondition=()
----------------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
--------------------------------------------hashJoin[LEFT_SEMI_JOIN shuffle] hashCondition=((web_sales.ws_item_sk = cross_items.ss_item_sk)) otherCondition=()
----------------------------------------------PhysicalProject
------------------------------------------------PhysicalOlapScan[web_sales] apply RFs: RF18
------------------------------------------------PhysicalOlapScan[web_sales] apply RFs: RF17
----------------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
--------------------------------------------PhysicalProject
----------------------------------------------PhysicalOlapScan[item]
----------------------------------------PhysicalProject
Expand Down
Loading

0 comments on commit d81ed7b

Please sign in to comment.