-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32613][CORE] Fix regressions in DecommissionWorkerSuite #29422
Conversation
4f9fd71
to
aaacf30
Compare
Test build #127396 has finished for PR 29422 at commit
|
626e285
to
16d0bc8
Compare
Test build #127400 has finished for PR 29422 at commit
|
Test build #127401 has finished for PR 29422 at commit
|
b890443
to
6334f80
Compare
6334f80
to
c051532
Compare
Test build #127422 has finished for PR 29422 at commit
|
Test build #127424 has finished for PR 29422 at commit
|
Test build #127428 has finished for PR 29422 at commit
|
Thank you for taking the time to resolve this and make such a clear writeup of the root cause. From an in-production not-in-test question: if the executor exits we also want to eagerly clean up everything and resubmit right? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this! I've got some questions mostly around TreeMap usage versus a higher level structure that handles GC for us.
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
Outdated
Show resolved
Hide resolved
val decomInfo = executorsPendingDecommission.get(executorId) | ||
if (decomInfo.isDefined) { | ||
val rememberSeconds = | ||
conf.getInt("spark.decommissioningRememberAfterRemoval.seconds", 60) | ||
val gcSecond = TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis()) + rememberSeconds | ||
decommissioningExecutorsToGc.computeIfAbsent(gcSecond, _ => mutable.ArrayBuffer.empty) += | ||
executorId | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like repeated logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, no. the removal code only shares the piece about TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis())
, it does not share the rest of the code.
core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
Show resolved
Hide resolved
Yes for sure. That will happen on its own. I haven't really changed that behavior. I have only changed the way fetch failures are handled (stemming from a decommissioned host). And the way they lead to a rerun is that When an executor exits, it will normally clean up just its shuffle data (it does not know that its peer executors on the same host will soon be dying as well). Its the incrementing of the |
c051532
to
343bf8b
Compare
@holdenk ... thanks for all the feedback and suggestions. I have incorporated them all and the PR is ready for your review again. |
Test build #127479 has finished for PR 29422 at commit
|
PySpark failure is likely unrelated. I'm going to go ahead and said LGTM but let's give it until Monday to see if anyone else wants to review. |
Jenkins retest this please |
Test build #127484 has finished for PR 29422 at commit
|
Sounds good. Thanks @holdenk ! |
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
Outdated
Show resolved
Hide resolved
9a4cce6
to
e542aa1
Compare
The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recent PR's (apache#28085 and apache#29211) neccessitate a small reworking of the decommissioning logic. Before getting into that, let me describe the intended behavior of decommissioning: If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job. - Per apache#29211, the executors now eagerly exit on decommissioning and thus the executor is lost before the fetch failure even happens. (I tested this by waiting some seconds before triggering the fetch failure). When an executor is lost, we forget its decommissioning information. The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout. - Per apache#28085, when the executor is lost, it forgets the shuffle state about just that executor and increments the shuffleFileLostEpoch. This incrementing precludes the clearing of state of the entire host when the fetch failure happens. This PR elects to only change this codepath for the special case of decommissioning, without any other side effects. This whole version keeping stuff is complex and it has effectively not been semantically changed since 2013! The fix here is also simple: Ignore the shuffleFileLostEpoch when the shuffle status is being cleared due to a fetch failure resulting from host decommission. These two fixes are local to decommissioning only and don't change other behavior. Also added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout. Also hardened the test DecommissionWorkerSuite to make it wait for successful job completion.
e542aa1
to
df128e5
Compare
Test build #127513 has finished for PR 29422 at commit
|
Test build #127515 has finished for PR 29422 at commit
|
Test build #127516 has finished for PR 29422 at commit
|
@cloud-fan, incorporated your feedback. Thanks ! |
thanks, merging to master! |
…kSchedulerImpl realm ### What changes were proposed in this pull request? The decommissioning state is a bit fragment across two places in the TaskSchedulerImpl: #29014 stored the incoming decommission info messages in TaskSchedulerImpl.executorsPendingDecommission. While #28619 was storing just the executor end time in the map TaskSetManager.tidToExecutorKillTimeMapping (which in turn is contained in TaskSchedulerImpl). While the two states are not really overlapping, it's a bit of a code hygiene concern to save this state in two places. With #29422, TaskSchedulerImpl is emerging as the place where all decommissioning book keeping is kept within the driver. So consolidate the information in _tidToExecutorKillTimeMapping_ into _executorsPendingDecommission_. However, in order to do so, we need to walk away from keeping the raw ExecutorDecommissionInfo messages and instead keep another class ExecutorDecommissionState. This decoupling will allow the RPC message class ExecutorDecommissionInfo to evolve independently from the book keeping ExecutorDecommissionState. ### Why are the changes needed? This is just a code cleanup. These two features were added independently and its time to consolidate their state for good hygiene. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #29452 from agrawaldevesh/consolidate_decom_state. Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com>
…s in TaskSchedulerImpl ### What changes were proposed in this pull request? The motivation of this PR is to avoid caching the removed decommissioned executors in `TaskSchedulerImpl`. The cache is introduced in #29422. The cache will hold the `isHostDecommissioned` info for a while. So if the task `FetchFailure` event comes after the executor loss event, `DAGScheduler` can still get the `isHostDecommissioned` from the cache and unregister the host shuffle map status when the host is decommissioned too. This PR tries to achieve the same goal without the cache. Instead of saving the `workerLost` in `ExecutorUpdated` / `ExecutorDecommissionInfo` / `ExecutorDecommissionState`, we could save the `hostOpt` directly. When the host is decommissioned or lost too, the `hostOpt` can be a specific host address. Otherwise, it's `None` to indicate that only the executor is decommissioned or lost. Now that we have the host info, we can also unregister the host shuffle map status when `executorLost` is triggered for the decommissioned executor. Besides, this PR also includes a few cleanups around the touched code. ### Why are the changes needed? It helps to unregister the shuffle map status earlier for both decommission and normal executor lost cases. It also saves memory in `TaskSchedulerImpl` and simplifies the code a little bit. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR only refactor the code. The original behaviour should be covered by `DecommissionWorkerSuite`. Closes #29579 from Ngone51/impr-decom. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
testFetchFailures(3600 * 1000) | ||
} | ||
|
||
test("decommission eager workers ensure that fetch failures lead to rerun") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick note: this test seems flaky:
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 9 did not equal 6 Expected 6 tasks but got List(0:0:0:0-SUCCESS, 0:0:1:0-SUCCESS, 1:0:0:0-FAILED, 0:1:0:0-SUCCESS, 0:1:1:0-SUCCESS, 1:1:0:0-FAILED, 0:2:0:0-SUCCESS, 0:2:1:0-SUCCESS, 1:2:0:0-SUCCESS)
at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
at org.apache.spark.deploy.DecommissionWorkerSuite.testFetchFailures(DecommissionWorkerSuite.scala:267)
at org.apache.spark.deploy.DecommissionWorkerSuite.$anonfun$new$14(DecommissionWorkerSuite.scala:275)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:176)
at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:61)
at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:61)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
I'll file a JIRA if I see more often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon Do you have a link for the test job? I want to take a look at it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I retriggered the job ... I will let you know when I see one more time ..
What changes were proposed in this pull request?
The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recently closed #29211 necessitates remembering the decommissioning shortly beyond the removal of the executor.
In addition to fixing this issue, ensure that DecommissionWorkerSuite continues to pass when executors haven't had a chance to exit eagery. That is the old behavior before #29211 also still works.
Added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.
Hardened the test DecommissionWorkerSuite to make it wait for successful job completion.
Why are the changes needed?
First, let me describe the intended behavior of decommissioning: If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job. This fetch failure can happen before the executor is truly marked "lost" because of heartbeat delays.
However, [SPARK-31197][CORE] Shutdown executor once we are done decommissioning #29211 eagerly exits the executors when they are done decommissioning. This removal of the executor was racing with the fetch failure. By the time the fetch failure is triggered the executor is already removed and thus has forgotten its decommissioning information. (I tested this by delaying the decommissioning). The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.
In addition the executor loss can also bump up
shuffleFileLostEpoch
(added in [SPARK-32003][CORE] When external shuffle service is used, unregister outputs for executor on fetch failure after executor is lost #28848). This happens because when the executor is lost, it forgets the shuffle state about just that executor and increments theshuffleFileLostEpoch
. This incrementing precludes the clearing of state of the entire host when the fetch failure happens because the failed task is still reusing the old epoch. The fix here is also simple: Ignore theshuffleFileLostEpoch
when the shuffle status is being cleared due to a fetch failure resulting from host decommission.I am strategically making both of these fixes be very local to decommissioning to avoid other regressions. Especially the version stuff is tricky (it hasn't been fundamentally changed since it was first introduced in 2013).
Does this PR introduce any user-facing change?
No
How was this patch tested?
Manually ran DecommissionWorkerSuite several times using a script and ensured it all passed.
(Internal) Configs added
I added two configs, one of which is sort of meant for testing only:
spark.test.executor.decommission.initial.sleep.millis
: Initial delay by the decommissioner shutdown thread. Default is same as before of 1 second. This is used for testing only. This one is kept "hidden" (ie not added as a constant to avoid config bloat)spark.executor.decommission.removed.infoCacheTTL
: Number of seconds to keep the removed executors decom entries around. It defaults to 5 minutes. It should be around the average time it takes for all of the shuffle data to be fetched from the mapper to the reducer, but I think that can take a while since the reducers also do a multistep sort.