From e36b44252b951d1dcecf1d87f30de836e5c9d740 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Tue, 16 Jun 2020 21:26:47 -0700 Subject: [PATCH 01/10] [SPARK-32003][CORE] Unregister outputs for executor on fetch failure after executor is lost If an executor is lost, the `DAGScheduler` handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files. In such a case, when fetches from the executor's outputs fail in the same stage, the `DAGScheduler` again removes the executor and by right, should unregister its outputs. It doesn't because the epoch used to track the executor failure has not increased. We track the epoch for failed executors that result in lost file output separately, so we can unregister the outputs in this scenario. --- .../apache/spark/scheduler/DAGScheduler.scala | 29 ++++++++------ .../spark/scheduler/DAGSchedulerSuite.scala | 40 +++++++++++++++++++ 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cb024d0852d06..05670e8ce10ac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -177,6 +177,8 @@ private[spark] class DAGScheduler( // TODO: Garbage collect information about failure epochs when we know there are no more // stray messages to detect. private val failedEpoch = new HashMap[String, Long] + // In addition, track epoch for failed executors that result in lost file output + private val fileLostEpoch = new HashMap[String, Long] private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator @@ -1939,24 +1941,22 @@ private[spark] class DAGScheduler( hostToUnregisterOutputs: Option[String], maybeEpoch: Option[Long] = None): Unit = { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) + logDebug(s"Removing executor $execId, fileLost: $fileLost, currentEpoch: $currentEpoch") if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { failedEpoch(execId) = currentEpoch logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) blockManagerMaster.removeExecutor(execId) - if (fileLost) { - hostToUnregisterOutputs match { - case Some(host) => - logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) - mapOutputTracker.removeOutputsOnHost(host) - case None => - logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) - mapOutputTracker.removeOutputsOnExecutor(execId) - } - clearCacheLocs() - - } else { - logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch)) + } + if (fileLost && (!fileLostEpoch.contains(execId) || fileLostEpoch(execId) < currentEpoch)) { + hostToUnregisterOutputs match { + case Some(host) => + logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) + mapOutputTracker.removeOutputsOnHost(host) + case None => + logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) + mapOutputTracker.removeOutputsOnExecutor(execId) } + clearCacheLocs() } } @@ -1986,6 +1986,9 @@ private[spark] class DAGScheduler( logInfo("Host added was in lost list earlier: " + host) failedEpoch -= execId } + if (fileLostEpoch.contains(execId)) { + fileLostEpoch -= execId + } } private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 7013832757e38..054a1ab6c5201 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -548,6 +548,46 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(mapStatus2(2).location.host === "hostB") } + test("[SPARK-32003] All shuffle files for executor should be cleaned up on fetch failure") { + // reset the test context with the right shuffle service config + afterEach() + val conf = new SparkConf() + conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") + init(conf) + + val shuffleMapRdd = new MyRDD(sc, 3, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker) + + submit(reduceRdd, Array(0, 1, 2)) + // Map stage completes successfully, + // two tasks are run on an executor on hostA and one on an executor on hostB + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 3)), + (Success, makeMapStatus("hostA", 3)), + (Success, makeMapStatus("hostB", 3)))) + // Now the executor on hostA is lost + runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container marked as failed"))) + + // The MapOutputTracker has all the shuffle files + val initialMapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses + assert(initialMapStatuses.count(_ != null) == 3) + assert(initialMapStatuses(0).location.executorId === "hostA-exec") + assert(initialMapStatuses(1).location.executorId === "hostA-exec") + assert(initialMapStatuses(2).location.executorId === "hostB-exec") + + // Now a fetch failure from the lost executor occurs + complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null) + )) + + // Shuffle files for hostA-exec should be lost + val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses + assert(mapStatuses.count(_ != null) == 1) + assert(mapStatuses(2).location.executorId === "hostB-exec") + } + test("zero split job") { var numResults = 0 var failureReason: Option[Exception] = None From fca8a6e2f81f4db7adf934bccada0121c81eb8b5 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Wed, 17 Jun 2020 15:13:55 -0700 Subject: [PATCH 02/10] [SPARK-32003] Update fileLostEpoch. I inadvertently left out a line when transferring code. The fileLostEpoch needs to be updated with an entry for the failed executor with lost output. Adopted suggestions from wuyi and attilapiros. --- .../apache/spark/scheduler/DAGScheduler.scala | 11 +++++------ .../spark/scheduler/DAGSchedulerSuite.scala | 17 +++++++---------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 05670e8ce10ac..eb7eeedb5bf6e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1944,16 +1944,17 @@ private[spark] class DAGScheduler( logDebug(s"Removing executor $execId, fileLost: $fileLost, currentEpoch: $currentEpoch") if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { failedEpoch(execId) = currentEpoch - logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) + logInfo(s"Executor lost: $execId (epoch $currentEpoch)") blockManagerMaster.removeExecutor(execId) } if (fileLost && (!fileLostEpoch.contains(execId) || fileLostEpoch(execId) < currentEpoch)) { + fileLostEpoch(execId) = currentEpoch hostToUnregisterOutputs match { case Some(host) => - logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) + logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") mapOutputTracker.removeOutputsOnHost(host) case None => - logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) + logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") mapOutputTracker.removeOutputsOnExecutor(execId) } clearCacheLocs() @@ -1986,9 +1987,7 @@ private[spark] class DAGScheduler( logInfo("Host added was in lost list earlier: " + host) failedEpoch -= execId } - if (fileLostEpoch.contains(execId)) { - fileLostEpoch -= execId - } + fileLostEpoch -= execId } private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 054a1ab6c5201..eb1d443399c14 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -563,19 +563,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(reduceRdd, Array(0, 1, 2)) // Map stage completes successfully, // two tasks are run on an executor on hostA and one on an executor on hostB - complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 3)), - (Success, makeMapStatus("hostA", 3)), - (Success, makeMapStatus("hostB", 3)))) + completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostA", "hostB")) // Now the executor on hostA is lost runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container marked as failed"))) // The MapOutputTracker has all the shuffle files val initialMapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses - assert(initialMapStatuses.count(_ != null) == 3) - assert(initialMapStatuses(0).location.executorId === "hostA-exec") - assert(initialMapStatuses(1).location.executorId === "hostA-exec") - assert(initialMapStatuses(2).location.executorId === "hostB-exec") + assert(initialMapStatuses.count(_ != null) === 3) + assert(initialMapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 2) + assert(initialMapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1) // Now a fetch failure from the lost executor occurs complete(taskSets(1), Seq( @@ -584,8 +580,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Shuffle files for hostA-exec should be lost val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses - assert(mapStatuses.count(_ != null) == 1) - assert(mapStatuses(2).location.executorId === "hostB-exec") + assert(mapStatuses.count(_ != null) === 1) + assert(initialMapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 0) + assert(initialMapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1) } test("zero split job") { From 973e3859386cd0fe610984bae16d159963e2f91a Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Thu, 18 Jun 2020 09:49:02 -0700 Subject: [PATCH 03/10] [SPARK-32003] Clean up test. --- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index eb1d443399c14..62680fc6e41d6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -568,10 +568,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container marked as failed"))) // The MapOutputTracker has all the shuffle files - val initialMapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses - assert(initialMapStatuses.count(_ != null) === 3) - assert(initialMapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 2) - assert(initialMapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1) + val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses + assert(mapStatuses.count(_ != null) === 3) + assert(mapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 2) + assert(mapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1) // Now a fetch failure from the lost executor occurs complete(taskSets(1), Seq( @@ -579,10 +579,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi )) // Shuffle files for hostA-exec should be lost - val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses assert(mapStatuses.count(_ != null) === 1) - assert(initialMapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 0) - assert(initialMapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1) + assert(mapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 0) + assert(mapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1) } test("zero split job") { From b9e55a48ae068de5604e2739540aaa0ae9282468 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Fri, 19 Jun 2020 13:23:55 -0700 Subject: [PATCH 04/10] [SPARK-32003] Reword debug message. --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index eb7eeedb5bf6e..dbf7a88fed59a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1941,7 +1941,8 @@ private[spark] class DAGScheduler( hostToUnregisterOutputs: Option[String], maybeEpoch: Option[Long] = None): Unit = { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) - logDebug(s"Removing executor $execId, fileLost: $fileLost, currentEpoch: $currentEpoch") + logDebug(s"Considering removal of executor $execId; " + + s"fileLost: $fileLost, currentEpoch: $currentEpoch") if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { failedEpoch(execId) = currentEpoch logInfo(s"Executor lost: $execId (epoch $currentEpoch)") From 06ea4119e66596703fcc6499cc8bbba66a23a11a Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Mon, 22 Jun 2020 19:15:27 -0700 Subject: [PATCH 05/10] [SPARK-32003] Tweak name of test to conform with preferred style. --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 62680fc6e41d6..a5b72c20c7f23 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -548,7 +548,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(mapStatus2(2).location.host === "hostB") } - test("[SPARK-32003] All shuffle files for executor should be cleaned up on fetch failure") { + test("SPARK-32003: All shuffle files for executor should be cleaned up on fetch failure") { // reset the test context with the right shuffle service config afterEach() val conf = new SparkConf() From 17393eb21608e9788d44d9676da8c0daf7a97c17 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Thu, 25 Jun 2020 21:45:23 -0700 Subject: [PATCH 06/10] [SPARK-32003] Rename the epochs, expand comments. Also expand tests. --- .../apache/spark/scheduler/DAGScheduler.scala | 73 +++++++++++++------ .../spark/scheduler/DAGSchedulerSuite.scala | 59 +++++++++++---- 2 files changed, 93 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dbf7a88fed59a..9553b7e81b847 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -170,15 +170,29 @@ private[spark] class DAGScheduler( */ private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]] - // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with - // every task. When we detect a node failing, we note the current epoch number and failed - // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results. + // Tracks the latest epoch of a fully processed error related to the given executor. (We use + // the MapOutputTracker's epoch number, which is sent with every task.) // - // TODO: Garbage collect information about failure epochs when we know there are no more - // stray messages to detect. - private val failedEpoch = new HashMap[String, Long] - // In addition, track epoch for failed executors that result in lost file output - private val fileLostEpoch = new HashMap[String, Long] + // When an executor fails, it can affect the results of many tasks, and we have to deal with + // all of them consistently. We don't simply ignore all future results from that executor, + // as the failures may have been transient; but we also don't want to "overreact" to follow- + // on errors we receive. Furthermore, we might receive notification of a task success, after + // we find out the executor has actually failed; we'll assume those successes are, in fact, + // simply delayed notifications and the results have been lost, if they come from the same + // epoch. In particular, we use this to control when we tell the BlockManagerMaster that the + // BlockManager has been lost. + private val executorFailureEpoch = new HashMap[String, Long] + // Tracks the latest epoch of a fully processed error where shuffle files have been lost from + // the given executor. + // + // This is closely related to executorFailureEpoch. + // They only differ for the executor when there is a Standalone worker or an external shuffle + // service serving shuffle files. In that case, when an executor is lost, we do not update + // the shuffleFileLostEpoch; we wait for a fetch failure. This way, if only the executor + // fails, we do not unregister the shuffle data as it can still be served; but if there is + // a failure in the shuffle service (resulting in fetch failure), we unregister the shuffle + // data only once, even if we get many fetch failures. + private val shuffleFileLostEpoch = new HashMap[String, Long] private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator @@ -1568,7 +1582,8 @@ private[spark] class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { + if (executorFailureEpoch.contains(execId) && + smt.epoch <= executorFailureEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { // The epoch of the task is acceptable (i.e., the task was launched after the most @@ -1914,12 +1929,8 @@ private[spark] class DAGScheduler( * modify the scheduler's internal state. Use executorLost() to post a loss event from outside. * * We will also assume that we've lost all shuffle blocks associated with the executor if the - * executor serves its own blocks (i.e., we're not using external shuffle), the entire executor - * process is lost (likely including the shuffle service), or a FetchFailed occurred, in which - * case we presume all shuffle data related to this executor to be lost. - * - * Optionally the epoch during which the failure was caught can be passed to avoid allowing - * stray fetch failures from possibly retriggering the detection of a node as lost. + * executor serves its own blocks (i.e., we're not using external shuffle), or the Standalone + * worker (which serves the shuffle data) is lost. */ private[scheduler] def handleExecutorLost( execId: String, @@ -1935,6 +1946,19 @@ private[spark] class DAGScheduler( maybeEpoch = None) } + /** + * Handles removing an executor from the BlockManagerMaster as well as unregistering shuffle + * outputs for the executor or optionally its host. + * + * The fileLost parameter being true indicates that we assume we've lost all shuffle blocks + * associated with the executor; this happens if the executor serves its own blocks (i.e., + * we're not using external shuffle), the Standalone worker (which serves the shuffle data) + * is lost, or a FetchFailed occurred (in which case we presume all shuffle data related to + * this executor to be lost). + * + * Optionally the epoch during which the failure was caught can be passed to avoid allowing + * stray fetch failures from possibly retriggering the detection of a node as lost. + */ private def removeExecutorAndUnregisterOutputs( execId: String, fileLost: Boolean, @@ -1943,13 +1967,15 @@ private[spark] class DAGScheduler( val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) logDebug(s"Considering removal of executor $execId; " + s"fileLost: $fileLost, currentEpoch: $currentEpoch") - if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { - failedEpoch(execId) = currentEpoch + if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) { + executorFailureEpoch(execId) = currentEpoch logInfo(s"Executor lost: $execId (epoch $currentEpoch)") blockManagerMaster.removeExecutor(execId) + clearCacheLocs() } - if (fileLost && (!fileLostEpoch.contains(execId) || fileLostEpoch(execId) < currentEpoch)) { - fileLostEpoch(execId) = currentEpoch + if (fileLost && + (!shuffleFileLostEpoch.contains(execId) || shuffleFileLostEpoch(execId) < currentEpoch)) { + shuffleFileLostEpoch(execId) = currentEpoch hostToUnregisterOutputs match { case Some(host) => logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") @@ -1958,7 +1984,6 @@ private[spark] class DAGScheduler( logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") mapOutputTracker.removeOutputsOnExecutor(execId) } - clearCacheLocs() } } @@ -1983,12 +2008,12 @@ private[spark] class DAGScheduler( } private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = { - // remove from failedEpoch(execId) ? - if (failedEpoch.contains(execId)) { + // remove from executorFailureEpoch(execId) ? + if (executorFailureEpoch.contains(execId)) { logInfo("Host added was in lost list earlier: " + host) - failedEpoch -= execId + executorFailureEpoch -= execId } - fileLostEpoch -= execId + shuffleFileLostEpoch -= execId } private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a5b72c20c7f23..93c5bf914ae13 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -25,6 +25,9 @@ import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.util.control.NonFatal +import org.mockito.Mockito.spy +import org.mockito.Mockito.times +import org.mockito.Mockito.verify import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.exceptions.TestFailedException import org.scalatest.time.SpanSugar._ @@ -235,6 +238,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi var sparkListener: EventInfoRecordingListener = null + var blockManagerMaster: BlockManagerMaster = null var mapOutputTracker: MapOutputTrackerMaster = null var broadcastManager: BroadcastManager = null var securityMgr: SecurityManager = null @@ -248,17 +252,17 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] // stub out BlockManagerMaster.getLocations to use our cacheLocations - val blockManagerMaster = new BlockManagerMaster(null, null, conf, true) { - override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { - blockIds.map { - _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). - getOrElse(Seq()) - }.toIndexedSeq - } - override def removeExecutor(execId: String): Unit = { - // don't need to propagate to the driver, which we don't have - } + class MyBlockManagerMaster(conf: SparkConf) extends BlockManagerMaster(null, null, conf, true) { + override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { + blockIds.map { + _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). + getOrElse(Seq()) + }.toIndexedSeq } + override def removeExecutor(execId: String): Unit = { + // don't need to propagate to the driver, which we don't have + } + } /** The list of results that DAGScheduler has collected. */ val results = new HashMap[Int, Any]() @@ -276,6 +280,16 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def jobFailed(exception: Exception): Unit = { failure = exception } } + class MyMapOutputTrackerMaster( + conf: SparkConf, + broadcastManager: BroadcastManager) + extends MapOutputTrackerMaster(conf, broadcastManager, true) { + + override def sendTracker(message: Any): Unit = { + // no-op, just so we can stop this to avoid leaking threads + } + } + override def beforeEach(): Unit = { super.beforeEach() init(new SparkConf()) @@ -293,11 +307,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi results.clear() securityMgr = new SecurityManager(conf) broadcastManager = new BroadcastManager(true, conf, securityMgr) - mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) { - override def sendTracker(message: Any): Unit = { - // no-op, just so we can stop this to avoid leaking threads - } - } + mapOutputTracker = spy(new MyMapOutputTrackerMaster(conf, broadcastManager)) + blockManagerMaster = spy(new MyBlockManagerMaster(conf)) scheduler = new DAGScheduler( sc, taskScheduler, @@ -566,6 +577,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostA", "hostB")) // Now the executor on hostA is lost runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container marked as failed"))) + // Executor is removed but shuffle files are not unregistered + verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec") + verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec") // The MapOutputTracker has all the shuffle files val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses @@ -577,11 +591,22 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(1), Seq( (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null) )) + // blockManagerMaster.removeExecutor is not called again + // but shuffle files are unregistered + verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec") + verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec") // Shuffle files for hostA-exec should be lost assert(mapStatuses.count(_ != null) === 1) assert(mapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 0) assert(mapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1) + + // Additional fetch failure from the executor does not result in further call to + // mapOutputTracker.removeOutputsOnExecutor + complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 1, 0, "ignored"), null) + )) + verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec") } test("zero split job") { @@ -803,6 +828,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null))) // this will get called // blockManagerMaster.removeExecutor("hostA-exec") + verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec") // ask the scheduler to try it again scheduler.resubmitFailedStages() // have the 2nd attempt pass @@ -842,11 +868,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(reduceRdd, Array(0)) completeShuffleMapStageSuccessfully(0, 0, 1) runEvent(ExecutorLost("hostA-exec", event)) + verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec") if (expectFileLoss) { + verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec") intercept[MetadataFetchFailedException] { mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0) } } else { + verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec") assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) } From d450c3e3ad66b3b92206a1296c6edae00b935f24 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Fri, 26 Jun 2020 16:15:12 -0700 Subject: [PATCH 07/10] [SPARK-32003] Improve comments. --- .../apache/spark/scheduler/DAGScheduler.scala | 65 ++++++++++--------- .../spark/scheduler/DAGSchedulerSuite.scala | 2 - 2 files changed, 36 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9553b7e81b847..09a856da86157 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -170,28 +170,33 @@ private[spark] class DAGScheduler( */ private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]] - // Tracks the latest epoch of a fully processed error related to the given executor. (We use - // the MapOutputTracker's epoch number, which is sent with every task.) - // - // When an executor fails, it can affect the results of many tasks, and we have to deal with - // all of them consistently. We don't simply ignore all future results from that executor, - // as the failures may have been transient; but we also don't want to "overreact" to follow- - // on errors we receive. Furthermore, we might receive notification of a task success, after - // we find out the executor has actually failed; we'll assume those successes are, in fact, - // simply delayed notifications and the results have been lost, if they come from the same - // epoch. In particular, we use this to control when we tell the BlockManagerMaster that the - // BlockManager has been lost. + /** + * Tracks the latest epoch of a fully processed error related to the given executor. (We use + * the MapOutputTracker's epoch number, which is sent with every task.) + * + * When an executor fails, it can affect the results of many tasks, and we have to deal with + * all of them consistently. We don't simply ignore all future results from that executor, + * as the failures may have been transient; but we also don't want to "overreact" to follow- + * on errors we receive. Furthermore, we might receive notification of a task success, after + * we find out the executor has actually failed; we'll assume those successes are, in fact, + * simply delayed notifications and the results have been lost, if the tasks started in the + * same or an earlier epoch. In particular, we use this to control when we tell the + * BlockManagerMaster that the BlockManager has been lost. + */ private val executorFailureEpoch = new HashMap[String, Long] - // Tracks the latest epoch of a fully processed error where shuffle files have been lost from - // the given executor. - // - // This is closely related to executorFailureEpoch. - // They only differ for the executor when there is a Standalone worker or an external shuffle - // service serving shuffle files. In that case, when an executor is lost, we do not update - // the shuffleFileLostEpoch; we wait for a fetch failure. This way, if only the executor - // fails, we do not unregister the shuffle data as it can still be served; but if there is - // a failure in the shuffle service (resulting in fetch failure), we unregister the shuffle - // data only once, even if we get many fetch failures. + + /** + * Tracks the latest epoch of a fully processed error where shuffle files have been lost from + * the given executor. + * + * This is closely related to executorFailureEpoch. + * They only differ for the executor when there is a Standalone worker or an external shuffle + * service serving shuffle files. In that case, when an executor is lost, we do not update + * the shuffleFileLostEpoch; we wait for a fetch failure. This way, if only the executor + * fails, we do not unregister the shuffle data as it can still be served; but if there is + * a failure in the shuffle service (resulting in fetch failure), we unregister the shuffle + * data only once, even if we get many fetch failures. + */ private val shuffleFileLostEpoch = new HashMap[String, Long] private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator @@ -1950,14 +1955,16 @@ private[spark] class DAGScheduler( * Handles removing an executor from the BlockManagerMaster as well as unregistering shuffle * outputs for the executor or optionally its host. * - * The fileLost parameter being true indicates that we assume we've lost all shuffle blocks - * associated with the executor; this happens if the executor serves its own blocks (i.e., - * we're not using external shuffle), the Standalone worker (which serves the shuffle data) - * is lost, or a FetchFailed occurred (in which case we presume all shuffle data related to - * this executor to be lost). - * - * Optionally the epoch during which the failure was caught can be passed to avoid allowing - * stray fetch failures from possibly retriggering the detection of a node as lost. + * @param execId executor to be removed + * @param fileLost If true, indicates that we assume we've lost all shuffle blocks associated + * with the executor; this happens if the executor serves its own blocks (i.e., we're not + * using external shuffle), the Standalone worker (which serves the shuffle data) is lost, + * or a FetchFailed occurred (in which case we presume all shuffle data related to this + * executor to be lost). + * @param hostToUnregisterOutputs (optional) executor host if we're unregistering all the + * outputs on the host + * @param maybeEpoch (optional) the epoch during which the failure was caught (this prevents + * reprocessing for follow-on fetch failures) */ private def removeExecutorAndUnregisterOutputs( execId: String, diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 93c5bf914ae13..f72993883df63 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -826,8 +826,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(1), Seq( (Success, 42), (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null))) - // this will get called - // blockManagerMaster.removeExecutor("hostA-exec") verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec") // ask the scheduler to try it again scheduler.resubmitFailedStages() From a8e619cfda41da33132c485289095e16221602be Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Mon, 6 Jul 2020 18:50:48 -0700 Subject: [PATCH 08/10] [SPARK-32003] Address style nit in DAGSchedulerSuite. --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f72993883df63..664cfc88cc410 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -255,8 +255,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi class MyBlockManagerMaster(conf: SparkConf) extends BlockManagerMaster(null, null, conf, true) { override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { blockIds.map { - _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). - getOrElse(Seq()) + _.asRDDId.map { id => (id.rddId -> id.splitIndex) + }.flatMap { key => cacheLocations.get(key) + }.getOrElse(Seq()) }.toIndexedSeq } override def removeExecutor(execId: String): Unit = { From 19235986e66792e170285f406ced9b368ab17e91 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Mon, 13 Jul 2020 11:17:57 -0700 Subject: [PATCH 09/10] [SPARK-32003] Minor tweak to comments. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 09a856da86157..bc48af3125bd0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1934,8 +1934,8 @@ private[spark] class DAGScheduler( * modify the scheduler's internal state. Use executorLost() to post a loss event from outside. * * We will also assume that we've lost all shuffle blocks associated with the executor if the - * executor serves its own blocks (i.e., we're not using external shuffle), or the Standalone - * worker (which serves the shuffle data) is lost. + * executor serves its own blocks (i.e., we're not using an external shuffle service), or the + * entire Standalone worker is lost. */ private[scheduler] def handleExecutorLost( execId: String, @@ -1958,9 +1958,8 @@ private[spark] class DAGScheduler( * @param execId executor to be removed * @param fileLost If true, indicates that we assume we've lost all shuffle blocks associated * with the executor; this happens if the executor serves its own blocks (i.e., we're not - * using external shuffle), the Standalone worker (which serves the shuffle data) is lost, - * or a FetchFailed occurred (in which case we presume all shuffle data related to this - * executor to be lost). + * using an external shuffle service), the entire Standalone worker is lost, or a FetchFailed + * occurred (in which case we presume all shuffle data related to this executor to be lost). * @param hostToUnregisterOutputs (optional) executor host if we're unregistering all the * outputs on the host * @param maybeEpoch (optional) the epoch during which the failure was caught (this prevents From 0e0086288f6279569e8a11cef9d928b87c40469b Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Mon, 13 Jul 2020 11:42:37 -0700 Subject: [PATCH 10/10] [SPARK-32003] One more comment tweak. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index bc48af3125bd0..73c95d19387c2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -189,9 +189,9 @@ private[spark] class DAGScheduler( * Tracks the latest epoch of a fully processed error where shuffle files have been lost from * the given executor. * - * This is closely related to executorFailureEpoch. - * They only differ for the executor when there is a Standalone worker or an external shuffle - * service serving shuffle files. In that case, when an executor is lost, we do not update + * This is closely related to executorFailureEpoch. They only differ for the executor when + * there is an external shuffle service serving shuffle files and we haven't been notified that + * the entire worker has been lost. In that case, when an executor is lost, we do not update * the shuffleFileLostEpoch; we wait for a fetch failure. This way, if only the executor * fails, we do not unregister the shuffle data as it can still be served; but if there is * a failure in the shuffle service (resulting in fetch failure), we unregister the shuffle