Skip to content

Commit

Permalink
[SPARK-32736][CORE] Avoid caching the removed decommissioned executor…
Browse files Browse the repository at this point in the history
…s in TaskSchedulerImpl

### What changes were proposed in this pull request?

The motivation of this PR is to avoid caching the removed decommissioned executors in `TaskSchedulerImpl`. The cache is introduced in #29422. The cache will hold the `isHostDecommissioned` info for a while. So if the task `FetchFailure` event comes after the executor loss event, `DAGScheduler` can still get the `isHostDecommissioned` from the cache and unregister the host shuffle map status when the host is decommissioned too.

This PR tries to achieve the same goal without the cache. Instead of saving the `workerLost` in `ExecutorUpdated` / `ExecutorDecommissionInfo` / `ExecutorDecommissionState`, we could save the `hostOpt` directly. When the host is decommissioned or lost too, the `hostOpt` can be a specific host address. Otherwise, it's `None` to indicate that only the executor is decommissioned or lost.

Now that we have the host info, we can also unregister the host shuffle map status when `executorLost` is triggered for the decommissioned executor.

Besides, this PR also includes a few cleanups around the touched code.

### Why are the changes needed?

It helps to unregister the shuffle map status earlier for both decommission and normal executor lost cases.

It also saves memory in  `TaskSchedulerImpl` and simplifies the code a little bit.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR only refactor the code. The original behaviour should be covered by `DecommissionWorkerSuite`.

Closes #29579 from Ngone51/impr-decom.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
Ngone51 authored and cloud-fan committed Sep 8, 2020
1 parent 4144b6d commit 125cbe3
Show file tree
Hide file tree
Showing 23 changed files with 98 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ private[spark] class ExecutorAllocationManager(
// when the task backlog decreased.
if (decommissionEnabled) {
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray
id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray
client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
} else {
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ private[deploy] object DeployMessages {
Utils.checkHostPort(hostPort)
}

// When the host of Worker is lost or decommissioned, the `workerHost` is the host address
// of that Worker. Otherwise, it's None.
case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
exitStatus: Option[Int], workerLost: Boolean)
exitStatus: Option[Int], workerHost: Option[String])

case class ApplicationRemoved(message: String)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient(
cores))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)

case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
case ExecutorUpdated(id, state, message, exitStatus, workerHost) =>
val fullId = appId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerHost)
} else if (state == ExecutorState.DECOMMISSIONED) {
listener.executorDecommissioned(fullId,
ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost))
ExecutorDecommissionInfo(message.getOrElse(""), workerHost))
}

case WorkerRemoved(id, host, message) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[spark] trait StandaloneAppClientListener {
fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit

def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
fullId: String, message: String, exitStatus: Option[Int], workerHost: Option[String]): Unit

def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit

Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private[deploy] class Master(
appInfo.resetRetryCount()
}

exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, None))

