Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27474][CORE] avoid retrying a task failed with CommitDeniedException many times #24375

Closed
wants to merge 3 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Apr 15, 2019

What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with CommitDeniedException gets retried many times.

This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with CommitDeniedException. CommitDeniedException.countTowardsTaskFailures is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot.

#21131 firstly implements that a previous successful completed task from zombie TaskSetManager could mark the task of the same partition completed in the active TaskSetManager. Later #23871 improves the implementation to cover a corner case that, an active TaskSetManager hasn't been created when a previous task succeed.

However, #23871 has a bug and was reverted in #24359. With hindsight, #23781 is fragile because we need to sync the states between DAGScheduler and TaskScheduler, about which partitions are completed.

This PR proposes a new fix:

  1. When DAGScheduler gets a task success event from an earlier attempt, notify the TaskSchedulerImpl about it
  2. When TaskSchedulerImpl knows a partition is already completed, ask the active TaskSetManager to mark the corresponding task as finished, if the task is not finished yet.

This fix covers the corner case, because:

  1. If DAGScheduler gets the task completion event from zombie TSM before submitting the new stage attempt, then DAGScheduler knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See DAGScheduler.submitMissingTasks
  2. If DAGScheduler gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created.

Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable.

How was this patch tested?

a new test case.

@cloud-fan
Copy link
Contributor Author

cc @Ngone51 @pgandhi999 @squito @jiangxb1987 @zsxwing

I think this is a more robust fix.

