From cdc8af7e63dc6c5bbd96575a4bdb8675af5825da Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 16 May 2024 10:55:58 +0900 Subject: [PATCH] Address a comment --- .../scala/org/apache/spark/SparkContext.scala | 3 ++- .../org/apache/spark/CheckpointSuite.scala | 17 ++++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ba517e3d4f210..6018c87b01224 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -374,7 +374,6 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] def cleaner: Option[ContextCleaner] = _cleaner private[spark] var checkpointDir: Option[String] = None - config.getOption(CHECKPOINT_DIR.key).foreach(setCheckpointDir) // Thread Local variable that can be used by users to pass information down the stack protected[spark] val localProperties = new InheritableThreadLocal[Properties] { @@ -602,6 +601,8 @@ class SparkContext(config: SparkConf) extends Logging { .foreach(logLevel => _schedulerBackend.updateExecutorsLogLevel(logLevel)) } + _conf.get(CHECKPOINT_DIR).foreach(setCheckpointDir) + val _executorMetricsSource = if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { Some(new ExecutorMetricsSource) diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 874f4896bb01e..06bb08d672d2f 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -588,7 +588,6 @@ object CheckpointSuite { } class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext { - test("checkpoint compression") { withTempDir { checkpointDir => val conf = new SparkConf() @@ -669,4 +668,20 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext { assert(rdd.firstParent.isInstanceOf[ReliableCheckpointRDD[_]]) } } + + test("SPARK-48268: checkpoint directory via configuration") { + withTempDir { checkpointDir => + val conf = new SparkConf() + .set("spark.checkpoint.dir", checkpointDir.toString) + .set(UI_ENABLED.key, "false") + sc = new SparkContext("local", "test", conf) + val parCollection = sc.makeRDD(1 to 4) + val flatMappedRDD = parCollection.flatMap(x => 1 to x) + flatMappedRDD.checkpoint() + assert(flatMappedRDD.dependencies.head.rdd === parCollection) + val result = flatMappedRDD.collect() + assert(flatMappedRDD.dependencies.head.rdd != parCollection) + assert(flatMappedRDD.collect() === result) + } + } }