From 6497ccbbda1874187ee60a4f6368e6d9ae6580ff Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 28 Dec 2020 16:33:01 -0800 Subject: [PATCH] [SPARK-33916][CORE] Fix fallback storage offset and improve compression codec test coverage ### What changes were proposed in this pull request? This PR aims to fix offset bug and improve compression codec test coverage. ### Why are the changes needed? When the user choose a non-default codec, it causes a failure. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the extended test suite. Closes #30934 from dongjoon-hyun/SPARK-33916. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../spark/storage/FallbackStorage.scala | 2 +- .../spark/storage/FallbackStorageSuite.scala | 67 ++++++++++--------- 2 files changed, 36 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala index 9221731f77a59..41126357f8983 100644 --- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala +++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala @@ -158,7 +158,7 @@ object FallbackStorage extends Logging { val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$name") val f = fallbackFileSystem.open(dataFile) - val size = nextOffset - 1 - offset + val size = nextOffset - offset logDebug(s"To byte array $size") val array = new Array[Byte](size.toInt) val startTimeNs = System.nanoTime() diff --git a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala index 2eeae2ecad5eb..c07edb65efb53 100644 --- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala @@ -59,6 +59,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { test("fallback storage APIs - copy/exists") { val conf = new SparkConf(false) .set("spark.app.id", "testId") + .set(SHUFFLE_COMPRESS, false) .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/") @@ -227,43 +228,45 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { } } - test("Newly added executors should access old data from remote storage") { - sc = new SparkContext(getSparkConf(2, 0)) - withSpark(sc) { sc => - TestUtils.waitUntilExecutorsUp(sc, 2, 60000) - val rdd1 = sc.parallelize(1 to 10, 2) - val rdd2 = rdd1.map(x => (x % 2, 1)) - val rdd3 = rdd2.reduceByKey(_ + _) - assert(rdd3.collect() === Array((0, 5), (1, 5))) + Seq("lz4", "lzf", "snappy", "zstd").foreach { codec => + test(s"$codec - Newly added executors should access old data from remote storage") { + sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC, codec)) + withSpark(sc) { sc => + TestUtils.waitUntilExecutorsUp(sc, 2, 60000) + val rdd1 = sc.parallelize(1 to 10, 2) + val rdd2 = rdd1.map(x => (x % 2, 1)) + val rdd3 = rdd2.reduceByKey(_ + _) + assert(rdd3.collect() === Array((0, 5), (1, 5))) + + // Decommission all + val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] + sc.getExecutorIds().foreach { + sched.decommissionExecutor(_, ExecutorDecommissionInfo(""), false) + } - // Decommission all - val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] - sc.getExecutorIds().foreach { - sched.decommissionExecutor(_, ExecutorDecommissionInfo(""), false) - } + // Make it sure that fallback storage are ready + val fallbackStorage = new FallbackStorage(sc.getConf) + eventually(timeout(10.seconds), interval(1.seconds)) { + Seq( + "shuffle_0_0_0.index", "shuffle_0_0_0.data", + "shuffle_0_1_0.index", "shuffle_0_1_0.data").foreach { file => + assert(fallbackStorage.exists(0, file)) + } + } - // Make it sure that fallback storage are ready - val fallbackStorage = new FallbackStorage(sc.getConf) - eventually(timeout(10.seconds), interval(1.seconds)) { - Seq( - "shuffle_0_0_0.index", "shuffle_0_0_0.data", - "shuffle_0_1_0.index", "shuffle_0_1_0.data").foreach { file => - assert(fallbackStorage.exists(0, file)) + // Since the data is safe, force to shrink down to zero executor + sc.getExecutorIds().foreach { id => + sched.killExecutor(id) + } + eventually(timeout(20.seconds), interval(1.seconds)) { + assert(sc.getExecutorIds().isEmpty) } - } - // Since the data is safe, force to shrink down to zero executor - sc.getExecutorIds().foreach { id => - sched.killExecutor(id) - } - eventually(timeout(20.seconds), interval(1.seconds)) { - assert(sc.getExecutorIds().isEmpty) + // Dynamic allocation will start new executors + assert(rdd3.collect() === Array((0, 5), (1, 5))) + assert(rdd3.sortByKey().count() == 2) + assert(sc.getExecutorIds().nonEmpty) } - - // Dynamic allocation will start new executors - assert(rdd3.collect() === Array((0, 5), (1, 5))) - assert(rdd3.sortByKey().count() == 2) - assert(sc.getExecutorIds().nonEmpty) } } }