*/
private[scheduler] def markPartitionCompletedInAllTaskSets(
private[scheduler] def markPartitionCompleted(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this method need to acquire synchronization on TaskSchedulerImpl object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it doesn't. It's called in the thread task-result-getter

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong, but if I follow the call chain correctly, you are calling the method markPartitionCompleted from enqueuePartitionCompletionNotification, which is called from notifyPartitionCompletion, and which gets invoked inside the DAG Scheduler event loop thread. Am I missing something here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see https://github.com/apache/spark/pull/24375/files#diff-c69e9db8cf7981fd484d4665181695a1R159

enqueuePartitionCompletionNotification calls markPartitionCompleted within getTaskResultExecutor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see it now, thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskResultGetter is a multithreaded pool (default 4) so I think you still need extra protection here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch! I should do synchronization here.

Copy link

@pgandhi999 pgandhi999 Apr 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fyi, after adding the synchronized statement, this PR is almost similar to PR #22806 before it got restructured. I could have simply added synchronized to markPartitionCompletedInAllTaskSets() instead of restructuring the PR but did not do so because of the following suggestion by @squito which I agree with.

I'm not sure how to fix this. You could make TaskSchedulerImpl.markPartitionCompletedInAllTaskSets() synchronized, but then you're getting a lock on the taskSchedulerImpl in the DAGScheduler event loop. That's not good for scheduling throughput, and also want to make sure there is no change of deadlock.

So I would like to once again ask the same question as above: is the scheduling throughput getting impacted by adding the extra synchronization? Also is there a possibility of hitting any deadlock due to the synchronized block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks for pointing it out! Yes it's very similar, except that this PR sends message to the task result getter thread pool first, to avoid locking in the DAGScheduler event loop thread.

I don't think we will hit a deadlock. The task result getter thread pool calls TaskSchedulerImpl.handleSuccessfulTask, which is synchronized too. And handleSuccessfulTask calls method of TaskSetManager, which is the same as the newly added handlePartitionCompleted.

@SparkQA
Copy link

SparkQA commented Apr 15, 2019

Test build #104587 has finished for PR 24375 at commit 9a7b053.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered making the set of completed partitions a structure that is safe to access from multiple threads, eg. an AtomicBitSet? the more I think about it, the more that seems like the best solution to deal with how important that set is to multiple threads.

*/
private[scheduler] def markPartitionCompletedInAllTaskSets(
private[scheduler] def markPartitionCompleted(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskResultGetter is a multithreaded pool (default 4) so I think you still need extra protection here

partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
if (speculationEnabled && !isZombie) {
successfulTaskDurations.insert(taskInfo.duration)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm undecided about this part of the change in general (I see an argument for doing it both ways) but I'd still prefer it is not in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then how about we use 0 as the task duration here? There is no taskInfo so I have to update this part.

@SparkQA
Copy link

SparkQA commented Apr 15, 2019

Test build #104598 has finished for PR 24375 at commit 9a7b053.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution just looks like a variant of #21131 to me. And for "best-effort", I think #21131 does better than this fix, since #21131 does not even go through message event loop.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Apr 16, 2019

Yes it's kind of a variant of #21131. This PR covers the corner case that, an active TaskSetManager hasn't been created when a previous task succeed.

There is a corner case that is un-fixable: when a task from a zombie TSM completes, and before we notify the active TSM about it, the active TSM has already submitted and completed the task for the same partition. #21131 doesn't cover it, and this PR doesn't cover it either. But this PR does make it more likely to happen, because we go through the event loop, so the active TSM takes longer to know that a partition has completed.

Maybe we can combine the solution? e.g. when a task from zombie TSM completes, notify the active TSM immediately. Later the DAGScheduler notify the active TSM again, in case it was not created when the task from zombie TSM completed.

@Ngone51
Copy link
Member

Ngone51 commented Apr 16, 2019

The issue cannot be 100% eliminated. Let's say task set 1.0 (zombie) has a running task for a partition, and task set 1.1 (active) has already submitted the task for the same partition and completed. Then there is nothing we can do.

Though we have nothing to do with finding 1, but what's wrong with it ?

task in TSM 1.1(active) submitted and completed successfully, and TSM 1.0(zombie) has a running task for the same partition. So, task in TSM 1.0 (zombie) would fail later, but it do not have any impact on TSM 1.1(active), and TSM 1.0/1.1 would both finished normally.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Apr 16, 2019

I think we are discussing the optimization(saving resource) instead of bug? Nothing will go wrong even without #21131

UPDATE:
For normal tasks, they can all complete even if they belong to the same partition. So it's just a matter of saving resource by avoiding submitting tasks whose corresponding partitions are already marked as completed.

For tasks that write to file sources, which need to commit to the central coordinator, only one task can complete for one partition. In this case, if a task from zombie TSM completes first, then the corresponding task in the active TSM will fail and get re-tried, and fail again, until the stage finishes(all partitions complete). The job doesn't fail, but the resource is wasted a lot.

If the task from the active TSM completes first, then the corresponding task from the zombie TSM will fail. This is totally fine, as zombie TSM does not retry tasks.

That said, this PR tries to avoid the worst case described above. Even if we go through the event loop now, I don't think it will take a very long time that the active TSM is already finished.

@cloud-fan cloud-fan changed the title [SPARK-25250][CORE] try best to not submit tasks when the partitions are already completed [SPARK-27474][CORE] try best to not submit tasks when the partitions are already completed Apr 16, 2019
@SparkQA
Copy link

SparkQA commented Apr 16, 2019

Test build #104611 has finished for PR 24375 at commit eb427f7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 16, 2019

Test build #104620 has finished for PR 24375 at commit eb427f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member

Ngone51 commented Apr 16, 2019

Yes it's kind of a variant of #21131. This PR covers the corner case that, an active TaskSetManager hasn't been created when a previous task succeed.

Does this pr really cover the corner case ? Do I miss something ?

That said, this PR tries to avoid the worst case described above. Even if we go through the event loop now, I don't think it will take a very long time that the task from the active TSM have already re-tried 3 times.

Yeah, I think this pr could avoid the worst case event if we go through the event loop. But, doesn't #21131 could do it either ? What's the advantage compares to #21131 ?

Maybe we can combine the solution? e.g. when a task from zombie TSM completes, notify the active TSM immediately. Later the DAGScheduler notify the active TSM again, in case it was not created when the task from zombie TSM completed.

I'm neutral on combining the solution since we're doing optimization(saving resource) instead of bug. Though, I'm wodering whether it would do a good help.

@cloud-fan
Copy link
Contributor Author

But, doesn't #21131 could do it either ? What's the advantage compares to #21131 ?

#21131 did not cover the corner case, that's why we open #23871 . Do I miss something? Maybe you were asking why I prefer this PR over #23871 . The reason is, we don't need to sync the states between DAGScheduler and TaskSchedulerImpl, which is fragile to me.

@cloud-fan cloud-fan changed the title [SPARK-27474][CORE] try best to not submit tasks when the partitions are already completed [SPARK-27474][CORE] avoid retrying a task failed with CommitDeniedException many times Apr 16, 2019
@cloud-fan
Copy link
Contributor Author

I've updated the PR description, to make it clearer about what the problem is and why propose this fix.

@Ngone51
Copy link
Member

Ngone51 commented Apr 16, 2019

No, I'm asking "does this pr really covers the corner case that an active TaskSetManager hasn't been created when a previous task succeed." If it does, how ? Because I didn't see it from the code.

@cloud-fan
Copy link
Contributor Author

I've added it to the PR description. let me quote it here

This fix covers the corner case, because:

  1. If DAGScheduler gets the task completion event from zombie TSM before submitting the new stage attempt, then DAGScheduler knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See DAGScheduler.submitMissingTasks
  2. If DAGScheduler gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created.

@Ngone51
Copy link
Member

Ngone51 commented Apr 16, 2019

Oh, yes, I get to know how this pr could fix the corner case and work finally. It's really a good solution. And now, I would suggest not combine #21131.

@pgandhi999
Copy link

@cloud-fan I tested the PR a couple of times, tests look good. LGTM.

@Ngone51
Copy link
Member

Ngone51 commented Apr 17, 2019

LGTM

@jiangxb1987
Copy link
Contributor

So currently there are three places that tracks the partition status:

  1. The MapStatusTracker keeps track of all shuffle partitions, it get updated by DAGScheduler.handleTaskCompletion(), and on FetchFailed it remove the corresponding shuffle partitions;
  2. The ShuffleMapStatus keeps track of pendingPartitions itself, it's actually a fork of MapStatusTracker, and get cleared on stage submitted;
  3. The TaskSetManager keeps track of pending and running tasks, when the number of successful tasks reaches the target number then it mark the TSM (eg. a stage attempt) as successful.

It shall be ideal that we keep track of the pending partitions of each stage in a data structure and update it in a synchronous way. The major problem here is that the DAGScheduler rely on MapStatusTracker to read the shuffle partitions statuses, which is updated asynchronously.

If we don't want to make major change to current infrastructure, the best approach I can think of is to just let DAGScheduler make all the final decision whether a stage has been completed, and all TSM shall update their own status according to that. The only shortcoming here is just there is a time window that some TSM(s) has finished all the tasks but the MapStatusTracker is not yet updated, in this case we shall see unnecessary tasks still running. To further avoid this case we can implement another approach that Wenchen suggested -- To have a status cache for TSM, when a task from zombie TSM completes, notify the active TSM immediately.

@jiangxb1987
Copy link
Contributor

The PR looks good from my side. The only concern I have is whether we shall backport this to 2.4? I know the previous PRs are backported, but my feeling is, strictly speaking the PR is a improvement instead of a bugfix, and it's making major change to a relatively critical and fragile part of Spark. Considering there has been one instance that a PR has caused regression of 2.4.1, personally I would suggest we be more conservative and only include this in master branch. Anyway it's just my two cents, feel free to correct me @squito @cloud-fan

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some minor comments but overall I think this is fine.

As for what branch to put it in ... I would have liked to get this into 2.4 (I do think this is a bug fix). But I'm also pretty embarrassed about the prior regression, and it probably makes more sense to tread carefully here and only put this in master.

// finished. Here we notify the task scheduler to skip running tasks for the same partition,
// to save resource.
if (task.stageAttemptId < stage.latestInfo.attemptNumber()) {
taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could pass in event.taskInfo here, or perhaps even jsut event.taskInfo.duration

partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
if (speculationEnabled && !isZombie) {
successfulTaskDurations.insert(taskInfo.duration)
// The task is skipped, its duration should be 0.
successfulTaskDurations.insert(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I wasn't clear before -- I don't think you should insert 0, that would really mess up the speculation calculation. I suggested above to pass in the duration. But if there is some reason I'm missing for why that isn't possible, then its OK to go back to the way you originally had it, by entirely skipping adding to successfulTaskDurations.

@@ -155,6 +155,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
}
}

def enqueuePartitionCompletionNotification(stageId: Int, partitionId: Int): Unit = {
getTaskResultExecutor.execute(() => Utils.logUncaughtExceptions {
scheduler.handlePartitionCompleted(stageId, partitionId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we have to get a lock on the TaskSchedulerImpl anyway, is there any advantage to this indirection here? Just avoiding aquiring the TaskSchedulerImpl lock inside the DAGScheduler event loop? If so, that is worth a comment here

@SparkQA
Copy link

SparkQA commented Apr 19, 2019

Test build #104735 has finished for PR 24375 at commit a58ac94.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm assuming tests pass

for (id <- Set(0, 1)) {
taskSetManager1.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
assert(sched.endedTasks(id) === Success)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think now that you're using the duration across stage attempts for speculation, you don't need to change this test anymore -- but I also don't feel strongly that we really need to keep the old test either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case creates 2 TSM, then complete the task of one TSM, and expect it to mark the corresponding task of the other TSM to completed.

This is not true anymore, as we need to go through DAGScheduler to let it happen, but this test suite uses a fake DAGScheduler. To simplify the code, I just create one TSM, and then call markPartitionCompleted directly.

@SparkQA
Copy link

SparkQA commented Apr 19, 2019

Test build #104738 has finished for PR 24375 at commit b148c6f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

cc @dbtsai

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Apr 22, 2019

Test build #104789 has finished for PR 24375 at commit b148c6f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 29, 2019

Test build #104980 has finished for PR 24375 at commit b148c6f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

thanks, merging to master!

@cloud-fan cloud-fan closed this in 05b85eb Apr 29, 2019
asfgit pushed a commit that referenced this pull request May 7, 2019
…culative tasks

## What changes were proposed in this pull request?

This is a followup of #24375

When `TaskSetManager` skips a task because its corresponding partition is already completed by other `TaskSetManager`s, we should not consider the duration of the task that is finished by other `TaskSetManager`s to schedule the speculative tasks of this `TaskSetManager`.

## How was this patch tested?

updated test case

Closes #24485 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…eption many times

Ref: LIHADOOP-53705

https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with `CommitDeniedException` gets retried many times.

This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with `CommitDeniedException`. `CommitDeniedException.countTowardsTaskFailures` is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot.

However, apache#23871 has a bug and was reverted in apache#24359. With hindsight, apache#23781 is fragile because we need to sync the states between `DAGScheduler` and `TaskScheduler`, about which partitions are completed.

This PR proposes a new fix:
1. When `DAGScheduler` gets a task success event from an earlier attempt, notify the `TaskSchedulerImpl` about it
2. When `TaskSchedulerImpl` knows a partition is already completed, ask the active `TaskSetManager` to mark the corresponding task as finished, if the task is not finished yet.

This fix covers the corner case, because:
1. If `DAGScheduler` gets the task completion event from zombie TSM before submitting the new stage attempt, then `DAGScheduler` knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See `DAGScheduler.submitMissingTasks`
2. If `DAGScheduler` gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created.

Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable.

a new test case.

Closes apache#24375 from cloud-fan/fix2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

RB=2113301
BUG=LIHADOOP-53705
G=spark-reviewers
R=chsingh
A=chsingh
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants