Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by #26946

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
child
case (child, BroadcastDistribution(mode)) =>
BroadcastExchangeExec(mode, child)
case (ShuffleExchangeExec(partitioning, child, _), distribution: OrderedDistribution) =>
ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child)
Comment on lines +58 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This considers a special case for OrderedDistribution. Generally, if ShuffleExchangeExec is followed by any unsatisfying distribution , we should always trim the ShuffleExchangeExec and apply the partitioning of distribution. Don't we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound reasonable. Any suitable cases?

Copy link
Member

@viirya viirya Dec 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried few possible cases, but can not have a concrete case like this. Maybe this is the only case possibly. So I think this should be fine.

case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
.getOrElse(defaultNumPreShufflePartitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession {
def computeChiSquareTest(): Double = {
val n = 10000
// Trigger a sort
// Range has range partitioning in its output now. To have a range shuffle, we
// need to run a repartition first.
val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc)
// Range has range partitioning in its output now.
Copy link
Contributor

@cloud-fan cloud-fan Dec 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove this comment now? it's not useful as we do add shuffle, the range output partitioning doesn't matter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okey

val data = spark.range(0, n, 1, 10).sort($"id".desc)
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
.selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect()

// Compute histogram for the number of records per partition post sort
Expand All @@ -55,12 +54,12 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession {

withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) {
// The default chi-sq value should be low
assert(computeChiSquareTest() < 100)
assert(computeChiSquareTest() < 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the physical plan is same as before, what caused this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not same, we had two shuffles before, one was RoundRobinPartitioning, the other was RangePartitioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see


withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> "1") {
// If we only sample one point, the range boundaries will be pretty bad and the
// chi-sq value would be very high.
assert(computeChiSquareTest() > 300)
assert(computeChiSquareTest() > 100)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,42 @@ class PlannerSuite extends SharedSparkSession {
}
}

test("SPARK-30036: Romove unnecessary RoundRobinPartitioning " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit Romove -> Remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"if SortExec is followed by RoundRobinPartitioning") {
val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil)
val partitioning = RoundRobinPartitioning(5)
assert(!partitioning.satisfies(distribution))

val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil,
global = true,
child = ShuffleExchangeExec(
partitioning,
DummySparkPlan(outputPartitioning = partitioning)))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find {
case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true
case _ => false}.isEmpty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

...find {
  case ...
  case ...
}.isEmpty

"RoundRobinPartitioning should be changed to RangePartitioning")
}

test("SPARK-30036: Romove unnecessary HashPartitioning " +
"if SortExec is followed by HashPartitioning") {
val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil)
val partitioning = HashPartitioning(Literal(1) :: Nil, 5)
assert(!partitioning.satisfies(distribution))

val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil,
global = true,
child = ShuffleExchangeExec(
partitioning,
DummySparkPlan(outputPartitioning = partitioning)))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find {
case ShuffleExchangeExec(_: HashPartitioning, _, _) => true
case _ => false}.isEmpty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

"HashPartitioning should be changed to RangePartitioning")
}

test("EnsureRequirements does not eliminate Exchange with different partitioning") {
val distribution = ClusteredDistribution(Literal(1) :: Nil)
val partitioning = HashPartitioning(Literal(2) :: Nil, 5)
Expand Down