Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48260][SQL] Disable output committer coordination in one test of ParquetIOSuite #46560

Closed
wants to merge 2 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented May 13, 2024

What changes were proposed in this pull request?

Recently I noticed a test from ParquetIOSuite being flaky: SPARK-7837 Do not close output writer twice when commitTask() fails

It turns out to be a race condition. The test injects error to the task committing step, and the job may fail in two ways:

  1. The task got the driver's permission to commit the task, but the committing failed and thus the task failed. This will trigger a stage failure as it means possible data duplication, see [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status #36564
  2. In test we disable task retry, so TaskSetManager will abort the stage.

Both these two failures are done by sending an event to DAGScheduler, so the final job failure depends on which event gets processed first. This is not a big deal, but that test in ParquetIOSuite checks the error class. This PR fixes the flaky test by disabling output committer coordination. Necessary changes are added to allow the disablement per query.

Why are the changes needed?

fix flaky test

Does this PR introduce any user-facing change?

no

How was this patch tested?

N/A

Was this patch authored or co-authored using generative AI tooling?

No

@cloud-fan
Copy link
Contributor Author

cc @gengliangwang @viirya

@viirya
Copy link
Member

viirya commented May 13, 2024

The task got the driver's permission to commit the task, but the committing failed and thus the task failed. This will trigger a stage failure as it means possible data duplication, see #36564

I read #36564, it seems to handle the case that a commit is successful but the task is failed which means data duplication is possible.

But here the task and the commit are both failed. It should not cause data duplication. Does it also trigger a stage failure?

@cloud-fan
Copy link
Contributor Author

@viirya I think #36564 's PR description was very clear about the details. Driver never knows if the task commit is successful or not. It only knows: 1) if the "ask for commit" request is approved or not. 2) if the task completes successfully or not.

The flaky test triggers it: 1) it injects failure to commitTask, which happens after the task receives driver's permission to commit. 2) the task fails eventually.

@viirya
Copy link
Member

viirya commented May 13, 2024

In this pr, we do below since:
When commit task success, executor send an CommitOutputSuccess message to outputCommitCoordinator.
When outputCommitCoordinator handle taskComplete, if task failed but commit success, means data duplicate will happen, we should failed to job.

@cloud-fan Thanks for explaining it. It sounds reasonable. Although from what I read from the above description in #36564, it seems when outputCommitCoordinator handles taskComplete, it will check if the task is failed but the commit is successful, then decide if data duplication is happened or not.

@viirya
Copy link
Member

viirya commented May 13, 2024

I got more clear picture after reading the change in #36564 and the code of OutputCommitCoordinator. The PR description of #36564 is not actually accurate and can cause confusion if not read the actual code in OutputCommitCoordinator.

See https://github.com/apache/spark/pull/36564/files#r1598660630

@viirya
Copy link
Member

viirya commented May 13, 2024

The task got the driver's permission to commit the task, but the committing failed and thus the task failed. This will trigger a stage failure as it means possible data duplication

This is more correct. The OutputCommitCoordinator only knows that a commit is authorized (the task got the driver's permission). But in #36565, it is not distinct it from a commit success. It'd be better to update the reason string in OutputCommitCoordinator, although not related to this PR.

@@ -276,8 +276,14 @@ class HadoopMapReduceCommitProtocol(
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
val attemptId = taskContext.getTaskAttemptID
logTrace(s"Commit task ${attemptId}")
val disableCommitCoordination =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val disableCommitCoordination =
val disableCommitCoordinationInTest =

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also let's add a comment about why we need to disable it in test

@@ -42,7 +42,8 @@ object SparkHadoopMapRedUtil extends Logging {
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
jobId: Int,
splitId: Int): Unit = {
splitId: Int,
disableCommitCoordination: Boolean): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we rename as disableCommitCoordinationInTest as well?

@gengliangwang
Copy link
Member

@cloud-fan I created #46562 for this, which avoids changing the production code to fix the flaky test.

@cloud-fan
Copy link
Contributor Author

closing in favor of #46562

@cloud-fan cloud-fan closed this May 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants