Skip to content

Commit

Permalink
Add test.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jun 23, 2020
1 parent 43c4726 commit 8e39ed7
Showing 1 changed file with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1029,21 +1029,30 @@ class AdaptiveQueryExecSuite
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.SHUFFLE_PARTITIONS.key -> "6",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
val df = spark.range(10).repartition($"id")
val partitionsNum = df.rdd.collectPartitions().length
val df1 = spark.range(10).repartition($"id")
val df2 = spark.range(10).repartition(10, $"id")
val df3 = spark.range(10).repartition(10)

val partitionsNum1 = df1.rdd.collectPartitions().length
if (enableAQE) {
assert(partitionsNum < 6)
assert(partitionsNum1 < 6)

val plan = df.queryExecution.executedPlan
val plan = df1.queryExecution.executedPlan
assert(plan.isInstanceOf[AdaptiveSparkPlanExec])
val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect {
case s: ShuffleExchangeExec => s
}
assert(shuffle.size == 1)
assert(shuffle(0).outputPartitioning.numPartitions == 7)
} else {
assert(partitionsNum === 6)
assert(partitionsNum1 === 6)
}

val partitionsNum2 = df2.rdd.collectPartitions().length
assert(partitionsNum2 == 10)

val partitionsNum3 = df3.rdd.collectPartitions().length
assert(partitionsNum3 == 10)
}
}
}
Expand Down

0 comments on commit 8e39ed7

Please sign in to comment.