Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning #29211

Closed
wants to merge 20 commits into from

Conversation

holdenk
Copy link
Contributor

@holdenk holdenk commented Jul 23, 2020

What changes were proposed in this pull request?

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of #28817

Why are the changes needed?

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

Does this PR introduce any user-facing change?

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

How was this patch tested?

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

@SparkQA
Copy link

SparkQA commented Jul 23, 2020

Test build #126436 has started for PR 29211 at commit 525b335.

@shaneknapp
Copy link
Contributor

test this please

@holdenk
Copy link
Contributor Author

holdenk commented Jul 23, 2020

cc @attilapiros & @agrawaldevesh

@@ -668,7 +668,7 @@ private[deploy] class Worker(
finishedApps += id
maybeCleanupApplication(id)

case DecommissionSelf =>
case WorkerDecommission(_, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should this be DecommissionWorker ? That sounds more like a command to me.

Whereas WorkerDecommissioned sounds like a state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong preferences (although this doesn't have ed at the end so I don't think it's implying a state).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most other command messages start with a verb, while the 'state updated' ones are in past tense. So I think we should rename this to DecommissionWorker to match that.

@@ -59,7 +59,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
// Just replicate blocks as fast as we can during testing, there isn't another
// workload we need to worry about.
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this changed to 10 seconds ? If it is related to this PR (as opposed to just reducing flakyness), then please document that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just reduced the log spam in debugging this test but still allowed the test to complete quickly.

@@ -20,7 +20,7 @@ package org.apache.spark.storage
import scala.concurrent.duration._

import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, times, verify, when}
import org.mockito.Mockito.{atLeast => least, mock, times, verify, when}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is atLeast renamed as least ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theres something else which is somehow imported and named atLeast (I suspect from ScalaTest?) and I needed to rename it or the compiler got confused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a bit more poking, it's not scalatest. It's easier just to rename to avoid the conflict.

@SparkQA
Copy link

SparkQA commented Jul 24, 2020

Test build #126437 has finished for PR 29211 at commit ba51033.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 24, 2020

Test build #126440 has finished for PR 29211 at commit ba51033.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Returns the last migration time and a boolean for if all blocks have been migrated.
* If there are any tasks running since that time the boolean may be incorrect.
*/
private[spark] def lastMigrationInfo(): (Long, Boolean) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool !. I am always anal about logging to a fault :-P

@holdenk holdenk force-pushed the SPARK-31197-exit-execs-redone branch from fea00dd to 12e9e23 Compare July 24, 2020 01:31
Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@@ -668,7 +668,7 @@ private[deploy] class Worker(
finishedApps += id
maybeCleanupApplication(id)

case DecommissionSelf =>
case WorkerDecommission(_, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most other command messages start with a verb, while the 'state updated' ones are in past tense. So I think we should rename this to DecommissionWorker to match that.

@SparkQA
Copy link

SparkQA commented Jul 24, 2020

Test build #126457 has finished for PR 29211 at commit 5678d28.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 24, 2020

Test build #126456 has finished for PR 29211 at commit 46fb2f7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 24, 2020

Test build #126453 has finished for PR 29211 at commit 12e9e23.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 24, 2020

Test build #126451 has finished for PR 29211 at commit fea00dd.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 24, 2020

Test build #126460 has finished for PR 29211 at commit 50b9cb2.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class DecommissionWorker(

@holdenk
Copy link
Contributor Author

holdenk commented Jul 24, 2020

All of the github actions pass, if no one else wants more time to comment on this, I'll merge this today.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I apologize for not paying attention to these things before.

@@ -64,7 +64,7 @@ private[deploy] object DeployMessages {
* @param id the worker id
* @param worker the worker endpoint ref
*/
case class WorkerDecommission(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I messed up when reviewing this: I am so sorry :-(

Somehow I thought that WorkerDecommission is something being introduced in this PR, but apparently it is an existing message that went in #26440 itself.

Let's please keep the original message WorkerDecommission everywhere to avoid un-necessary merge conflicts etc in people's private forks :-)

At least I learned not to trust "changes since last reviewed" feature of Github when we all just force push into our PR branches :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No stress, I did it as a seperate commit so I can revert it easily.

@@ -80,6 +79,8 @@ private[spark] class CoarseGrainedExecutorBackend(
*/
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]

@volatile private var decommissioned = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: In the spirit of minimizing code diff, is there is a strong reason why this decommissioned flag was moved down from line 67 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't consider it related to the variables it was grouped with strongly enough so I moved it to stand alone.

@attilapiros
Copy link
Contributor

I would like to help here too but I am still busy with SPARK-32198.

@holdenk
Copy link
Contributor Author

holdenk commented Jul 24, 2020

Sure, I can hold off on this one then @attilapiros :)

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM again :-)

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just peeked into this and I can only continue the review in the second half of the next week.

val lastMigrationTime = if (
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) &&
conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
Math.min(lastRDDMigrationTime, lastShuffleMigrationTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be Math.max.

As I understand in CoarseGrainedExecutorBackend the lastTaskRunningTime increases with about 1000ms in every iteration, let's assume exactly 1000ms. So if the RDD migration finished at 500ms (let's count in the example from 0 here) but shuffle files to be migrated are still left and they will be finished only in the next round (let's assume in 1500ms) then we we never shutdown the executor: as in the current round blocksMigrated is false and in all the following ones migrationTime will be less than lastTaskRunningTime, so this condition will be never satisfied:

if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking a bit more on this part.

The intention of lastRDDMigrationTime, lastShuffleMigrationTime and lastTaskRunningTime is to notify the driver only once about the exit of this executor.

My first thought was to improve this by expressing this intention directly with a simple flag in the shutdownThread(like exitTriggered which can be used as a stopping condition instead of the while (true) ) and remove all the three time variables and change lastMigrationInfo to return just a boolean and rename it for its new role (for example to isFinished or just simply to finished).

Then I went a bit further and checked the exitExecutor method and I think even the flag is not necessarily needed:

Still I like to have a stopping condition in the while loop so please consider to use the flag instead of the times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk @agrawaldevesh what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you @attilapiros. On both the counts:

  • Converting this into a level trigger vs an edge trigger: ie the flag goes on and remains on when it is time to exit.
  • Having consistency b/w the termination condition of the thread above and the computation of the boolean variable in this method. Having a helper method would help with this and also improve longer term maintainability.

IMHO, this is great feedback and will improve this PR. Thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it only increases if there is a task running. We don’t want it to shutdown or consider the blocks migrated if there is a task migrated.
Unfortunately this flag can come and go depending on the tasks running on the executor which is why it’s structured the way it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR against your branch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agrawaldevesh: what is your opinion/idea?

I confess that even I am finding the logic in the lastMigrationInfo a bit hard to follow. And as far as I can see, this logic is the key to getting the shutdown thread to exit cleanly. If the information returned by lastMigrationInfo is wrong then the shutdown thread may exit prematurely or never. Both of which should be avoided, particularly because the shutdown thread has no timeout on the amount of time it can hang around.

This complexity is intrinsic. The logic is indeed complex but I think what we can add some more documentation here, perhaps explaining the intent of the code using a couple of examples ? Even better would be to please add a unit test for this. I think it should be easy to unit test just the lastMigrationInfo. This is just testing that the function is behaving as expected by stubbing out other parts. This test would also help with providing the necessary guardrails as we change this function in the future. I expect this function to change as we production harden the cache/shuffle block migration in the near term.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I’m down to unit test this function, makes sense to me and it’s fairly standalone.

I’ll work on the documentation over the weekend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the approach in that PR re-opens the race condition were preventing here. I would really rather not do that.

I’d also like us to make progress here though, so we can temporarily accept the race condition and file a JIRA and revisit it later as a stand-alone item if y’all are not comfortable with any of the ways to avoid the race.

@@ -213,10 +213,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts)
}

// Make the executor we decommissioned exit
sched.client.killExecutors(List(execToDecommission))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing some tests to ensure that the executors do actually exit. Particularly in light of @attilapiros recent comment :-P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that’s the point of taking out the kill executor. If you look a line bellow we wait on the semaphore to make sure the executor has exited.

@SparkQA
Copy link

SparkQA commented Jul 24, 2020

Test build #126510 has finished for PR 29211 at commit 484f8e2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WorkerDecommission(

var lastTaskRunningTime = System.nanoTime()
val sleep_time = 1000 // 1s

while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to execute this while loop within the Thread#run and also start the thread.

Currently the while runs as part of the construction of the new thread instance by the caller's thread and not as a separate new thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NICE CATCH. OMG!!.

@holdenk, I think we should add tests for this :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good catch. This will block on the decommissioning message and we want to return that. Let me think of how we can check that we’re not blocking in the decommission message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm checking that this is non blocking is going to be a pain, and from what we've seen it works out ok even if it does block. I'll switch it to non-blocking, but if it works regardless of blocking or non-blocking do we need a test for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to have a test (either a new one dedicated for this case or better to modify an existing one to check the executor really shutdowned).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the executor would shut down (we already test for that), it's just the RPC call would have been blocking rather than non-blocking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, you probably implying this test and this line:

I had no time to run the test yet or check the details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I'll double check by disabling the shutdown and seeing if the test fails in CI while I'm doing my other adventures today. (Then I'll either re-enable the shutdown code or work on getting it to fail first then re-enable it).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just wondering there is no timeout for this. So if somebody in the future make a mistake this will hang as long as the Jenkins admin comes. Should not we need to use tryAcquire?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched it to tryAcquire 👍

@SparkQA
Copy link

SparkQA commented Jul 27, 2020

Test build #126664 has finished for PR 29211 at commit 3debd73.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 27, 2020

Test build #126666 has finished for PR 29211 at commit 63846f1.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk holdenk force-pushed the SPARK-31197-exit-execs-redone branch from 63846f1 to 3910d50 Compare July 28, 2020 19:05
@SparkQA
Copy link

SparkQA commented Jul 28, 2020

Test build #126735 has started for PR 29211 at commit 285600a.

@SparkQA
Copy link

SparkQA commented Jul 28, 2020

Test build #126739 has finished for PR 29211 at commit 03a88ff.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor Author

holdenk commented Jul 29, 2020

So it seems without the thread start, we end up in the situation where there is an orphan worker (see https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126735/console ) so I think that's covered. I'll double-check that this goes away once we put the thread start back in, otherwise I'll do some more poking.

@holdenk holdenk force-pushed the SPARK-31197-exit-execs-redone branch from 477e77f to e81c3fc Compare August 5, 2020 18:39
@holdenk
Copy link
Contributor Author

holdenk commented Aug 5, 2020

The only outstanding point of discussion is a test timeout length, which I don't believe is critical to address. If CI passes I intend to merge this PR and rebase the next PR which brings the feature full circle.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the tests and working on this. More closer to better decommissioning !!.

// Wait for the executor to be removed
executorRemovedSem.acquire(1)
// Wait for the executor to be removed automatically after migration.
assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Would it be okay to add a comment about how long does it usually take in the code itself and mention that the larger timeout is for good measure ?

assert(done && currentTime > previousTime)
}
} finally {
bmDecomManager.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good !

@SparkQA
Copy link

SparkQA commented Aug 5, 2020

Test build #127101 has finished for PR 29211 at commit e81c3fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor Author

holdenk commented Aug 5, 2020

Since it's approved and passing jenkins & GHA I'm going to merge this, but I'll add the comment about the time in the follow up I'll rebase on top of this after merge.

@asfgit asfgit closed this in 375d348 Aug 5, 2020
@holdenk
Copy link
Contributor Author

holdenk commented Aug 5, 2020

Merged to the dev branch :)

@HyukjinKwon
Copy link
Member

cc @tgravescs, @mridulm, @squito, @Ngone51, @jiangxb1987 as well FYI

@@ -442,6 +442,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
case e: Exception =>
logError(s"Unexpected error during decommissioning ${e.toString}", e)
}
// Send decommission message to the executor, this may be a duplicate since the executor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we need to prevent duplicate DecommissionSelf at the executor side? For now, I don't see we handle duplicate DecommissionSelf and it may create duplicate threads as a result.

}
} else {
logInfo("No running tasks, no block migration configured, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we need to wait for tasks to finish if storage decommission is disabled. I mean, for a shuffle map task and result task with indirect result, their outputs still base on blocks. As a result, we'd spend time waiting for them to finish but get nothing good in return.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless it's an action. If someone just wants things to exit immediately they should not enable decommissioning.

exitExecutor(0, "Finished decommissioning", notifyDriver = true)
}
} else {
logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you missed a s" here ... the string interpolation isn't happening

agrawaldevesh pushed a commit to agrawaldevesh/spark that referenced this pull request Aug 13, 2020
The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recent PR's (apache#28085 and apache#29211) neccessitate a small reworking of the decommissioning logic.

Before getting into that, let me describe the intended behavior of decommissioning:

If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job.

- Per apache#29211, the executors now eagerly exit on decommissioning and thus the executor is lost before the fetch failure even happens. (I tested this by waiting some seconds before triggering the fetch failure). When an executor is lost, we forget its decommissioning information. The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.

- Per apache#28085, when the executor is lost, it forgets the shuffle state about just that executor and increments the shuffleFileLostEpoch. This incrementing precludes the clearing of state of the entire host when the fetch failure happens. I elected to only change this codepath for the special case of decommissioning, without any other side effects. This whole version keeping stuff is complex and it has effectively not been semantically changed since 2013! The fix here is also simple: Ignore the shuffleFileLostEpoch when the shuffle status is being cleared due to a fetch failure resulting from host decommission.

These two fixes are local to decommissioning only and don't change other behavior.

I also added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.
agrawaldevesh pushed a commit to agrawaldevesh/spark that referenced this pull request Aug 13, 2020
The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recent PR's (apache#28085 and apache#29211) neccessitate a small reworking of the decommissioning logic.

Before getting into that, let me describe the intended behavior of decommissioning:

If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job.

- Per apache#29211, the executors now eagerly exit on decommissioning and thus the executor is lost before the fetch failure even happens. (I tested this by waiting some seconds before triggering the fetch failure). When an executor is lost, we forget its decommissioning information. The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.

- Per apache#28085, when the executor is lost, it forgets the shuffle state about just that executor and increments the shuffleFileLostEpoch. This incrementing precludes the clearing of state of the entire host when the fetch failure happens. This PR elects to only change this codepath for the special case of decommissioning, without any other side effects. This whole version keeping stuff is complex and it has effectively not been semantically changed since 2013! The fix here is also simple: Ignore the shuffleFileLostEpoch when the shuffle status is being cleared due to a fetch failure resulting from host decommission.

These two fixes are local to decommissioning only and don't change other behavior.

Also added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.

Also hardened the test DecommissionWorkerSuite to make it wait for successful job completion.
agrawaldevesh pushed a commit to agrawaldevesh/spark that referenced this pull request Aug 14, 2020
The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recent PR's (apache#28085 and apache#29211) neccessitate a small reworking of the decommissioning logic.

Before getting into that, let me describe the intended behavior of decommissioning:

If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job.

- Per apache#29211, the executors now eagerly exit on decommissioning and thus the executor is lost before the fetch failure even happens. (I tested this by waiting some seconds before triggering the fetch failure). When an executor is lost, we forget its decommissioning information. The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.

- Per apache#28085, when the executor is lost, it forgets the shuffle state about just that executor and increments the shuffleFileLostEpoch. This incrementing precludes the clearing of state of the entire host when the fetch failure happens. This PR elects to only change this codepath for the special case of decommissioning, without any other side effects. This whole version keeping stuff is complex and it has effectively not been semantically changed since 2013! The fix here is also simple: Ignore the shuffleFileLostEpoch when the shuffle status is being cleared due to a fetch failure resulting from host decommission.

These two fixes are local to decommissioning only and don't change other behavior.

Also added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.

Also hardened the test DecommissionWorkerSuite to make it wait for successful job completion.
agrawaldevesh pushed a commit to agrawaldevesh/spark that referenced this pull request Aug 15, 2020
The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recent PR's (apache#28085 and apache#29211) neccessitate a small reworking of the decommissioning logic.

Before getting into that, let me describe the intended behavior of decommissioning:

If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job.

- Per apache#29211, the executors now eagerly exit on decommissioning and thus the executor is lost before the fetch failure even happens. (I tested this by waiting some seconds before triggering the fetch failure). When an executor is lost, we forget its decommissioning information. The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.

- Per apache#28085, when the executor is lost, it forgets the shuffle state about just that executor and increments the shuffleFileLostEpoch. This incrementing precludes the clearing of state of the entire host when the fetch failure happens. This PR elects to only change this codepath for the special case of decommissioning, without any other side effects. This whole version keeping stuff is complex and it has effectively not been semantically changed since 2013! The fix here is also simple: Ignore the shuffleFileLostEpoch when the shuffle status is being cleared due to a fetch failure resulting from host decommission.

These two fixes are local to decommissioning only and don't change other behavior.

Also added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.

Also hardened the test DecommissionWorkerSuite to make it wait for successful job completion.
agrawaldevesh pushed a commit to agrawaldevesh/spark that referenced this pull request Aug 17, 2020
The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recent PR's (apache#28085 and apache#29211) neccessitate a small reworking of the decommissioning logic.

Before getting into that, let me describe the intended behavior of decommissioning:

If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job.

- Per apache#29211, the executors now eagerly exit on decommissioning and thus the executor is lost before the fetch failure even happens. (I tested this by waiting some seconds before triggering the fetch failure). When an executor is lost, we forget its decommissioning information. The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.

- Per apache#28085, when the executor is lost, it forgets the shuffle state about just that executor and increments the shuffleFileLostEpoch. This incrementing precludes the clearing of state of the entire host when the fetch failure happens. This PR elects to only change this codepath for the special case of decommissioning, without any other side effects. This whole version keeping stuff is complex and it has effectively not been semantically changed since 2013! The fix here is also simple: Ignore the shuffleFileLostEpoch when the shuffle status is being cleared due to a fetch failure resulting from host decommission.

These two fixes are local to decommissioning only and don't change other behavior.

Also added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.

Also hardened the test DecommissionWorkerSuite to make it wait for successful job completion.
cloud-fan pushed a commit that referenced this pull request Aug 18, 2020
### What changes were proposed in this pull request?

The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recently closed #29211 necessitates remembering the decommissioning shortly beyond the removal of the executor.

In addition to fixing this issue, ensure that DecommissionWorkerSuite continues to pass when executors haven't had a chance to exit eagery. That is the old behavior before #29211 also still works.

Added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.

Hardened the test DecommissionWorkerSuite to make it wait for successful job completion.

### Why are the changes needed?

First, let me describe the intended behavior of decommissioning: If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job. This fetch failure can happen before the executor is truly marked "lost" because of heartbeat delays.

- However, #29211 eagerly exits the executors when they are done decommissioning. This removal of the executor was racing with the fetch failure. By the time the fetch failure is triggered the executor is already removed and thus has forgotten its decommissioning information. (I tested this by delaying the decommissioning). The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.

- In addition the executor loss can also bump up `shuffleFileLostEpoch` (added in #28848). This happens because when the executor is lost, it forgets the shuffle state about just that executor and increments the `shuffleFileLostEpoch`. This incrementing precludes the clearing of state of the entire host when the fetch failure happens because the failed task is still reusing the old epoch. The fix here is also simple: Ignore the `shuffleFileLostEpoch` when the shuffle status is being cleared due to a fetch failure resulting from host decommission.

I am strategically making both of these fixes be very local to decommissioning to avoid other regressions. Especially the version stuff is tricky (it hasn't been fundamentally changed since it was first introduced in 2013).

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually ran DecommissionWorkerSuite several times using a script and ensured it all passed.

### (Internal) Configs added
I added two configs, one of which is sort of meant for testing only:
- `spark.test.executor.decommission.initial.sleep.millis`: Initial delay by the decommissioner shutdown thread. Default is same as before of 1 second. This is used for testing only. This one is kept "hidden" (ie not added as a constant to avoid config bloat)
- `spark.executor.decommission.removed.infoCacheTTL`: Number of seconds to keep the removed executors decom entries around. It defaults to 5 minutes. It should be around the average time it takes for all of the shuffle data to be fetched from the mapper to the reducer, but I think that can take a while since the reducers also do a multistep sort.

Closes #29422 from agrawaldevesh/decom_fixes.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
holdenk added a commit to holdenk/spark that referenced this pull request Oct 27, 2020
[SPARK-21040][CORE] Speculate tasks which are running on decommission executors

This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.

Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.

Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.

Added UT.

Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`.

Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning.

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

This PR is a cleaned-up version of the previous WIP PR I made apache#28331 (thanks to attilapiros for his very helpful reviewing on it :)).

Closes apache#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up.

Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Co-authored-by: Attila Zsolt Piros <attilazsoltpiros@apiros-mbp16.lan>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s

Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.

For more relevant information see here: fabric8io/kubernetes-client#1075

No

Running spark-submit to a k8s cluster.

Not sure how to make an automated test for this. If someone can help me out that would be great.

Closes apache#28423 from stijndehaes/bugfix/k8s-submit-resource-version-change.

Authored-by: Stijn De Haes <stijndehaes@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32217] Plumb whether a worker would also be decommissioned along with executor

This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along
with the DecommissionExecutor message.

The primary motivation is to know whether a decommissioned executor
would also be loosing shuffle files -- and thus it is important to know
whether the host would also be decommissioned.

In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message.

In the future, this `ExecutorDecommissionInfo` can be embellished for
knowing how long the executor has to live for scenarios like Cloud spot
kills (or Yarn preemption) and the like.

No

Tweaked an existing unit test in `AppClientSuite`

Closes apache#29032 from agrawaldevesh/plumb_decom_info.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning

This PR reduces the prospect of a job loss during decommissioning. It
fixes two holes in the current decommissioning framework:

- (a) Loss of decommissioned executors is not treated as a job failure:
We know that the decommissioned executor would be dying soon, so its death is
clearly not caused by the application.

- (b) Shuffle files on the decommissioned host are cleared when the
first fetch failure is detected from a decommissioned host: This is a
bit tricky in terms of when to clear the shuffle state ? Ideally you
want to clear it the millisecond before the shuffle service on the node
dies (or the executor dies when there is no external shuffle service) --
too soon and it could lead to some wastage and too late would lead to
fetch failures.

  The approach here is to do this clearing when the very first fetch
failure is observed on the decommissioned block manager, without waiting for
other blocks to also signal a failure.

Without them decommissioning a lot of executors at a time leads to job failures.

The task scheduler tracks the executors that were decommissioned along with their
`ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure.

No

Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-)

- Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-).

Closes apache#29014 from agrawaldevesh/decom_harden.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite

This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.

Make the block manager decommissioning test be less flaky

An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
   the block manager won't be able to save the block. This condition is
   easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
   blocks will land there.
- No actual block migration happens here because the decommissioned
   executor technically failed before it could even produce a block.

To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.

The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.

I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.

No, unit test only change.

Github checks. Ran this test 100 times, 10 at a time in parallel in a script.

Closes apache#29226 from agrawaldevesh/block-manager-decom-flaky.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of apache#28817

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

Closes apache#29211 from holdenk/SPARK-31197-exit-execs-redone.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

Connect decommissioning to dynamic scaling

Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads

Make Spark's dynamic allocation use decommissioning

Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission

Fix up executor add for resource profile

Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits.

Verify executors decommissioned, then killed by external external cluster manager are re-launched

Verify some additional calls are not occuring in the executor allocation manager suite.

Dont' close the watcher until the end of the test

Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors

bump numparts up to 6

Revert "bump numparts up to 6"

This reverts commit daf96dd.

Small coment & visibility cleanup

CR feedback/cleanup

Cleanup the merge

[SPARK-21040][CORE] Speculate tasks which are running on decommission executors

This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.

Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.

Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.

Added UT.

Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`.

Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning.

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

This PR is a cleaned-up version of the previous WIP PR I made apache#28331 (thanks to attilapiros for his very helpful reviewing on it :)).

Closes apache#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up.

Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Co-authored-by: Attila Zsolt Piros <attilazsoltpiros@apiros-mbp16.lan>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s

Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.

For more relevant information see here: fabric8io/kubernetes-client#1075

No

Running spark-submit to a k8s cluster.

Not sure how to make an automated test for this. If someone can help me out that would be great.

Closes apache#28423 from stijndehaes/bugfix/k8s-submit-resource-version-change.

Authored-by: Stijn De Haes <stijndehaes@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32217] Plumb whether a worker would also be decommissioned along with executor

This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along
with the DecommissionExecutor message.

The primary motivation is to know whether a decommissioned executor
would also be loosing shuffle files -- and thus it is important to know
whether the host would also be decommissioned.

In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message.

In the future, this `ExecutorDecommissionInfo` can be embellished for
knowing how long the executor has to live for scenarios like Cloud spot
kills (or Yarn preemption) and the like.

No

Tweaked an existing unit test in `AppClientSuite`

Closes apache#29032 from agrawaldevesh/plumb_decom_info.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning

This PR reduces the prospect of a job loss during decommissioning. It
fixes two holes in the current decommissioning framework:

- (a) Loss of decommissioned executors is not treated as a job failure:
We know that the decommissioned executor would be dying soon, so its death is
clearly not caused by the application.

- (b) Shuffle files on the decommissioned host are cleared when the
first fetch failure is detected from a decommissioned host: This is a
bit tricky in terms of when to clear the shuffle state ? Ideally you
want to clear it the millisecond before the shuffle service on the node
dies (or the executor dies when there is no external shuffle service) --
too soon and it could lead to some wastage and too late would lead to
fetch failures.

  The approach here is to do this clearing when the very first fetch
failure is observed on the decommissioned block manager, without waiting for
other blocks to also signal a failure.

Without them decommissioning a lot of executors at a time leads to job failures.

The task scheduler tracks the executors that were decommissioned along with their
`ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure.

No

Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-)

- Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-).

Closes apache#29014 from agrawaldevesh/decom_harden.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite

This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.

Make the block manager decommissioning test be less flaky

An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
   the block manager won't be able to save the block. This condition is
   easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
   blocks will land there.
- No actual block migration happens here because the decommissioned
   executor technically failed before it could even produce a block.

To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.

The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.

I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.

No, unit test only change.

Github checks. Ran this test 100 times, 10 at a time in parallel in a script.

Closes apache#29226 from agrawaldevesh/block-manager-decom-flaky.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of apache#28817

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

Closes apache#29211 from holdenk/SPARK-31197-exit-execs-redone.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

Connect decommissioning to dynamic scaling

Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads

Make Spark's dynamic allocation use decommissioning

Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission

Fix up executor add for resource profile

Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits.

Verify executors decommissioned, then killed by external external cluster manager are re-launched

Verify some additional calls are not occuring in the executor allocation manager suite.

Dont' close the watcher until the end of the test

Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors

bump numparts up to 6

Revert "bump numparts up to 6"

This reverts commit daf96dd.

Small coment & visibility cleanup

CR feedback/cleanup

Fix up the merge

CR feedback, move adjustExecutors to a common utility function

Exclude some non-public APIs

Remove junk

More CR feedback

Fix adjustExecutors backport

This test fails for me locally and from what I recall it's because we use a different method of resolving the bind address than upstream so disabling the test

This test fails for me locally and from what I recall it's because we use a different method of resolving the bind address than upstream so disabling the test

Cleanup and drop watcher changes from the backport
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants