Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32056][SQL] Coalesce partitions for repartition by expressions when AQE is enabled #28900

Closed
wants to merge 7 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Jun 23, 2020

What changes were proposed in this pull request?

This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled.

Why are the changes needed?

When repartition by some partition expressions, users can specify number of partitions or not. If the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling.

Does this PR introduce any user-facing change?

Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions.

How was this patch tested?

Added unit test.

def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = {
private def repartitionByExpression(
numPartitions: Option[Int],
partitionExprs: Column*): Dataset[T] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for internal method, we don't need to use var-length parameter list.


private def repartitionByRange(
numPartitions: Option[Int],
partitionExprs: Column*): Dataset[T] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

exchange.ShuffleExchangeExec(
r.partitioning, planLater(r.child), canChangeNumPartitions = false) :: Nil
r.partitioning, planLater(r.child), canChangeNumPartitions = canChangeNumParts) :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we have a variable name, we can just write r.partitioning, planLater(r.child), canChangeNumParts

@cloud-fan
Copy link
Contributor

cc @maryannxue @JkSelf @koertkuipers

SQLConf.SHUFFLE_PARTITIONS.key -> "6",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length
val df = spark.range(10).repartition($"id")
Copy link
Contributor

@cloud-fan cloud-fan Jun 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test repartition(numPartitions) in this test case and make sure the partition number doesn't change? Your new test case already test repartition by key/range.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

@SparkQA
Copy link

SparkQA commented Jun 23, 2020

Test build #124378 has finished for PR 28900 at commit 0a9223f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 23, 2020

Test build #124391 has finished for PR 28900 at commit 8e39ed7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 23, 2020

Test build #124387 has finished for PR 28900 at commit 43c4726.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

}

val partitionsNum2 = df2.rdd.collectPartitions().length
assert(partitionsNum2 == 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: assert(df2.rdd.collectPartitions().length == 10)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@SparkQA
Copy link

SparkQA commented Jun 23, 2020

Test build #124401 has finished for PR 28900 at commit 8e39ed7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 23, 2020

Test build #124424 has finished for PR 28900 at commit 4b9b0e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ulysses-you
Copy link
Contributor

Can we add the feature in ResolveCoalesceHints ? Hint can call repartition with default shuffle number.

@viirya
Copy link
Member Author

viirya commented Jun 24, 2020

Can we add the feature in ResolveCoalesceHints ? Hint can call repartition with default shuffle number.

Do you mean like SELECT /*+ COALESCE() */ ... ? When no partition number is not specified, let it be default partition number and AQE can coalesce it if enabled?

Seems currently the COALESCE hint doesn't allow default partition number usage. I'm not sure the reason about it.

val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length
val df1 = spark.range(10).repartition($"id")
val df2 = spark.range(10).repartition(10, $"id")
val df3 = spark.range(10).repartition(10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repartitionByRange also takes numPartitions. Can we test it as well and check it doesn't coalesce?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it.

@ulysses-you
Copy link
Contributor

Seems currently the COALESCE hint doesn't allow default partition number usage. I'm not sure the reason about it.

I mean the repartition, such as this sql select /*+ repartition(col) */ * from test.

@@ -1026,13 +1026,79 @@ class AdaptiveQueryExecSuite
Seq(true, false).foreach { enableAQE =>
Copy link
Contributor

@cloud-fan cloud-fan Jun 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can merge this test case to your two newly added test cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. one test to test repartition, and it verifies both the initial partition number and the coalesced partition number. The other test tests the same thing but for repartitionByRange.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, merged them.

@viirya
Copy link
Member Author

viirya commented Jun 24, 2020

I mean the repartition, such as this sql select /*+ repartition(col) */ * from test.

Sounds reasonable to me. @cloud-fan WDYT?

@cloud-fan
Copy link
Contributor

Yea, /*+ repartition(col) */ should also be supported by AQE

@SparkQA
Copy link

SparkQA commented Jun 24, 2020

Test build #124461 has finished for PR 28900 at commit 7ceaebc.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 24, 2020

Test build #124467 has finished for PR 28900 at commit df6a035.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jun 24, 2020

LGTM. We can support /*+ repartition(col) */ with a followup PR.

@SparkQA
Copy link

SparkQA commented Jun 24, 2020

Test build #124491 has finished for PR 28900 at commit 1ae1a87.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@manuzhang
Copy link
Contributor

@viirya Can we support distribute by in SQL as well ?

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 4204a63 Jun 29, 2020
@cloud-fan
Copy link
Contributor

@viirya please send a new PR to fix the SQL side, thanks!

@viirya
Copy link
Member Author

viirya commented Jun 29, 2020

@cloud-fan Thanks, will do it.

dongjoon-hyun pushed a commit that referenced this pull request Jul 1, 2020
…nt and sql when AQE is enabled

### What changes were proposed in this pull request?

As the followup of #28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled.

### Why are the changes needed?

When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled.

### Does this PR introduce _any_ user-facing change?

Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions.

### How was this patch tested?

Unit tests.

Closes #28952 from viirya/SPARK-32056-sql.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
MGHawes pushed a commit to palantir/spark that referenced this pull request May 16, 2021
… when AQE is enabled

This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled.

When repartition by some partition expressions, users can specify number of partitions or not. If  the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling.

Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions.

Added unit test.

Closes apache#28900 from viirya/SPARK-32056.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
MGHawes pushed a commit to palantir/spark that referenced this pull request May 16, 2021
…nt and sql when AQE is enabled

As the followup of apache#28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled.

When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled.

Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions.

Unit tests.

Closes apache#28952 from viirya/SPARK-32056-sql.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
LorenzoMartini pushed a commit to palantir/spark that referenced this pull request May 18, 2021
… when AQE is enabled

This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled.

When repartition by some partition expressions, users can specify number of partitions or not. If  the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling.

Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions.

Added unit test.

Closes apache#28900 from viirya/SPARK-32056.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
LorenzoMartini pushed a commit to palantir/spark that referenced this pull request May 18, 2021
…nt and sql when AQE is enabled

As the followup of apache#28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled.

When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled.

Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions.

Unit tests.

Closes apache#28952 from viirya/SPARK-32056-sql.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
LorenzoMartini pushed a commit to palantir/spark that referenced this pull request May 19, 2021
… when AQE is enabled

This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled.

When repartition by some partition expressions, users can specify number of partitions or not. If  the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling.

Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions.

Added unit test.

Closes apache#28900 from viirya/SPARK-32056.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
LorenzoMartini pushed a commit to palantir/spark that referenced this pull request May 19, 2021
…nt and sql when AQE is enabled

As the followup of apache#28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled.

When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled.

Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions.

Unit tests.

Closes apache#28952 from viirya/SPARK-32056-sql.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
16pierre pushed a commit to 16pierre/spark that referenced this pull request May 24, 2021
… when AQE is enabled

This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled.

When repartition by some partition expressions, users can specify number of partitions or not. If  the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling.

Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions.

Added unit test.

Closes apache#28900 from viirya/SPARK-32056.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
16pierre pushed a commit to 16pierre/spark that referenced this pull request May 24, 2021
…nt and sql when AQE is enabled

As the followup of apache#28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled.

When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled.

Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions.

Unit tests.

Closes apache#28952 from viirya/SPARK-32056-sql.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@viirya viirya deleted the SPARK-32056 branch December 27, 2023 18:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants