-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4180] [Core] Prevent creation of multiple active SparkContexts #3121
Changes from all commits
afaa7e3
918e878
c4d35a2
d0437eb
06c5c54
1c66070
ed17e14
4629d5c
7ba6db8
a1cba65
79a7e6f
d809cb4
f5bb78c
372d0d3
85a424a
c0987d3
d38251b
23c7123
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,12 +58,26 @@ import org.apache.spark.util._ | |
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark | ||
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. | ||
* | ||
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before | ||
* creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. | ||
* | ||
* @param config a Spark Config object describing the application configuration. Any settings in | ||
* this config overrides the default configs as well as system properties. | ||
*/ | ||
|
||
class SparkContext(config: SparkConf) extends Logging { | ||
|
||
// The call site where this SparkContext was constructed. | ||
private val creationSite: CallSite = Utils.getCallSite() | ||
|
||
// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active | ||
private val allowMultipleContexts: Boolean = | ||
config.getBoolean("spark.driver.allowMultipleContexts", false) | ||
|
||
// In order to prevent multiple SparkContexts from being active at the same time, mark this | ||
// context as having started construction. | ||
// NOTE: this must be placed at the beginning of the SparkContext constructor. | ||
SparkContext.markPartiallyConstructed(this, allowMultipleContexts) | ||
|
||
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, | ||
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It | ||
// contains a map from hostname to a list of input format splits on the host. | ||
|
@@ -1166,27 +1180,30 @@ class SparkContext(config: SparkConf) extends Logging { | |
|
||
/** Shut down the SparkContext. */ | ||
def stop() { | ||
postApplicationEnd() | ||
ui.foreach(_.stop()) | ||
// Do this only if not stopped already - best case effort. | ||
// prevent NPE if stopped more than once. | ||
val dagSchedulerCopy = dagScheduler | ||
dagScheduler = null | ||
if (dagSchedulerCopy != null) { | ||
env.metricsSystem.report() | ||
metadataCleaner.cancel() | ||
env.actorSystem.stop(heartbeatReceiver) | ||
cleaner.foreach(_.stop()) | ||
dagSchedulerCopy.stop() | ||
taskScheduler = null | ||
// TODO: Cache.stop()? | ||
env.stop() | ||
SparkEnv.set(null) | ||
listenerBus.stop() | ||
eventLogger.foreach(_.stop()) | ||
logInfo("Successfully stopped SparkContext") | ||
} else { | ||
logInfo("SparkContext already stopped") | ||
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { | ||
postApplicationEnd() | ||
ui.foreach(_.stop()) | ||
// Do this only if not stopped already - best case effort. | ||
// prevent NPE if stopped more than once. | ||
val dagSchedulerCopy = dagScheduler | ||
dagScheduler = null | ||
if (dagSchedulerCopy != null) { | ||
env.metricsSystem.report() | ||
metadataCleaner.cancel() | ||
env.actorSystem.stop(heartbeatReceiver) | ||
cleaner.foreach(_.stop()) | ||
dagSchedulerCopy.stop() | ||
taskScheduler = null | ||
// TODO: Cache.stop()? | ||
env.stop() | ||
SparkEnv.set(null) | ||
listenerBus.stop() | ||
eventLogger.foreach(_.stop()) | ||
logInfo("Successfully stopped SparkContext") | ||
SparkContext.clearActiveContext() | ||
} else { | ||
logInfo("SparkContext already stopped") | ||
} | ||
} | ||
} | ||
|
||
|
@@ -1475,6 +1492,11 @@ class SparkContext(config: SparkConf) extends Logging { | |
private[spark] def cleanup(cleanupTime: Long) { | ||
persistentRdds.clearOldValues(cleanupTime) | ||
} | ||
|
||
// In order to prevent multiple SparkContexts from being active at the same time, mark this | ||
// context as having finished construction. | ||
// NOTE: this must be placed at the end of the SparkContext constructor. | ||
SparkContext.setActiveContext(this, allowMultipleContexts) | ||
} | ||
|
||
/** | ||
|
@@ -1483,6 +1505,107 @@ class SparkContext(config: SparkConf) extends Logging { | |
*/ | ||
object SparkContext extends Logging { | ||
|
||
/** | ||
* Lock that guards access to global variables that track SparkContext construction. | ||
*/ | ||
private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() | ||
|
||
/** | ||
* The active, fully-constructed SparkContext. If no SparkContext is active, then this is `None`. | ||
* | ||
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK | ||
*/ | ||
private var activeContext: Option[SparkContext] = None | ||
|
||
/** | ||
* Points to a partially-constructed SparkContext if some thread is in the SparkContext | ||
* constructor, or `None` if no SparkContext is being constructed. | ||
* | ||
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK | ||
*/ | ||
private var contextBeingConstructed: Option[SparkContext] = None | ||
|
||
/** | ||
* Called to ensure that no other SparkContext is running in this JVM. | ||
* | ||
* Throws an exception if a running context is detected and logs a warning if another thread is | ||
* constructing a SparkContext. This warning is necessary because the current locking scheme | ||
* prevents us from reliably distinguishing between cases where another context is being | ||
* constructed and cases where another constructor threw an exception. | ||
*/ | ||
private def assertNoOtherContextIsRunning( | ||
sc: SparkContext, | ||
allowMultipleContexts: Boolean): Unit = { | ||
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { | ||
contextBeingConstructed.foreach { otherContext => | ||
if (otherContext ne sc) { // checks for reference equality | ||
// Since otherContext might point to a partially-constructed context, guard against | ||
// its creationSite field being null: | ||
val otherContextCreationSite = | ||
Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location") | ||
val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason we don't print the callsite of the other constructor in this case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess you assume if there is a real issue it will be logged later on? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I originally thought that it might require some additional bookeeping, but I think it would probably be safe to just log otherContext.creationSite. |
||
" constructor). This may indicate an error, since only one SparkContext may be" + | ||
" running in this JVM (see SPARK-2243)." + | ||
s" The other SparkContext was created at:\n$otherContextCreationSite" | ||
logWarning(warnMsg) | ||
} | ||
|
||
activeContext.foreach { ctx => | ||
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." + | ||
" To ignore this error, set spark.driver.allowMultipleContexts = true. " + | ||
s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}" | ||
val exception = new SparkException(errMsg) | ||
if (allowMultipleContexts) { | ||
logWarning("Multiple running SparkContexts detected in the same JVM!", exception) | ||
} else { | ||
throw exception | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is | ||
* running. Throws an exception if a running context is detected and logs a warning if another | ||
* thread is constructing a SparkContext. This warning is necessary because the current locking | ||
* scheme prevents us from reliably distinguishing between cases where another context is being | ||
* constructed and cases where another constructor threw an exception. | ||
*/ | ||
private[spark] def markPartiallyConstructed( | ||
sc: SparkContext, | ||
allowMultipleContexts: Boolean): Unit = { | ||
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { | ||
assertNoOtherContextIsRunning(sc, allowMultipleContexts) | ||
contextBeingConstructed = Some(sc) | ||
} | ||
} | ||
|
||
/** | ||
* Called at the end of the SparkContext constructor to ensure that no other SparkContext has | ||
* raced with this constructor and started. | ||
*/ | ||
private[spark] def setActiveContext( | ||
sc: SparkContext, | ||
allowMultipleContexts: Boolean): Unit = { | ||
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { | ||
assertNoOtherContextIsRunning(sc, allowMultipleContexts) | ||
contextBeingConstructed = None | ||
activeContext = Some(sc) | ||
} | ||
} | ||
|
||
/** | ||
* Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's | ||
* also called in unit tests to prevent a flood of warnings from test suites that don't / can't | ||
* properly clean up their SparkContexts. | ||
*/ | ||
private[spark] def clearActiveContext(): Unit = { | ||
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { | ||
activeContext = None | ||
} | ||
} | ||
|
||
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" | ||
|
||
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,9 +21,62 @@ import org.scalatest.FunSuite | |
|
||
import org.apache.hadoop.io.BytesWritable | ||
|
||
class SparkContextSuite extends FunSuite { | ||
//Regression test for SPARK-3121 | ||
class SparkContextSuite extends FunSuite with LocalSparkContext { | ||
|
||
/** Allows system properties to be changed in tests */ | ||
private def withSystemProperty[T](property: String, value: String)(block: => T): T = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be put in some test utility somewhere? Or at least we can make a JIRA to use this throughout tests? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it might be nice to move this into a test utilities object / subpackage, along with LocalSparkContext, SharedSparkContext, my systemproperties DI code, etc. Let's do this in a separate JIRA. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, created https://issues.apache.org/jira/browse/SPARK-4442 to track this. |
||
val originalValue = System.getProperty(property) | ||
try { | ||
System.setProperty(property, value) | ||
block | ||
} finally { | ||
if (originalValue == null) { | ||
System.clearProperty(property) | ||
} else { | ||
System.setProperty(property, originalValue) | ||
} | ||
} | ||
} | ||
|
||
test("Only one SparkContext may be active at a time") { | ||
// Regression test for SPARK-4180 | ||
withSystemProperty("spark.driver.allowMultipleContexts", "false") { | ||
val conf = new SparkConf().setAppName("test").setMaster("local") | ||
sc = new SparkContext(conf) | ||
// A SparkContext is already running, so we shouldn't be able to create a second one | ||
intercept[SparkException] { new SparkContext(conf) } | ||
// After stopping the running context, we should be able to create a new one | ||
resetSparkContext() | ||
sc = new SparkContext(conf) | ||
} | ||
} | ||
|
||
test("Can still construct a new SparkContext after failing to construct a previous one") { | ||
withSystemProperty("spark.driver.allowMultipleContexts", "false") { | ||
// This is an invalid configuration (no app name or master URL) | ||
intercept[SparkException] { | ||
new SparkContext(new SparkConf()) | ||
} | ||
// Even though those earlier calls failed, we should still be able to create a new context | ||
sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test")) | ||
} | ||
} | ||
|
||
test("Check for multiple SparkContexts can be disabled via undocumented debug option") { | ||
withSystemProperty("spark.driver.allowMultipleContexts", "true") { | ||
var secondSparkContext: SparkContext = null | ||
try { | ||
val conf = new SparkConf().setAppName("test").setMaster("local") | ||
sc = new SparkContext(conf) | ||
secondSparkContext = new SparkContext(conf) | ||
} finally { | ||
Option(secondSparkContext).foreach(_.stop()) | ||
} | ||
} | ||
} | ||
|
||
test("BytesWritable implicit conversion is correct") { | ||
// Regression test for SPARK-3121 | ||
val bytesWritable = new BytesWritable() | ||
val inputArray = (1 to 10).map(_.toByte).toArray | ||
bytesWritable.set(inputArray, 0, 10) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -978,6 +978,7 @@ | |
<spark.testing>1</spark.testing> | ||
<spark.ui.enabled>false</spark.ui.enabled> | ||
<spark.executor.extraClassPath>${test_classpath}</spark.executor.extraClassPath> | ||
<spark.driver.allowMultipleContexts>true</spark.driver.allowMultipleContexts> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it a TODO to set this to false in the tests? I imagine we want to force our tests to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think that we'd eventually want to have this be By the way, if you look in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've created https://issues.apache.org/jira/browse/SPARK-4424 so that we remember to finish the test cleanup / refactoring. |
||
</systemProperties> | ||
</configuration> | ||
<executions> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add something stronger in the docs here in case someone tries to move this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and similar for the corresponding call at the end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea.