-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1772 Stop catching Throwable, let Executors die #715
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,28 +74,7 @@ private[spark] class Executor( | |
// Setup an uncaught exception handler for non-local mode. | ||
// Make any thread terminations due to uncaught exceptions kill the entire | ||
// executor process to avoid surprising stalls. | ||
Thread.setDefaultUncaughtExceptionHandler( | ||
new Thread.UncaughtExceptionHandler { | ||
override def uncaughtException(thread: Thread, exception: Throwable) { | ||
try { | ||
logError("Uncaught exception in thread " + thread, exception) | ||
|
||
// We may have been called from a shutdown hook. If so, we must not call System.exit(). | ||
// (If we do, we will deadlock.) | ||
if (!Utils.inShutdown()) { | ||
if (exception.isInstanceOf[OutOfMemoryError]) { | ||
System.exit(ExecutorExitCode.OOM) | ||
} else { | ||
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) | ||
} | ||
} | ||
} catch { | ||
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) | ||
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) | ||
} | ||
} | ||
} | ||
) | ||
Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler) | ||
} | ||
|
||
val executorSource = new ExecutorSource(this, executorId) | ||
|
@@ -259,19 +238,30 @@ private[spark] class Executor( | |
} | ||
|
||
case t: Throwable => { | ||
val serviceTime = System.currentTimeMillis() - taskStart | ||
val metrics = attemptedTask.flatMap(t => t.metrics) | ||
for (m <- metrics) { | ||
m.executorRunTime = serviceTime | ||
m.jvmGCTime = gcTime - startGCTime | ||
} | ||
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) | ||
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) | ||
// Attempt to exit cleanly by informing the driver of our failure. | ||
// If anything goes wrong (or this was a fatal exception), we will delegate to | ||
// the default uncaught exception handler, which will terminate the Executor. | ||
try { | ||
logError("Exception in task ID " + taskId, t) | ||
|
||
val serviceTime = System.currentTimeMillis() - taskStart | ||
val metrics = attemptedTask.flatMap(t => t.metrics) | ||
for (m <- metrics) { | ||
m.executorRunTime = serviceTime | ||
m.jvmGCTime = gcTime - startGCTime | ||
} | ||
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) | ||
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) | ||
|
||
// TODO: Should we exit the whole executor here? On the one hand, the failed task may | ||
// have left some weird state around depending on when the exception was thrown, but on | ||
// the other hand, maybe we could detect that when future tasks fail and exit then. | ||
logError("Exception in task ID " + taskId, t) | ||
// Don't forcibly exit unless the exception was inherently fatal, to avoid | ||
// stopping other tasks unnecessarily. | ||
if (Utils.isFatalError(t)) { | ||
ExecutorUncaughtExceptionHandler.uncaughtException(t) | ||
} | ||
} catch { | ||
case t2: Throwable => | ||
ExecutorUncaughtExceptionHandler.uncaughtException(t2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't the uncaught exception handler for this thread be set to deal with this, instead of another catch? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, good point. I kind of like being explicit over relying on the globally set uncaught exception handler. I could be happy with getting rid of this and replacing it with a comment, though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually just realized we basically already have that comment, just interpreted in a different way :) |
||
} | ||
} | ||
} finally { | ||
// TODO: Unregister shuffle memory only for ResultTask | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.executor | ||
|
||
import org.apache.spark.Logging | ||
import org.apache.spark.util.Utils | ||
|
||
/** | ||
* The default uncaught exception handler for Executors terminates the whole process, to avoid | ||
* getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better | ||
* to fail fast when things go wrong. | ||
*/ | ||
private[spark] object ExecutorUncaughtExceptionHandler | ||
extends Thread.UncaughtExceptionHandler with Logging { | ||
|
||
override def uncaughtException(thread: Thread, exception: Throwable) { | ||
try { | ||
logError("Uncaught exception in thread " + thread, exception) | ||
|
||
// We may have been called from a shutdown hook. If so, we must not call System.exit(). | ||
// (If we do, we will deadlock.) | ||
if (!Utils.inShutdown()) { | ||
if (exception.isInstanceOf[OutOfMemoryError]) { | ||
System.exit(ExecutorExitCode.OOM) | ||
} else { | ||
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) | ||
} | ||
} | ||
} catch { | ||
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) | ||
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) | ||
} | ||
} | ||
|
||
def uncaughtException(exception: Throwable) { | ||
uncaughtException(Thread.currentThread(), exception) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this line ?