Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32003][CORE] When external shuffle service is used, unregister outputs for executor on fetch failure after executor is lost #28848

Closed
wants to merge 10 commits into from
100 changes: 67 additions & 33 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,34 @@ private[spark] class DAGScheduler(
*/
private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]

// For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
// every task. When we detect a node failing, we note the current epoch number and failed
// executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
//
// TODO: Garbage collect information about failure epochs when we know there are no more
// stray messages to detect.
private val failedEpoch = new HashMap[String, Long]
/**
* Tracks the latest epoch of a fully processed error related to the given executor. (We use
* the MapOutputTracker's epoch number, which is sent with every task.)
*
* When an executor fails, it can affect the results of many tasks, and we have to deal with
* all of them consistently. We don't simply ignore all future results from that executor,
* as the failures may have been transient; but we also don't want to "overreact" to follow-
* on errors we receive. Furthermore, we might receive notification of a task success, after
* we find out the executor has actually failed; we'll assume those successes are, in fact,
* simply delayed notifications and the results have been lost, if the tasks started in the
* same or an earlier epoch. In particular, we use this to control when we tell the
* BlockManagerMaster that the BlockManager has been lost.
*/
private val executorFailureEpoch = new HashMap[String, Long]

/**
* Tracks the latest epoch of a fully processed error where shuffle files have been lost from
* the given executor.
*
* This is closely related to executorFailureEpoch. They only differ for the executor when
* there is an external shuffle service serving shuffle files and we haven't been notified that
* the entire worker has been lost. In that case, when an executor is lost, we do not update
* the shuffleFileLostEpoch; we wait for a fetch failure. This way, if only the executor
* fails, we do not unregister the shuffle data as it can still be served; but if there is
* a failure in the shuffle service (resulting in fetch failure), we unregister the shuffle
* data only once, even if we get many fetch failures.
*/
private val shuffleFileLostEpoch = new HashMap[String, Long]

private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator

Expand Down Expand Up @@ -1566,7 +1587,8 @@ private[spark] class DAGScheduler(
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
if (executorFailureEpoch.contains(execId) &&
smt.epoch <= executorFailureEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
// The epoch of the task is acceptable (i.e., the task was launched after the most
Expand Down Expand Up @@ -1912,12 +1934,8 @@ private[spark] class DAGScheduler(
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
*
* We will also assume that we've lost all shuffle blocks associated with the executor if the
* executor serves its own blocks (i.e., we're not using external shuffle), the entire executor
* process is lost (likely including the shuffle service), or a FetchFailed occurred, in which
* case we presume all shuffle data related to this executor to be lost.
*
* Optionally the epoch during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
* executor serves its own blocks (i.e., we're not using an external shuffle service), or the
* entire Standalone worker is lost.
*/
private[scheduler] def handleExecutorLost(
execId: String,
Expand All @@ -1933,29 +1951,44 @@ private[spark] class DAGScheduler(
maybeEpoch = None)
}

/**
* Handles removing an executor from the BlockManagerMaster as well as unregistering shuffle
* outputs for the executor or optionally its host.
*
* @param execId executor to be removed
* @param fileLost If true, indicates that we assume we've lost all shuffle blocks associated
* with the executor; this happens if the executor serves its own blocks (i.e., we're not
* using an external shuffle service), the entire Standalone worker is lost, or a FetchFailed
* occurred (in which case we presume all shuffle data related to this executor to be lost).
* @param hostToUnregisterOutputs (optional) executor host if we're unregistering all the
* outputs on the host
* @param maybeEpoch (optional) the epoch during which the failure was caught (this prevents
* reprocessing for follow-on fetch failures)
*/
private def removeExecutorAndUnregisterOutputs(
execId: String,
fileLost: Boolean,
hostToUnregisterOutputs: Option[String],
maybeEpoch: Option[Long] = None): Unit = {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
logDebug(s"Considering removal of executor $execId; " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ; -> ,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deliberately used a semicolon; it was not a typo.

s"fileLost: $fileLost, currentEpoch: $currentEpoch")
if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) {
executorFailureEpoch(execId) = currentEpoch
logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
dongjoon-hyun marked this conversation as resolved.
Show resolved Hide resolved
blockManagerMaster.removeExecutor(execId)
if (fileLost) {
hostToUnregisterOutputs match {
case Some(host) =>
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
mapOutputTracker.removeOutputsOnHost(host)
case None =>
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
mapOutputTracker.removeOutputsOnExecutor(execId)
}
clearCacheLocs()

} else {
logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch))
wypoon marked this conversation as resolved.
Show resolved Hide resolved
clearCacheLocs()
}
if (fileLost &&
(!shuffleFileLostEpoch.contains(execId) || shuffleFileLostEpoch(execId) < currentEpoch)) {
shuffleFileLostEpoch(execId) = currentEpoch
hostToUnregisterOutputs match {
case Some(host) =>
logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)")
dongjoon-hyun marked this conversation as resolved.
Show resolved Hide resolved
mapOutputTracker.removeOutputsOnHost(host)
case None =>
logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)")
dongjoon-hyun marked this conversation as resolved.
Show resolved Hide resolved
mapOutputTracker.removeOutputsOnExecutor(execId)
}
}
}
Expand All @@ -1981,11 +2014,12 @@ private[spark] class DAGScheduler(
}

private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = {
// remove from failedEpoch(execId) ?
if (failedEpoch.contains(execId)) {
// remove from executorFailureEpoch(execId) ?
if (executorFailureEpoch.contains(execId)) {
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
executorFailureEpoch -= execId
}
shuffleFileLostEpoch -= execId
}

private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import scala.annotation.meta.param
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.util.control.NonFatal

import org.mockito.Mockito.spy
import org.mockito.Mockito.times
import org.mockito.Mockito.verify
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.exceptions.TestFailedException
import org.scalatest.time.SpanSugar._
Expand Down Expand Up @@ -235,6 +238,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

var sparkListener: EventInfoRecordingListener = null

var blockManagerMaster: BlockManagerMaster = null
var mapOutputTracker: MapOutputTrackerMaster = null
var broadcastManager: BroadcastManager = null
var securityMgr: SecurityManager = null
Expand All @@ -248,17 +252,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
*/
val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
// stub out BlockManagerMaster.getLocations to use our cacheLocations
val blockManagerMaster = new BlockManagerMaster(null, null, conf, true) {
override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
blockIds.map {
_.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)).
getOrElse(Seq())
}.toIndexedSeq
}
override def removeExecutor(execId: String): Unit = {
// don't need to propagate to the driver, which we don't have
}
class MyBlockManagerMaster(conf: SparkConf) extends BlockManagerMaster(null, null, conf, true) {
override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
blockIds.map {
_.asRDDId.map { id => (id.rddId -> id.splitIndex)
}.flatMap { key => cacheLocations.get(key)
}.getOrElse(Seq())
Comment on lines +258 to +260
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito requested this change, to conform with what he said was the preferred scala style (map or flatMap with "=>" should use braces rather than parentheses).

}.toIndexedSeq
}
override def removeExecutor(execId: String): Unit = {
// don't need to propagate to the driver, which we don't have
}
}

