Skip to content

Commit

Permalink
Address review feedback; expose hack workaround for existing unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Nov 7, 2014
1 parent 1c66070 commit ed17e14
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 27 deletions.
78 changes: 52 additions & 26 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,26 +185,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
// This is placed after the configuration validation so that common configuration errors, like
// forgetting to pass a master url or app name, don't prevent subsequent SparkContexts from being
// constructed.
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
SparkContext.activeSparkContextCreationSite.foreach { creationSite =>
val errMsg = "Only one SparkContext may be active in this JVM (see SPARK-2243)."
val errDetails = if (SparkContext.activeSparkContextIsFullyConstructed) {
s"The currently active SparkContext was created at:\n${creationSite.longForm}"
} else {
s"Another SparkContext is either being constructed or threw an exception from its" +
" constructor; please restart your JVM in order to create a new SparkContext." +
s"The current SparkContext was created at:\n${creationSite.longForm}"
}
val exception = new SparkException(s"$errMsg $errDetails")
if (conf.getBoolean("spark.driver.disableMultipleSparkContextsErrorChecking", false)) {
logWarning("Multiple SparkContext error detection is disabled!", exception)
} else {
throw exception
}
}
SparkContext.activeSparkContextCreationSite = Some(Utils.getCallSite())
SparkContext.activeSparkContextIsFullyConstructed = false
}
SparkContext.verifyUniqueConstruction(conf)

val jars: Seq[String] =
conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
Expand Down Expand Up @@ -1124,8 +1105,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
/** Shut down the SparkContext. */
def stop() {
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
SparkContext.activeSparkContextCreationSite = None
SparkContext.activeSparkContextIsFullyConstructed = false
postApplicationEnd()
ui.foreach(_.stop())
// Do this only if not stopped already - best case effort.
Expand All @@ -1145,6 +1124,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
SparkContext.clearActiveContext()
} else {
logInfo("SparkContext already stopped")
}
Expand Down Expand Up @@ -1437,9 +1417,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
persistentRdds.clearOldValues(cleanupTime)
}

SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
SparkContext.activeSparkContextIsFullyConstructed = true
}
SparkContext.markFullyConstructed()
}

/**
Expand Down Expand Up @@ -1470,7 +1448,55 @@ object SparkContext extends Logging {
*
* Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
*/
private[spark] var activeSparkContextIsFullyConstructed: Boolean = false
private[spark] var activeContextIsFullyConstructed: Boolean = false

/**
* Called in the SparkContext constructor to ensure that no other SparkContext is running
* in the same JVM.
*/
private[spark] def verifyUniqueConstruction(conf: SparkConf) {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
activeSparkContextCreationSite.foreach { creationSite =>
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)."
val errDetails = if (activeContextIsFullyConstructed) {
s"The currently running SparkContext was created at:\n${creationSite.longForm}"
} else {
s"Another SparkContext is either being constructed or threw an exception from its" +
" constructor; please restart your JVM in order to create a new SparkContext." +
s"The current SparkContext was created at:\n${creationSite.longForm}"
}
val exception = new SparkException(s"$errMsg $errDetails")
if (conf.getBoolean("spark.driver.allowMultipleContexts", false)) {
logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
} else {
throw exception
}
}
activeSparkContextCreationSite = Some(Utils.getCallSite())
activeContextIsFullyConstructed = false
}
}

/**
* Called once the SparkContext that's undergoing construction is fully constructed.
*/
private[spark] def markFullyConstructed() {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
activeContextIsFullyConstructed = true
}
}

/**
* Clears the active SparkContext metadata. This is called by `SparkContext.stop()`. It's
* also called in unit tests to prevent a flood of warnings from test suites that don't / can't
* properly clean up their SparkContexts.
*/
private[spark] def clearActiveContext() {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
activeSparkContextCreationSite = None
activeContextIsFullyConstructed = false
}
}

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,24 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
.set("spark.dynamicAllocation.enabled", "true")
intercept[SparkException] { new SparkContext(conf) }
SparkEnv.get.stop() // cleanup the created environment
SparkContext.clearActiveContext()

// Only min
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
intercept[SparkException] { new SparkContext(conf1) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()

// Only max
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
intercept[SparkException] { new SparkContext(conf2) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()

// Both min and max, but min > max
intercept[SparkException] { createSparkContext(2, 1) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()

// Both min and max, and min == max
val sc1 = createSparkContext(1, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
}

test("Check for multiple SparkContexts can be disabled via undocumented debug option") {
val propertyName = "spark.driver.disableMultipleSparkContextsErrorChecking"
val propertyName = "spark.driver.allowMultipleContexts"
val originalPropertyValue = System.getProperty(propertyName)
var secondSparkContext: SparkContext = null
try {
Expand Down

0 comments on commit ed17e14

Please sign in to comment.