From afaa7e37cce76da8173ff556d217a67a1633426a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Nov 2014 15:06:13 -0800 Subject: [PATCH] [SPARK-4180] Prevent creations of multiple active SparkContexts. --- .../scala/org/apache/spark/SparkContext.scala | 100 ++++++++++++++---- .../org/apache/spark/SparkContextSuite.scala | 48 ++++++++- 2 files changed, 124 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8b4db783979ec..fb3364e87dd4a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -179,6 +179,30 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { conf.setIfMissing("spark.driver.host", Utils.localHostName()) conf.setIfMissing("spark.driver.port", "0") + // This is placed after the configuration validation so that common configuration errors, like + // forgetting to pass a master url or app name, don't prevent subsequent SparkContexts from being + // constructed. + SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + SparkContext.activeSparkContextCreationSite.foreach { creationSite => + val errMsg = "Only one SparkContext may be active in this JVM (see SPARK-2243)." + val errDetails = if (SparkContext.activeSparkContextIsFullyConstructed) { + s"The currently active SparkContext was created at ${creationSite.shortForm}" + } else { + s"Another SparkContext, created at ${creationSite.shortForm}, is either being constructed" + + " or threw an exception from its constructor; please restart your JVM in order to" + + " create a new SparkContext." + } + val exception = new SparkException(s"$errMsg $errDetails") + if (conf.getBoolean("spark.driver.disableMultipleSparkContextsErrorChecking", false)) { + logWarning("Multiple SparkContext error detection is disabled!", exception) + } else { + throw exception + } + } + SparkContext.activeSparkContextCreationSite = Some(Utils.getCallSite()) + SparkContext.activeSparkContextIsFullyConstructed = false + } + val jars: Seq[String] = conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten @@ -1071,27 +1095,31 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { /** Shut down the SparkContext. */ def stop() { - postApplicationEnd() - ui.foreach(_.stop()) - // Do this only if not stopped already - best case effort. - // prevent NPE if stopped more than once. - val dagSchedulerCopy = dagScheduler - dagScheduler = null - if (dagSchedulerCopy != null) { - env.metricsSystem.report() - metadataCleaner.cancel() - env.actorSystem.stop(heartbeatReceiver) - cleaner.foreach(_.stop()) - dagSchedulerCopy.stop() - taskScheduler = null - // TODO: Cache.stop()? - env.stop() - SparkEnv.set(null) - listenerBus.stop() - eventLogger.foreach(_.stop()) - logInfo("Successfully stopped SparkContext") - } else { - logInfo("SparkContext already stopped") + SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + SparkContext.activeSparkContextCreationSite = None + SparkContext.activeSparkContextIsFullyConstructed = false + postApplicationEnd() + ui.foreach(_.stop()) + // Do this only if not stopped already - best case effort. + // prevent NPE if stopped more than once. + val dagSchedulerCopy = dagScheduler + dagScheduler = null + if (dagSchedulerCopy != null) { + env.metricsSystem.report() + metadataCleaner.cancel() + env.actorSystem.stop(heartbeatReceiver) + cleaner.foreach(_.stop()) + dagSchedulerCopy.stop() + taskScheduler = null + // TODO: Cache.stop()? + env.stop() + SparkEnv.set(null) + listenerBus.stop() + eventLogger.foreach(_.stop()) + logInfo("Successfully stopped SparkContext") + } else { + logInfo("SparkContext already stopped") + } } } @@ -1157,7 +1185,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { if (dagScheduler == null) { throw new SparkException("SparkContext has been shutdown") } - val callSite = getCallSite + val callSite = Utils.getCallSite() val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, @@ -1380,6 +1408,10 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { private[spark] def cleanup(cleanupTime: Long) { persistentRdds.clearOldValues(cleanupTime) } + + SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + SparkContext.activeSparkContextIsFullyConstructed = true + } } /** @@ -1388,6 +1420,30 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { */ object SparkContext extends Logging { + /** + * Lock that prevents multiple threads from being in the SparkContext constructor at the same + * time. + */ + private[spark] val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() + + /** + * Records the creation site of the last SparkContext to successfully enter the constructor. + * This may be an active SparkContext, or a SparkContext that is currently under construction. + * + * Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK + */ + private[spark] var activeSparkContextCreationSite: Option[CallSite] = None + + /** + * Tracks whether `activeSparkContextCreationSite` refers to a fully-constructed SparkContext + * or a partially-constructed one that is either still executing its constructor or threw + * an exception from its constructor. This is used to enable better error-reporting when + * SparkContext construction fails due to existing contexts. + * + * Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK + */ + private[spark] var activeSparkContextIsFullyConstructed: Boolean = false + private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 31edad1c56c73..78afd06a8b6a2 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -21,9 +21,53 @@ import org.scalatest.FunSuite import org.apache.hadoop.io.BytesWritable -class SparkContextSuite extends FunSuite { - //Regression test for SPARK-3121 +class SparkContextSuite extends FunSuite with LocalSparkContext { + + test("Only one SparkContext may be active at a time") { + // Regression test for SPARK-4180 + val conf = new SparkConf().setAppName("test").setMaster("local") + sc = new SparkContext(conf) + // A SparkContext is already running, so we shouldn't be able to create a second one + intercept[SparkException] { new SparkContext(conf) } + // After stopping the running context, we should be able to create a new one + resetSparkContext() + sc = new SparkContext(conf) + } + + test("Can still construct a new SparkContext after failing due to missing app name or master") { + val missingMaster = new SparkConf() + val missingAppName = missingMaster.clone.setMaster("local") + val validConf = missingAppName.clone.setAppName("test") + // We shouldn't be able to construct SparkContexts because these are invalid configurations + intercept[SparkException] { new SparkContext(missingMaster) } + intercept[SparkException] { new SparkContext(missingAppName) } + // Even though those earlier calls failed, we should still be able to create a new SparkContext + sc = new SparkContext(validConf) + } + + test("Check for multiple SparkContexts can be disabled via undocumented debug option") { + val propertyName = "spark.driver.disableMultipleSparkContextsErrorChecking" + val originalPropertyValue = System.getProperty(propertyName) + var secondSparkContext: SparkContext = null + try { + System.setProperty(propertyName, "true") + val conf = new SparkConf().setAppName("test").setMaster("local") + sc = new SparkContext(conf) + secondSparkContext = new SparkContext(conf) + } finally { + if (secondSparkContext != null) { + secondSparkContext.stop() + } + if (originalPropertyValue != null) { + System.setProperty(propertyName, originalPropertyValue) + } else { + System.clearProperty(propertyName) + } + } + } + test("BytesWritable implicit conversion is correct") { + // Regression test for SPARK-3121 val bytesWritable = new BytesWritable() val inputArray = (1 to 10).map(_.toByte).toArray bytesWritable.set(inputArray, 0, 10)