/** The list of results that DAGScheduler has collected. */
val results = new HashMap[Int, Any]()
Expand All @@ -276,6 +281,16 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
override def jobFailed(exception: Exception): Unit = { failure = exception }
}

class MyMapOutputTrackerMaster(
conf: SparkConf,
broadcastManager: BroadcastManager)
extends MapOutputTrackerMaster(conf, broadcastManager, true) {

override def sendTracker(message: Any): Unit = {
// no-op, just so we can stop this to avoid leaking threads
}
}

override def beforeEach(): Unit = {
super.beforeEach()
init(new SparkConf())
Expand All @@ -293,11 +308,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
results.clear()
securityMgr = new SecurityManager(conf)
broadcastManager = new BroadcastManager(true, conf, securityMgr)
mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) {
override def sendTracker(message: Any): Unit = {
// no-op, just so we can stop this to avoid leaking threads
}
}
mapOutputTracker = spy(new MyMapOutputTrackerMaster(conf, broadcastManager))
blockManagerMaster = spy(new MyBlockManagerMaster(conf))
scheduler = new DAGScheduler(
sc,
taskScheduler,
Expand Down Expand Up @@ -548,6 +560,56 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(mapStatus2(2).location.host === "hostB")
}

test("SPARK-32003: All shuffle files for executor should be cleaned up on fetch failure") {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Do we need to describe when external shuffle service is used assumption here because we have conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") at line 567?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personally, I'm OK with this as is, I think its OK for some of the details to be down in the test itself and balance a super-duper long name.

@dongjoon-hyun since you called this a nit I'm assuming you're OK with me merging this anyhow, but if not lemme know, can submit a quick followup.

// reset the test context with the right shuffle service config
afterEach()
wypoon marked this conversation as resolved.
Show resolved Hide resolved
val conf = new SparkConf()
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
init(conf)

val shuffleMapRdd = new MyRDD(sc, 3, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker)

submit(reduceRdd, Array(0, 1, 2))
// Map stage completes successfully,
// two tasks are run on an executor on hostA and one on an executor on hostB
completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostA", "hostB"))
// Now the executor on hostA is lost
runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container marked as failed")))
// Executor is removed but shuffle files are not unregistered
verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec")

// The MapOutputTracker has all the shuffle files
val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
assert(mapStatuses.count(_ != null) === 3)
assert(mapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 2)
assert(mapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1)

// Now a fetch failure from the lost executor occurs
complete(taskSets(1), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null)
))
// blockManagerMaster.removeExecutor is not called again
// but shuffle files are unregistered
verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")

// Shuffle files for hostA-exec should be lost
assert(mapStatuses.count(_ != null) === 1)
assert(mapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 0)
assert(mapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1)

// Additional fetch failure from the executor does not result in further call to
// mapOutputTracker.removeOutputsOnExecutor
complete(taskSets(1), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 1, 0, "ignored"), null)
))
verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
}

test("zero split job") {
var numResults = 0
var failureReason: Option[Exception] = None
Expand Down Expand Up @@ -765,8 +827,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
complete(taskSets(1), Seq(
(Success, 42),
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null)))
// this will get called
// blockManagerMaster.removeExecutor("hostA-exec")
verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
// ask the scheduler to try it again
scheduler.resubmitFailedStages()
// have the 2nd attempt pass
Expand Down Expand Up @@ -806,11 +867,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
submit(reduceRdd, Array(0))
completeShuffleMapStageSuccessfully(0, 0, 1)
runEvent(ExecutorLost("hostA-exec", event))
verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
if (expectFileLoss) {
verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
intercept[MetadataFetchFailedException] {
mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
}
} else {
verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec")
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
}
Expand Down