Skip to content

Commit

Permalink
[SPARK-32003][CORE] Unregister outputs for executor on fetch failure …
Browse files Browse the repository at this point in the history
…after executor is lost

If an executor is lost, the `DAGScheduler` handles the executor loss by
removing the executor but does not unregister its outputs if the external
shuffle service is used. However, if the node on which the executor runs
is lost, the shuffle service may not be able to serve the shuffle files.
In such a case, when fetches from the executor's outputs fail in the
same stage, the `DAGScheduler` again removes the executor and by right,
should unregister its outputs. It doesn't because the epoch used to track
the executor failure has not increased.

We track the epoch for failed executors that result in lost file output
separately, so we can unregister the outputs in this scenario.
  • Loading branch information
wypoon committed Jun 17, 2020
1 parent 7f6a8ab commit 4e976ab
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 13 deletions.
29 changes: 16 additions & 13 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ private[spark] class DAGScheduler(
// TODO: Garbage collect information about failure epochs when we know there are no more
// stray messages to detect.
private val failedEpoch = new HashMap[String, Long]
// In addition, track epoch for failed executors that result in lost file output
private val fileLostEpoch = new HashMap[String, Long]

private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator

Expand Down Expand Up @@ -1939,24 +1941,22 @@ private[spark] class DAGScheduler(
hostToUnregisterOutputs: Option[String],
maybeEpoch: Option[Long] = None): Unit = {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
logDebug(s"Removing executor $execId, fileLost: $fileLost, currentEpoch: $currentEpoch")
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)
if (fileLost) {
hostToUnregisterOutputs match {
case Some(host) =>
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
mapOutputTracker.removeOutputsOnHost(host)
case None =>
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
mapOutputTracker.removeOutputsOnExecutor(execId)
}
clearCacheLocs()

} else {
logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch))
}
if (fileLost && (!fileLostEpoch.contains(execId) || fileLostEpoch(execId) < currentEpoch)) {
hostToUnregisterOutputs match {
case Some(host) =>
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
mapOutputTracker.removeOutputsOnHost(host)
case None =>
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
mapOutputTracker.removeOutputsOnExecutor(execId)
}
clearCacheLocs()
}
}

Expand Down Expand Up @@ -1986,6 +1986,9 @@ private[spark] class DAGScheduler(
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
}
if (fileLostEpoch.contains(execId)) {
fileLostEpoch -= execId
}
}

private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,46 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(mapStatus2(2).location.host === "hostB")
}

test("[SPARK-32003] All shuffle files for executor should be cleaned up on fetch failure") {
// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
init(conf)

val shuffleMapRdd = new MyRDD(sc, 3, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker)

submit(reduceRdd, Array(0, 1, 2))
// Map stage completes successfully,
// two tasks are run on an executor on hostA and one on an executor on hostB
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 3)),
(Success, makeMapStatus("hostA", 3)),
(Success, makeMapStatus("hostB", 3))))
// Now the executor on hostA is lost
runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container marked as failed")))

// The MapOutputTracker has all the shuffle files
val initialMapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
assert(initialMapStatuses.count(_ != null) == 3)
assert(initialMapStatuses(0).location.executorId === "hostA-exec")
assert(initialMapStatuses(1).location.executorId === "hostA-exec")
assert(initialMapStatuses(2).location.executorId === "hostB-exec")

// Now a fetch failure from the lost executor occurs
complete(taskSets(1), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null)
))

// Shuffle files for hostA-exec should be lost
val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
assert(mapStatuses.count(_ != null) == 1)
assert(mapStatuses(2).location.executorId === "hostB-exec")
}

test("zero split job") {
var numResults = 0
var failureReason: Option[Exception] = None
Expand Down

0 comments on commit 4e976ab

Please sign in to comment.