From 4e976ab16e922dfe28125798461e45afaa1d62a7 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Tue, 16 Jun 2020 21:26:47 -0700 Subject: [PATCH] [SPARK-32003][CORE] Unregister outputs for executor on fetch failure after executor is lost If an executor is lost, the `DAGScheduler` handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files. In such a case, when fetches from the executor's outputs fail in the same stage, the `DAGScheduler` again removes the executor and by right, should unregister its outputs. It doesn't because the epoch used to track the executor failure has not increased. We track the epoch for failed executors that result in lost file output separately, so we can unregister the outputs in this scenario. --- .../apache/spark/scheduler/DAGScheduler.scala | 29 ++++++++------ .../spark/scheduler/DAGSchedulerSuite.scala | 40 +++++++++++++++++++ 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 37f9e0bb483c2..6d91ce282c04e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -177,6 +177,8 @@ private[spark] class DAGScheduler( // TODO: Garbage collect information about failure epochs when we know there are no more // stray messages to detect. private val failedEpoch = new HashMap[String, Long] + // In addition, track epoch for failed executors that result in lost file output + private val fileLostEpoch = new HashMap[String, Long] private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator @@ -1939,24 +1941,22 @@ private[spark] class DAGScheduler( hostToUnregisterOutputs: Option[String], maybeEpoch: Option[Long] = None): Unit = { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) + logDebug(s"Removing executor $execId, fileLost: $fileLost, currentEpoch: $currentEpoch") if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { failedEpoch(execId) = currentEpoch logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) blockManagerMaster.removeExecutor(execId) - if (fileLost) { - hostToUnregisterOutputs match { - case Some(host) => - logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) - mapOutputTracker.removeOutputsOnHost(host) - case None => - logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) - mapOutputTracker.removeOutputsOnExecutor(execId) - } - clearCacheLocs() - - } else { - logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch)) + } + if (fileLost && (!fileLostEpoch.contains(execId) || fileLostEpoch(execId) < currentEpoch)) { + hostToUnregisterOutputs match { + case Some(host) => + logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) + mapOutputTracker.removeOutputsOnHost(host) + case None => + logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) + mapOutputTracker.removeOutputsOnExecutor(execId) } + clearCacheLocs() } } @@ -1986,6 +1986,9 @@ private[spark] class DAGScheduler( logInfo("Host added was in lost list earlier: " + host) failedEpoch -= execId } + if (fileLostEpoch.contains(execId)) { + fileLostEpoch -= execId + } } private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9d412f2dba3ce..05cd3f199e0cb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -540,6 +540,46 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(mapStatus2(2).location.host === "hostB") } + test("[SPARK-32003] All shuffle files for executor should be cleaned up on fetch failure") { + // reset the test context with the right shuffle service config + afterEach() + val conf = new SparkConf() + conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") + init(conf) + + val shuffleMapRdd = new MyRDD(sc, 3, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker) + + submit(reduceRdd, Array(0, 1, 2)) + // Map stage completes successfully, + // two tasks are run on an executor on hostA and one on an executor on hostB + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 3)), + (Success, makeMapStatus("hostA", 3)), + (Success, makeMapStatus("hostB", 3)))) + // Now the executor on hostA is lost + runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container marked as failed"))) + + // The MapOutputTracker has all the shuffle files + val initialMapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses + assert(initialMapStatuses.count(_ != null) == 3) + assert(initialMapStatuses(0).location.executorId === "hostA-exec") + assert(initialMapStatuses(1).location.executorId === "hostA-exec") + assert(initialMapStatuses(2).location.executorId === "hostB-exec") + + // Now a fetch failure from the lost executor occurs + complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null) + )) + + // Shuffle files for hostA-exec should be lost + val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses + assert(mapStatuses.count(_ != null) == 1) + assert(mapStatuses(2).location.executorId === "hostB-exec") + } + test("zero split job") { var numResults = 0 var failureReason: Option[Exception] = None