From 9ccae0c9e7d1a0a704e8cd7574ba508419e05e30 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Sat, 15 Dec 2018 13:55:24 +0800 Subject: [PATCH] [SPARK-26362][CORE] Remove 'spark.driver.allowMultipleContexts' to disallow multiple creation of SparkContexts ## What changes were proposed in this pull request? Multiple SparkContexts are discouraged and it has been warning for last 4 years, see SPARK-4180. It could cause arbitrary and mysterious error cases, see SPARK-2243. Honestly, I didn't even know Spark still allows it, which looks never officially supported, see SPARK-2243. I believe It should be good timing now to remove this configuration. ## How was this patch tested? Each doc was manually checked and manually tested: ``` $ ./bin/spark-shell --conf=spark.driver.allowMultipleContexts=true ... scala> new SparkContext() org.apache.spark.SparkException: Only one SparkContext should be running in this JVM (see SPARK-2243).The currently running SparkContext was created at: org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:939) ... org.apache.spark.SparkContext$.$anonfun$assertNoOtherContextIsRunning$2(SparkContext.scala:2435) at scala.Option.foreach(Option.scala:274) at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2432) at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2509) at org.apache.spark.SparkContext.(SparkContext.scala:80) at org.apache.spark.SparkContext.(SparkContext.scala:112) ... 49 elided ``` Closes #23311 from HyukjinKwon/SPARK-26362. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/SparkContext.scala | 65 +++++++------------ .../spark/api/java/JavaSparkContext.scala | 4 +- .../org/apache/spark/SparkContextSuite.scala | 19 +----- .../ExternalClusterManagerSuite.scala | 3 +- docs/rdd-programming-guide.md | 2 +- project/MimaExcludes.scala | 4 ++ python/pyspark/context.py | 3 + .../execution/ExchangeCoordinatorSuite.scala | 1 - 8 files changed, 34 insertions(+), 67 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 696dafda6d1ec..09cc346db0ed2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -64,9 +64,8 @@ import org.apache.spark.util.logging.DriverLogger * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * - * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before - * creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. - * + * @note Only one `SparkContext` should be active per JVM. You must `stop()` the + * active `SparkContext` before creating a new one. * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. */ @@ -75,14 +74,10 @@ class SparkContext(config: SparkConf) extends Logging { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() - // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active - private val allowMultipleContexts: Boolean = - config.getBoolean("spark.driver.allowMultipleContexts", false) - // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having started construction. // NOTE: this must be placed at the beginning of the SparkContext constructor. - SparkContext.markPartiallyConstructed(this, allowMultipleContexts) + SparkContext.markPartiallyConstructed(this) val startTime = System.currentTimeMillis() @@ -2392,7 +2387,7 @@ class SparkContext(config: SparkConf) extends Logging { // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having finished construction. // NOTE: this must be placed at the end of the SparkContext constructor. - SparkContext.setActiveContext(this, allowMultipleContexts) + SparkContext.setActiveContext(this) } /** @@ -2409,18 +2404,18 @@ object SparkContext extends Logging { private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() /** - * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`. + * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`. * - * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK. + * Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`. */ private val activeContext: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null) /** - * Points to a partially-constructed SparkContext if some thread is in the SparkContext + * Points to a partially-constructed SparkContext if another thread is in the SparkContext * constructor, or `None` if no SparkContext is being constructed. * - * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK + * Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`. */ private var contextBeingConstructed: Option[SparkContext] = None @@ -2428,24 +2423,16 @@ object SparkContext extends Logging { * Called to ensure that no other SparkContext is running in this JVM. * * Throws an exception if a running context is detected and logs a warning if another thread is - * constructing a SparkContext. This warning is necessary because the current locking scheme + * constructing a SparkContext. This warning is necessary because the current locking scheme * prevents us from reliably distinguishing between cases where another context is being * constructed and cases where another constructor threw an exception. */ - private def assertNoOtherContextIsRunning( - sc: SparkContext, - allowMultipleContexts: Boolean): Unit = { + private def assertNoOtherContextIsRunning(sc: SparkContext): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { Option(activeContext.get()).filter(_ ne sc).foreach { ctx => - val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." + - " To ignore this error, set spark.driver.allowMultipleContexts = true. " + + val errMsg = "Only one SparkContext should be running in this JVM (see SPARK-2243)." + s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}" - val exception = new SparkException(errMsg) - if (allowMultipleContexts) { - logWarning("Multiple running SparkContexts detected in the same JVM!", exception) - } else { - throw exception - } + throw new SparkException(errMsg) } contextBeingConstructed.filter(_ ne sc).foreach { otherContext => @@ -2454,7 +2441,7 @@ object SparkContext extends Logging { val otherContextCreationSite = Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location") val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" + - " constructor). This may indicate an error, since only one SparkContext may be" + + " constructor). This may indicate an error, since only one SparkContext should be" + " running in this JVM (see SPARK-2243)." + s" The other SparkContext was created at:\n$otherContextCreationSite" logWarning(warnMsg) @@ -2467,8 +2454,6 @@ object SparkContext extends Logging { * singleton object. Because we can only have one active SparkContext per JVM, * this is useful when applications may wish to share a SparkContext. * - * @note This function cannot be used to create multiple SparkContext instances - * even if multiple contexts are allowed. * @param config `SparkConfig` that will be used for initialisation of the `SparkContext` * @return current `SparkContext` (or a new one if it wasn't created before the function call) */ @@ -2477,7 +2462,7 @@ object SparkContext extends Logging { // from assertNoOtherContextIsRunning within setActiveContext SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { if (activeContext.get() == null) { - setActiveContext(new SparkContext(config), allowMultipleContexts = false) + setActiveContext(new SparkContext(config)) } else { if (config.getAll.nonEmpty) { logWarning("Using an existing SparkContext; some configuration may not take effect.") @@ -2494,14 +2479,12 @@ object SparkContext extends Logging { * * This method allows not passing a SparkConf (useful if just retrieving). * - * @note This function cannot be used to create multiple SparkContext instances - * even if multiple contexts are allowed. * @return current `SparkContext` (or a new one if wasn't created before the function call) */ def getOrCreate(): SparkContext = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { if (activeContext.get() == null) { - setActiveContext(new SparkContext(), allowMultipleContexts = false) + setActiveContext(new SparkContext()) } activeContext.get() } @@ -2516,16 +2499,14 @@ object SparkContext extends Logging { /** * Called at the beginning of the SparkContext constructor to ensure that no SparkContext is - * running. Throws an exception if a running context is detected and logs a warning if another - * thread is constructing a SparkContext. This warning is necessary because the current locking + * running. Throws an exception if a running context is detected and logs a warning if another + * thread is constructing a SparkContext. This warning is necessary because the current locking * scheme prevents us from reliably distinguishing between cases where another context is being * constructed and cases where another constructor threw an exception. */ - private[spark] def markPartiallyConstructed( - sc: SparkContext, - allowMultipleContexts: Boolean): Unit = { + private[spark] def markPartiallyConstructed(sc: SparkContext): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - assertNoOtherContextIsRunning(sc, allowMultipleContexts) + assertNoOtherContextIsRunning(sc) contextBeingConstructed = Some(sc) } } @@ -2534,18 +2515,16 @@ object SparkContext extends Logging { * Called at the end of the SparkContext constructor to ensure that no other SparkContext has * raced with this constructor and started. */ - private[spark] def setActiveContext( - sc: SparkContext, - allowMultipleContexts: Boolean): Unit = { + private[spark] def setActiveContext(sc: SparkContext): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - assertNoOtherContextIsRunning(sc, allowMultipleContexts) + assertNoOtherContextIsRunning(sc) contextBeingConstructed = None activeContext.set(sc) } } /** - * Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's + * Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's * also called in unit tests to prevent a flood of warnings from test suites that don't / can't * properly clean up their SparkContexts. */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 03f259d73e975..2f74d09b3a2bc 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -40,8 +40,8 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD} * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones. * - * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before - * creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. + * @note Only one `SparkContext` should be active per JVM. You must `stop()` the + * active `SparkContext` before creating a new one. */ class JavaSparkContext(val sc: SparkContext) extends Closeable { diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index ec4c7efb5835a..66de2f2ac86a4 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -44,7 +44,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu test("Only one SparkContext may be active at a time") { // Regression test for SPARK-4180 val conf = new SparkConf().setAppName("test").setMaster("local") - .set("spark.driver.allowMultipleContexts", "false") sc = new SparkContext(conf) val envBefore = SparkEnv.get // A SparkContext is already running, so we shouldn't be able to create a second one @@ -58,7 +57,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } test("Can still construct a new SparkContext after failing to construct a previous one") { - val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "false") + val conf = new SparkConf() // This is an invalid configuration (no app name or master URL) intercept[SparkException] { new SparkContext(conf) @@ -67,18 +66,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu sc = new SparkContext(conf.setMaster("local").setAppName("test")) } - test("Check for multiple SparkContexts can be disabled via undocumented debug option") { - var secondSparkContext: SparkContext = null - try { - val conf = new SparkConf().setAppName("test").setMaster("local") - .set("spark.driver.allowMultipleContexts", "true") - sc = new SparkContext(conf) - secondSparkContext = new SparkContext(conf) - } finally { - Option(secondSparkContext).foreach(_.stop()) - } - } - test("Test getOrCreate") { var sc2: SparkContext = null SparkContext.clearActiveContext() @@ -92,10 +79,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(sc === sc2) assert(sc eq sc2) - // Try creating second context to confirm that it's still possible, if desired - sc2 = new SparkContext(new SparkConf().setAppName("test3").setMaster("local") - .set("spark.driver.allowMultipleContexts", "true")) - sc2.stop() } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 0621c98d41184..30d0966691a3c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -25,8 +25,7 @@ import org.apache.spark.util.AccumulatorV2 class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext { test("launch of backend and scheduler") { - val conf = new SparkConf().setMaster("myclusterManager"). - setAppName("testcm").set("spark.driver.allowMultipleContexts", "true") + val conf = new SparkConf().setMaster("myclusterManager").setAppName("testcm") sc = new SparkContext(conf) // check if the scheduler components are created and initialized sc.schedulerBackend match { diff --git a/docs/rdd-programming-guide.md b/docs/rdd-programming-guide.md index 2d1ddae5780de..308a8ea653909 100644 --- a/docs/rdd-programming-guide.md +++ b/docs/rdd-programming-guide.md @@ -138,7 +138,7 @@ The first thing a Spark program must do is to create a [SparkContext](api/scala/ how to access a cluster. To create a `SparkContext` you first need to build a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object that contains information about your application. -Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before creating a new one. +Only one SparkContext should be active per JVM. You must `stop()` the active SparkContext before creating a new one. {% highlight scala %} val conf = new SparkConf().setAppName(appName).setMaster(master) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 883913332ca1e..7bb70a29195d6 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -220,6 +220,10 @@ object MimaExcludes { // [SPARK-26139] Implement shuffle write metrics in SQL ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ShuffleDependency.this"), + // [SPARK-26362][CORE] Remove 'spark.driver.allowMultipleContexts' to disallow multiple creation of SparkContexts + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.setActiveContext"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.markPartiallyConstructed"), + // Data Source V2 API changes (problem: Problem) => problem match { case MissingClassProblem(cls) => diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 1180bf91baa5a..6137ed25a0dd9 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -63,6 +63,9 @@ class SparkContext(object): Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create L{RDD} and broadcast variables on that cluster. + + .. note:: Only one :class:`SparkContext` should be active per JVM. You must `stop()` + the active :class:`SparkContext` before creating a new one. """ _gateway = None diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala index 6ad025f37e440..4a439940beb74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala @@ -263,7 +263,6 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { .setMaster("local[*]") .setAppName("test") .set("spark.ui.enabled", "false") - .set("spark.driver.allowMultipleContexts", "true") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")