-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25250][CORE] : Late zombie task completions handled correctly even before new taskset launched #22806
Conversation
…tion id, kill other running task attempts on that same partition The fix that this PR addresses is as follows: Whenever any Result Task gets successfully completed, we simply mark the corresponding partition id as completed in all attempts for that particular stage. As a result, we do not see any Killed tasks due to TaskCommitDenied Exceptions showing up in the UI. Also, since, the method defined uses hash maps and arrays for efficient searching and processing, so as a result, it's time complexity is not related to the number of tasks, hence, it is also efficient.
Test build #97922 has finished for PR 22806 at commit
|
…RK-25250 [SPARK-25250] : Upmerging with master to fix unit tests
Test build #97939 has finished for PR 22806 at commit
|
@@ -1091,6 +1091,10 @@ private[spark] class TaskSetManager( | |||
def executorAdded() { | |||
recomputeLocality() | |||
} | |||
|
|||
def markPartitionIdAsCompletedForTaskAttempt(index: Int): Unit = { | |||
successful(index) = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this method also make a call to TaskSetManager.maybeFinishTaskSet()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, have made the necessary changes. Thank you.
cc: @jiangxb1987 @cloud-fan @srowen |
…RK-25250 [SPARK-25250]: Upmerging with master branch
Test build #100508 has finished for PR 22806 at commit
|
Multiline comment indentation
Thank you @sujithjay for your comment, have updated the PR. |
partitionId: Int, stageId: Int): Unit = { | ||
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => | ||
val index: Option[Int] = tsm.partitionToIndex.get(partitionId) | ||
if (!index.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it's more usual to match on the Option and use case Some(...)
, or use foreach. It avoids a couple lines of code here.
Same with the getOrElse above, I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, have updated the code.
val taskInfoList = tsm.taskAttempts(index.get) | ||
taskInfoList.foreach { taskInfo => | ||
if (taskInfo.running) { | ||
killTaskAttempt(taskInfo.taskId, false, "Corresponding Partition Id " + partitionId + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Id -> ID and use string interpolation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (!index.isEmpty) { | ||
tsm.markPartitionIdAsCompletedForTaskAttempt(index.get) | ||
val taskInfoList = tsm.taskAttempts(index.get) | ||
taskInfoList.foreach { taskInfo => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider taskInfoList.filter(_.running).foreach
or for (taskInfo <- taskInfoList if taskInfo.running)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* result, we do not see any Killed tasks due to TaskCommitDenied Exceptions showing up | ||
* in the UI. | ||
*/ | ||
override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this logic overlap with killAllTaskAttempts
? should it reuse that logic? I understand it does something a little different, and I don't know this code well, but seems like there are related but separate implementations of something similar here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understand the code, killAllTaskAttempts
kills all the running tasks for a particular stage whereas markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts
kills all running tasks for all stages and attempts working on a particular partition that has already been marked as completed by one of the previously running tasks for that corresponding partition. So the logic is different for both the cases, but we can modify the code to have one fixed method for performing both these tasks. Let me know what you think!
@@ -39,6 +40,14 @@ class FakeSchedulerBackend extends SchedulerBackend { | |||
def reviveOffers() {} | |||
def defaultParallelism(): Int = 1 | |||
def maxNumConcurrentTasks(): Int = 0 | |||
val killedTaskIds: HashSet[Long] = new HashSet[Long]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: why not a Scala mutable Set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -1319,4 +1328,26 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B | |||
tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, TaskKilled("test")) | |||
assert(tsm.isZombie) | |||
} | |||
test("SPARK-25250 On successful completion of a task attempt on a partition id, kill other" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: blank line above, and please use spaces in the body of the test to break this up into more readable chunks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Test build #100509 has finished for PR 22806 at commit
|
Can you explain the code path that does it? This part is a little convoluted... |
I think we can expand #21131's behavior here. Instead of killing other running task attempts actively, updating a stage's completed partition once it is submitted to |
@cloud-fan I do not fully understand your question but this is what happens according to the current behaviour: |
@Ngone51 I see your point, and yes the key difference is that the PR #21131 marks partition as completed across all task attempts for the same stage attempt whereas this PR does that for all task attempts across all the stage attempts for the corresponding stage. We can extend the same behaviour here, but as far as killing running task attempts is concerned, that saves a lot of resources for long running tasks which might be consuming resources even though they have become redundant. I could be wrong here as well so let me know your thoughts. Thank you. |
when task 5005 finishes, stage 4.1 will mark partition 9000 as completed, assuming we are talking about the latest code base, with #21131 merged. Did I miss something? |
@cloud-fan yes, ideally, that should happen. However, this problem occurs only when a task from previous stage attempt just finishes before new attempt for the same stage gets created as described in the Jira, so when the code in PR #21131 executes, the tasksets for stage 4.1 have been submitted but not yet created, so |
Test build #100644 has finished for PR 22806 at commit
|
I see. Another question: I know the |
@Ngone51 That is indeed a good question. I have not seen the error before for a ShuffleMapStage, but let me try to reproduce it if I can. Thank you. |
ooh this is a very good point -- really this is a bug in my change from #21131. It seems to me that really the original change should be removed, and you should do this instead -- but only kill tasks if its in a result stage. |
@squito Thank you for your suggestion, I have one question though. This was brought up by @Ngone51. I have seen this bug coming up in case of FetchFailure for a ResultStage, however, I was not able to reproduce the same for ShuffleMapStage. Could it be that this issue might also be affecting ShuffleMapStage? Should I also add the same fix on task completion for a ShuffleMapStage? WDYT? |
well, you wouldn't see the exact same problem with a ShuffleMapStage, but you could still have the same problem in general. Certainly you wouldn't see TaskCommitDenied, since thats only with result stages. I think the problem you've mentioned could lead to multiple active task sets as described in SPARK-23433; it requires a task to get completed from the zombie taskset, but before the DAGScheduler has launched a new taskset. How are you trying to test this, that you aren't able to reproduce? Also I'd change the PR description to focus on that key point, something like: [SPARK-25250][CORE] Late zombie taskcompletions handled correctly even before new taskset launched SPARK-23433 tried to ensure that late task completions from a zombie taskset were properly updated in all tasksets for the stage (zombie or not). However, because it did this outside of the DAGScheduler event loop, the DAGScheduler could launch another taskset for the stage at the same time, before it had updated the set of tasks left to run. The new active taskset would never learn about the old completion from the zombie taskset. This could lead to multiple active tasksets (as described in SPARK-23433) or resultstage failure from repeated TaskCommitDenied exceptions for the task which the final stage attemps to re-run. This change fixes it by moving that logic into the DAGScheduler event loop. As an optimization, it also kills tasks in all task sets, only if its a result stage (an extension of SPARK-25773). |
also cc @jiangxb1987 @markhamstra |
IMHO, the updated PR is far way from my proposal mentioned above. And considering this pr's discussion may be too long for other people to follow, I'd like to post another pr separately. |
I would like to disagree with you on the above @Ngone51 , the PR is an implementation of your proposal, but with small changes.
In your proposal above, you have mentioned that we can look the map I have basically come up with a solution to ensure that once the partition completes, other tasks running on the same partition fail once and then do not get rescheduled as it is happening currently. The problem with the old change was that we were not using a lock while calling |
Test this please |
@@ -1383,6 +1383,7 @@ private[spark] class DAGScheduler( | |||
|
|||
event.reason match { | |||
case Success => | |||
taskScheduler.markPartitionCompletedFromEventLoop(task.partitionId, task.stageId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pgandhi999 I don't follow your explanation -- I think I agree w/ @Ngone51 . yes, TSM calls DAGs.taskEnded
while it has a lock on the TaskSchedulerImpl, but the DAGs.taskEnded
call just puts an event into the queue and then returns. So this part is happening w/ out the lock. but maybe you meant something else?
@@ -920,6 +923,9 @@ private[spark] class TaskSetManager( | |||
s" be re-executed (either because the task failed with a shuffle data fetch failure," + | |||
s" so the previous stage needs to be re-run, or because a different copy of the task" + | |||
s" has already succeeded).") | |||
} else if (sched.stageIdToFinishedPartitions.get(stageId).exists( | |||
partitions => partitions.contains(tasks(index).partitionId))) { | |||
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stageIdToFinishedPartitions
is getting updated in the dag scheduler event loop, but here you're querying it outside of the event loop, which is definitely not safe.
(I'm also not really convinced this would solve the problem, but I'm afraid I have to spend a bit more time paging it all back in ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is correct, stageIdToFinishedPartitions
is getting updated in the event loop and is being queried outside the event loop within the TaskSchedulerImpl lock. However, I did not get as to why is it not safe as the only thing that can happen is that while querying, TSM does not find the partition to be present in the HashSet while it is getting updated, but it will definitely catch this in the next check. I could be wrong though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hashmaps are totally unsafe to be used for multiple threads -- its not just getting inconsistent values, its that the hashmap may be in some undefined state b/c of rehashing. see eg. http://javabypatel.blogspot.com/2016/01/infinite-loop-in-hashmap.html (I just skimmed this but I think it has the right idea).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, in that case, what if I turn stageIdToFinishedPartitions
to a ConcurrentHashMap? That should take care of the safety feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that will take care of the access-during-rehash, but I'm still not sure the values of this are safe. Here you're querying it from a task-result-getter thread, but you're also updating it from the dag scheduler event loop, with no lock protecting access.
The other proposal is easier to reason about, because it keeps this structure protected by a lock on TaskSchedulerImpl.
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for #22806 . #21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue. This pr extends #21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes #23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <ngone_5451@163.com> Co-authored-by: Ngone51 <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for #22806 . #21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue. This pr extends #21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes #23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <ngone_5451@163.com> Co-authored-by: Ngone51 <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com>
mind closing this now @pgandhi999 ? |
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. #17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. #21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. #22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes #23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. #17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. #21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. #22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes #23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. #17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. #21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. #22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes #23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com>
Sure. Thank you @squito . |
…ould learn about the finished partitions ## What changes were proposed in this pull request? This is an optional solution for #22806 . #21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue. This pr extends #21131 's behavior by adding stageIdToFinishedPartitions into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into stageIdToFinishedPartitions and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes #24007 from Ngone51/dev-23433-25250-branch-2.3. Authored-by: wuyi <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com>
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for apache#22806 . apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue. This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes apache#23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <ngone_5451@163.com> Co-authored-by: Ngone51 <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for apache#22806 . apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue. This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes apache#23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <ngone_5451@163.com> Co-authored-by: Ngone51 <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for apache#22806 . apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue. This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes apache#23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <ngone_5451@163.com> Co-authored-by: Ngone51 <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
We recently had a scenario where a race condition occurred when a task from previous stage attempt just finished before new attempt for the same stage was created due to fetch failure, so the new task created in the second attempt on the same partition id was retrying multiple times due to TaskCommitDenied Exception without realizing that the task in earlier attempt was already successful.
For example, consider a task with partition id 9000 and index 9000 running in stage 4.0. We see a fetch failure so thus, we spawn a new stage attempt 4.1. Just within this timespan, the above task completes successfully, thus, marking the partition id 9000 as complete for 4.0. However, as stage 4.1 has not yet been created, the taskset info for that stage is not available to the TaskScheduler so, naturally, the partition id 9000 has not been marked completed for 4.1. Stage 4.1 now spawns task with index 2000 on the same partition id 9000. This task fails due to CommitDeniedException and since, it does not see the corresponding partition id as been marked successful, it keeps retrying multiple times until the job finally succeeds. It doesn't cause any job failures because the DAG scheduler is tracking the partitions separate from the task set managers.
What changes were proposed in this pull request?
SPARK-23433 tried to ensure that late task completions from a zombie taskset were properly updated in all tasksets for the stage (zombie or not). However, because it did this outside of the DAGScheduler event loop, the DAGScheduler could launch another taskset for the stage at the same time, before it had updated the set of tasks left to run. The new active taskset would never learn about the old completion from the zombie taskset. This could lead to multiple active tasksets (as described in SPARK-23433) or resultstage failure from repeated TaskCommitDenied exceptions for the task which the final stage attemps to re-run.
This change fixes it by duplicating that logic into the DAGScheduler event loop. So now, on completion of a task, we maintain a map of completed partitions to stage id's which we update from the DAGScheduler event loop. When any task fails in TSM, the TSM checks whether the corresponding partition is already complete by looking in the map and based on that, marks the corresponding partition as complete.
How was this patch tested?
The screenshot for the bug is attached below:
In the above screenshot, you can see that for the partition id 17352, one of the task attempts in the previous stage succeeded so, for the current stage attempt, we get a TaskCommitDenied Exception each time and instead of killing the task, Spark keeps on retrying multiple times till the application exits.