Skip to content

Commit

Permalink
[opt](nereids) enable runtime filter use cte as target (#40815)
Browse files Browse the repository at this point in the history
## Proposed changes
1. remove some unused maps in RuntimeFilterContext
2. let runtime filter use cte as target

Issue Number: close #xxx

<!--Describe your changes.-->
  • Loading branch information
englefly authored and dataroaring committed Oct 5, 2024
1 parent 80226fe commit 0fec418
Show file tree
Hide file tree
Showing 155 changed files with 1,043 additions and 1,111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1253,9 +1253,6 @@ public PlanFragment visitPhysicalCTEProducer(PhysicalCTEProducer<? extends Plan>
multiCastPlanFragment.setOutputExprs(outputs);
context.getCteProduceFragments().put(cteId, multiCastPlanFragment);
context.getCteProduceMap().put(cteId, cteProducer);
if (context.getRuntimeTranslator().isPresent()) {
context.getRuntimeTranslator().get().getContext().getCteProduceMap().put(cteId, cteProducer);
}
context.getPlanFragments().add(multiCastPlanFragment);
return child;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

import org.apache.doris.analysis.SlotRef;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.EqualPredicate;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
Expand Down Expand Up @@ -118,11 +116,6 @@ public boolean equals(Object other) {

private final Map<Plan, EffectiveSrcType> effectiveSrcNodes = Maps.newHashMap();

private final Map<CTEId, PhysicalCTEProducer> cteProducerMap = Maps.newLinkedHashMap();

// cte whose runtime filter has been extracted
private final Set<CTEId> processedCTE = Sets.newHashSet();

private final SessionVariable sessionVariable;

private final FilterSizeLimits limits;
Expand Down Expand Up @@ -152,6 +145,7 @@ public ExpandRF(AbstractPhysicalJoin buildNode, PhysicalRelation srcNode,
this.srcNode = srcNode;
this.target1 = target1;
this.target2 = target2;
this.equal = equal;
}
}

Expand All @@ -160,10 +154,6 @@ public RuntimeFilterContext(SessionVariable sessionVariable) {
this.limits = new FilterSizeLimits(sessionVariable);
}

public void setRelationsUsedByPlan(Plan plan, Set<PhysicalRelation> relations) {
relationsUsedByPlan.put(plan, relations);
}

/**
* return true, if the relation is in the subtree
*/
Expand All @@ -185,14 +175,6 @@ public FilterSizeLimits getLimits() {
return limits;
}

public Map<CTEId, PhysicalCTEProducer> getCteProduceMap() {
return cteProducerMap;
}

public Set<CTEId> getProcessedCTE() {
return processedCTE;
}

public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) {
Preconditions.checkArgument(filter.getTargetSlots().stream().anyMatch(expr -> expr.getExprId() == id));
this.targetExprIdToFilter.computeIfAbsent(id, k -> Lists.newArrayList()).add(filter);
Expand Down

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions regression-test/data/nereids_hint_tpcds_p0/shape/query1.out
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
------PhysicalDistribute[DistributionSpecGather]
--------PhysicalTopN[LOCAL_SORT]
----------PhysicalProject
------------hashJoin[INNER_JOIN broadcast] hashCondition=((ctr1.ctr_customer_sk = customer.c_customer_sk)) otherCondition=()
------------hashJoin[INNER_JOIN broadcast] hashCondition=((ctr1.ctr_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF3 ctr_customer_sk->[c_customer_sk]
--------------PhysicalProject
----------------PhysicalOlapScan[customer]
----------------PhysicalOlapScan[customer] apply RFs: RF3
--------------PhysicalProject
----------------hashJoin[INNER_JOIN broadcast] hashCondition=((ctr1.ctr_store_sk = ctr2.ctr_store_sk)) otherCondition=((cast(ctr_total_return as DOUBLE) > cast((avg(cast(ctr_total_return as DECIMALV3(38, 4))) * 1.2) as DOUBLE)))
----------------hashJoin[INNER_JOIN broadcast] hashCondition=((ctr1.ctr_store_sk = ctr2.ctr_store_sk)) otherCondition=((cast(ctr_total_return as DOUBLE) > cast((avg(cast(ctr_total_return as DECIMALV3(38, 4))) * 1.2) as DOUBLE))) build RFs:RF2 ctr_store_sk->[ctr_store_sk,s_store_sk]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN shuffle] hashCondition=((store.s_store_sk = ctr1.ctr_store_sk)) otherCondition=() build RFs:RF1 s_store_sk->[ctr_store_sk]
----------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF1
----------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF1 RF2
----------------------PhysicalProject
------------------------filter((store.s_state = 'TN'))
--------------------------PhysicalOlapScan[store]
--------------------------PhysicalOlapScan[store] apply RFs: RF2
------------------hashAgg[GLOBAL]
--------------------PhysicalDistribute[DistributionSpecHash]
----------------------hashAgg[LOCAL]
Expand Down
12 changes: 6 additions & 6 deletions regression-test/data/nereids_hint_tpcds_p0/shape/query11.out
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
------PhysicalDistribute[DistributionSpecGather]
--------PhysicalTopN[LOCAL_SORT]
----------PhysicalProject
------------hashJoin[INNER_JOIN shuffleBucket] hashCondition=((t_s_firstyear.customer_id = t_w_secyear.customer_id)) otherCondition=((if((year_total > 0.00), (cast(year_total as DECIMALV3(38, 8)) / year_total), 0.000000) > if((year_total > 0.00), (cast(year_total as DECIMALV3(38, 8)) / year_total), 0.000000)))
------------hashJoin[INNER_JOIN shuffleBucket] hashCondition=((t_s_firstyear.customer_id = t_w_secyear.customer_id)) otherCondition=((if((year_total > 0.00), (cast(year_total as DECIMALV3(38, 8)) / year_total), 0.000000) > if((year_total > 0.00), (cast(year_total as DECIMALV3(38, 8)) / year_total), 0.000000))) build RFs:RF5 customer_id->[customer_id]
--------------PhysicalProject
----------------filter((t_w_secyear.dyear = 1999) and (t_w_secyear.sale_type = 'w'))
------------------PhysicalCteConsumer ( cteId=CTEId#0 )
------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF5
--------------PhysicalProject
----------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((t_s_firstyear.customer_id = t_w_firstyear.customer_id)) otherCondition=()
------------------hashJoin[INNER_JOIN shuffle] hashCondition=((t_s_secyear.customer_id = t_s_firstyear.customer_id)) otherCondition=()
----------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((t_s_firstyear.customer_id = t_w_firstyear.customer_id)) otherCondition=() build RFs:RF4 customer_id->[customer_id,customer_id]
------------------hashJoin[INNER_JOIN shuffle] hashCondition=((t_s_secyear.customer_id = t_s_firstyear.customer_id)) otherCondition=() build RFs:RF3 customer_id->[customer_id]
--------------------PhysicalProject
----------------------filter((t_s_secyear.dyear = 1999) and (t_s_secyear.sale_type = 's'))
------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF3 RF4
--------------------PhysicalProject
----------------------filter((t_s_firstyear.dyear = 1998) and (t_s_firstyear.sale_type = 's') and (t_s_firstyear.year_total > 0.00))
------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF4
------------------PhysicalProject
--------------------filter((t_w_firstyear.dyear = 1998) and (t_w_firstyear.sale_type = 'w') and (t_w_firstyear.year_total > 0.00))
----------------------PhysicalCteConsumer ( cteId=CTEId#0 )
Expand Down
28 changes: 14 additions & 14 deletions regression-test/data/nereids_hint_tpcds_p0/shape/query14.out
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,16 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------------hashAgg[LOCAL]
------------------------------------PhysicalProject
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF13 i_item_sk->[ss_item_sk,ss_item_sk]
----------------------------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = cross_items.ss_item_sk)) otherCondition=()
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF14 i_item_sk->[ss_item_sk,ss_item_sk]
----------------------------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = cross_items.ss_item_sk)) otherCondition=() build RFs:RF13 ss_item_sk->[ss_item_sk]
------------------------------------------PhysicalProject
--------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF12 d_date_sk->[ss_sold_date_sk]
----------------------------------------------PhysicalProject
------------------------------------------------PhysicalOlapScan[store_sales] apply RFs: RF12 RF13
------------------------------------------------PhysicalOlapScan[store_sales] apply RFs: RF12 RF13 RF14
----------------------------------------------PhysicalProject
------------------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 2001))
--------------------------------------------------PhysicalOlapScan[date_dim]
------------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF13
------------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF14
----------------------------------------PhysicalProject
------------------------------------------PhysicalOlapScan[item]
----------------------------PhysicalProject
Expand All @@ -120,16 +120,16 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------------hashAgg[LOCAL]
------------------------------------PhysicalProject
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF15 i_item_sk->[cs_item_sk,ss_item_sk]
----------------------------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((catalog_sales.cs_item_sk = cross_items.ss_item_sk)) otherCondition=()
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF17 i_item_sk->[cs_item_sk,ss_item_sk]
----------------------------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((catalog_sales.cs_item_sk = cross_items.ss_item_sk)) otherCondition=() build RFs:RF16 ss_item_sk->[cs_item_sk]
------------------------------------------PhysicalProject
--------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF14 d_date_sk->[cs_sold_date_sk]
--------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF15 d_date_sk->[cs_sold_date_sk]
----------------------------------------------PhysicalProject
------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF14 RF15
------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF15 RF16 RF17
----------------------------------------------PhysicalProject
------------------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 2001))
--------------------------------------------------PhysicalOlapScan[date_dim]
------------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF15
------------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF17
----------------------------------------PhysicalProject
------------------------------------------PhysicalOlapScan[item]
----------------------------PhysicalProject
Expand All @@ -143,16 +143,16 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------------hashAgg[LOCAL]
------------------------------------PhysicalProject
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF17 i_item_sk->[ss_item_sk,ws_item_sk]
----------------------------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((web_sales.ws_item_sk = cross_items.ss_item_sk)) otherCondition=()
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF20 i_item_sk->[ss_item_sk,ws_item_sk]
----------------------------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((web_sales.ws_item_sk = cross_items.ss_item_sk)) otherCondition=() build RFs:RF19 ss_item_sk->[ws_item_sk]
------------------------------------------PhysicalProject
--------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF16 d_date_sk->[ws_sold_date_sk]
--------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF18 d_date_sk->[ws_sold_date_sk]
----------------------------------------------PhysicalProject
------------------------------------------------PhysicalOlapScan[web_sales] apply RFs: RF16 RF17
------------------------------------------------PhysicalOlapScan[web_sales] apply RFs: RF18 RF19 RF20
----------------------------------------------PhysicalProject
------------------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 2001))
--------------------------------------------------PhysicalOlapScan[date_dim]
------------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF17
------------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF20
----------------------------------------PhysicalProject
------------------------------------------PhysicalOlapScan[item]
----------------------------PhysicalProject
Expand Down
18 changes: 9 additions & 9 deletions regression-test/data/nereids_hint_tpcds_p0/shape/query23.out
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,27 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
------------hashAgg[LOCAL]
--------------PhysicalUnion
----------------PhysicalProject
------------------hashJoin[RIGHT_SEMI_JOIN shuffle] hashCondition=((catalog_sales.cs_item_sk = frequent_ss_items.item_sk)) otherCondition=() build RFs:RF4 cs_item_sk->[item_sk]
--------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF4
------------------hashJoin[RIGHT_SEMI_JOIN shuffle] hashCondition=((catalog_sales.cs_item_sk = frequent_ss_items.item_sk)) otherCondition=() build RFs:RF5 cs_item_sk->[item_sk]
--------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF5
--------------------PhysicalProject
----------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((catalog_sales.cs_bill_customer_sk = best_ss_customer.c_customer_sk)) otherCondition=()
----------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((catalog_sales.cs_bill_customer_sk = best_ss_customer.c_customer_sk)) otherCondition=() build RFs:RF4 c_customer_sk->[cs_bill_customer_sk]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF3 d_date_sk->[cs_sold_date_sk]
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF3
------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF3 RF4
----------------------------PhysicalProject
------------------------------filter((date_dim.d_moy = 7) and (date_dim.d_year = 2000))
--------------------------------PhysicalOlapScan[date_dim]
------------------------PhysicalCteConsumer ( cteId=CTEId#2 )
----------------PhysicalProject
------------------hashJoin[RIGHT_SEMI_JOIN shuffle] hashCondition=((web_sales.ws_item_sk = frequent_ss_items.item_sk)) otherCondition=() build RFs:RF6 ws_item_sk->[item_sk]
--------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF6
------------------hashJoin[RIGHT_SEMI_JOIN shuffle] hashCondition=((web_sales.ws_item_sk = frequent_ss_items.item_sk)) otherCondition=() build RFs:RF8 ws_item_sk->[item_sk]
--------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF8
--------------------PhysicalProject
----------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((web_sales.ws_bill_customer_sk = best_ss_customer.c_customer_sk)) otherCondition=()
----------------------hashJoin[LEFT_SEMI_JOIN broadcast] hashCondition=((web_sales.ws_bill_customer_sk = best_ss_customer.c_customer_sk)) otherCondition=() build RFs:RF7 c_customer_sk->[ws_bill_customer_sk]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF5 d_date_sk->[ws_sold_date_sk]
--------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF6 d_date_sk->[ws_sold_date_sk]
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[web_sales] apply RFs: RF5
------------------------------PhysicalOlapScan[web_sales] apply RFs: RF6 RF7
----------------------------PhysicalProject
------------------------------filter((date_dim.d_moy = 7) and (date_dim.d_year = 2000))
--------------------------------PhysicalOlapScan[date_dim]
Expand Down
4 changes: 2 additions & 2 deletions regression-test/data/nereids_hint_tpcds_p0/shape/query30.out
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
------PhysicalDistribute[DistributionSpecGather]
--------PhysicalTopN[LOCAL_SORT]
----------PhysicalProject
------------hashJoin[INNER_JOIN broadcast] hashCondition=((ctr1.ctr_state = ctr2.ctr_state)) otherCondition=((cast(ctr_total_return as DOUBLE) > cast((avg(cast(ctr_total_return as DECIMALV3(38, 4))) * 1.2) as DOUBLE)))
------------hashJoin[INNER_JOIN broadcast] hashCondition=((ctr1.ctr_state = ctr2.ctr_state)) otherCondition=((cast(ctr_total_return as DOUBLE) > cast((avg(cast(ctr_total_return as DECIMALV3(38, 4))) * 1.2) as DOUBLE))) build RFs:RF4 ctr_state->[ctr_state]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN shuffle] hashCondition=((ctr1.ctr_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF3 c_customer_sk->[ctr_customer_sk]
------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF3
------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF3 RF4
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN broadcast] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) otherCondition=() build RFs:RF2 ca_address_sk->[c_current_addr_sk]
----------------------PhysicalProject
Expand Down
Loading

0 comments on commit 0fec418

Please sign in to comment.