Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status #36564

Closed
wants to merge 53 commits into from

Conversation

AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented May 16, 2022

What changes were proposed in this pull request?

There is a case for current code may cause data correctness issue:

  1. One writing data task commit task success, but the container was killed after commit task, so this task failed, but the data file still remained under committedTaskPath .
  2. DAGScheduler call handleCompletedTask and then call outputCommitCoordinator.taskCompleted, outputCommitCoordinator will remove the lock of failed task's partition.
  3. Then failed task's rerun with an new attempt, the new attempt task call outputCommitCoordinator.canCommit() will return true since the lock of this partition had been removed, then it commit task success, also task final succeed.
  4. Two files remained under this job's attempt path for same partition.
  5. CommitJob commit both two committed task path's data.
  6. Finally data duplicated.

In this pr, we do below since:

  1. When commit task success, executor send an CommitOutputSuccess message to outputCommitCoordinator.
  2. When outputCommitCoordinator handle taskComplete, if task failed but commit success, means data duplicate will happen, we should failed to job.

Why are the changes needed?

Fix data duplicated issue.

Does this PR introduce any user-facing change?

No

How was this patch tested?

@github-actions github-actions bot added the CORE label May 16, 2022
@AngersZhuuuu AngersZhuuuu changed the title [WIP][SPARK-39195][SQL] Spark should use two step update of outputCommitCoordinator [SPARK-39195][SQL] Spark OutputCommitCoordinator should help keep file consistent with task status. May 24, 2022
@AngersZhuuuu
Copy link
Contributor Author

gentle ping @dongjoon-hyun @cloud-fan @HyukjinKwon @srowen Could you take a view of this data correctness issue?

case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) =>
throw new SparkException(s"Authorized committer (attemptNumber=$attemptNumber, " +
s"stage=$stage, partition=$partition) failed; but task commit success, " +
s"should fail the job")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems throw exception here won't stop the job, any suggestion.

@AngersZhuuuu
Copy link
Contributor Author

ping @cloud-fan Could you take a look?

@AngersZhuuuu
Copy link
Contributor Author

Yea

@mstebelev
Copy link

Hi @AngersZhuuuu. I came across problems with this changes after updating to spark 3.4
I write data to iceberg table with S3 backend and the data upload happens in dataWriter.commit() after coordinator.canCommit was called. So if uploading to S3 fails for some reason, the task fails and the partition data can't be uploaded anymore event in the task's retries, because the failed task remains to be the assigned commiter.
Looks like usually data writing is idempotent, because each partition is written into a separate file and you always can do it again in retrying task without data duplication.

@cloud-fan
Copy link
Contributor

@mstebelev
Copy link

🤔 so the option useCommitCoordinator is for appending to the same destination, like inserting rows to a remote database and we don't need it in iceberg?

@cloud-fan
Copy link
Contributor

the new data lake formats all use transaction logs, I don't think coordinator is needed anymore.

@huaxingao
Copy link
Contributor

@cloud-fan Thanks for pinging me. It appears that Iceberg doesn't override this useCommitCoordinator. I will take a look at this issue.

@aokolnychyi
Copy link
Contributor

@huaxingao @cloud-fan, could you confirm only a single WriterCommitMessage will be passed in case of speculative execution even without the commit coordinator? Based on what I see in V2TableWriteExec, if multiple task attempts succeed, the last one will win by replacing the commit message from the first one. Am I right?

@cloud-fan
Copy link
Contributor

@aokolnychyi In most cases, yes. However, we have BatchWrite#onDataWriterCommit, so implementations can decide how to deal with conflicting commit messages, maybe first-win.

@aokolnychyi
Copy link
Contributor

Thanks for confirming, @cloud-fan!

Comment on lines +161 to 164
sc.foreach(_.dagScheduler.stageFailed(stage, s"Authorized committer " +
s"(attemptNumber=$attemptNumber, stage=$stage, partition=$partition) failed; " +
s"but task commit success, data duplication may happen."))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I think this is not very clear or correct in the reason string. stageState.authorizedCommitters records a commit is allowed but it is not actually successful. So as you said the driver never knows if the task commit is successful or not. Maybe we should update this to reduce confusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. @AngersZhuuuu can you refine it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With #38980 seems we didn't need this patch anymore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, shall we create a PR to revert it then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, shall we create a PR to revert it then?

Double checked with @boneanxs , we should revert this, let me do this.

gengliangwang added a commit that referenced this pull request May 13, 2024
…of ParquetIOSuite

### What changes were proposed in this pull request?

A test from `ParquetIOSuite` is flaky: `SPARK-7837 Do not close output writer twice when commitTask() fails`

It turns out to be a race condition. The test injects error to the task committing step, and the job may fail in two ways:
1. The task got the driver's permission to commit the task, but the committing failed and thus the task failed. This will trigger a stage failure as it means possible data duplication, see #36564
2. In test we disable task retry, so `TaskSetManager` will abort the stage.

Both these two failures are done by sending an event to `DAGScheduler`, so the final job failure depends on which event gets processed first. This is not a big deal, but that test in `ParquetIOSuite` checks the error class. This PR fixes the flaky test by running the test case in a new test suite with output committer coordination disabled

