Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mh/cherry pick spark 33494 #762

Closed
wants to merge 4 commits into from
Closed

Conversation

MGHawes
Copy link

@MGHawes MGHawes commented May 14, 2021

Cherry picks a series of interrelated PRs relating to AQE's CoalesceShufflePartitions.

What changes were proposed in this pull request?/Why are the changes needed?

Before these cherry-picks AQE would not attempt to coalesce the shuffle partitions of a .repartition() operation. With these cherry-picks a user can call .repartition() and allow AQE to optimally choose the number of partitions.

@MGHawes MGHawes force-pushed the mh/cherry-pick-spark-33494 branch 2 times, most recently from a0f605b to 255e857 Compare May 16, 2021 13:05
wangyum and others added 2 commits May 16, 2021 14:15
…eExecutionEnabled

This PR makes `repartition`/`DISTRIBUTE BY` obeys [initialPartitionNum](https://github.com/apache/spark/blob/af4248b2d661d04fec89b37857a47713246d9465/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L446-L455) when adaptive execution enabled.

To make `DISTRIBUTE BY`/`GROUP BY` partitioned by same partition number.
How to reproduce:
```scala
spark.sql("CREATE TABLE spark_31220(id int)")
spark.sql("set spark.sql.adaptive.enabled=true")
spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000")
```

Before this PR:
```
scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
   +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
      +- HashAggregate(keys=[id#5], functions=[])
         +- FileScan parquet default.spark_31220[id#5]

scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 200), false, [id=#179]
   +- FileScan parquet default.spark_31220[id#5]
```
After this PR:
```
scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
   +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
      +- HashAggregate(keys=[id#5], functions=[])
         +- FileScan parquet default.spark_31220[id#5]

scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 1000), false, [id=#179]
   +- FileScan parquet default.spark_31220[id#5]
```

No.

Unit test.

Closes apache#27986 from wangyum/SPARK-31220.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… when AQE is enabled

This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled.

When repartition by some partition expressions, users can specify number of partitions or not. If  the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling.

Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions.

Added unit test.

Closes apache#28900 from viirya/SPARK-32056.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@MGHawes MGHawes force-pushed the mh/cherry-pick-spark-33494 branch from 255e857 to 3762261 Compare May 16, 2021 13:45
viirya and others added 2 commits May 16, 2021 14:59
…nt and sql when AQE is enabled

As the followup of apache#28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled.

When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled.

Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions.

Unit tests.

Closes apache#28952 from viirya/SPARK-32056-sql.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This PR updates `ShuffleExchangeExec` to carry more information about how much we can change the partitioning. For `repartition(col)`, we should preserve the user-specified partitioning and don't apply the AQE local shuffle reader.

Similar to `repartition(number, col)`, we should respect the user-specified partitioning.

No

a new test

Closes apache#30432 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants