-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32003][CORE][3.0] When external shuffle service is used, unregister outputs for executor on fetch failure after executor is lost #29193
Conversation
… outputs for executor on fetch failure after executor is lost If an executor is lost, the `DAGScheduler` handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files. In such a case, when fetches from the executor's outputs fail in the same stage, the `DAGScheduler` again removes the executor and by right, should unregister its outputs. It doesn't because the epoch used to track the executor failure has not increased. We track the epoch for failed executors that result in lost file output separately, so we can unregister the outputs in this scenario. The idea to track a second epoch is due to Attila Zsolt Piros. Without the changes, the loss of a node could require two stage attempts to recover instead of one. No. New unit test. This test fails without the change and passes with it.
This is a backport of #28848 to branch-3.0. The backport of DAGScheduler.scala is straightforward, with a minor diff conflict in a comment. The backport of DAGSchedulerSuite.scala needed some minor adjustments, it is between the version in master and the version in branch-2.4. |
Test build #126354 has finished for PR 29193 at commit
|
retest this please |
Test build #126370 has finished for PR 29193 at commit
|
retest this please |
Test build #126425 has finished for PR 29193 at commit
|
org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.Fallback Parquet V2 to V1 failed in https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126425; however, earlier, it passed in https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126354/. It must be flaky. |
retest this please |
@dongjoon-hyun are you aware of any CI issues currently? I think an issue with PySpark pip packaging tests was fixed recently: SPARK-32303. I saw the same symptom again in build #126354 above. I also hit the bad .m2 problem in build #126370. |
Test build #126434 has finished for PR 29193 at commit
|
retest this please |
Test build #126454 has finished for PR 29193 at commit
|
The failure in test run 126434, "BarrierTaskContextSuite.global sync by barrier() call" was supposedly fixed here: SPARK-31730 |
Unfortunately, we don't have |
Urgh, BarrierTaskContextSuite seems flaky (failed in two builds now, but passed in two earlier builds that ran into other issues), although the failing test is not the same each time. And this time, ran into Kafka failures as well. |
retest this please |
Ya. I agree that those tests are really flaky. However, without passing them in Scala/Java, we cannot reach PySpark/SparkR UT stages. |
Test build #126507 has finished for PR 29193 at commit
|
retest this please |
Test build #126515 has finished for PR 29193 at commit
|
Finally, this has passed the tests! |
Test build #5056 has finished for PR 29193 at commit
|
Test build #5057 has finished for PR 29193 at commit
|
retest this please |
Test build #127010 has finished for PR 29193 at commit
|
…ister outputs for executor on fetch failure after executor is lost ### What changes were proposed in this pull request? If an executor is lost, the `DAGScheduler` handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files. In such a case, when fetches from the executor's outputs fail in the same stage, the `DAGScheduler` again removes the executor and by right, should unregister its outputs. It doesn't because the epoch used to track the executor failure has not increased. We track the epoch for failed executors that result in lost file output separately, so we can unregister the outputs in this scenario. The idea to track a second epoch is due to Attila Zsolt Piros. ### Why are the changes needed? Without the changes, the loss of a node could require two stage attempts to recover instead of one. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. This test fails without the change and passes with it. Closes #29193 from wypoon/SPARK-32003-3.0. Authored-by: Wing Yew Poon <wypoon@cloudera.com> Signed-off-by: Imran Rashid <irashid@cloudera.com>
merged, thanks @wypoon |
What changes were proposed in this pull request?
If an executor is lost, the
DAGScheduler
handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files.In such a case, when fetches from the executor's outputs fail in the same stage, the
DAGScheduler
again removes the executor and by right, should unregister its outputs. It doesn't because the epoch used to track the executor failure has not increased.We track the epoch for failed executors that result in lost file output separately, so we can unregister the outputs in this scenario. The idea to track a second epoch is due to Attila Zsolt Piros.
Why are the changes needed?
Without the changes, the loss of a node could require two stage attempts to recover instead of one.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New unit test. This test fails without the change and passes with it.