Skip to content

Commit

Permalink
Improve handling of failed SparkContext creation attempts.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Nov 10, 2014
1 parent 79a7e6f commit d809cb4
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 51 deletions.
98 changes: 57 additions & 41 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ import org.apache.spark.util._

class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {

SparkContext.markPartiallyConstructed(this, config)

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
Expand Down Expand Up @@ -182,11 +184,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
conf.setIfMissing("spark.driver.host", Utils.localHostName())
conf.setIfMissing("spark.driver.port", "0")

// This is placed after the configuration validation so that common configuration errors, like
// forgetting to pass a master url or app name, don't prevent subsequent SparkContexts from being
// constructed.
SparkContext.verifyUniqueConstruction(conf)

val jars: Seq[String] =
conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

Expand Down Expand Up @@ -1421,7 +1418,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
persistentRdds.clearOldValues(cleanupTime)
}

SparkContext.markFullyConstructed()
SparkContext.markFullyConstructed(this, config)
}

/**
Expand All @@ -1431,62 +1428,82 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
object SparkContext extends Logging {

/**
* Lock that prevents multiple threads from being in the SparkContext constructor at the same
* time.
* Lock that guards access to global variables that track SparkContext construction.
*/
private[spark] val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()

/**
* Records the creation site of the last SparkContext to successfully enter the constructor.
* This may be an active SparkContext, or a SparkContext that is currently under construction.
* Records the creation site of the active, fully-constructed SparkContext. If no SparkContext
* is active, then this is `None`.
*
* Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
*/
private[spark] var activeSparkContextCreationSite: Option[CallSite] = None
private[spark] var activeContextCreationSite: Option[CallSite] = None

/**
* Tracks whether `activeSparkContextCreationSite` refers to a fully-constructed SparkContext
* or a partially-constructed one that is either still executing its constructor or threw
* an exception from its constructor. This is used to enable better error-reporting when
* SparkContext construction fails due to existing contexts.
* Points to a partially-constructed SparkContext if some thread is in the SparkContext
* constructor, or `None` if no SparkContext is being constructed.
*
* Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
*/
private[spark] var activeContextIsFullyConstructed: Boolean = false
private[spark] var contextBeingConstructed: Option[SparkContext] = None

/**
* Called in the SparkContext constructor to ensure that no other SparkContext is running
* in the same JVM.
* Called to ensure that no other SparkContext is running in this JVM.
*
* Throws an exception if a running context is detected and logs a warning if another thread is
* constructing a SparkContext. This warning is necessary because the current locking scheme
* prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*/
private[spark] def verifyUniqueConstruction(conf: SparkConf) {
private def assertNoOtherContextIsRunning(sc: SparkContext, conf: SparkConf) {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
activeSparkContextCreationSite.foreach { creationSite =>
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)."
val errDetails = if (activeContextIsFullyConstructed) {
s"The currently running SparkContext was created at:\n${creationSite.longForm}"
} else {
s"Another SparkContext is either being constructed or threw an exception from its" +
" constructor; please restart your JVM in order to create a new SparkContext." +
s"The current SparkContext was created at:\n${creationSite.longForm}"
contextBeingConstructed.foreach { otherContext =>
if (otherContext ne sc) {
val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" +
" constructor). This may indicate an error, since only one SparkContext may be" +
" running in this JVM (see SPARK-2243)."
logWarning(warnMsg)
}
val exception = new SparkException(s"$errMsg $errDetails")
if (conf.getBoolean("spark.driver.allowMultipleContexts", false)) {
logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
} else {
throw exception

activeContextCreationSite.foreach { creationSite =>
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
" To ignore this error, set spark.driver.allowMultipleContexts = true. " +
s"The currently running SparkContext was created at:\n${creationSite.longForm}"
val exception = new SparkException(errMsg)
if (conf.getBoolean("spark.driver.allowMultipleContexts", false)) {
logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
} else {
throw exception
}
}
}
activeSparkContextCreationSite = Some(Utils.getCallSite())
activeContextIsFullyConstructed = false
}
}

/**
* Called once the SparkContext that's undergoing construction is fully constructed.
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
* running. Throws an exception if a running context is detected and logs a warning if another
* thread is constructing a SparkContext. This warning is necessary because the current locking
* scheme prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*/
private[spark] def markPartiallyConstructed(sc: SparkContext, conf: SparkConf) {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, conf)
contextBeingConstructed = Some(sc)
}
}

/**
* Called at the end of the SparkContext constructor to ensure that no other SparkContext has
* raced with this constructor and started.
*/
private[spark] def markFullyConstructed() {
private[spark] def markFullyConstructed(sc: SparkContext, conf: SparkConf) {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
activeContextIsFullyConstructed = true
assertNoOtherContextIsRunning(sc, conf)
contextBeingConstructed = None
activeContextCreationSite = Some(Utils.getCallSite())
}
}

Expand All @@ -1497,8 +1514,7 @@ object SparkContext extends Logging {
*/
private[spark] def clearActiveContext() {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
activeSparkContextCreationSite = None
activeContextIsFullyConstructed = false
activeContextCreationSite = None
}
}

Expand Down
14 changes: 4 additions & 10 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,14 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
}
}

test("Can still construct a new SparkContext after failing due to missing app name or master") {
test("Can still construct a new SparkContext after failing to construct a previous one") {
withSystemProperty("spark.driver.allowMultipleContexts", "false") {
val missingMaster = new SparkConf()
val missingAppName = missingMaster.clone.setMaster("local")
val validConf = missingAppName.clone.setAppName("test")
// We shouldn't be able to construct SparkContexts because these are invalid configurations
// This is an invalid configuration (no app name or master URL)
intercept[SparkException] {
new SparkContext(missingMaster)
}
intercept[SparkException] {
new SparkContext(missingAppName)
new SparkContext(new SparkConf())
}
// Even though those earlier calls failed, we should still be able to create a new context
sc = new SparkContext(validConf)
sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))
}
}

Expand Down

0 comments on commit d809cb4

Please sign in to comment.