From 906d6266efaa899b3fba949428018bd75157731a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 14 Jul 2015 12:01:18 -0500 Subject: [PATCH] fix merge --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 11b12edf7eaf1..65e17e677bff0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -913,7 +913,7 @@ class DAGScheduler( partitionsToCompute.map { id => val locs = getPreferredLocs(stage.rdd, id) val part = stage.rdd.partitions(id) - new ShuffleMapTask(stage.id, stage.attemptId, taskBinary, part, locs) + new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs) } case stage: ResultStage => @@ -922,7 +922,7 @@ class DAGScheduler( val p: Int = job.partitions(id) val part = stage.rdd.partitions(p) val locs = getPreferredLocs(stage.rdd, p) - new ResultTask(stage.id, stage.attemptId, taskBinary, part, locs, id) + new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id) } } } catch { @@ -1128,8 +1128,6 @@ class DAGScheduler( val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleToMapStage(shuffleId) - // failedStage.attemptId is already on the next attempt, so we have to use - // failedStage.latestInfo.attemptId if (failedStage.latestInfo.attemptId != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + s" ${task.stageAttemptId}, which has already failed")