Skip to content

Commit

Permalink
Simplify unnecessary branching in DAGScheduler
Browse files Browse the repository at this point in the history
With this change, we ALWAYS post task end events in canceled
stages for both tasks that succeeded and tasks that failed.
  • Loading branch information
Andrew Or committed Jan 27, 2016
1 parent 822899d commit ba5f555
Showing 1 changed file with 6 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1144,13 +1144,12 @@ class DAGScheduler(
null
}

// The success case is dealt with separately below.
// TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here.
if (event.reason != Success) {
val attemptId = task.stageAttemptId
listenerBus.post(SparkListenerTaskEnd(
stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics))
}
// Note: this stage may already have been canceled, in which case this task end event
// maybe posted after the stage completed event. There's not much we can do here without
// introducing additional complexity in the scheduler to wait for all the task end events
// before posting the stage completed event.
listenerBus.post(SparkListenerTaskEnd(
stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))

if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
Expand All @@ -1160,8 +1159,6 @@ class DAGScheduler(
val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, taskMetrics))
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
Expand Down

0 comments on commit ba5f555

Please sign in to comment.