Skip to content

Commit

Permalink
[SPARK-2645] [CORE] Allow SparkEnv.stop() to be called multiple times…
Browse files Browse the repository at this point in the history
… without side effects.

Fix for SparkContext stop behavior - Allow sc.stop() to be called multiple times without side effects.

Author: Joshi <rekhajoshm@gmail.com>
Author: Rekha Joshi <rekhajoshm@gmail.com>

Closes apache#6973 from rekhajoshm/SPARK-2645 and squashes the following commits:

277043e [Joshi] Fix for SparkContext stop behavior
446b0a4 [Joshi] Fix for SparkContext stop behavior
2ce5760 [Joshi] Fix for SparkContext stop behavior
c97839a [Joshi] Fix for SparkContext stop behavior
1aff39c [Joshi] Fix for SparkContext stop behavior
12f66b5 [Joshi] Fix for SparkContext stop behavior
72bb484 [Joshi] Fix for SparkContext stop behavior
a5a7d7f [Joshi] Fix for SparkContext stop behavior
9193a0c [Joshi] Fix for SparkContext stop behavior
58dba70 [Joshi] SPARK-2645: Fix for SparkContext stop behavior
380c5b0 [Joshi] SPARK-2645: Fix for SparkContext stop behavior
b566b66 [Joshi] SPARK-2645: Fix for SparkContext stop behavior
0be142d [Rekha Joshi] Merge pull request #3 from apache/master
106fd8e [Rekha Joshi] Merge pull request #2 from apache/master
e3677c9 [Rekha Joshi] Merge pull request #1 from apache/master
  • Loading branch information
rekhajoshm authored and Andrew Or committed Jun 30, 2015
1 parent 79f0b37 commit 7dda084
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 32 deletions.
66 changes: 34 additions & 32 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.net.Socket

import akka.actor.ActorSystem

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Properties

Expand Down Expand Up @@ -90,39 +89,42 @@ class SparkEnv (
private var driverTmpDirToDelete: Option[String] = None

private[spark] def stop() {
isStopped = true
pythonWorkers.foreach { case(key, worker) => worker.stop() }
Option(httpFileServer).foreach(_.stop())
mapOutputTracker.stop()
shuffleManager.stop()
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
rpcEnv.shutdown()

// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
// actorSystem.awaitTermination()

// Note that blockTransferService is stopped by BlockManager since it is started by it.

// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs.
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
// current working dir in executor which we do not need to delete.
driverTmpDirToDelete match {
case Some(path) => {
try {
Utils.deleteRecursively(new File(path))
} catch {
case e: Exception =>
logWarning(s"Exception while deleting Spark temp dir: $path", e)

if (!isStopped) {
isStopped = true
pythonWorkers.values.foreach(_.stop())
Option(httpFileServer).foreach(_.stop())
mapOutputTracker.stop()
shuffleManager.stop()
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
rpcEnv.shutdown()

// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
// actorSystem.awaitTermination()

// Note that blockTransferService is stopped by BlockManager since it is started by it.

// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs.
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
// current working dir in executor which we do not need to delete.
driverTmpDirToDelete match {
case Some(path) => {
try {
Utils.deleteRecursively(new File(path))
} catch {
case e: Exception =>
logWarning(s"Exception while deleting Spark temp dir: $path", e)
}
}
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
}

Expand Down
13 changes: 13 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.util.Utils

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import org.scalatest.Matchers._

class SparkContextSuite extends SparkFunSuite with LocalSparkContext {

Expand Down Expand Up @@ -272,4 +273,16 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
sc.stop()
}
}

test("calling multiple sc.stop() must not throw any exception") {
noException should be thrownBy {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
val cnt = sc.parallelize(1 to 4).count()
sc.cancelAllJobs()
sc.stop()
// call stop second time
sc.stop()
}
}

}

0 comments on commit 7dda084

Please sign in to comment.