-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27065][CORE] avoid more than one active task set managers for a stage #23927
Conversation
This fixes https://issues.apache.org/jira/browse/SPARK-25250 as well, because zombie TSM won't launch new tasks. Update: |
// events for all the 10 partitions and thinks the stage is finished. If it's a shuffle stage | ||
// and somehow it has missing map outputs, then DAGScheduler will resubmit it and create a | ||
// TSM3 for it. As a stage can't have more than one active task set managers, we must mark | ||
// TSM2 as zombie (it actually is). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If TSM3 is created just after TSM2 finished partition 10, so, how does TSM3 know about the finished partition 10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR focuses on fixing the potential occurrence of java.lang.IllegalStateException: more than one active taskSet for stage
, which is described in https://issues.apache.org/jira/browse/SPARK-23433 .
https://issues.apache.org/jira/browse/SPARK-25250 remains unfixed and will be addressed in #22806 or #23871 .
Note that, SPARK-23433 can crush the cluster, even #22806 or #23871 can fix it as well, we need a simple fix and backport to 2.3/2.4.
SPARK-25250 is just a matter of wasting resource, we can keep the fix in master only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's make sense and the Update
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep it makes sense to fix the issue that this PR addresses alongwith the other PR's for SPARK-25250.
@@ -123,7 +123,7 @@ private[spark] class TaskSetManager( | |||
// state until all tasks have finished running; we keep TaskSetManagers that are in the zombie | |||
// state in order to continue to track and account for the running tasks. | |||
// TODO: We should kill any running task attempts when the task set manager becomes a zombie. | |||
private[scheduler] var isZombie = false | |||
@transient private[scheduler] var isZombie = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is transient because ... if it's true then that fact shouldn't survive serialization?
Just wondering if volatile was what was meant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previously, only the task completion/fail event can mutate it, which is in the task-result-getter thread.
Now, DAGScheduler can also mutate it via TaskSchedulerImpl.submitTasks
, which is in the dag-scheduler-event-loop thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
damn, mistyped it...
Test build #102913 has finished for PR 23927 at commit
|
Test build #102935 has finished for PR 23927 at commit
|
@@ -201,30 +201,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B | |||
// Even if one of the task sets has not-serializable tasks, the other task set should | |||
// still be processed without error | |||
taskScheduler.submitTasks(FakeTask.createTaskSet(1)) | |||
taskScheduler.submitTasks(taskSet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't have 2 active task set managers at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we shall just give it another stageId ?
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten | ||
assert(taskDescriptions.map(_.executorId) === Seq("executor0")) | ||
} | ||
|
||
test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this part of code is reverted in this PR, so remove the test as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine, but do we also want to add a test case to ensure the new behavior will not break ?
Test build #102965 has finished for PR 23927 at commit
|
retest this please |
Test build #102972 has finished for PR 23927 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes LGTM, maybe we shall add a test case?
@@ -201,30 +201,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B | |||
// Even if one of the task sets has not-serializable tasks, the other task set should | |||
// still be processed without error | |||
taskScheduler.submitTasks(FakeTask.createTaskSet(1)) | |||
taskScheduler.submitTasks(taskSet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we shall just give it another stageId ?
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten | ||
assert(taskDescriptions.map(_.executorId) === Seq("executor0")) | ||
} | ||
|
||
test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine, but do we also want to add a test case to ensure the new behavior will not break ?
Test build #103022 has finished for PR 23927 at commit
|
retest this please |
// Mark all the existing TaskSetManagers of this stage as zombie, as we are adding a new one. | ||
// This is necessary to handle a corner case. Let's say a stage has 10 partitions and has 2 | ||
// TaskSetManagers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 | ||
// and it completes. TSM2 finishes tasks for partition 1-19, and thinks he is still active |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: 1-9.
Test build #103039 has finished for PR 23927 at commit
|
Test build #103046 has finished for PR 23927 at commit
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is a necessary change (no idea why I didn't do this in the first place, would have been much cleaner).
Just some book-keeping stuff -- I think from the discussion its clear that SPARK-25250 is not just an optimization after this, its still a necessary chance, can you update the PR description?
Also can you file a separate jira for this? I think its helpful to make it clear what is fixed in each maintenance version of spark.
@@ -123,7 +123,7 @@ private[spark] class TaskSetManager( | |||
// state until all tasks have finished running; we keep TaskSetManagers that are in the zombie | |||
// state in order to continue to track and account for the running tasks. | |||
// TODO: We should kill any running task attempts when the task set manager becomes a zombie. | |||
private[scheduler] var isZombie = false | |||
@volatile private[scheduler] var isZombie = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessary. You still only touch isZombie
with a lock on the TaskSchedulerImpl (everything in the TSM requires a lock on the TaskSchedulerImpl). Even before, there could be multiple task-result-getter threads interacting w/ this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch!
thanks all of you for the review! I've updated the PR description. |
Test build #103060 has finished for PR 23927 at commit
|
Jenkins, retest this please |
Test build #103058 has finished for PR 23927 at commit
|
Test build #103069 has finished for PR 23927 at commit
|
lgtm |
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. #17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. #21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. #22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes #23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. #17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. #21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. #22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes #23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
merged to master, 2.4, & 2.3. Thanks! |
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
What changes were proposed in this pull request?
This is another attempt to fix the more-than-one-active-task-set-managers bug.
#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the code) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.
This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.
#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.
However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.
#22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.
This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.
After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix SPARK-25250. #22806 and #23871 are its followups to fix the hole.
How was this patch tested?
existing tests.