-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][SPARK-31197][CORE] Exit the executor once all tasks and migrations are finished built on top of on top of spark20629 #28817
Changes from all commits
67dec3c
4f2e7ce
ecd1a14
9c8836a
ccb8827
c4ed3bd
ac510dc
e13b070
d3ecd8e
6bdf0c2
f2eb6eb
81e29a8
6f0544a
41d5464
a517d67
bce1613
6c1b364
a904030
5838639
4b3fb27
6a940e6
d591507
ea8efc7
a2c0557
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -210,6 +210,10 @@ private[spark] class CoarseGrainedExecutorBackend( | |||
case UpdateDelegationTokens(tokenBytes) => | ||||
logInfo(s"Received tokens of ${tokenBytes.length} bytes") | ||||
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) | ||||
|
||||
case DecommissionSelf => | ||||
logInfo("Received decommission self") | ||||
decommissionSelf() | ||||
} | ||||
|
||||
override def onDisconnected(remoteAddress: RpcAddress): Unit = { | ||||
|
@@ -258,26 +262,65 @@ private[spark] class CoarseGrainedExecutorBackend( | |||
System.exit(code) | ||||
} | ||||
|
||||
private def decommissionSelf(): Boolean = { | ||||
logInfo("Decommissioning self w/sync") | ||||
try { | ||||
decommissioned = true | ||||
// Tell master we are are decommissioned so it stops trying to schedule us | ||||
if (driver.nonEmpty) { | ||||
driver.get.askSync[Boolean](DecommissionExecutor(executorId)) | ||||
private var previousAllBlocksMigrated = false | ||||
private def shutdownIfDone(): Unit = { | ||||
val numRunningTasks = executor.numRunningTasks | ||||
logInfo(s"Checking to see if we can shutdown have ${numRunningTasks} running tasks.") | ||||
if (executor.numRunningTasks == 0) { | ||||
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { | ||||
val allBlocksMigrated = env.blockManager.decommissionManager match { | ||||
case Some(m) => m.allBlocksMigrated | ||||
case None => false // We haven't started migrations yet. | ||||
} | ||||
if (allBlocksMigrated && previousAllBlocksMigrated) { | ||||
logInfo("No running tasks, all blocks migrated, stopping.") | ||||
exitExecutor(0, "Finished decommissioning", notifyDriver = true) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Interestingly, the driver does indeed respond back with a Also, as currently written, this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it's my understanding the I think swapping out exit executor for instead telling the driver to stop the executor and avoiding the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was talking about the case where we get shot down before we had a chance to cleanly exit on line 276. Say for example, some time out expires and the executor/node is brought down. Are In this case, the executor will eventually be marked loss via a heartbeat/timeout. And that loss would be deemed as the fault of the task, and could cause job failures. I am wondering if we can fix that scenario of an unclean exit ? One workaround I suggested above was to send a message to the driver saying that the executor is going to go away soon. When that happens (in a clean or unclean way), that loss shouldn't be attributed to the task. Perhaps this unclean executor loss/timeout handling is follow up work ? We (or rather I) can create Jira's for this under the parent ticket :-). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, although I think this behaviour is covered by the changes in https://github.com/apache/spark/pull/26440/files (we only increment failures if the executors previous state was not decommissioning). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please double check that ? I couldn't find this behavior when scouring TaskSchedulerImpl, and TaskSetManager. The only place we check for an executor being decommissioned in that PR is when scheduling tasks (in CoarseGrainedSchedulerBackend#isExecutorActive). Thanks ! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you point to where in TaskSchedulerImpl it's going to fail the job? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In this match block, we will hit the default case which will treat the failure as having been caused by the app and thus penalize it. This routine is called from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks :) If you want to make a PR for that I'd be happy to review/merge since I think that would not depend on any of the in-flight PRs just the current code in master. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Absolutely !. Thanks |
||||
} | ||||
previousAllBlocksMigrated = allBlocksMigrated | ||||
} else { | ||||
logError("No driver to message decommissioning.") | ||||
logInfo("No running tasks, no block migration configured, stopping.") | ||||
exitExecutor(0, "Finished decommissioning", notifyDriver = true) | ||||
} | ||||
if (executor != null) { | ||||
executor.decommission() | ||||
} else { | ||||
// If there's a running task it could store blocks. | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this logic of previousAllBlocks and allBlocks migrated is a bit confusing. Its not clear why the previous state has to be considered. I wonder if the following code can make this "history" aspect a bit clearer:
Also, should we really be checking for numRunningTasks here ? What if some race condition caused some tasks to be scheduled onto us while we were marked for decom ? Finally, should there be a timeout for how much time the executor will stay alive in decommissioned state ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a task is scheduled before we are asked to decom. You can verify this is covered by taking the logic out and watching the tests fail :) (There's an ungly thread sleep in the tests to make this possible). Since the block migrations are not atomic, I do think we need the 2x logic, unfortunately, think of this situation:
Now that being said that's probably a corner case, and arguably not super important since we're really only doing best effort, but I think for the overhead of one extra boolean it's worth it to cover this corner case. |
||||
previousAllBlocksMigrated = false | ||||
} | ||||
} | ||||
|
||||
private def decommissionSelf(): Boolean = { | ||||
if (!decommissioned) { | ||||
logInfo("Decommissioning self and starting background thread to exit when done.") | ||||
try { | ||||
decommissioned = true | ||||
// Tell master we are are decommissioned so it stops trying to schedule us | ||||
if (driver.nonEmpty) { | ||||
driver.get.askSync[Boolean](DecommissionExecutor(executorId)) | ||||
} else { | ||||
logError("No driver to message decommissioning.") | ||||
} | ||||
if (executor != null) { | ||||
executor.decommission() | ||||
} | ||||
// Shutdown the executor once all tasks are gone :) | ||||
val shutdownThread = new Thread() { | ||||
while (true) { | ||||
shutdownIfDone() | ||||
Thread.sleep(1000) // 1s | ||||
} | ||||
} | ||||
shutdownThread.setDaemon(true) | ||||
shutdownThread.setName("decommission-shutdown-thread") | ||||
shutdownThread.start() | ||||
logInfo("Done decommissioning self.") | ||||
// Return true since we are handling a signal | ||||
true | ||||
} catch { | ||||
case e: Exception => | ||||
logError(s"Error ${e} during attempt to decommission self") | ||||
false | ||||
} | ||||
logInfo("Done decommissioning self.") | ||||
// Return true since we are handling a signal | ||||
} else { | ||||
true | ||||
} catch { | ||||
case e: Exception => | ||||
logError(s"Error ${e} during attempt to decommission self") | ||||
false | ||||
} | ||||
} | ||||
} | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,8 @@ private[spark] object CoarseGrainedClusterMessages { | |
case class UpdateDelegationTokens(tokens: Array[Byte]) | ||
extends CoarseGrainedClusterMessage | ||
|
||
case object DecommissionSelf extends CoarseGrainedClusterMessage // Mark as decommissioned. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO, the DecommissionSelf naming is a bit ambiguous: "Who is self here" ? The sender or the receiver ? This message is now send from the driver to the executor: So perhaps we should just repurpose DecommissionExecutor with a check for the executorId ? Not a big deal but trying to reduce the number of message types introduced by this feature ;) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think decommissionself is pretty clearly telling the receiver to decommission itself. That being said I'm open to renaming. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good ;-) I checked that DecommissionSelf is not indeed used anywhere else, so it should be unambiguous. Lets keep the name. |
||
|
||
// Executors to driver | ||
case class RegisterExecutor( | ||
executorId: String, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this variable be marked volatile ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, this will only be accessed in one thread.