Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[CARMEL-6306] backport [SPARK-33399][SPARK-31078] Normalize output pa…
…rtitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes (#1092) * [SPARK-31078][SQL] Respect aliases in output ordering Currently, in the following scenario, an unnecessary `Sort` node is introduced: ```scala withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { val df = (0 until 20).toDF("i").as("df") df.repartition(8, df("i")).write.format("parquet") .bucketBy(8, "i").sortBy("i").saveAsTable("t") val t1 = spark.table("t") val t2 = t1.selectExpr("i as ii") t1.join(t2, t1("i") === t2("ii")).explain } ``` ``` == Physical Plan == *(3) SortMergeJoin [i#8], [ii#10], Inner :- *(1) Project [i#8] : +- *(1) Filter isnotnull(i#8) : +- *(1) ColumnarToRow : +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8 +- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0 <==== UNNECESSARY +- *(2) Project [i#8 AS ii#10] +- *(2) Filter isnotnull(i#8) +- *(2) ColumnarToRow +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8 ``` Notice that `Sort [ii#10 ASC NULLS FIRST], false, 0` is introduced even though the underlying data is already sorted. This is because `outputOrdering` doesn't handle aliases correctly. This PR proposes to fix this issue. To better handle aliases in `outputOrdering`. Yes, now with the fix, the `explain` prints out the following: ``` == Physical Plan == *(3) SortMergeJoin [i#8], [ii#10], Inner :- *(1) Project [i#8] : +- *(1) Filter isnotnull(i#8) : +- *(1) ColumnarToRow : +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8 +- *(2) Project [i#8 AS ii#10] +- *(2) Filter isnotnull(i#8) +- *(2) ColumnarToRow +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8 ``` Tests added. Closes #27842 from imback82/alias_aware_sort_order. Authored-by: Terry Kim <yuminkim@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-33399][SQL] Normalize output partitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes This pull request tries to remove unneeded exchanges/sorts by normalizing the output partitioning and sortorder information correctly with respect to aliases. Example: consider this join of three tables: |SELECT t2id, t3.id as t3id |FROM ( | SELECT t1.id as t1id, t2.id as t2id | FROM t1, t2 | WHERE t1.id = t2.id |) t12, t3 |WHERE t1id = t3.id The plan for this looks like: *(9) Project [t2id#1034L, id#1004L AS t3id#1035L] +- *(9) SortMergeJoin [t1id#1033L], [id#1004L], Inner :- *(6) Sort [t1id#1033L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(t1id#1033L, 5), true, [id=#1343] <------------------------------ : +- *(5) Project [id#996L AS t1id#1033L, id#1000L AS t2id#1034L] : +- *(5) SortMergeJoin [id#996L], [id#1000L], Inner : :- *(2) Sort [id#996L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(id#996L, 5), true, [id=#1329] : : +- *(1) Range (0, 10, step=1, splits=2) : +- *(4) Sort [id#1000L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#1000L, 5), true, [id=#1335] : +- *(3) Range (0, 20, step=1, splits=2) +- *(8) Sort [id#1004L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#1004L, 5), true, [id=#1349] +- *(7) Range (0, 30, step=1, splits=2) In this plan, the marked exchange could have been avoided as the data is already partitioned on "t1.id". This happens because AliasAwareOutputPartitioning class handles aliases only related to HashPartitioning. This change normalizes all output partitioning based on aliasing happening in Project. To remove unneeded exchanges. No New UT added. On TPCDS 1000 scale, this change improves the performance of query 95 from 330 seconds to 170 seconds by removing the extra Exchange. Closes #30300 from prakharjain09/SPARK-33399-outputpartitioning. Authored-by: Prakhar Jain <prakharjain09@gmail.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org> * [CARMEL-6306] Fix ut * [CARMEL-6306] Fix alias not compatible with ebay skew implementation Co-authored-by: Terry Kim <yuminkim@gmail.com> Co-authored-by: Prakhar Jain <prakharjain09@gmail.com>
- Loading branch information