Skip to content

Commit

Permalink
Merge pull request apache#320 from kayousterhout/erroneous_failed_msg
Browse files Browse the repository at this point in the history
Remove erroneous FAILED state for killed tasks.

Currently, when tasks are killed, the Executor first sends a
status update for the task with a "KILLED" state, and then
sends a second status update with a "FAILED" state saying that
the task failed due to an exception. The second FAILED state is
misleading/unncessary, and occurs due to a NonLocalReturnControl
Exception that gets thrown due to the way we kill tasks. This
commit eliminates that problem.

I'm not at all sure that this is the best way to fix this problem,
so alternate suggestions welcome. @rxin guessing you're the right
person to look at this.
  • Loading branch information
rxin committed Jan 2, 2014
2 parents 588a169 + a1b438d commit 0475ca8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
24 changes: 13 additions & 11 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,6 @@ private[spark] class Executor(
val tr = runningTasks.get(taskId)
if (tr != null) {
tr.kill()
// We remove the task also in the finally block in TaskRunner.run.
// The reason we need to remove it here is because killTask might be called before the task
// is even launched, and never reaching that finally block. ConcurrentHashMap's remove is
// idempotent.
runningTasks.remove(taskId)
}
}

Expand All @@ -167,6 +162,8 @@ private[spark] class Executor(
class TaskRunner(execBackend: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable {

object TaskKilledException extends Exception

@volatile private var killed = false
@volatile private var task: Task[Any] = _

Expand Down Expand Up @@ -200,9 +197,11 @@ private[spark] class Executor(
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
if (killed) {
logInfo("Executor killed task " + taskId)
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
return
// Throw an exception rather than returning, because returning within a try{} block
// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw TaskKilledException
}

attemptedTask = Some(task)
Expand All @@ -216,9 +215,7 @@ private[spark] class Executor(

// If the task has been killed, let's fail it.
if (task.killed) {
logInfo("Executor killed task " + taskId)
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
return
throw TaskKilledException
}

val resultSer = SparkEnv.get.serializer.newInstance()
Expand Down Expand Up @@ -260,6 +257,11 @@ private[spark] class Executor(
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
}

case TaskKilledException => {
logInfo("Executor killed task " + taskId)
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
}

case t: Throwable => {
val serviceTime = (System.currentTimeMillis() - taskStart).toInt
val metrics = attemptedTask.flatMap(t => t.metrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ private[spark] class TaskSchedulerImpl(
}
}
case None =>
logInfo("Ignoring update from TID " + tid + " because its task set is gone")
logInfo("Ignoring update with state %s from TID %s because its task set is gone"
.format(state, tid))
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
Expand Down

0 comments on commit 0475ca8

Please sign in to comment.