Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by #26946

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
child
case (child, BroadcastDistribution(mode)) =>
BroadcastExchangeExec(mode, child)
case (ShuffleExchangeExec(partitioning, child, _), distribution: OrderedDistribution) =>
ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child)
Comment on lines +58 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This considers a special case for OrderedDistribution. Generally, if ShuffleExchangeExec is followed by any unsatisfying distribution , we should always trim the ShuffleExchangeExec and apply the partitioning of distribution. Don't we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound reasonable. Any suitable cases?

Copy link
Member

@viirya viirya Dec 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried few possible cases, but can not have a concrete case like this. Maybe this is the only case possibly. So I think this should be fine.

case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
.getOrElse(defaultNumPreShufflePartitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession {
def computeChiSquareTest(): Double = {
val n = 10000
// Trigger a sort
// Range has range partitioning in its output now. To have a range shuffle, we
// need to run a repartition first.
val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc)
val data = spark.range(0, n, 1, 10).sort($"id".desc)
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
.selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect()

// Compute histogram for the number of records per partition post sort
Expand All @@ -55,12 +53,12 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession {

withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) {
// The default chi-sq value should be low
assert(computeChiSquareTest() < 100)
assert(computeChiSquareTest() < 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the physical plan is same as before, what caused this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not same, we had two shuffles before, one was RoundRobinPartitioning, the other was RangePartitioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see


withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> "1") {
// If we only sample one point, the range boundaries will be pretty bad and the
// chi-sq value would be very high.
assert(computeChiSquareTest() > 300)
assert(computeChiSquareTest() > 100)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,52 @@ class PlannerSuite extends SharedSparkSession {
}
}

test("SPARK-30036: Remove unnecessary RoundRobinPartitioning " +
"if SortExec is followed by RoundRobinPartitioning") {
val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil)
val partitioning = RoundRobinPartitioning(5)
assert(!partitioning.satisfies(distribution))

val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil,
global = true,
child = ShuffleExchangeExec(
partitioning,
DummySparkPlan(outputPartitioning = partitioning)))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find {
case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true
case _ => false
}.isEmpty,
"RoundRobinPartitioning should be changed to RangePartitioning")

val query = testData.select('key, 'value).repartition(2).sort('key.asc)
assert(query.rdd.getNumPartitions == 2)
assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 50))
}

test("SPARK-30036: Remove unnecessary HashPartitioning " +
"if SortExec is followed by HashPartitioning") {
val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil)
val partitioning = HashPartitioning(Literal(1) :: Nil, 5)
assert(!partitioning.satisfies(distribution))

val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil,
global = true,
child = ShuffleExchangeExec(
partitioning,
DummySparkPlan(outputPartitioning = partitioning)))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find {
case ShuffleExchangeExec(_: HashPartitioning, _, _) => true
case _ => false
}.isEmpty,
"HashPartitioning should be changed to RangePartitioning")

val query = testData.select('key, 'value).repartition(5, 'key).sort('key.asc)
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
assert(query.rdd.getNumPartitions == 5)
assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 20))
}

test("EnsureRequirements does not eliminate Exchange with different partitioning") {
val distribution = ClusteredDistribution(Literal(1) :: Nil)
val partitioning = HashPartitioning(Literal(2) :: Nil, 5)
Expand Down