if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
Expand Down Expand Up @@ -909,9 +909,10 @@ private[deploy] class Master(
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.DECOMMISSIONED,
Some("worker decommissioned"), None,
// workerLost is being set to true here to let the driver know that the host (aka. worker)
// is also being decommissioned.
workerLost = true))
// worker host is being set here to let the driver know that the host (aka. worker)
// is also being decommissioned. So the driver can unregister all the shuffle map
// statues located at this host when it receives the executor lost event.
Some(worker.host)))
exec.state = ExecutorState.DECOMMISSIONED
exec.application.removeExecutor(exec)
}
Expand All @@ -932,7 +933,7 @@ private[deploy] class Master(
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
exec.id, ExecutorState.LOST, Some("worker lost"), None, Some(worker.host)))
exec.state = ExecutorState.LOST
exec.application.removeExecutor(exec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ private[spark] class CoarseGrainedExecutorBackend(
driver match {
case Some(endpoint) =>
logInfo("Sending DecommissionExecutor to driver.")
endpoint.send(
DecommissionExecutor(
executorId,
ExecutorDecommissionInfo(msg, isHostDecommissioned = false)))
endpoint.send(DecommissionExecutor(executorId, ExecutorDecommissionInfo(msg)))
case _ =>
logError("No registered driver to send Decommission to.")
}
Expand Down Expand Up @@ -275,7 +272,7 @@ private[spark] class CoarseGrainedExecutorBackend(
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(
executorId, ExecutorDecommissionInfo(msg, false)))
executorId, ExecutorDecommissionInfo(msg)))
} else {
logError("No driver to message decommissioning.")
}
Expand Down
10 changes: 0 additions & 10 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1889,16 +1889,6 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL =
ConfigBuilder("spark.executor.decommission.removed.infoCacheTTL")
.doc("Duration for which a decommissioned executor's information will be kept after its" +
"removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to " +
"decommissioning even after the mapper executor has been decommissioned. This allows " +
"eager recovery from fetch failures caused by decommissioning, increasing job robustness.")
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5m")

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.version("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1826,7 +1826,7 @@ private[spark] class DAGScheduler(
val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled
val isHostDecommissioned = taskScheduler
.getExecutorDecommissionState(bmAddress.executorId)
.exists(_.isHostDecommissioned)
.exists(_.workerHost.isDefined)

// Shuffle output of all executors on host `bmAddress.host` may be lost if:
// - External shuffle service is enabled, so we assume that all shuffle data on node is
Expand Down Expand Up @@ -1989,15 +1989,15 @@ private[spark] class DAGScheduler(
*/
private[scheduler] def handleExecutorLost(
execId: String,
workerLost: Boolean): Unit = {
workerHost: Option[String]): Unit = {
// if the cluster manager explicitly tells us that the entire worker was lost, then
// we know to unregister shuffle output. (Note that "worker" specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the Worker.)
val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled
val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
hostToUnregisterOutputs = None,
hostToUnregisterOutputs = workerHost,
maybeEpoch = None)
}

Expand Down Expand Up @@ -2366,11 +2366,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case ExecutorProcessLost(_, true, _) => true
case _ => false
val workerHost = reason match {
case ExecutorProcessLost(_, workerHost, _) => workerHost
case ExecutorDecommission(workerHost) => workerHost
case _ => None
}
dagScheduler.handleExecutorLost(execId, workerLost)
dagScheduler.handleExecutorLost(execId, workerHost)

case WorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package org.apache.spark.scheduler
/**
* Message providing more detail when an executor is being decommissioned.
* @param message Human readable reason for why the decommissioning is happening.
* @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is
* being decommissioned too. Used to infer if the shuffle data might
* be lost even if the external shuffle service is enabled.
* @param workerHost When workerHost is defined, it means the host (aka the `node` or `worker`
* in other places) has been decommissioned too. Used to infer if the
* shuffle data might be lost even if the external shuffle service is enabled.
*/
private[spark]
case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean)
case class ExecutorDecommissionInfo(message: String, workerHost: Option[String] = None)

/**
* State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived
Expand All @@ -37,4 +37,4 @@ case class ExecutorDecommissionState(
// to estimate when the executor might eventually be lost if EXECUTOR_DECOMMISSION_KILL_INTERVAL
// is configured.
startTime: Long,
isHostDecommissioned: Boolean)
workerHost: Option[String] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los

/**
* @param _message human readable loss reason
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
* @param workerHost it's defined when the host is confirmed lost too (i.e. including
* shuffle service)
* @param causedByApp whether the loss of the executor is the fault of the running app.
* (assumed true by default unless known explicitly otherwise)
*/
private[spark]
case class ExecutorProcessLost(
_message: String = "Executor Process Lost",
workerLost: Boolean = false,
workerHost: Option[String] = None,
causedByApp: Boolean = true)
extends ExecutorLossReason(_message)

Expand All @@ -69,5 +70,8 @@ case class ExecutorProcessLost(
*
* This is used by the task scheduler to remove state associated with the executor, but
* not yet fail any tasks that were running in the executor before the executor is "fully" lost.
*
* @param workerHost it is defined when the worker is decommissioned too
*/
private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.")
private [spark] case class ExecutorDecommission(workerHost: Option[String] = None)
extends ExecutorLossReason("Executor decommission.")
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,6 @@ private[spark] class TaskSchedulerImpl(
// continue to run even after being asked to decommission, but they will eventually exit.
val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionState]

// When they exit and we know of that via heartbeat failure, we will add them to this cache.
// This cache is consulted to know if a fetch failure is because a source executor was
// decommissioned.
lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder()
.expireAfterWrite(
conf.get(DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL), TimeUnit.SECONDS)
.ticker(new Ticker{
override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis())
})
.build[String, ExecutorDecommissionState]()
.asMap()

