Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by #26946

Closed
wants to merge 8 commits into from

Conversation

jackylee-ch
Copy link
Contributor

@jackylee-ch jackylee-ch commented Dec 19, 2019

Why are the changes needed?

EnsureRequirements adds ShuffleExchangeExec (RangePartitioning) after Sort if RoundRobinPartitioning behinds it. This will cause 2 shuffles, and the number of partitions in the final stage is not the number specified by `RoundRobinPartitioning.

Example SQL

SELECT /*+ REPARTITION(5) */ * FROM test ORDER BY a

BEFORE

== Physical Plan ==
*(1) Sort [a#0 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#0 ASC NULLS FIRST, 200), true, [id=#11]
   +- Exchange RoundRobinPartitioning(5), false, [id=#9]
      +- Scan hive default.test [a#0, b#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#0, b#1]

AFTER

== Physical Plan ==
*(1) Sort [a#0 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#0 ASC NULLS FIRST, 5), true, [id=#11]
   +- Scan hive default.test [a#0, b#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#0, b#1]

Does this PR introduce any user-facing change?

No

How was this patch tested?

Run suite Tests and add new test for this.

jackylee-ch and others added 3 commits December 19, 2019 12:44
Change-Id: I9ec887eece29abed048192b559f7d69a9e67afe3
Change-Id: If6b4c1f818c38b1862f69acc63f79feea127bbee
Change-Id: Ieb757a218588e2f35efd1b0eac4d076fb75eb1c8
@jackylee-ch
Copy link
Contributor Author

@ulysses-you
Copy link
Contributor

I think we should add check at CollapseRepartition instead of EnsureRequirements if we want to prune repartition with sort.

@cloud-fan
Copy link
Contributor

ok to test

@jackylee-ch
Copy link
Contributor Author

jackylee-ch commented Dec 19, 2019

I think we should add check at CollapseRepartition instead of EnsureRequirements if we want to prune repartition with sort.

@ulysses-you Adding RangePartitioning doesn't happen in optimizer, thus we can't check this in CollapseRepartition. Besides, It is not easy to keep partition number after pruning repartition with sort.

@SparkQA
Copy link

SparkQA commented Dec 19, 2019

Test build #115563 has finished for PR 26946 at commit 02267db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case (ShuffleExchangeExec(partitioning: RoundRobinPartitioning, child, _),
distribution: OrderedDistribution) =>
ShuffleExchangeExec(
distribution.createPartitioning(partitioning.numPartitions), child)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update like the following, @stczwd ?

-      case (ShuffleExchangeExec(partitioning: RoundRobinPartitioning, child, _),
-          distribution: OrderedDistribution) =>
-        ShuffleExchangeExec(
-          distribution.createPartitioning(partitioning.numPartitions), child)
+      case (ShuffleExchangeExec(partitioning, child, _), distribution) =>
+        ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You means this should work in other Partitioning? Let me run some test for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, i have change my code

DummySparkPlan(outputPartitioning = partitioning)))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find{
case e: ShuffleExchangeExec => e.outputPartitioning.isInstanceOf[RoundRobinPartitioning]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- case e: ShuffleExchangeExec => e.outputPartitioning.isInstanceOf[RoundRobinPartitioning]
+ case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good

partitioning,
DummySparkPlan(outputPartitioning = partitioning)))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.find{ -> .find {.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@dongjoon-hyun
Copy link
Member

I updated the PR description a little, @stczwd .

@@ -55,6 +55,10 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
child
case (child, BroadcastDistribution(mode)) =>
BroadcastExchangeExec(mode, child)
case (ShuffleExchangeExec(partitioning: RoundRobinPartitioning, child, _),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use Partitioning instead of RoundRobinPartitioning . Since we already support this SELECT /*+ REPARTITION(5, a) */ * FROM test ORDER BY a.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I will change it

@@ -421,6 +421,24 @@ class PlannerSuite extends SharedSparkSession {
}
}

test("SPARK-30036: EnsureRequirements replace Exchange " +
"if child has SortExec and RoundRobinPartitioning") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just saying Remove unnecessary RoundRobinPartitioning in the test title? Also, can you make the PR title obivious, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good for test title, thanks.
But it is not suitable for PR title, there are other situations in this titile.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about Avoid RoundRobinPartitioning that EnsureRequirements Redundantly adds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because HashPartitioning should also be concerned.

@jackylee-ch
Copy link
Contributor Author

I updated the PR description a little, @stczwd .

@dongjoon-hyun thanks

Change-Id: I6b102f32b4084625875b395990e8ac4673c56bac
@SparkQA
Copy link

SparkQA commented Dec 20, 2019

Test build #115598 has finished for PR 26946 at commit 56a1101.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find {
case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true
case _ => false}.isEmpty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

...find {
  case ...
  case ...
}.isEmpty

val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find {
case ShuffleExchangeExec(_: HashPartitioning, _, _) => true
case _ => false}.isEmpty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@cloud-fan
Copy link
Contributor

shall we add an end-to-end test for SELECT /*+ REPARTITION(5) */ * FROM test ORDER BY a?

@jackylee-ch
Copy link
Contributor Author

shall we add an end-to-end test for SELECT /*+ REPARTITION(5) */ * FROM test ORDER BY a?
@cloud-fan The modification of this patch only affects the number of final partitions and does not affect the overall result. Is it enough to check the rule? Or, maybe check the number of partitions?

Change-Id: I0b55a61e1a9ac3555177322ac44d2b216d45bd24
@SparkQA
Copy link

SparkQA commented Dec 20, 2019

Test build #115630 has finished for PR 26946 at commit 5915a12.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Comment on lines +58 to +59
case (ShuffleExchangeExec(partitioning, child, _), distribution: OrderedDistribution) =>
ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This considers a special case for OrderedDistribution. Generally, if ShuffleExchangeExec is followed by any unsatisfying distribution , we should always trim the ShuffleExchangeExec and apply the partitioning of distribution. Don't we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound reasonable. Any suitable cases?

Copy link
Member

@viirya viirya Dec 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried few possible cases, but can not have a concrete case like this. Maybe this is the only case possibly. So I think this should be fine.

@cloud-fan
Copy link
Contributor

We can add an end-to-end test, check the physical plan of a query, and count shuffles.

@jackylee-ch
Copy link
Contributor Author

jackylee-ch commented Dec 23, 2019

We can add an end-to-end test, check the physical plan of a query, and count shuffles.

Sure,I will add some tests for these cases.

// Range has range partitioning in its output now. To have a range shuffle, we
// need to run a repartition first.
val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc)
// Range has range partitioning in its output now.
Copy link
Contributor

@cloud-fan cloud-fan Dec 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove this comment now? it's not useful as we do add shuffle, the range output partitioning doesn't matter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okey

@@ -55,12 +54,12 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession {

withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) {
// The default chi-sq value should be low
assert(computeChiSquareTest() < 100)
assert(computeChiSquareTest() < 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the physical plan is same as before, what caused this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not same, we had two shuffles before, one was RoundRobinPartitioning, the other was RangePartitioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115689 has finished for PR 26946 at commit 52ce660.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@@ -421,6 +421,52 @@ class PlannerSuite extends SharedSparkSession {
}
}

test("SPARK-30036: Romove unnecessary RoundRobinPartitioning " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit Romove -> Remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Change-Id: I175b3824ba9ce46fba0ebba6ebf0b220d64de42c
@HyukjinKwon
Copy link
Member

Looks fine to me

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115719 has finished for PR 26946 at commit d2615b6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115718 has finished for PR 26946 at commit 52ce660.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115729 has finished for PR 26946 at commit d2615b6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Dec 25, 2019

Test build #115766 has finished for PR 26946 at commit d2615b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan closed this in a2de20c Dec 27, 2019
@HyukjinKwon
Copy link
Member

Merged to master, I guess :-).

@cloud-fan
Copy link
Contributor

yea merged to master!

@ulysses-you
Copy link
Contributor

ulysses-you commented Jan 3, 2020

Wait, another point.
First this scence also exists in join or window operator, as @viirya say, exists other Distribution.
For e.g. join

val df = spark.range(1, 10, 2)
df.join(df.repartition(10), Seq("id"), "left").explain(true)

// physical plan like this
= Physical Plan ==
*(5) Project [id#0L]
+- SortMergeJoin [id#0L], [id#83L], LeftOuter
   :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#0L, 200), true, [id=#378]
   :     +- *(1) Range (1, 10, step=1, splits=40)
   +- *(4) Sort [id#83L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#83L, 200), true, [id=#384]
         +- Exchange RoundRobinPartitioning(10), false, [id=#383]
            +- *(3) Range (1, 10, step=1, splits=40)

And then there is a little difference between 2 -> 10 -> 200 and 2 -> 10 because of different operator complexity. Repartition may be is a light operator compare with sort or join or else algorithm. So it's not sure 2 -> 10 is always run faster than 2 -> 10 -> 200.

The last, if end user really want result partition is 10, should use df.sort("id").repartition(10) instead, not the df.repartition(10).sort("id"). Pruning shuffle may mislead user.

cc @HyukjinKwon @cloud-fan @maropu

@cloud-fan
Copy link
Contributor

for join, it doesn't require OrderedDistribution, but HashClusteredDistribution.

This PR only affects sort.

@jackylee-ch
Copy link
Contributor Author

if end user really want result partition is 10, should use df.sort("id").repartition(10) instead, not the df.repartition(10).sort("id"). Pruning shuffle may mislead user.

df.sort("id").repartition(10) returns wrong result. Global sort result would be repartitioned with disordered.

@ulysses-you
Copy link
Contributor

This PR only affects sort.

Yes it is. But it is similar with outer join. df.join(df.repartition(10), Seq("id"), "left") result the 200 partitions, and now df.repartition(10).sort("id") result the 10 partitions. Should they be same ?

@ulysses-you
Copy link
Contributor

df.sort("id").repartition(10) returns wrong result. Global sort result would be repartitioned with disordered.

Sorry for the wrong example. I mean user should use the right way to change partition. Obviously df.repartition(10).sort("id") should return the spark sql shuffle partitions.

@jackylee-ch jackylee-ch deleted the RoundRobinPartitioning branch January 6, 2020 06:21
@jackylee-ch
Copy link
Contributor Author

Yes it is. But it is similar with outer join. df.join(df.repartition(10), Seq("id"), "left") result the 200 partitions, and now df.repartition(10).sort("id") result the 10 partitions. Should they be same ?
Sorry for the wrong example. I mean user should use the right way to change partition. Obviously df.repartition(10).sort("id") should return the spark sql shuffle partitions.

Thanks for pay attention on this. The main problem you described is whether we should change partition num for OrderedDistribution.
Hm, it's you add REPARTITION hint in SPARK-28746, you may know what it means to users. In other cases, REPARTITION hint will change result partition number with shuffles, but it didn't work with order by, which confused users. REPARTITION is a great way for users to control final result num, we should keep it works on every queries.
Besides, sort("id") is a global OrderDistribution, which usually generate the final result. It is not easy to set partition number with defaultShufflePartitions, especially on large queries with multiple shuffles.
Finally, changing partition num is good way for use to control shuffle and final results with df.repartition(10).sort("id"). Users may won't write df.repartition(10).sort("id") unless they want change the final partition num. It is not a normal case in other scenes.

Correct me if I'm wrong. Thanks

@ulysses-you
Copy link
Contributor

ulysses-you commented Jan 6, 2020

I see you want to make a way that change partition easily after sort.

Only one thing I not sure. If df.repartition(10).sort("id") result the 10 partitions, user will think df.repartition(10).(what need shuffle operator) result 10 partitions too but actually not. It's a special handle for sort.

I don not know how committer think about it, or it's just fine.

@cloud-fan
Copy link
Contributor

what .sort should guarantee is that the output is ordered, and users shouldn't care about the number of partitions.

It's more efficient to shuffle only once for query df.repartition(10).sort("id"). This is just an optimization and nothing about semantic.

For df.join(df.repartition(10), Seq("id"), "left"), again we don't care about the number of result partitions. If there is a way to save shuffles, please propose.

@maryannxue
Copy link
Contributor

Think we should revert this PR. The change in test ConfigBehaviorSuite cannot be fully justified. Plus, this is not the right approach to fix this kind of issue.

@cloud-fan
Copy link
Contributor

After more thoughts, I think it's wrong to use optimization to fix a bug.

Looking into the bug, the issue is: the Repartition operator added by the hint is under the Sort operator, not above it. This is because our parser treats ORDER BY as the last clause, while the hint is associated with the SELECT clause. The parser rule is like SELECT ... UNION/INTERSECT SELECT ... ORDER BY. That's why we add the Sort operator at the end.

I think #27096 is in the right way to optimize redundant shuffles, but we still need to fix the bug about how to handle hints in the parser.

I'm reverting this. Let's fix the bug in the parser.

fqaiser94 pushed a commit to fqaiser94/spark that referenced this pull request Mar 30, 2020
### Why are the changes needed?
`EnsureRequirements` adds `ShuffleExchangeExec` (RangePartitioning) after Sort if `RoundRobinPartitioning` behinds it. This will cause 2 shuffles, and the number of partitions in the final stage is not the number specified by `RoundRobinPartitioning.

**Example SQL**
```
SELECT /*+ REPARTITION(5) */ * FROM test ORDER BY a
```

**BEFORE**
```
== Physical Plan ==
*(1) Sort [a#0 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#0 ASC NULLS FIRST, 200), true, [id=apache#11]
   +- Exchange RoundRobinPartitioning(5), false, [id=apache#9]
      +- Scan hive default.test [a#0, b#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#0, b#1]
```

**AFTER**
```
== Physical Plan ==
*(1) Sort [a#0 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#0 ASC NULLS FIRST, 5), true, [id=apache#11]
   +- Scan hive default.test [a#0, b#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#0, b#1]
```

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Run suite Tests and add new test for this.

Closes apache#26946 from stczwd/RoundRobinPartitioning.

Lead-authored-by: lijunqing <lijunqing@baidu.com>
Co-authored-by: stczwd <qcsd2011@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants