-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][SPARK-31197][CORE] Exit the executor once all tasks and migrations are finished built on top of on top of spark20629 #28817
Changes from 4 commits
67dec3c
4f2e7ce
ecd1a14
9c8836a
ccb8827
c4ed3bd
ac510dc
e13b070
d3ecd8e
6bdf0c2
f2eb6eb
81e29a8
6f0544a
41d5464
a517d67
bce1613
6c1b364
a904030
5838639
4b3fb27
6a940e6
d591507
ea8efc7
a2c0557
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -210,6 +210,10 @@ private[spark] class CoarseGrainedExecutorBackend( | |||
case UpdateDelegationTokens(tokenBytes) => | ||||
logInfo(s"Received tokens of ${tokenBytes.length} bytes") | ||||
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) | ||||
|
||||
case DecommissionSelf => | ||||
logInfo("Received decommission self") | ||||
decommissionSelf() | ||||
} | ||||
|
||||
override def onDisconnected(remoteAddress: RpcAddress): Unit = { | ||||
|
@@ -258,26 +262,65 @@ private[spark] class CoarseGrainedExecutorBackend( | |||
System.exit(code) | ||||
} | ||||
|
||||
private def decommissionSelf(): Boolean = { | ||||
logInfo("Decommissioning self w/sync") | ||||
try { | ||||
decommissioned = true | ||||
// Tell master we are are decommissioned so it stops trying to schedule us | ||||
if (driver.nonEmpty) { | ||||
driver.get.askSync[Boolean](DecommissionExecutor(executorId)) | ||||
private var previousAllBlocksMigrated = false | ||||
private def shutdownIfDone(): Unit = { | ||||
val numRunningTasks = executor.numRunningTasks | ||||
logInfo(s"Checking to see if we can shutdown have ${numRunningTasks} running tasks.") | ||||
if (executor.numRunningTasks == 0) { | ||||
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { | ||||
val allBlocksMigrated = env.blockManager.decommissionManager match { | ||||
case Some(m) => m.allBlocksMigrated | ||||
case None => false // We haven't started migrations yet. | ||||
} | ||||
if (allBlocksMigrated && previousAllBlocksMigrated) { | ||||
logInfo("No running tasks, all blocks migrated, stopping.") | ||||
exitExecutor(0, "Finished decommissioning", notifyDriver = true) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Interestingly, the driver does indeed respond back with a Also, as currently written, this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it's my understanding the I think swapping out exit executor for instead telling the driver to stop the executor and avoiding the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was talking about the case where we get shot down before we had a chance to cleanly exit on line 276. Say for example, some time out expires and the executor/node is brought down. Are In this case, the executor will eventually be marked loss via a heartbeat/timeout. And that loss would be deemed as the fault of the task, and could cause job failures. I am wondering if we can fix that scenario of an unclean exit ? One workaround I suggested above was to send a message to the driver saying that the executor is going to go away soon. When that happens (in a clean or unclean way), that loss shouldn't be attributed to the task. Perhaps this unclean executor loss/timeout handling is follow up work ? We (or rather I) can create Jira's for this under the parent ticket :-). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, although I think this behaviour is covered by the changes in https://github.com/apache/spark/pull/26440/files (we only increment failures if the executors previous state was not decommissioning). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please double check that ? I couldn't find this behavior when scouring TaskSchedulerImpl, and TaskSetManager. The only place we check for an executor being decommissioned in that PR is when scheduling tasks (in CoarseGrainedSchedulerBackend#isExecutorActive). Thanks ! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you point to where in TaskSchedulerImpl it's going to fail the job? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In this match block, we will hit the default case which will treat the failure as having been caused by the app and thus penalize it. This routine is called from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks :) If you want to make a PR for that I'd be happy to review/merge since I think that would not depend on any of the in-flight PRs just the current code in master. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Absolutely !. Thanks |
||||
} | ||||
previousAllBlocksMigrated = allBlocksMigrated | ||||
} else { | ||||
logError("No driver to message decommissioning.") | ||||
logInfo("No running tasks, no block migration configured, stopping.") | ||||
exitExecutor(0, "Finished decommissioning", notifyDriver = true) | ||||
} | ||||
if (executor != null) { | ||||
executor.decommission() | ||||
} else { | ||||
// If there's a running task it could store blocks. | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this logic of previousAllBlocks and allBlocks migrated is a bit confusing. Its not clear why the previous state has to be considered. I wonder if the following code can make this "history" aspect a bit clearer:
Also, should we really be checking for numRunningTasks here ? What if some race condition caused some tasks to be scheduled onto us while we were marked for decom ? Finally, should there be a timeout for how much time the executor will stay alive in decommissioned state ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a task is scheduled before we are asked to decom. You can verify this is covered by taking the logic out and watching the tests fail :) (There's an ungly thread sleep in the tests to make this possible). Since the block migrations are not atomic, I do think we need the 2x logic, unfortunately, think of this situation:
Now that being said that's probably a corner case, and arguably not super important since we're really only doing best effort, but I think for the overhead of one extra boolean it's worth it to cover this corner case. |
||||
previousAllBlocksMigrated = false | ||||
} | ||||
} | ||||
|
||||
private def decommissionSelf(): Boolean = { | ||||
if (!decommissioned) { | ||||
logInfo("Decommissioning self w/sync") | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we should expand what 'w/sync' stands for in the log message ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure |
||||
try { | ||||
decommissioned = true | ||||
// Tell master we are are decommissioned so it stops trying to schedule us | ||||
if (driver.nonEmpty) { | ||||
driver.get.askSync[Boolean](DecommissionExecutor(executorId)) | ||||
} else { | ||||
logError("No driver to message decommissioning.") | ||||
} | ||||
if (executor != null) { | ||||
executor.decommission() | ||||
} | ||||
// Shutdown the executor once all tasks are gone :) | ||||
val shutdownThread = new Thread() { | ||||
while (true) { | ||||
shutdownIfDone() | ||||
Thread.sleep(1000) // 1s | ||||
} | ||||
} | ||||
shutdownThread.setDaemon(true) | ||||
shutdownThread.setName("decommission-shutdown-thread") | ||||
shutdownThread.start() | ||||
logInfo("Done decommissioning self.") | ||||
// Return true since we are handling a signal | ||||
true | ||||
} catch { | ||||
case e: Exception => | ||||
logError(s"Error ${e} during attempt to decommission self") | ||||
false | ||||
} | ||||
logInfo("Done decommissioning self.") | ||||
// Return true since we are handling a signal | ||||
} else { | ||||
true | ||||
} catch { | ||||
case e: Exception => | ||||
logError(s"Error ${e} during attempt to decommission self") | ||||
false | ||||
} | ||||
} | ||||
} | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -229,38 +229,12 @@ private[spark] class Executor( | |
|
||
private[executor] def numRunningTasks: Int = runningTasks.size() | ||
|
||
private def shutdownIfDone(): Unit = { | ||
if (numRunningTasks == 0) { | ||
if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { | ||
val allBlocksMigrated = env.blockManager.decommissionManager match { | ||
case Some(m) => m.allBlocksMigrated | ||
case None => false // We haven't started migrations yet. | ||
} | ||
if (allBlocksMigrated) { | ||
stop() | ||
} | ||
} else { | ||
stop() | ||
} | ||
} | ||
} | ||
|
||
|
||
/** | ||
* Mark an executor for decommissioning and avoid launching new tasks. | ||
*/ | ||
private[spark] def decommission(): Unit = { | ||
logInfo("Executor asked to decommission. Starting shutdown thread.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this comment looks stale. It should probably be moved to the CoarseGrainedBackendExecutor. Its also not clear to me what the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just logging for now. The reason I propagate the message to the executor is so that if we end up in a state where the executor believes it decommissioned (say local SIGPWR) but the driver doesn't it could be weird so having some logging is useful. |
||
decommissioned = true | ||
// Shutdown the executor once all tasks are gone :) | ||
val shutdownThread = new Thread() { | ||
while (true) { | ||
shutdownIfDone() | ||
Thread.sleep(1000) // 1s | ||
} | ||
} | ||
shutdownThread.setDaemon(true) | ||
shutdownThread.setName("decommission-shutdown-thread") | ||
shutdownThread.start() | ||
} | ||
|
||
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,8 @@ private[spark] object CoarseGrainedClusterMessages { | |
case class UpdateDelegationTokens(tokens: Array[Byte]) | ||
extends CoarseGrainedClusterMessage | ||
|
||
case object DecommissionSelf extends CoarseGrainedClusterMessage // Mark as decommissioned. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO, the DecommissionSelf naming is a bit ambiguous: "Who is self here" ? The sender or the receiver ? This message is now send from the driver to the executor: So perhaps we should just repurpose DecommissionExecutor with a check for the executorId ? Not a big deal but trying to reduce the number of message types introduced by this feature ;) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think decommissionself is pretty clearly telling the receiver to decommission itself. That being said I'm open to renaming. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good ;-) I checked that DecommissionSelf is not indeed used anywhere else, so it should be unambiguous. Lets keep the name. |
||
|
||
// Executors to driver | ||
case class RegisterExecutor( | ||
executorId: String, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1927,6 +1927,7 @@ private[spark] class BlockManager( | |
* Stop migrating shuffle blocks. | ||
*/ | ||
def stopOffloadingShuffleBlocks(): Unit = { | ||
logInfo("Stopping offloading shuffle blocks") | ||
migrationPeers.values.foreach(_.running = false) | ||
} | ||
|
||
|
@@ -2050,6 +2051,7 @@ private[spark] class BlockManager( | |
@volatile private var stopped = false | ||
// Since running tasks can add more blocks this can change. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to make sure I am totally understanding this: You mean that the running tasks that were already running when the decommissioning was started at the executor ? Because, I think we refuse launching new tasks when the decommissioning has started, so the new blocks being written must be written by already running tasks. Did I get this right ? Also, just to confirm I am still following along: I don't see this case handled in the existing BlockManagerSuite: I believe we are not testing writing new blocks while the decom/offload is in progress. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is covered, you can verify this by disabling this logic and seeing the test fail (albiet you'll have to run the test a few times because it becomes a race condition). Look at the "migrateDuring" flag for details. |
||
@volatile var allBlocksMigrated = false | ||
var previousBlocksLeft = true | ||
private val blockMigrationThread = new Thread { | ||
val sleepInterval = conf.get( | ||
config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) | ||
|
@@ -2065,17 +2067,20 @@ private[spark] class BlockManager( | |
var blocksLeft = false | ||
// If enabled we migrate shuffle blocks first as they are more expensive. | ||
if (conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED)) { | ||
logDebug(s"Attempting to replicate all shuffle blocks") | ||
logDebug("Attempting to replicate all shuffle blocks") | ||
blocksLeft = blocksLeft || offloadShuffleBlocks() | ||
logInfo(s"Done starting workers to migrate shuffle blocks") | ||
logInfo("Done starting workers to migrate shuffle blocks") | ||
} | ||
if (conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED)) { | ||
logDebug(s"Attempting to replicate all cached RDD blocks") | ||
logDebug("Attempting to replicate all cached RDD blocks") | ||
blocksLeft = blocksLeft || decommissionRddCacheBlocks() | ||
logInfo(s"Attempt to replicate all cached blocks done") | ||
logInfo("Attempt to replicate all cached blocks done") | ||
blocksLeft | ||
} | ||
allBlocksMigrated = ! blocksLeft | ||
logInfo(s"We have blocksLeft: ${blocksLeft}") | ||
// Avoid the situation where block was added during the loop | ||
allBlocksMigrated = (! blocksLeft ) && ( ! previousBlocksLeft ) | ||
previousBlocksLeft = blocksLeft | ||
if (!conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED) && | ||
!conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED)) { | ||
logWarning("Decommissioning, but no task configured set one or both:\n" + | ||
|
@@ -2117,6 +2122,7 @@ private[spark] class BlockManager( | |
} | ||
|
||
def stop(): Unit = { | ||
logInfo("Stopping decommission manager") | ||
decommissionManager.foreach(_.stop()) | ||
blockTransferService.close() | ||
if (blockStoreClient ne blockTransferService) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this variable be marked volatile ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, this will only be accessed in one thread.