Skip to content

Commit

Permalink
Revert "Simplify unnecessary branching in DAGScheduler"
Browse files Browse the repository at this point in the history
This reverts commit ba5f555.
  • Loading branch information
Andrew Or committed Feb 8, 2016
1 parent 6e4859d commit 3b1e414
Showing 1 changed file with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1144,12 +1144,13 @@ class DAGScheduler(
null
}

// Note: this stage may already have been canceled, in which case this task end event
// maybe posted after the stage completed event. There's not much we can do here without
// introducing additional complexity in the scheduler to wait for all the task end events
// before posting the stage completed event.
listenerBus.post(SparkListenerTaskEnd(
stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))
// The success case is dealt with separately below.
// TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here.
if (event.reason != Success) {
val attemptId = task.stageAttemptId
listenerBus.post(SparkListenerTaskEnd(
stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics))
}

if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
Expand All @@ -1159,6 +1160,8 @@ class DAGScheduler(
val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, taskMetrics))
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
Expand Down

0 comments on commit 3b1e414

Please sign in to comment.