Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46957][CORE] Decommission migrated shuffle files should be able to cleanup from executor #47037

Closed
wants to merge 3 commits into from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Jun 20, 2024

What changes were proposed in this pull request?

This PR uses SortShuffleManager#taskIdMapsForShuffle to track the migrated shuffle files on the destination executor.

Why are the changes needed?

This is a long-standing bug in decommission where the migrated shuffle files can't be cleaned up from the executor. Normally, the shuffle files are tracked by taskIdMapsForShuffle during the map task execution. Upon receiving the RemoveShuffle(shuffleId) request from driver, executor can clean up those shuffle files by searching taskIdMapsForShuffle. However, for the migrated shuffle files by decommission, they lose the track in the destination executor's taskIdMapsForShuffle and can't be deleted as a result.

Note this bug only affects shuffle removal on the executor. For shuffle removal on the external shuffle service (when spark.shuffle.service.removeShuffle enabled and the executor stores the shuffle files has gone), we don't rely on taskIdMapsForShuffle but using the specific shuffle block id to locate the shuffle file directly. So it won't be an issue there.

Does this PR introduce any user-facing change?

No. (Common users won't see the difference underlying.)

How was this patch tested?

Add unit test.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the CORE label Jun 20, 2024
@Ngone51
Copy link
Member Author

Ngone51 commented Jun 20, 2024

@dongjoon-hyun @holdenk @mridulm @jiangxb1987 @bozhang2820 Could you help take a look? Thanks!

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 20, 2024

@dongjoon-hyun Do you think it is the same issue for FallbackStorage?

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 27, 2024

@attilapiros @LuciferYang @warrenzhu25 Anyone else want to take a look at this fix?

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 27, 2024

I'm going to merge this PR by the end of day if there is no objection.

Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM
Thanks @Ngone51

@Ngone51 Ngone51 closed this in b5a55e4 Jun 27, 2024
Ngone51 added a commit that referenced this pull request Jun 27, 2024
…e to cleanup from executor

### What changes were proposed in this pull request?

This PR uses `SortShuffleManager#taskIdMapsForShuffle` to track the migrated shuffle files on the destination executor.

### Why are the changes needed?

This is a long-standing bug in decommission where the migrated shuffle files can't be cleaned up from the executor. Normally, the shuffle files are tracked by `taskIdMapsForShuffle` during the map task execution. Upon receiving the `RemoveShuffle(shuffleId)` request from driver, executor can clean up those shuffle files by searching `taskIdMapsForShuffle`. However, for the migrated shuffle files by decommission, they lose the track in the destination executor's  `taskIdMapsForShuffle` and can't be deleted as a result.

Note this bug only affects shuffle removal on the executor. For shuffle removal on the external shuffle service (when `spark.shuffle.service.removeShuffle` enabled and the executor stores the shuffle files has gone), we don't rely on `taskIdMapsForShuffle` but using the specific shuffle block id to locate the shuffle file directly. So it won't be an issue there.

### Does this PR introduce _any_ user-facing change?

No. (Common users won't see the difference underlying.)

### How was this patch tested?

Add unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47037 from Ngone51/SPARK-46957.

Authored-by: Yi Wu <yi.wu@databricks.com>
Signed-off-by: Yi Wu <yi.wu@databricks.com>
(cherry picked from commit b5a55e4)
Signed-off-by: Yi Wu <yi.wu@databricks.com>
Ngone51 added a commit that referenced this pull request Jun 27, 2024
…e to cleanup from executor

### What changes were proposed in this pull request?

This PR uses `SortShuffleManager#taskIdMapsForShuffle` to track the migrated shuffle files on the destination executor.

### Why are the changes needed?

This is a long-standing bug in decommission where the migrated shuffle files can't be cleaned up from the executor. Normally, the shuffle files are tracked by `taskIdMapsForShuffle` during the map task execution. Upon receiving the `RemoveShuffle(shuffleId)` request from driver, executor can clean up those shuffle files by searching `taskIdMapsForShuffle`. However, for the migrated shuffle files by decommission, they lose the track in the destination executor's  `taskIdMapsForShuffle` and can't be deleted as a result.

Note this bug only affects shuffle removal on the executor. For shuffle removal on the external shuffle service (when `spark.shuffle.service.removeShuffle` enabled and the executor stores the shuffle files has gone), we don't rely on `taskIdMapsForShuffle` but using the specific shuffle block id to locate the shuffle file directly. So it won't be an issue there.

### Does this PR introduce _any_ user-facing change?

No. (Common users won't see the difference underlying.)

### How was this patch tested?

Add unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47037 from Ngone51/SPARK-46957.

Authored-by: Yi Wu <yi.wu@databricks.com>
Signed-off-by: Yi Wu <yi.wu@databricks.com>
(cherry picked from commit b5a55e4)
Signed-off-by: Yi Wu <yi.wu@databricks.com>
@Ngone51
Copy link
Member Author

Ngone51 commented Jun 27, 2024

Thanks, merged to master/3.5/3.4!

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 27, 2024

There are compliaton failures against Java 8 in branch-3.5/3.4. I'll send a followup fix soon.

Ngone51 added a commit to Ngone51/spark that referenced this pull request Jun 27, 2024
…e to cleanup from executor

### What changes were proposed in this pull request?

This PR uses `SortShuffleManager#taskIdMapsForShuffle` to track the migrated shuffle files on the destination executor.

### Why are the changes needed?

This is a long-standing bug in decommission where the migrated shuffle files can't be cleaned up from the executor. Normally, the shuffle files are tracked by `taskIdMapsForShuffle` during the map task execution. Upon receiving the `RemoveShuffle(shuffleId)` request from driver, executor can clean up those shuffle files by searching `taskIdMapsForShuffle`. However, for the migrated shuffle files by decommission, they lose the track in the destination executor's  `taskIdMapsForShuffle` and can't be deleted as a result.

Note this bug only affects shuffle removal on the executor. For shuffle removal on the external shuffle service (when `spark.shuffle.service.removeShuffle` enabled and the executor stores the shuffle files has gone), we don't rely on `taskIdMapsForShuffle` but using the specific shuffle block id to locate the shuffle file directly. So it won't be an issue there.

### Does this PR introduce _any_ user-facing change?

No. (Common users won't see the difference underlying.)

### How was this patch tested?

Add unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47037 from Ngone51/SPARK-46957.

Authored-by: Yi Wu <yi.wu@databricks.com>
Signed-off-by: Yi Wu <yi.wu@databricks.com>
@Ngone51
Copy link
Member Author

Ngone51 commented Jun 27, 2024

FYI, created a separate backport PR along with the compliation error fix: #47122

attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…e to cleanup from executor

### What changes were proposed in this pull request?

This PR uses `SortShuffleManager#taskIdMapsForShuffle` to track the migrated shuffle files on the destination executor.

### Why are the changes needed?

This is a long-standing bug in decommission where the migrated shuffle files can't be cleaned up from the executor. Normally, the shuffle files are tracked by `taskIdMapsForShuffle` during the map task execution. Upon receiving the `RemoveShuffle(shuffleId)` request from driver, executor can clean up those shuffle files by searching `taskIdMapsForShuffle`. However, for the migrated shuffle files by decommission, they lose the track in the destination executor's  `taskIdMapsForShuffle` and can't be deleted as a result.

Note this bug only affects shuffle removal on the executor. For shuffle removal on the external shuffle service (when `spark.shuffle.service.removeShuffle` enabled and the executor stores the shuffle files has gone), we don't rely on `taskIdMapsForShuffle` but using the specific shuffle block id to locate the shuffle file directly. So it won't be an issue there.

### Does this PR introduce _any_ user-facing change?

No. (Common users won't see the difference underlying.)

### How was this patch tested?

Add unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47037 from Ngone51/SPARK-46957.

Authored-by: Yi Wu <yi.wu@databricks.com>
Signed-off-by: Yi Wu <yi.wu@databricks.com>
.get

val newShuffleFiles = shuffleFiles.diff(existingShuffleFiles)
assert(newShuffleFiles.size >= shuffleBlockUpdates.size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this seems to introduce a flakiness.

[info] - SPARK-46957: Migrated shuffle files should be able to cleanup from executor *** FAILED *** (36 seconds, 137 milliseconds)
[info]   0 was not greater than or equal to 6 (BlockManagerDecommissionIntegrationSuite.scala:423)

Copy link
Contributor

@attilapiros attilapiros Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It must be a race condition on JDK21 but I did not managed to reproduce it locally. Neither on arm64 nor on a x86_64 (using docker).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see a jira already exists for this: https://issues.apache.org/jira/browse/SPARK-49297
I am working on a possible fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants