Skip to content

Commit

Permalink
Incorporate more review feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Nov 15, 2014
1 parent 372d0d3 commit 85a424a
Showing 1 changed file with 25 additions and 17 deletions.
42 changes: 25 additions & 17 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,15 @@ import org.apache.spark.util._

class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having started construction
SparkContext.markPartiallyConstructed(this, config)

/**
* The call site where this SparkContext was constructed.
*/
private val creationSite: CallSite = Utils.getCallSite()

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
Expand Down Expand Up @@ -1194,7 +1201,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
if (dagScheduler == null) {
throw new SparkException("SparkContext has been shutdown")
}
val callSite = Utils.getCallSite()
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
Expand Down Expand Up @@ -1418,7 +1425,9 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
persistentRdds.clearOldValues(cleanupTime)
}

SparkContext.markFullyConstructed(this, config)
// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having finished construction
SparkContext.setActiveContext(this, config)
}

/**
Expand All @@ -1430,23 +1439,22 @@ object SparkContext extends Logging {
/**
* Lock that guards access to global variables that track SparkContext construction.
*/
private[spark] val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()
private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()

/**
* Records the creation site of the active, fully-constructed SparkContext. If no SparkContext
* is active, then this is `None`.
* The active, fully-constructed SparkContext. If no SparkContext is active, then this is `None`.
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
*/
private[spark] var activeContextCreationSite: Option[CallSite] = None
private var activeContext: Option[SparkContext] = None

/**
* Points to a partially-constructed SparkContext if some thread is in the SparkContext
* constructor, or `None` if no SparkContext is being constructed.
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
*/
private[spark] var contextBeingConstructed: Option[SparkContext] = None
private var contextBeingConstructed: Option[SparkContext] = None

/**
* Called to ensure that no other SparkContext is running in this JVM.
Expand All @@ -1456,20 +1464,20 @@ object SparkContext extends Logging {
* prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*/
private def assertNoOtherContextIsRunning(sc: SparkContext, conf: SparkConf) {
private def assertNoOtherContextIsRunning(sc: SparkContext, conf: SparkConf): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
contextBeingConstructed.foreach { otherContext =>
if (otherContext ne sc) {
if (otherContext ne sc) { // checks for reference equality
val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" +
" constructor). This may indicate an error, since only one SparkContext may be" +
" running in this JVM (see SPARK-2243)."
logWarning(warnMsg)
}

activeContextCreationSite.foreach { creationSite =>
activeContext.foreach { ctx =>
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
" To ignore this error, set spark.driver.allowMultipleContexts = true. " +
s"The currently running SparkContext was created at:\n${creationSite.longForm}"
s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
val exception = new SparkException(errMsg)
if (conf.getBoolean("spark.driver.allowMultipleContexts", false)) {
logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
Expand All @@ -1488,7 +1496,7 @@ object SparkContext extends Logging {
* scheme prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*/
private[spark] def markPartiallyConstructed(sc: SparkContext, conf: SparkConf) {
private[spark] def markPartiallyConstructed(sc: SparkContext, conf: SparkConf): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, conf)
contextBeingConstructed = Some(sc)
Expand All @@ -1499,22 +1507,22 @@ object SparkContext extends Logging {
* Called at the end of the SparkContext constructor to ensure that no other SparkContext has
* raced with this constructor and started.
*/
private[spark] def markFullyConstructed(sc: SparkContext, conf: SparkConf) {
private[spark] def setActiveContext(sc: SparkContext, conf: SparkConf): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, conf)
contextBeingConstructed = None
activeContextCreationSite = Some(Utils.getCallSite())
activeContext = Some(sc)
}
}

/**
* Clears the active SparkContext metadata. This is called by `SparkContext.stop()`. It's
* Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's
* also called in unit tests to prevent a flood of warnings from test suites that don't / can't
* properly clean up their SparkContexts.
*/
private[spark] def clearActiveContext() {
private[spark] def clearActiveContext(): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
activeContextCreationSite = None
activeContext = None
}
}

Expand Down

0 comments on commit 85a424a

Please sign in to comment.