Skip to content

Commit

Permalink
suite repartition(int, Seq[expression])
Browse files Browse the repository at this point in the history
Change-Id: I6b102f32b4084625875b395990e8ac4673c56bac
  • Loading branch information
jackylee-ch committed Dec 20, 2019
1 parent 02267db commit 56a1101
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
child
case (child, BroadcastDistribution(mode)) =>
BroadcastExchangeExec(mode, child)
case (ShuffleExchangeExec(partitioning: RoundRobinPartitioning, child, _),
distribution: OrderedDistribution) =>
ShuffleExchangeExec(
distribution.createPartitioning(partitioning.numPartitions), child)
case (ShuffleExchangeExec(partitioning, child, _), distribution: OrderedDistribution) =>
ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child)
case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
.getOrElse(defaultNumPreShufflePartitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ class PlannerSuite extends SharedSparkSession {
}
}

test("SPARK-30036: EnsureRequirements replace Exchange " +
"if child has SortExec and RoundRobinPartitioning") {
test("SPARK-30036: Romove unnecessary RoundRobinPartitioning " +
"if SortExec is followed by RoundRobinPartitioning") {
val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil)
val partitioning = RoundRobinPartitioning(5)
assert(!partitioning.satisfies(distribution))
Expand All @@ -433,12 +433,30 @@ class PlannerSuite extends SharedSparkSession {
partitioning,
DummySparkPlan(outputPartitioning = partitioning)))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find{
case e: ShuffleExchangeExec => e.outputPartitioning.isInstanceOf[RoundRobinPartitioning]
assert(outputPlan.find {
case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true
case _ => false}.isEmpty,
"RoundRobinPartitioning should be changed to RangePartitioning")
}

test("SPARK-30036: Romove unnecessary HashPartitioning " +
"if SortExec is followed by HashPartitioning") {
val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil)
val partitioning = HashPartitioning(Literal(1) :: Nil, 5)
assert(!partitioning.satisfies(distribution))

val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil,
global = true,
child = ShuffleExchangeExec(
partitioning,
DummySparkPlan(outputPartitioning = partitioning)))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find {
case ShuffleExchangeExec(_: HashPartitioning, _, _) => true
case _ => false}.isEmpty,
"HashPartitioning should be changed to RangePartitioning")
}

test("EnsureRequirements does not eliminate Exchange with different partitioning") {
val distribution = ClusteredDistribution(Literal(1) :: Nil)
val partitioning = HashPartitioning(Literal(2) :: Nil, 5)
Expand Down

0 comments on commit 56a1101

Please sign in to comment.