Skip to content

Commit

Permalink
[SPARK-43838][SQL][FOLLOWUP] Improve DeduplicateRelations performance
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Reverting to the old way of handling DeduplicateRelations in order to improve performance.  Instead of checking attribute IDs linearly, we use `HashSet:contains()`

### Why are the changes needed?
Improving perfomance

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#48053 from mihailotim-db/deduplicate_relations_perf_improvement.

Authored-by: Mihailo Timotic <mihailo.timotic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
mihailotim-db authored and cloud-fan committed Sep 11, 2024
1 parent c5fd509 commit d72e8f9
Show file tree
Hide file tree
Showing 9 changed files with 493 additions and 489 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,15 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
case p: LogicalPlan if p.isStreaming => (plan, false)

case m: MultiInstanceRelation =>
deduplicateAndRenew[LogicalPlan with MultiInstanceRelation](
existingRelations,
m,
_.output.map(_.exprId.id),
node => node.newInstance().asInstanceOf[LogicalPlan with MultiInstanceRelation])
val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
if (existingRelations.contains(planWrapper)) {
val newNode = m.newInstance()
newNode.copyTagsFrom(m)
(newNode, true)
} else {
existingRelations.add(planWrapper)
(m, false)
}

case p: Project =>
deduplicateAndRenew[Project](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,125 +175,125 @@ Input [6]: [i_product_name#12, i_brand#9, i_class#10, i_category#11, sum#21, cou
Keys [4]: [i_product_name#12, i_brand#9, i_class#10, i_category#11]
Functions [1]: [avg(qoh#18)]
Aggregate Attributes [1]: [avg(qoh#18)#23]
Results [5]: [i_product_name#12 AS i_product_name#24, i_brand#9 AS i_brand#25, i_class#10 AS i_class#26, i_category#11 AS i_category#27, avg(qoh#18)#23 AS qoh#28]
Results [5]: [i_product_name#12, i_brand#9, i_class#10, i_category#11, avg(qoh#18)#23 AS qoh#24]

(27) ReusedExchange [Reuses operator id: 23]
Output [6]: [i_product_name#29, i_brand#30, i_class#31, i_category#32, sum#33, count#34]
Output [6]: [i_product_name#25, i_brand#26, i_class#27, i_category#28, sum#29, count#30]

(28) HashAggregate [codegen id : 16]
Input [6]: [i_product_name#29, i_brand#30, i_class#31, i_category#32, sum#33, count#34]
Keys [4]: [i_product_name#29, i_brand#30, i_class#31, i_category#32]
Functions [1]: [avg(inv_quantity_on_hand#35)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#35)#17]
Results [4]: [i_product_name#29, i_brand#30, i_class#31, avg(inv_quantity_on_hand#35)#17 AS qoh#36]
Input [6]: [i_product_name#25, i_brand#26, i_class#27, i_category#28, sum#29, count#30]
Keys [4]: [i_product_name#25, i_brand#26, i_class#27, i_category#28]
Functions [1]: [avg(inv_quantity_on_hand#31)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#31)#17]
Results [4]: [i_product_name#25, i_brand#26, i_class#27, avg(inv_quantity_on_hand#31)#17 AS qoh#32]

(29) HashAggregate [codegen id : 16]
Input [4]: [i_product_name#29, i_brand#30, i_class#31, qoh#36]
Keys [3]: [i_product_name#29, i_brand#30, i_class#31]
Functions [1]: [partial_avg(qoh#36)]
Aggregate Attributes [2]: [sum#37, count#38]
Results [5]: [i_product_name#29, i_brand#30, i_class#31, sum#39, count#40]
Input [4]: [i_product_name#25, i_brand#26, i_class#27, qoh#32]
Keys [3]: [i_product_name#25, i_brand#26, i_class#27]
Functions [1]: [partial_avg(qoh#32)]
Aggregate Attributes [2]: [sum#33, count#34]
Results [5]: [i_product_name#25, i_brand#26, i_class#27, sum#35, count#36]

(30) Exchange
Input [5]: [i_product_name#29, i_brand#30, i_class#31, sum#39, count#40]
Arguments: hashpartitioning(i_product_name#29, i_brand#30, i_class#31, 5), ENSURE_REQUIREMENTS, [plan_id=5]
Input [5]: [i_product_name#25, i_brand#26, i_class#27, sum#35, count#36]
Arguments: hashpartitioning(i_product_name#25, i_brand#26, i_class#27, 5), ENSURE_REQUIREMENTS, [plan_id=5]

(31) HashAggregate [codegen id : 17]
Input [5]: [i_product_name#29, i_brand#30, i_class#31, sum#39, count#40]
Keys [3]: [i_product_name#29, i_brand#30, i_class#31]
Functions [1]: [avg(qoh#36)]
Aggregate Attributes [1]: [avg(qoh#36)#41]
Results [5]: [i_product_name#29, i_brand#30, i_class#31, null AS i_category#42, avg(qoh#36)#41 AS qoh#43]
Input [5]: [i_product_name#25, i_brand#26, i_class#27, sum#35, count#36]
Keys [3]: [i_product_name#25, i_brand#26, i_class#27]
Functions [1]: [avg(qoh#32)]
Aggregate Attributes [1]: [avg(qoh#32)#37]
Results [5]: [i_product_name#25, i_brand#26, i_class#27, null AS i_category#38, avg(qoh#32)#37 AS qoh#39]

(32) ReusedExchange [Reuses operator id: 23]
Output [6]: [i_product_name#44, i_brand#45, i_class#46, i_category#47, sum#48, count#49]
Output [6]: [i_product_name#40, i_brand#41, i_class#42, i_category#43, sum#44, count#45]

(33) HashAggregate [codegen id : 25]
Input [6]: [i_product_name#44, i_brand#45, i_class#46, i_category#47, sum#48, count#49]
Keys [4]: [i_product_name#44, i_brand#45, i_class#46, i_category#47]
Functions [1]: [avg(inv_quantity_on_hand#50)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#50)#17]
Results [3]: [i_product_name#44, i_brand#45, avg(inv_quantity_on_hand#50)#17 AS qoh#51]
Input [6]: [i_product_name#40, i_brand#41, i_class#42, i_category#43, sum#44, count#45]
Keys [4]: [i_product_name#40, i_brand#41, i_class#42, i_category#43]
Functions [1]: [avg(inv_quantity_on_hand#46)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#46)#17]
Results [3]: [i_product_name#40, i_brand#41, avg(inv_quantity_on_hand#46)#17 AS qoh#47]

(34) HashAggregate [codegen id : 25]
Input [3]: [i_product_name#44, i_brand#45, qoh#51]
Keys [2]: [i_product_name#44, i_brand#45]
Functions [1]: [partial_avg(qoh#51)]
Aggregate Attributes [2]: [sum#52, count#53]
Results [4]: [i_product_name#44, i_brand#45, sum#54, count#55]
Input [3]: [i_product_name#40, i_brand#41, qoh#47]
Keys [2]: [i_product_name#40, i_brand#41]
Functions [1]: [partial_avg(qoh#47)]
Aggregate Attributes [2]: [sum#48, count#49]
Results [4]: [i_product_name#40, i_brand#41, sum#50, count#51]

(35) Exchange
Input [4]: [i_product_name#44, i_brand#45, sum#54, count#55]
Arguments: hashpartitioning(i_product_name#44, i_brand#45, 5), ENSURE_REQUIREMENTS, [plan_id=6]
Input [4]: [i_product_name#40, i_brand#41, sum#50, count#51]
Arguments: hashpartitioning(i_product_name#40, i_brand#41, 5), ENSURE_REQUIREMENTS, [plan_id=6]

(36) HashAggregate [codegen id : 26]
Input [4]: [i_product_name#44, i_brand#45, sum#54, count#55]
Keys [2]: [i_product_name#44, i_brand#45]
Functions [1]: [avg(qoh#51)]
Aggregate Attributes [1]: [avg(qoh#51)#56]
Results [5]: [i_product_name#44, i_brand#45, null AS i_class#57, null AS i_category#58, avg(qoh#51)#56 AS qoh#59]
Input [4]: [i_product_name#40, i_brand#41, sum#50, count#51]
Keys [2]: [i_product_name#40, i_brand#41]
Functions [1]: [avg(qoh#47)]
Aggregate Attributes [1]: [avg(qoh#47)#52]
Results [5]: [i_product_name#40, i_brand#41, null AS i_class#53, null AS i_category#54, avg(qoh#47)#52 AS qoh#55]

(37) ReusedExchange [Reuses operator id: 23]
Output [6]: [i_product_name#60, i_brand#61, i_class#62, i_category#63, sum#64, count#65]
Output [6]: [i_product_name#56, i_brand#57, i_class#58, i_category#59, sum#60, count#61]

(38) HashAggregate [codegen id : 34]
Input [6]: [i_product_name#60, i_brand#61, i_class#62, i_category#63, sum#64, count#65]
Keys [4]: [i_product_name#60, i_brand#61, i_class#62, i_category#63]
Functions [1]: [avg(inv_quantity_on_hand#66)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#66)#17]
Results [2]: [i_product_name#60, avg(inv_quantity_on_hand#66)#17 AS qoh#67]
Input [6]: [i_product_name#56, i_brand#57, i_class#58, i_category#59, sum#60, count#61]
Keys [4]: [i_product_name#56, i_brand#57, i_class#58, i_category#59]
Functions [1]: [avg(inv_quantity_on_hand#62)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#62)#17]
Results [2]: [i_product_name#56, avg(inv_quantity_on_hand#62)#17 AS qoh#63]

(39) HashAggregate [codegen id : 34]
Input [2]: [i_product_name#60, qoh#67]
Keys [1]: [i_product_name#60]
Functions [1]: [partial_avg(qoh#67)]
Aggregate Attributes [2]: [sum#68, count#69]
Results [3]: [i_product_name#60, sum#70, count#71]
Input [2]: [i_product_name#56, qoh#63]
Keys [1]: [i_product_name#56]
Functions [1]: [partial_avg(qoh#63)]
Aggregate Attributes [2]: [sum#64, count#65]
Results [3]: [i_product_name#56, sum#66, count#67]

(40) Exchange
Input [3]: [i_product_name#60, sum#70, count#71]
Arguments: hashpartitioning(i_product_name#60, 5), ENSURE_REQUIREMENTS, [plan_id=7]
Input [3]: [i_product_name#56, sum#66, count#67]
Arguments: hashpartitioning(i_product_name#56, 5), ENSURE_REQUIREMENTS, [plan_id=7]

(41) HashAggregate [codegen id : 35]
Input [3]: [i_product_name#60, sum#70, count#71]
Keys [1]: [i_product_name#60]
Functions [1]: [avg(qoh#67)]
Aggregate Attributes [1]: [avg(qoh#67)#72]
Results [5]: [i_product_name#60, null AS i_brand#73, null AS i_class#74, null AS i_category#75, avg(qoh#67)#72 AS qoh#76]
Input [3]: [i_product_name#56, sum#66, count#67]
Keys [1]: [i_product_name#56]
Functions [1]: [avg(qoh#63)]
Aggregate Attributes [1]: [avg(qoh#63)#68]
Results [5]: [i_product_name#56, null AS i_brand#69, null AS i_class#70, null AS i_category#71, avg(qoh#63)#68 AS qoh#72]

(42) ReusedExchange [Reuses operator id: 23]
Output [6]: [i_product_name#77, i_brand#78, i_class#79, i_category#80, sum#81, count#82]
Output [6]: [i_product_name#73, i_brand#74, i_class#75, i_category#76, sum#77, count#78]

(43) HashAggregate [codegen id : 43]
Input [6]: [i_product_name#77, i_brand#78, i_class#79, i_category#80, sum#81, count#82]
Keys [4]: [i_product_name#77, i_brand#78, i_class#79, i_category#80]
Functions [1]: [avg(inv_quantity_on_hand#83)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#83)#17]
Results [1]: [avg(inv_quantity_on_hand#83)#17 AS qoh#84]
Input [6]: [i_product_name#73, i_brand#74, i_class#75, i_category#76, sum#77, count#78]
Keys [4]: [i_product_name#73, i_brand#74, i_class#75, i_category#76]
Functions [1]: [avg(inv_quantity_on_hand#79)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#79)#17]
Results [1]: [avg(inv_quantity_on_hand#79)#17 AS qoh#80]

(44) HashAggregate [codegen id : 43]
Input [1]: [qoh#84]
Input [1]: [qoh#80]
Keys: []
Functions [1]: [partial_avg(qoh#84)]
Aggregate Attributes [2]: [sum#85, count#86]
Results [2]: [sum#87, count#88]
Functions [1]: [partial_avg(qoh#80)]
Aggregate Attributes [2]: [sum#81, count#82]
Results [2]: [sum#83, count#84]

(45) Exchange
Input [2]: [sum#87, count#88]
Input [2]: [sum#83, count#84]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=8]

(46) HashAggregate [codegen id : 44]
Input [2]: [sum#87, count#88]
Input [2]: [sum#83, count#84]
Keys: []
Functions [1]: [avg(qoh#84)]
Aggregate Attributes [1]: [avg(qoh#84)#89]
Results [5]: [null AS i_product_name#90, null AS i_brand#91, null AS i_class#92, null AS i_category#93, avg(qoh#84)#89 AS qoh#94]
Functions [1]: [avg(qoh#80)]
Aggregate Attributes [1]: [avg(qoh#80)#85]
Results [5]: [null AS i_product_name#86, null AS i_brand#87, null AS i_class#88, null AS i_category#89, avg(qoh#80)#85 AS qoh#90]

(47) Union

(48) TakeOrderedAndProject
Input [5]: [i_product_name#24, i_brand#25, i_class#26, i_category#27, qoh#28]
Arguments: 100, [qoh#28 ASC NULLS FIRST, i_product_name#24 ASC NULLS FIRST, i_brand#25 ASC NULLS FIRST, i_class#26 ASC NULLS FIRST, i_category#27 ASC NULLS FIRST], [i_product_name#24, i_brand#25, i_class#26, i_category#27, qoh#28]
Input [5]: [i_product_name#12, i_brand#9, i_class#10, i_category#11, qoh#24]
Arguments: 100, [qoh#24 ASC NULLS FIRST, i_product_name#12 ASC NULLS FIRST, i_brand#9 ASC NULLS FIRST, i_class#10 ASC NULLS FIRST, i_category#11 ASC NULLS FIRST], [i_product_name#12, i_brand#9, i_class#10, i_category#11, qoh#24]

===== Subqueries =====

Expand All @@ -306,22 +306,22 @@ BroadcastExchange (53)


(49) Scan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#7, d_month_seq#95]
Output [2]: [d_date_sk#7, d_month_seq#91]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1212), LessThanOrEqual(d_month_seq,1223), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_month_seq:int>

(50) ColumnarToRow [codegen id : 1]
Input [2]: [d_date_sk#7, d_month_seq#95]
Input [2]: [d_date_sk#7, d_month_seq#91]

(51) Filter [codegen id : 1]
Input [2]: [d_date_sk#7, d_month_seq#95]
Condition : (((isnotnull(d_month_seq#95) AND (d_month_seq#95 >= 1212)) AND (d_month_seq#95 <= 1223)) AND isnotnull(d_date_sk#7))
Input [2]: [d_date_sk#7, d_month_seq#91]
Condition : (((isnotnull(d_month_seq#91) AND (d_month_seq#91 >= 1212)) AND (d_month_seq#91 <= 1223)) AND isnotnull(d_date_sk#7))

(52) Project [codegen id : 1]
Output [1]: [d_date_sk#7]
Input [2]: [d_date_sk#7, d_month_seq#95]
Input [2]: [d_date_sk#7, d_month_seq#91]

(53) BroadcastExchange
Input [1]: [d_date_sk#7]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
TakeOrderedAndProject [qoh,i_product_name,i_brand,i_class,i_category]
Union
WholeStageCodegen (8)
HashAggregate [i_product_name,i_brand,i_class,i_category,sum,count] [avg(qoh),i_product_name,i_brand,i_class,i_category,qoh,sum,count]
HashAggregate [i_product_name,i_brand,i_class,i_category,sum,count] [avg(qoh),qoh,sum,count]
HashAggregate [i_product_name,i_brand,i_class,i_category,qoh] [sum,count,sum,count]
HashAggregate [i_product_name,i_brand,i_class,i_category,sum,count] [avg(inv_quantity_on_hand),qoh,sum,count]
InputAdapter
Expand Down
Loading

0 comments on commit d72e8f9

Please sign in to comment.