def runningTasksByExecutors: Map[String, Int] = synchronized {
executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
}
Expand Down Expand Up @@ -922,28 +910,8 @@ private[spark] class TaskSchedulerImpl(
synchronized {
// Don't bother noting decommissioning for executors that we don't know about
if (executorIdToHost.contains(executorId)) {
val oldDecomStateOpt = executorsPendingDecommission.get(executorId)
val newDecomState = if (oldDecomStateOpt.isEmpty) {
// This is the first time we are hearing of decommissioning this executor,
// so create a brand new state.
ExecutorDecommissionState(
clock.getTimeMillis(),
decommissionInfo.isHostDecommissioned)
} else {
val oldDecomState = oldDecomStateOpt.get
if (!oldDecomState.isHostDecommissioned && decommissionInfo.isHostDecommissioned) {
// Only the cluster manager is allowed to send decommission messages with
// isHostDecommissioned set. So the new decommissionInfo is from the cluster
// manager and is thus authoritative. Flip isHostDecommissioned to true but keep the old
// decommission start time.
ExecutorDecommissionState(
oldDecomState.startTime,
isHostDecommissioned = true)
} else {
oldDecomState
}
}
executorsPendingDecommission(executorId) = newDecomState
executorsPendingDecommission(executorId) =
ExecutorDecommissionState(clock.getTimeMillis(), decommissionInfo.workerHost)
}
}
rootPool.executorDecommission(executorId)
Expand All @@ -952,26 +920,11 @@ private[spark] class TaskSchedulerImpl(

override def getExecutorDecommissionState(executorId: String)
: Option[ExecutorDecommissionState] = synchronized {
executorsPendingDecommission
.get(executorId)
.orElse(Option(decommissionedExecutorsRemoved.get(executorId)))
executorsPendingDecommission.get(executorId)
}

override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = {
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None
val reason = givenReason match {
// Handle executor process loss due to decommissioning
case ExecutorProcessLost(message, origWorkerLost, origCausedByApp) =>
val executorDecommissionState = getExecutorDecommissionState(executorId)
ExecutorProcessLost(
message,
// Also mark the worker lost if we know that the host was decommissioned
origWorkerLost || executorDecommissionState.exists(_.isHostDecommissioned),
// Executor loss is certainly not caused by app if we knew that this executor is being
// decommissioned
causedByApp = executorDecommissionState.isEmpty && origCausedByApp)
case e => e
}

synchronized {
if (executorIdToRunningTaskIds.contains(executorId)) {
Expand Down Expand Up @@ -1060,9 +1013,7 @@ private[spark] class TaskSchedulerImpl(
}
}


val decomState = executorsPendingDecommission.remove(executorId)
decomState.foreach(decommissionedExecutorsRemoved.put(executorId, _))
executorsPendingDecommission.remove(executorId)

if (reason != LossReasonPending) {
executorIdToHost -= executorId
Expand Down Expand Up @@ -1104,7 +1055,7 @@ private[spark] class TaskSchedulerImpl(
// exposed for test
protected final def isHostDecommissioned(host: String): Boolean = {
hostToExecutors.get(host).exists { executors =>
executors.exists(e => getExecutorDecommissionState(e).exists(_.isHostDecommissioned))
executors.exists(e => getExecutorDecommissionState(e).exists(_.workerHost.isDefined))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ private[spark] class TaskSetManager(
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
val exitCausedByApp: Boolean = reason match {
case exited: ExecutorExited => exited.exitCausedByApp
case ExecutorKilled => false
case ExecutorKilled | ExecutorDecommission(_) => false
case ExecutorProcessLost(_, _, false) => false
case _ => true
}
Expand Down
Loading

0 comments on commit 125cbe3

Please sign in to comment.