From 85a424a70c6bb63e159fd73d8f35d750a3aa92b1 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 14 Nov 2014 23:18:29 -0800 Subject: [PATCH] Incorporate more review feedback. --- .../scala/org/apache/spark/SparkContext.scala | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f0fe4cf4e280e..725fcea551f39 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -66,8 +66,15 @@ import org.apache.spark.util._ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { + // In order to prevent multiple SparkContexts from being active at the same time, mark this + // context as having started construction SparkContext.markPartiallyConstructed(this, config) + /** + * The call site where this SparkContext was constructed. + */ + private val creationSite: CallSite = Utils.getCallSite() + // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It // contains a map from hostname to a list of input format splits on the host. @@ -1194,7 +1201,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { if (dagScheduler == null) { throw new SparkException("SparkContext has been shutdown") } - val callSite = Utils.getCallSite() + val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, @@ -1418,7 +1425,9 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { persistentRdds.clearOldValues(cleanupTime) } - SparkContext.markFullyConstructed(this, config) + // In order to prevent multiple SparkContexts from being active at the same time, mark this + // context as having finished construction + SparkContext.setActiveContext(this, config) } /** @@ -1430,15 +1439,14 @@ object SparkContext extends Logging { /** * Lock that guards access to global variables that track SparkContext construction. */ - private[spark] val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() + private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() /** - * Records the creation site of the active, fully-constructed SparkContext. If no SparkContext - * is active, then this is `None`. + * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `None`. * * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK */ - private[spark] var activeContextCreationSite: Option[CallSite] = None + private var activeContext: Option[SparkContext] = None /** * Points to a partially-constructed SparkContext if some thread is in the SparkContext @@ -1446,7 +1454,7 @@ object SparkContext extends Logging { * * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK */ - private[spark] var contextBeingConstructed: Option[SparkContext] = None + private var contextBeingConstructed: Option[SparkContext] = None /** * Called to ensure that no other SparkContext is running in this JVM. @@ -1456,20 +1464,20 @@ object SparkContext extends Logging { * prevents us from reliably distinguishing between cases where another context is being * constructed and cases where another constructor threw an exception. */ - private def assertNoOtherContextIsRunning(sc: SparkContext, conf: SparkConf) { + private def assertNoOtherContextIsRunning(sc: SparkContext, conf: SparkConf): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { contextBeingConstructed.foreach { otherContext => - if (otherContext ne sc) { + if (otherContext ne sc) { // checks for reference equality val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" + " constructor). This may indicate an error, since only one SparkContext may be" + " running in this JVM (see SPARK-2243)." logWarning(warnMsg) } - activeContextCreationSite.foreach { creationSite => + activeContext.foreach { ctx => val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." + " To ignore this error, set spark.driver.allowMultipleContexts = true. " + - s"The currently running SparkContext was created at:\n${creationSite.longForm}" + s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}" val exception = new SparkException(errMsg) if (conf.getBoolean("spark.driver.allowMultipleContexts", false)) { logWarning("Multiple running SparkContexts detected in the same JVM!", exception) @@ -1488,7 +1496,7 @@ object SparkContext extends Logging { * scheme prevents us from reliably distinguishing between cases where another context is being * constructed and cases where another constructor threw an exception. */ - private[spark] def markPartiallyConstructed(sc: SparkContext, conf: SparkConf) { + private[spark] def markPartiallyConstructed(sc: SparkContext, conf: SparkConf): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { assertNoOtherContextIsRunning(sc, conf) contextBeingConstructed = Some(sc) @@ -1499,22 +1507,22 @@ object SparkContext extends Logging { * Called at the end of the SparkContext constructor to ensure that no other SparkContext has * raced with this constructor and started. */ - private[spark] def markFullyConstructed(sc: SparkContext, conf: SparkConf) { + private[spark] def setActiveContext(sc: SparkContext, conf: SparkConf): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { assertNoOtherContextIsRunning(sc, conf) contextBeingConstructed = None - activeContextCreationSite = Some(Utils.getCallSite()) + activeContext = Some(sc) } } /** - * Clears the active SparkContext metadata. This is called by `SparkContext.stop()`. It's + * Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's * also called in unit tests to prevent a flood of warnings from test suites that don't / can't * properly clean up their SparkContexts. */ - private[spark] def clearActiveContext() { + private[spark] def clearActiveContext(): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - activeContextCreationSite = None + activeContext = None } }