### Why are the changes needed?

fix flaky test

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

GA test + manual test on lcoal

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #46562 from gengliangwang/fixParquetIO.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
@dongjoon-hyun
Copy link
Member

To @AngersZhuuuu , @viirya , @cloud-fan .
Since this PR is too old, shall we file a JIRA issue for that comment, #36564 (comment) ?

@viirya
Copy link
Member

viirya commented May 15, 2024

To @AngersZhuuuu , @viirya , @cloud-fan . Since this PR is too old, shall we file a JIRA issue for that comment, #36564 (comment) ?

Yea, I think so.

@viirya
Copy link
Member

viirya commented May 15, 2024

I created https://issues.apache.org/jira/browse/SPARK-48292

cloud-fan pushed a commit that referenced this pull request May 30, 2024
…inator should abort stage when committed file not consistent with task status

### What changes were proposed in this pull request?
Revert #36564 According to discuss #36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in #36564 's case, since before #38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After #38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

### Why are the changes needed?
No need anymore

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46696 from AngersZhuuuu/SPARK-48292.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
riyaverm-db pushed a commit to riyaverm-db/spark that referenced this pull request Jun 7, 2024
…inator should abort stage when committed file not consistent with task status

### What changes were proposed in this pull request?
Revert apache#36564 According to discuss apache#36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in apache#36564 's case, since before apache#38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After apache#38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

### Why are the changes needed?
No need anymore

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46696 from AngersZhuuuu/SPARK-48292.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit to dongjoon-hyun/spark that referenced this pull request Jul 1, 2024
…inator should abort stage when committed file not consistent with task status

Revert apache#36564 According to discuss apache#36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in apache#36564 's case, since before apache#38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After apache#38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

No need anymore

No

Existed UT

No

Closes apache#46696 from AngersZhuuuu/SPARK-48292.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit to dongjoon-hyun/spark that referenced this pull request Jul 1, 2024
…inator should abort stage when committed file not consistent with task status

Revert apache#36564 According to discuss apache#36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in apache#36564 's case, since before apache#38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After apache#38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

No need anymore

No

Existed UT

No

Closes apache#46696 from AngersZhuuuu/SPARK-48292.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit that referenced this pull request Jul 1, 2024
…Coordinator should abort stage when committed file not consistent with task status

This is a backport of #46696

### What changes were proposed in this pull request?
Revert #36564 According to discuss #36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in #36564 's case, since before #38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After #38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

### Why are the changes needed?
No need anymore

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #47166 from dongjoon-hyun/SPARK-48292.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
dongjoon-hyun pushed a commit that referenced this pull request Jul 1, 2024
…Coordinator should abort stage when committed file not consistent with task status

This is a backport of #46696

### What changes were proposed in this pull request?
Revert #36564 According to discuss #36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in #36564 's case, since before #38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After #38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

### Why are the changes needed?
No need anymore

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #47168 from dongjoon-hyun/SPARK-48292-3.4.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@steveloughran
Copy link
Contributor

Task commits MUST be atomic and a second attempt MUST be able to supercede the first.

Hadoop FileOutputCommitter v1 (on hdfs, localfs, abfs, not gcs) uses atomic rename for this, deleting task completed data path first. A retry will delete that dest path and rename their own work directory to it.

S3A committers PUT a manifest to the task path; relies on PUT being atomic.
Mapreduce Manifest committer does write of manifest and then rename; this is atomic on GCS so safe there (dir rename is non atomic)

The application MUST NOT constrain which of two attempts told to commit succeeds, only that the second one MUST report success.

Why so? Because if task attempt TA1 stops responding after being told to commit, then TA2 will be told to commit, and reports success. But if TA1 is somehow suspended and performs its atomic commit after TA2, then it will be the TA1 manifest which is processed.

If you are encountering problems with jobs where task failures are unrecoverable, that means there is something wrong with the task commit algorithm.

What were you seeing it with?

@dongjoon-hyun
Copy link
Member

To @steveloughran , for the better context, you had better see and post on #46696 instead of here.

BTW, for me, I can say that I don't see any issue with both SPARK-39195 and its revert (SPARK-48292) with S3 environment.

@akki
Copy link

akki commented Aug 2, 2024

Hi all

I am facing this issue after upgrading to Spark3.5.1 and wonder if this change is a root cause for it. Can anyone here confirm?

Many of our jobs are failing with "Authorized committer" errors and we might have to revert our whole system back to Spark3.3, which would be a lot of work. So I am trying to understand the root issue a bit more before I have to go back reverting my whole Spark+Hadoop system.

Thanks!

szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Aug 7, 2024
…Coordinator should abort stage when committed file not consistent with task status

This is a backport of apache#46696

### What changes were proposed in this pull request?
Revert apache#36564 According to discuss apache#36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in apache#36564 's case, since before apache#38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After apache#38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

### Why are the changes needed?
No need anymore

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47168 from dongjoon-hyun/SPARK-48292-3.4.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants