From 1867867a00241ff1bd20d2ac3ac610ed126a9280 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Fri, 9 May 2014 13:28:26 -0700 Subject: [PATCH 1/3] [RFC] SPARK-1772 Stop catching Throwable, let Executors die The main issue this patch fixes is [SPARK-1772](https://issues.apache.org/jira/browse/SPARK-1772), in which Executors may not die when fatal exceptions (e.g., OOM) are thrown. This patch causes Executors to delegate to the ExecutorUncaughtExceptionHandler when a fatal exception is thrown. This patch also continues the fight in the neverending war against `case t: Throwable =>`, by only catching Exceptions in many places, and adding a wrapper for Threads and Runnables to make sure any uncaught exceptions are at least printed to the logs. It also turns out that it is unlikely that the IndestructibleActorSystem actually works, given testing ([here](https://gist.github.com/aarondav/ca1f0cdcd50727f89c0d)). The uncaughtExceptionHandler is not called from the places that we expected it would be. [SPARK-1620](https://issues.apache.org/jira/browse/SPARK-1620) deals with part of this issue, but refactoring our Actor Systems to ensure that exceptions are dealt with properly is a much bigger change, outside the scope of this PR. --- .../org/apache/spark/ContextCleaner.scala | 11 +-- .../scala/org/apache/spark/SparkContext.scala | 12 ++-- .../apache/spark/api/python/PythonRDD.scala | 4 +- .../api/python/PythonWorkerFactory.scala | 2 +- .../org/apache/spark/deploy/Client.scala | 2 +- .../apache/spark/deploy/SparkHadoopUtil.scala | 2 +- .../spark/deploy/history/HistoryServer.scala | 8 +-- .../apache/spark/deploy/master/Master.scala | 4 +- .../spark/deploy/worker/DriverWrapper.scala | 2 +- .../CoarseGrainedExecutorBackend.scala | 2 +- .../org/apache/spark/executor/Executor.scala | 58 +++++++--------- .../ExecutorUncaughtExceptionHandler.scala | 53 +++++++++++++++ .../scheduler/EventLoggingListener.scala | 4 +- .../spark/scheduler/TaskResultGetter.scala | 8 +-- .../spark/storage/DiskBlockManager.scala | 6 +- .../spark/storage/TachyonBlockManager.scala | 7 +- .../org/apache/spark/util/AkkaUtils.scala | 11 +-- .../util/IndestructibleActorSystem.scala | 68 ------------------- .../scala/org/apache/spark/util/Utils.scala | 26 ++++++- 19 files changed, 142 insertions(+), 148 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala delete mode 100644 core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 54e08d7866f75..e2d2250982daa 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils /** * Classes that represent cleaning tasks. @@ -110,7 +111,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } /** Keep cleaning RDD, shuffle, and broadcast state. */ - private def keepCleaning() { + private def keepCleaning(): Unit = Utils.logUncaughtExceptions { while (!stopped) { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) @@ -128,7 +129,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } } catch { - case t: Throwable => logError("Error in cleaning thread", t) + case e: Exception => logError("Error in cleaning thread", e) } } } @@ -141,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { listeners.foreach(_.rddCleaned(rddId)) logInfo("Cleaned RDD " + rddId) } catch { - case t: Throwable => logError("Error cleaning RDD " + rddId, t) + case e: Exception => logError("Error cleaning RDD " + rddId, e) } } @@ -154,7 +155,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { listeners.foreach(_.shuffleCleaned(shuffleId)) logInfo("Cleaned shuffle " + shuffleId) } catch { - case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t) + case e: Exception => logError("Error cleaning shuffle " + shuffleId, e) } } @@ -166,7 +167,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { listeners.foreach(_.broadcastCleaned(broadcastId)) logInfo("Cleaned broadcast " + broadcastId) } catch { - case t: Throwable => logError("Error cleaning broadcast " + broadcastId, t) + case e: Exception => logError("Error cleaning broadcast " + broadcastId, e) } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9d7c2c8d3d630..2e325e0e29ae6 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1494,8 +1494,8 @@ object SparkContext extends Logging { } catch { // TODO: Enumerate the exact reasons why it can fail // But irrespective of it, it means we cannot proceed ! - case th: Throwable => { - throw new SparkException("YARN mode not available ?", th) + case e: Exception => { + throw new SparkException("YARN mode not available ?", e) } } val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) @@ -1510,8 +1510,8 @@ object SparkContext extends Logging { cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] } catch { - case th: Throwable => { - throw new SparkException("YARN mode not available ?", th) + case e: Exception => { + throw new SparkException("YARN mode not available ?", e) } } @@ -1521,8 +1521,8 @@ object SparkContext extends Logging { val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] } catch { - case th: Throwable => { - throw new SparkException("YARN mode not available ?", th) + case e: Exception => { + throw new SparkException("YARN mode not available ?", e) } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index fecd9762f3f60..b6d2e3844c20a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -170,7 +170,7 @@ private[spark] class PythonRDD[T: ClassTag]( this.interrupt() } - override def run() { + override def run(): Unit = Utils.logUncaughtExceptions { try { SparkEnv.set(env) val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize) @@ -281,7 +281,7 @@ private[spark] object PythonRDD { } } catch { case eof: EOFException => {} - case e: Throwable => throw e + case e: Exception => throw e } JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 002f2acd94dee..dbc48f6c8996f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -71,7 +71,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String stopDaemon() startDaemon() new Socket(daemonHost, daemonPort) - case e: Throwable => throw e + case e: Exception => throw e } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 7ead1171525d2..aeb159adc31d9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -157,7 +157,7 @@ object Client { // TODO: See if we can initialize akka so return messages are sent back using the same TCP // flow. Else, this (sadly) requires the DriverClient be routable from the Master. val (actorSystem, _) = AkkaUtils.createActorSystem( - "driverClient", Utils.localHostName(), 0, false, conf, new SecurityManager(conf)) + "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index e2df1b8954124..148115d3ed351 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -103,7 +103,7 @@ object SparkHadoopUtil { .newInstance() .asInstanceOf[SparkHadoopUtil] } catch { - case th: Throwable => throw new SparkException("Unable to load YARN support", th) + case e: Exception => throw new SparkException("Unable to load YARN support", e) } } else { new SparkHadoopUtil diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 1238bbf9da2fd..a9c11dca5678e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -70,7 +70,7 @@ class HistoryServer( * TODO: Add a mechanism to update manually. */ private val logCheckingThread = new Thread { - override def run() { + override def run(): Unit = Utils.logUncaughtExceptions { while (!stopped) { val now = System.currentTimeMillis if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) { @@ -154,7 +154,7 @@ class HistoryServer( numCompletedApplications = logInfos.size } catch { - case t: Throwable => logError("Exception in checking for event log updates", t) + case e: Exception => logError("Exception in checking for event log updates", e) } } else { logWarning("Attempted to check for event log updates before binding the server.") @@ -231,8 +231,8 @@ class HistoryServer( dir.getModificationTime } } catch { - case t: Throwable => - logError("Exception in accessing modification time of %s".format(dir.getPath), t) + case e: Exception => + logError("Exception in accessing modification time of %s".format(dir.getPath), e) -1L } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fdb633bd33608..45cc2c447327b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -675,8 +675,8 @@ private[spark] class Master( webUi.attachSparkUI(ui) return true } catch { - case t: Throwable => - logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t) + case e: Exception => + logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e) } } else { logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir)) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index be15138f62406..05e242e6df702 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -31,7 +31,7 @@ object DriverWrapper { case workerUrl :: mainClass :: extraArgs => val conf = new SparkConf() val (actorSystem, _) = AkkaUtils.createActorSystem("Driver", - Utils.localHostName(), 0, false, conf, new SecurityManager(conf)) + Utils.localHostName(), 0, conf, new SecurityManager(conf)) actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher") // Delegate to supplied main class diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index e912ae8a5d3c5..84aec65b7765d 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -105,7 +105,7 @@ private[spark] object CoarseGrainedExecutorBackend { // Create a new ActorSystem to run the backend, because we can't create a // SparkEnv / Executor before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, - indestructible = true, conf = conf, new SecurityManager(conf)) + conf, new SecurityManager(conf)) // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 98e7e0be813be..05e23fe110d87 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -74,28 +74,7 @@ private[spark] class Executor( // Setup an uncaught exception handler for non-local mode. // Make any thread terminations due to uncaught exceptions kill the entire // executor process to avoid surprising stalls. - Thread.setDefaultUncaughtExceptionHandler( - new Thread.UncaughtExceptionHandler { - override def uncaughtException(thread: Thread, exception: Throwable) { - try { - logError("Uncaught exception in thread " + thread, exception) - - // We may have been called from a shutdown hook. If so, we must not call System.exit(). - // (If we do, we will deadlock.) - if (!Utils.inShutdown()) { - if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) - } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) - } - } - } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) - } - } - } - ) + Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler) } val executorSource = new ExecutorSource(this, executorId) @@ -259,19 +238,30 @@ private[spark] class Executor( } case t: Throwable => { - val serviceTime = System.currentTimeMillis() - taskStart - val metrics = attemptedTask.flatMap(t => t.metrics) - for (m <- metrics) { - m.executorRunTime = serviceTime - m.jvmGCTime = gcTime - startGCTime - } - val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) - execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) + // Attempt to exit cleanly by informing the driver of our failure. + // If anything goes wrong (or this was a fatal exception), we will delegate to + // the default uncaught exception handler, which will terminate the Executor. + try { + logError("Exception in task ID " + taskId, t) + + val serviceTime = System.currentTimeMillis() - taskStart + val metrics = attemptedTask.flatMap(t => t.metrics) + for (m <- metrics) { + m.executorRunTime = serviceTime + m.jvmGCTime = gcTime - startGCTime + } + val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) + execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) - // TODO: Should we exit the whole executor here? On the one hand, the failed task may - // have left some weird state around depending on when the exception was thrown, but on - // the other hand, maybe we could detect that when future tasks fail and exit then. - logError("Exception in task ID " + taskId, t) + // Don't forcibly exit unless the exception was inherently fatal, to avoid + // stopping other tasks unnecessarily. + if (Utils.isFatalError(t)) { + ExecutorUncaughtExceptionHandler.uncaughtException(t) + } + } catch { + case t2: Throwable => + ExecutorUncaughtExceptionHandler.uncaughtException(t2) + } } } finally { // TODO: Unregister shuffle memory only for ResultTask diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala new file mode 100644 index 0000000000000..b0e984c03964c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.Logging +import org.apache.spark.util.Utils + +/** + * The default uncaught exception handler for Executors terminates the whole process, to avoid + * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better + * to fail fast when things go wrong. + */ +private[spark] object ExecutorUncaughtExceptionHandler + extends Thread.UncaughtExceptionHandler with Logging { + + override def uncaughtException(thread: Thread, exception: Throwable) { + try { + logError("Uncaught exception in thread " + thread, exception) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!Utils.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + } + } + } catch { + case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) + } + } + + def uncaughtException(exception: Throwable) { + uncaughtException(Thread.currentThread(), exception) + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 7968a0691db10..a90b0d475c04e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -206,8 +206,8 @@ private[spark] object EventLoggingListener extends Logging { applicationComplete = filePaths.exists { path => isApplicationCompleteFile(path.getName) } ) } catch { - case t: Throwable => - logError("Exception in parsing logging info from directory %s".format(logDir), t) + case e: Exception => + logError("Exception in parsing logging info from directory %s".format(logDir), e) EventLoggingInfo.empty } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index c9ad2b151daf0..99d305b36a959 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -43,7 +43,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul def enqueueSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) { getTaskResultExecutor.execute(new Runnable { - override def run() { + override def run(): Unit = Utils.logUncaughtExceptions { try { val result = serializer.get().deserialize[TaskResult[_]](serializedData) match { case directResult: DirectTaskResult[_] => directResult @@ -70,7 +70,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul case cnf: ClassNotFoundException => val loader = Thread.currentThread.getContextClassLoader taskSetManager.abort("ClassNotFound with classloader: " + loader) - case ex: Throwable => + case ex: Exception => taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex)) } } @@ -81,7 +81,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul serializedData: ByteBuffer) { var reason : TaskEndReason = UnknownReason getTaskResultExecutor.execute(new Runnable { - override def run() { + override def run(): Unit = Utils.logUncaughtExceptions { try { if (serializedData != null && serializedData.limit() > 0) { reason = serializer.get().deserialize[TaskEndReason]( @@ -94,7 +94,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul val loader = Utils.getContextOrSparkClassLoader logError( "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) - case ex: Throwable => {} + case ex: Exception => {} } scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index cf6ef0029a861..3a7243a1ba19c 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -148,7 +148,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD private def addShutdownHook() { localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { - override def run() { + override def run(): Unit = Utils.logUncaughtExceptions { logDebug("Shutdown hook called") DiskBlockManager.this.stop() } @@ -162,8 +162,8 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD try { if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) } catch { - case t: Throwable => - logError("Exception while deleting local spark dir: " + localDir, t) + case e: Exception => + logError("Exception while deleting local spark dir: " + localDir, e) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index b0b9674856568..a6cbe3aa440ff 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -25,7 +25,6 @@ import tachyon.client.TachyonFile import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.network.netty.ShuffleSender import org.apache.spark.util.Utils @@ -137,7 +136,7 @@ private[spark] class TachyonBlockManager( private def addShutdownHook() { tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir)) Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") { - override def run() { + override def run(): Unit = Utils.logUncaughtExceptions { logDebug("Shutdown hook called") tachyonDirs.foreach { tachyonDir => try { @@ -145,8 +144,8 @@ private[spark] class TachyonBlockManager( Utils.deleteRecursively(tachyonDir, client) } } catch { - case t: Throwable => - logError("Exception while deleting tachyon spark dir: " + tachyonDir, t) + case e: Exception => + logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) } } } diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 8afe09a117ebc..a8d12bb2a0165 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.util import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.duration.{Duration, FiniteDuration} -import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem} +import akka.actor.{ActorSystem, ExtendedActorSystem} import com.typesafe.config.ConfigFactory import org.apache.log4j.{Level, Logger} @@ -41,7 +41,7 @@ private[spark] object AkkaUtils extends Logging { * If indestructible is set to true, the Actor System will continue running in the event * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. */ - def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false, + def createActorSystem(name: String, host: String, port: Int, conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { val akkaThreads = conf.getInt("spark.akka.threads", 4) @@ -101,12 +101,7 @@ private[spark] object AkkaUtils extends Logging { |akka.log-dead-letters-during-shutdown = $lifecycleEvents """.stripMargin)) - val actorSystem = if (indestructible) { - IndestructibleActorSystem(name, akkaConf) - } else { - ActorSystem(name, akkaConf) - } - + val actorSystem = ActorSystem(name, akkaConf) val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.getDefaultAddress.port.get (actorSystem, boundPort) diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala deleted file mode 100644 index 4188a869c13da..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Must be in akka.actor package as ActorSystemImpl is protected[akka]. -package akka.actor - -import scala.util.control.{ControlThrowable, NonFatal} - -import com.typesafe.config.Config - -/** - * An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception - * This is necessary as Spark Executors are allowed to recover from fatal exceptions - * (see org.apache.spark.executor.Executor) - */ -object IndestructibleActorSystem { - def apply(name: String, config: Config): ActorSystem = - apply(name, config, ActorSystem.findClassLoader()) - - def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = - new IndestructibleActorSystemImpl(name, config, classLoader).start() -} - -private[akka] class IndestructibleActorSystemImpl( - override val name: String, - applicationConfig: Config, - classLoader: ClassLoader) - extends ActorSystemImpl(name, applicationConfig, classLoader) { - - protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = { - val fallbackHandler = super.uncaughtExceptionHandler - - new Thread.UncaughtExceptionHandler() { - def uncaughtException(thread: Thread, cause: Throwable): Unit = { - if (isFatalError(cause) && !settings.JvmExitOnFatalError) { - log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " + - "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name) - // shutdown() //TODO make it configurable - } else { - fallbackHandler.uncaughtException(thread, cause) - } - } - } - } - - def isFatalError(e: Throwable): Boolean = { - e match { - case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable => - false - case _ => - true - } - } -} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3f0ed61c5bbfb..9cc06e6a4a3f8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -29,6 +29,7 @@ import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag import scala.util.Try +import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -41,7 +42,6 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} - /** * Various utility methods used by Spark. */ @@ -1125,4 +1125,28 @@ private[spark] object Utils extends Logging { } } + /** + * Executes the given block, printing and re-throwing any uncaught exceptions. + * This is particularly useful for wrapping code that runs in a thread, to ensure + * that exceptions are printed, and to avoid having to catch Throwable. + */ + def logUncaughtExceptions[T](f: => T): T = { + try { + f + } catch { + case t: Throwable => + logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + throw t + } + } + + /** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */ + def isFatalError(e: Throwable): Boolean = { + e match { + case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable => + false + case _ => + true + } + } } From e937a0a9f26a06631f0f578add9de02e5f5b4b40 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Mon, 12 May 2014 09:53:53 -0700 Subject: [PATCH 2/3] Address Prashant and Matei's comments --- .../apache/spark/api/python/PythonRDD.scala | 1 - .../org/apache/spark/executor/Executor.scala | 35 ++++++++----------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index b6d2e3844c20a..8ac27589bbb71 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -281,7 +281,6 @@ private[spark] object PythonRDD { } } catch { case eof: EOFException => {} - case e: Exception => throw e } JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 05e23fe110d87..baee7a216a7c3 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -241,26 +241,21 @@ private[spark] class Executor( // Attempt to exit cleanly by informing the driver of our failure. // If anything goes wrong (or this was a fatal exception), we will delegate to // the default uncaught exception handler, which will terminate the Executor. - try { - logError("Exception in task ID " + taskId, t) - - val serviceTime = System.currentTimeMillis() - taskStart - val metrics = attemptedTask.flatMap(t => t.metrics) - for (m <- metrics) { - m.executorRunTime = serviceTime - m.jvmGCTime = gcTime - startGCTime - } - val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) - execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) - - // Don't forcibly exit unless the exception was inherently fatal, to avoid - // stopping other tasks unnecessarily. - if (Utils.isFatalError(t)) { - ExecutorUncaughtExceptionHandler.uncaughtException(t) - } - } catch { - case t2: Throwable => - ExecutorUncaughtExceptionHandler.uncaughtException(t2) + logError("Exception in task ID " + taskId, t) + + val serviceTime = System.currentTimeMillis() - taskStart + val metrics = attemptedTask.flatMap(t => t.metrics) + for (m <- metrics) { + m.executorRunTime = serviceTime + m.jvmGCTime = gcTime - startGCTime + } + val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) + execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) + + // Don't forcibly exit unless the exception was inherently fatal, to avoid + // stopping other tasks unnecessarily. + if (Utils.isFatalError(t)) { + ExecutorUncaughtExceptionHandler.uncaughtException(t) } } } finally { From f9b9bfe110afbc30c9af8cf2d3e2fa251f28c4e4 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Mon, 12 May 2014 09:56:18 -0700 Subject: [PATCH 3/3] Remove other redundant 'throw e' --- .../scala/org/apache/spark/api/python/PythonWorkerFactory.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index dbc48f6c8996f..759cbe2c46c52 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -71,7 +71,6 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String stopDaemon() startDaemon() new Socket(daemonHost, daemonPort) - case e: Exception => throw e } } }