Skip to content

Commit

Permalink
[SPARK-45568][TESTS] Fix flaky WholeStageCodegenSparkSubmitSuite
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

WholeStageCodegenSparkSubmitSuite is [flaky](https://github.com/apache/spark/actions/runs/6479534195/job/17593342589) because SHUFFLE_PARTITIONS(200) creates 200 reducers for one total core and improper stop progress causes executor launcher reties. The heavy load and reties might result in timeout test failures.

### Why are the changes needed?

CI robustness

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing WholeStageCodegenSparkSubmitSuite
### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#43394 from yaooqinn/SPARK-45568.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
yaooqinn committed Oct 17, 2023
1 parent 3b46cc8 commit f00ec39
Showing 1 changed file with 30 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.deploy.SparkSubmitTestUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
import org.apache.spark.sql.functions.{array, col, count, lit}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.tags.ExtendedSQLTest
import org.apache.spark.unsafe.Platform
Expand Down Expand Up @@ -70,39 +71,41 @@ class WholeStageCodegenSparkSubmitSuite extends SparkSubmitTestUtils

object WholeStageCodegenSparkSubmitSuite extends Assertions with Logging {

var spark: SparkSession = _

def main(args: Array[String]): Unit = {
TestUtils.configTestLog4j2("INFO")

spark = SparkSession.builder().getOrCreate()
val spark = SparkSession.builder()
.config(SQLConf.SHUFFLE_PARTITIONS.key, "2")
.getOrCreate()

try {
// Make sure the test is run where the driver and the executors uses different object layouts
val driverArrayHeaderSize = Platform.BYTE_ARRAY_OFFSET
val executorArrayHeaderSize =
spark.sparkContext.range(0, 1).map(_ => Platform.BYTE_ARRAY_OFFSET).collect().head
assert(driverArrayHeaderSize > executorArrayHeaderSize)

// Make sure the test is run where the driver and the executors uses different object layouts
val driverArrayHeaderSize = Platform.BYTE_ARRAY_OFFSET
val executorArrayHeaderSize =
spark.sparkContext.range(0, 1).map(_ => Platform.BYTE_ARRAY_OFFSET).collect.head.toInt
assert(driverArrayHeaderSize > executorArrayHeaderSize)
val df = spark.range(71773).select((col("id") % lit(10)).cast(IntegerType) as "v")
.groupBy(array(col("v"))).agg(count(col("*")))
val plan = df.queryExecution.executedPlan
assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]))

val df = spark.range(71773).select((col("id") % lit(10)).cast(IntegerType) as "v")
.groupBy(array(col("v"))).agg(count(col("*")))
val plan = df.queryExecution.executedPlan
assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]))
val expectedAnswer =
Row(Array(0), 7178) ::
Row(Array(1), 7178) ::
Row(Array(2), 7178) ::
Row(Array(3), 7177) ::
Row(Array(4), 7177) ::
Row(Array(5), 7177) ::
Row(Array(6), 7177) ::
Row(Array(7), 7177) ::
Row(Array(8), 7177) ::
Row(Array(9), 7177) :: Nil

val expectedAnswer =
Row(Array(0), 7178) ::
Row(Array(1), 7178) ::
Row(Array(2), 7178) ::
Row(Array(3), 7177) ::
Row(Array(4), 7177) ::
Row(Array(5), 7177) ::
Row(Array(6), 7177) ::
Row(Array(7), 7177) ::
Row(Array(8), 7177) ::
Row(Array(9), 7177) :: Nil
val result = df.collect
QueryTest.sameRows(result.toSeq, expectedAnswer) match {
case Some(errMsg) => fail(errMsg)
case _ =>
QueryTest.checkAnswer(df, expectedAnswer)
} finally {
spark.stop()
}

}
}

0 comments on commit f00ec39

Please sign in to comment.