-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][SPARK-31197][CORE] Exit the executor once all tasks and migrations are finished built on top of on top of spark20629 #28817
Changes from 1 commit
67dec3c
4f2e7ce
ecd1a14
9c8836a
ccb8827
c4ed3bd
ac510dc
e13b070
d3ecd8e
6bdf0c2
f2eb6eb
81e29a8
6f0544a
41d5464
a517d67
bce1613
6c1b364
a904030
5838639
4b3fb27
6a940e6
d591507
ea8efc7
a2c0557
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -210,6 +210,10 @@ private[spark] class CoarseGrainedExecutorBackend( | |
case UpdateDelegationTokens(tokenBytes) => | ||
logInfo(s"Received tokens of ${tokenBytes.length} bytes") | ||
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) | ||
|
||
case DecommissionSelf => | ||
logInfo("Received decommission self") | ||
decommissionSelf() | ||
} | ||
|
||
override def onDisconnected(remoteAddress: RpcAddress): Unit = { | ||
|
@@ -258,26 +262,60 @@ private[spark] class CoarseGrainedExecutorBackend( | |
System.exit(code) | ||
} | ||
|
||
private def decommissionSelf(): Boolean = { | ||
logInfo("Decommissioning self w/sync") | ||
try { | ||
decommissioned = true | ||
// Tell master we are are decommissioned so it stops trying to schedule us | ||
if (driver.nonEmpty) { | ||
driver.get.askSync[Boolean](DecommissionExecutor(executorId)) | ||
private def shutdownIfDone(): Unit = { | ||
val numRunningTasks = executor.numRunningTasks | ||
logInfo(s"Checking to see if we can shutdown have ${numRunningTasks} running tasks.") | ||
if (executor.numRunningTasks == 0) { | ||
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { | ||
val allBlocksMigrated = env.blockManager.decommissionManager match { | ||
case Some(m) => m.allBlocksMigrated | ||
case None => false // We haven't started migrations yet. | ||
} | ||
if (allBlocksMigrated) { | ||
logInfo("No running tasks, all blocks migrated, stopping.") | ||
exitExecutor(0, "Finished decommissioning", notifyDriver = true) | ||
} | ||
} else { | ||
logError("No driver to message decommissioning.") | ||
logInfo("No running tasks, no block migration configured, stopping.") | ||
exitExecutor(0, "Finished decommissioning", notifyDriver = true) | ||
} | ||
if (executor != null) { | ||
executor.decommission() | ||
} | ||
} | ||
|
||
private def decommissionSelf(): Boolean = { | ||
if (!decommissioned) { | ||
logInfo("Decommissioning self w/sync") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we should expand what 'w/sync' stands for in the log message ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure |
||
try { | ||
decommissioned = true | ||
// Tell master we are are decommissioned so it stops trying to schedule us | ||
if (driver.nonEmpty) { | ||
driver.get.askSync[Boolean](DecommissionExecutor(executorId)) | ||
} else { | ||
logError("No driver to message decommissioning.") | ||
} | ||
if (executor != null) { | ||
executor.decommission() | ||
} | ||
// Shutdown the executor once all tasks are gone :) | ||
val shutdownThread = new Thread() { | ||
while (true) { | ||
shutdownIfDone() | ||
Thread.sleep(1000) // 1s | ||
} | ||
} | ||
shutdownThread.setDaemon(true) | ||
shutdownThread.setName("decommission-shutdown-thread") | ||
shutdownThread.start() | ||
logInfo("Done decommissioning self.") | ||
// Return true since we are handling a signal | ||
true | ||
} catch { | ||
case e: Exception => | ||
logError(s"Error ${e} during attempt to decommission self") | ||
false | ||
} | ||
logInfo("Done decommissioning self.") | ||
// Return true since we are handling a signal | ||
} else { | ||
true | ||
} catch { | ||
case e: Exception => | ||
logError(s"Error ${e} during attempt to decommission self") | ||
false | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -229,38 +229,12 @@ private[spark] class Executor( | |
|
||
private[executor] def numRunningTasks: Int = runningTasks.size() | ||
|
||
private def shutdownIfDone(): Unit = { | ||
if (numRunningTasks == 0) { | ||
if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { | ||
val allBlocksMigrated = env.blockManager.decommissionManager match { | ||
case Some(m) => m.allBlocksMigrated | ||
case None => false // We haven't started migrations yet. | ||
} | ||
if (allBlocksMigrated) { | ||
stop() | ||
} | ||
} else { | ||
stop() | ||
} | ||
} | ||
} | ||
|
||
|
||
/** | ||
* Mark an executor for decommissioning and avoid launching new tasks. | ||
*/ | ||
private[spark] def decommission(): Unit = { | ||
logInfo("Executor asked to decommission. Starting shutdown thread.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this comment looks stale. It should probably be moved to the CoarseGrainedBackendExecutor. Its also not clear to me what the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just logging for now. The reason I propagate the message to the executor is so that if we end up in a state where the executor believes it decommissioned (say local SIGPWR) but the driver doesn't it could be weird so having some logging is useful. |
||
decommissioned = true | ||
// Shutdown the executor once all tasks are gone :) | ||
val shutdownThread = new Thread() { | ||
while (true) { | ||
shutdownIfDone() | ||
Thread.sleep(1000) // 1s | ||
} | ||
} | ||
shutdownThread.setDaemon(true) | ||
shutdownThread.setName("decommission-shutdown-thread") | ||
shutdownThread.start() | ||
} | ||
|
||
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,8 @@ private[spark] object CoarseGrainedClusterMessages { | |
case class UpdateDelegationTokens(tokens: Array[Byte]) | ||
extends CoarseGrainedClusterMessage | ||
|
||
case object DecommissionSelf extends CoarseGrainedClusterMessage // Mark as decommissioned. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO, the DecommissionSelf naming is a bit ambiguous: "Who is self here" ? The sender or the receiver ? This message is now send from the driver to the executor: So perhaps we should just repurpose DecommissionExecutor with a check for the executorId ? Not a big deal but trying to reduce the number of message types introduced by this feature ;) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think decommissionself is pretty clearly telling the receiver to decommission itself. That being said I'm open to renaming. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good ;-) I checked that DecommissionSelf is not indeed used anywhere else, so it should be unambiguous. Lets keep the name. |
||
|
||
// Executors to driver | ||
case class RegisterExecutor( | ||
executorId: String, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1927,6 +1927,7 @@ private[spark] class BlockManager( | |
* Stop migrating shuffle blocks. | ||
*/ | ||
def stopOffloadingShuffleBlocks(): Unit = { | ||
logInfo("Stopping offloading shuffle blocks") | ||
migrationPeers.values.foreach(_.running = false) | ||
} | ||
|
||
|
@@ -2050,6 +2051,7 @@ private[spark] class BlockManager( | |
@volatile private var stopped = false | ||
// Since running tasks can add more blocks this can change. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to make sure I am totally understanding this: You mean that the running tasks that were already running when the decommissioning was started at the executor ? Because, I think we refuse launching new tasks when the decommissioning has started, so the new blocks being written must be written by already running tasks. Did I get this right ? Also, just to confirm I am still following along: I don't see this case handled in the existing BlockManagerSuite: I believe we are not testing writing new blocks while the decom/offload is in progress. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is covered, you can verify this by disabling this logic and seeing the test fail (albiet you'll have to run the test a few times because it becomes a race condition). Look at the "migrateDuring" flag for details. |
||
@volatile var allBlocksMigrated = false | ||
var previousBlocksLeft = true | ||
private val blockMigrationThread = new Thread { | ||
val sleepInterval = conf.get( | ||
config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) | ||
|
@@ -2065,17 +2067,20 @@ private[spark] class BlockManager( | |
var blocksLeft = false | ||
// If enabled we migrate shuffle blocks first as they are more expensive. | ||
if (conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED)) { | ||
logDebug(s"Attempting to replicate all shuffle blocks") | ||
logDebug("Attempting to replicate all shuffle blocks") | ||
blocksLeft = blocksLeft || offloadShuffleBlocks() | ||
logInfo(s"Done starting workers to migrate shuffle blocks") | ||
logInfo("Done starting workers to migrate shuffle blocks") | ||
} | ||
if (conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED)) { | ||
logDebug(s"Attempting to replicate all cached RDD blocks") | ||
logDebug("Attempting to replicate all cached RDD blocks") | ||
blocksLeft = blocksLeft || decommissionRddCacheBlocks() | ||
logInfo(s"Attempt to replicate all cached blocks done") | ||
logInfo("Attempt to replicate all cached blocks done") | ||
blocksLeft | ||
} | ||
allBlocksMigrated = ! blocksLeft | ||
logInfo(s"We have blocksLeft: ${blocksLeft}") | ||
// Avoid the situation where block was added during the loop | ||
allBlocksMigrated = (! blocksLeft ) && ( ! previousBlocksLeft ) | ||
previousBlocksLeft = blocksLeft | ||
if (!conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED) && | ||
!conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED)) { | ||
logWarning("Decommissioning, but no task configured set one or both:\n" + | ||
|
@@ -2117,6 +2122,7 @@ private[spark] class BlockManager( | |
} | ||
|
||
def stop(): Unit = { | ||
logInfo("Stopping decommission manager") | ||
decommissionManager.foreach(_.stop()) | ||
blockTransferService.close() | ||
if (blockStoreClient ne blockTransferService) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exitExecutor
asynchronously sends RemoveExecutor to the driver. Does that actually make it to the driver ? There is also this question about if we should be using the sameShutdown
/StopExecutor
codepath for doing the stopping ? (But althought it seems that we do want to intimate to the driver that the executor is being removed).Interestingly, the driver does indeed respond back with a
StopExecutor
and does trigger the clean shutdown path in the executor, but again I wonder if it is too late for it. Perhaps we shouldn't be callingSystem.exit
here ?Also, as currently written, this
exitExecutor
could cause job failures: Since theTaskSchedulerImpl
will treat theExecutorLossReason
send by the executor to the driver as anexitCausedByApp
and thus penalize the task. Instead, I think we shouldn't penalize the running job on a planned executor decommission. One workaround might be to actually respond back to the driver withExecutorDecommission
(which is not used elsewhere currently) and then handle that specifically in theTaskSchedulerImpl
's determination ofexitCausedByApp
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it's my understanding the
TaskSchedulerImpl
shouldn't have any job failures because we've waited for all the tasks on the executor to finish before calling this code path. Unless is there something I've missed there?I think swapping out exit executor for instead telling the driver to stop the executor and avoiding the
system.exit
makes sense either way though.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was talking about the case where we get shot down before we had a chance to cleanly exit on line 276. Say for example, some time out expires and the executor/node is brought down.
Are
decom.sh
anddecommission-slave.sh
expected to wait until the executor/worker process has properly shut down ? I think they have some timeouts in them to kill the executor ? Or consider a spot kill scenario where you got some warning (like 2 minutes) and then the machine is yanked out.In this case, the executor will eventually be marked loss via a heartbeat/timeout. And that loss would be deemed as the fault of the task, and could cause job failures. I am wondering if we can fix that scenario of an unclean exit ?
One workaround I suggested above was to send a message to the driver saying that the executor is going to go away soon. When that happens (in a clean or unclean way), that loss shouldn't be attributed to the task.
Perhaps this unclean executor loss/timeout handling is follow up work ? We (or rather I) can create Jira's for this under the parent ticket :-).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, although I think this behaviour is covered by the changes in https://github.com/apache/spark/pull/26440/files (we only increment failures if the executors previous state was not decommissioning).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please double check that ? I couldn't find this behavior when scouring TaskSchedulerImpl, and TaskSetManager. The only place we check for an executor being decommissioned in that PR is when scheduling tasks (in CoarseGrainedSchedulerBackend#isExecutorActive). Thanks !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point to where in TaskSchedulerImpl it's going to fail the job?
core/src/main/scala/org/apache/spark/deploy/master/Master.scala
is where the current code is, but there might be an additional case that needs to be covered.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Line 980 in a2c0557
In this match block, we will hit the default case which will treat the failure as having been caused by the app and thus penalize it.
This routine is called from
TaskScheduler.executorLost
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks :) If you want to make a PR for that I'd be happy to review/merge since I think that would not depend on any of the in-flight PRs just the current code in master.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely !. Thanks