Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25250][CORE] : Late zombie task completions handled correctly even before new taskset launched #22806

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5ad6efd
[SPARK-25250] : On successful completion of a task attempt on a parti…
Oct 23, 2018
8667c28
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Oct 23, 2018
a73f619
[SPARK-25250] : Calling maybeFinishTaskSet() from method and adding c…
Dec 28, 2018
5509165
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Dec 28, 2018
ee5bc68
[SPARK-25250] : Fixing scalastyle tests
Dec 28, 2018
7677aec
[SPARK-25250] : Addressing Reviews January 2, 2019
Jan 2, 2019
67e1644
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 8, 2019
f395b65
[SPARK-25250] : Addressing Reviews January 8, 2019
Jan 8, 2019
f7102ca
[SPARK-25250] : Addressing Reviews January 9, 2019
Jan 9, 2019
6709fe1
[SPARK-25250] : Addressing Reviews January 10, 2019
Jan 10, 2019
5234e87
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 10, 2019
9efbc58
[SPARK-25250] : Addressing Reviews January 15, 2019
Jan 15, 2019
6abd52c
[SPARK-25250] : Addressing Reviews January 15, 2019 - 2
Jan 15, 2019
89373af
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 16, 2019
231c51b
[SPARK-25250] : Addressing Reviews January 16, 2019
Jan 16, 2019
fcfe9f5
[SPARK-25250] : Addressing Reviews January 18, 2019
Jan 18, 2019
7ce6f10
[SPARK-25250] : Adding unit test
Jan 22, 2019
0610939
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 22, 2019
929fbf9
[SPARK-25250] : Addressing Reviews January 24, 2019
Jan 24, 2019
393f901
[SPARK-25250] : Fixing Unit Tests
Jan 25, 2019
52e832a
[SPARK-25250] : Addressing Reviews January 30, 2019
Jan 30, 2019
afbac96
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 30, 2019
024ec53
[SPARK-25250] : Fixing Scalastyle Checks
Jan 30, 2019
d2b7044
[SPARK-25250] : Addressing Minor Reviews January 30, 2019
Jan 30, 2019
d6ac4a9
[SPARK-25250] : Addressing Reviews January 31, 2019
Jan 31, 2019
e9b363b
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Feb 15, 2019
b55dbb0
[SPARK-25250] : Restructuring PR and trying out a different solution
Feb 18, 2019
551f412
[SPARK-25250] : Fixing indentation
Feb 18, 2019
28017ed
[SPARK-25250] : Addressing Reviews February 19, 2019
Feb 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,7 @@ private[spark] class DAGScheduler(

event.reason match {
case Success =>
taskScheduler.markPartitionCompletedFromEventLoop(task.partitionId, task.stageId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call from thread dag-scheduler-event-loop do not acquire a lock on TaskScheduler, so I think race condition still exists, since handleFailedTask is called from thread task-result-getter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method handleFailedTask is called from TaskResultGetter to TaskSchedulerImpl which executes the call to TaskSetManager in synchronized block. So, my code in TaskSetManager runs within the TaskSchedulerImpl lock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pgandhi999 I don't follow your explanation -- I think I agree w/ @Ngone51 . yes, TSM calls DAGs.taskEnded while it has a lock on the TaskSchedulerImpl, but the DAGs.taskEnded call just puts an event into the queue and then returns. So this part is happening w/ out the lock. but maybe you meant something else?

task match {
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,16 @@ private[spark] trait TaskScheduler {
*/
def applicationAttemptId(): Option[String]

/**
* SPARK-25250: Marks the task has completed in all TaskSetManagers for the given stage.
* After stage failure and retry, there may be multiple TaskSetManagers for the stage. If an
* earlier attempt of a stage completes a task, we should ensure that the later attempts do not
* also submit those same tasks. That also means that a task completion from an earlier attempt
* can lead to the entire stage getting marked as successful. Whenever any Task gets
* successfully completed, we simply mark the corresponding partition id as completed in all
* attempts for that particular stage. This method must be called from inside the DAGScheduler
* event loop, to ensure a consistent view of all task sets for the given stage.
*/
def markPartitionCompletedFromEventLoop(partitionId: Int, stageId: Int): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ private[spark] class TaskSchedulerImpl(

private[scheduler] var barrierCoordinator: RpcEndpoint = null

private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, HashSet[Int]]

private def maybeInitBarrierCoordinator(): Unit = {
if (barrierCoordinator == null) {
barrierCoordinator = new BarrierCoordinator(barrierSyncTimeout, sc.listenerBus,
Expand Down Expand Up @@ -287,6 +289,10 @@ private[spark] class TaskSchedulerImpl(
}
}

override def markPartitionCompletedFromEventLoop(partitionId: Int, stageId: Int): Unit = {
stageIdToFinishedPartitions.getOrElseUpdate(stageId, new HashSet[Int]).add(partitionId)
}

/**
* Called to indicate that all task attempts (including speculated tasks) associated with the
* given TaskSetManager have completed, so state associated with the TaskSetManager should be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,6 @@ private[spark] class TaskSetManager(
if (tasksSuccessful == numTasks) {
isZombie = true
}
maybeFinishTaskSet()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why you remove this ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we were going to merge this PR, the redundant call to maybeFinishTaskSet() was told to be removed in an earlier comment in this PR.

}
}
}
Expand Down Expand Up @@ -924,6 +923,9 @@ private[spark] class TaskSetManager(
s" be re-executed (either because the task failed with a shuffle data fetch failure," +
s" so the previous stage needs to be re-run, or because a different copy of the task" +
s" has already succeeded).")
} else if (sched.stageIdToFinishedPartitions.getOrElse(
stageId, new HashSet[Int]).contains(tasks(index).partitionId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may create empty hash set unnecessarily. how about
sched.stageIdToFinishedPartitions.get(stageId).exists(partitions => partitions.contains(tasks(index).partitionId))

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, have changed it.

sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stageIdToFinishedPartitions is getting updated in the dag scheduler event loop, but here you're querying it outside of the event loop, which is definitely not safe.

(I'm also not really convinced this would solve the problem, but I'm afraid I have to spend a bit more time paging it all back in ...)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is correct, stageIdToFinishedPartitions is getting updated in the event loop and is being queried outside the event loop within the TaskSchedulerImpl lock. However, I did not get as to why is it not safe as the only thing that can happen is that while querying, TSM does not find the partition to be present in the HashSet while it is getting updated, but it will definitely catch this in the next check. I could be wrong though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hashmaps are totally unsafe to be used for multiple threads -- its not just getting inconsistent values, its that the hashmap may be in some undefined state b/c of rehashing. see eg. http://javabypatel.blogspot.com/2016/01/infinite-loop-in-hashmap.html (I just skimmed this but I think it has the right idea).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, in that case, what if I turn stageIdToFinishedPartitions to a ConcurrentHashMap? That should take care of the safety feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that will take care of the access-during-rehash, but I'm still not sure the values of this are safe. Here you're querying it from a task-result-getter thread, but you're also updating it from the dag scheduler event loop, with no lock protecting access.

The other proposal is easier to reason about, because it keeps this structure protected by a lock on TaskSchedulerImpl.

} else {
addPendingTask(index)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
/** Stages for which the DAGScheduler has called TaskScheduler.cancelTasks(). */
val cancelledStages = new HashSet[Int]()

val completedPartitions = new HashMap[Int, HashSet[Int]]()

val taskScheduler = new TaskScheduler() {
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
Expand Down Expand Up @@ -160,6 +162,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
// Since, the method completeTasks in TaskSchedulerImpl.scala marks the partition complete
// for all stage attempts in the particular stage id, it does not need any info about
// stageAttemptId. Hence, completed partition id's are stored only for stage id's to mock
// the method implementation here.
override def markPartitionCompletedFromEventLoop(partitionId: Int, stageId: Int): Unit = {
val partitionIds = completedPartitions.getOrElseUpdate(stageId, new HashSet[Int])
partitionIds.add(partitionId)
}
}

/** Length of time to wait while draining listener events. */
Expand Down Expand Up @@ -248,6 +258,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
cancelledStages.clear()
cacheLocations.clear()
results.clear()
completedPartitions.clear()
securityMgr = new SecurityManager(conf)
broadcastManager = new BroadcastManager(true, conf, securityMgr)
mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) {
Expand Down Expand Up @@ -667,6 +678,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
override def markPartitionCompletedFromEventLoop(partitionId: Int, stageId: Int): Unit = {}
}
val noKillScheduler = new DAGScheduler(
sc,
Expand Down Expand Up @@ -2849,6 +2861,49 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}
}

// This test is kind of similar and goes alongwith "Completions in zombie tasksets update
// status of non-zombie taskset" in TaskSchedulerImplSuite.scala.
test("SPARK-25250: Late zombie task completions handled correctly even before" +
" new taskset launched") {
val shuffleMapRdd = new MyRDD(sc, 4, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(4))
val reduceRdd = new MyRDD(sc, 4, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0, 1, 2, 3))

completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = 4)

// Fail Stage 1 Attempt 0 with Fetch Failure
runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, 0, "ignored"),
null))

// this will trigger a resubmission of stage 0, since we've lost some of its
// map output, for the next iteration through the loop
scheduler.resubmitFailedStages()
completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = 4)

// tasksets 1 & 3 should be two different attempts for our reduce stage -- lets
// double-check test setup
val reduceStage = taskSets(1).stageId
assert(taskSets(3).stageId === reduceStage)

// complete one task from the original taskset, make sure we update the taskSchedulerImpl
// so it can notify all taskSetManagers. Some of that is mocked here, just check there
// is the right event.
val taskToComplete = taskSets(1).tasks(3)

runEvent(makeCompletionEvent(taskToComplete, Success, Nil, Nil))
assert(completedPartitions.getOrElse(reduceStage, Set()) === Set(taskToComplete.partitionId))

// this will mark partition id 1 of stage 1 attempt 0 as complete. So we expect the status
// of that partition id to be reflected for stage 1 attempt 1 as well.
runEvent(makeCompletionEvent(
taskSets(1).tasks(1), Success, Nil, Nil))
assert(completedPartitions(reduceStage) === Set(
taskSets(3).tasks(1).partitionId, taskSets(3).tasks(3).partitionId))
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,5 @@ private class DummyTaskScheduler extends TaskScheduler {
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorMetrics: ExecutorMetrics): Boolean = true
override def markPartitionCompletedFromEventLoop(partitionId: Int, stageId: Int): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.scalatest.mockito.MockitoSugar
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.util.ManualClock
import org.apache.spark.util.{AccumulatorV2, ManualClock}

class FakeSchedulerBackend extends SchedulerBackend {
def start() {}
Expand Down Expand Up @@ -125,6 +125,20 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
failedTaskSetReason = reason
failedTaskSetException = exception
}
override def taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Seq[AccumulatorV2[_, _]],
taskInfo: TaskInfo): Unit = {
if (reason == Success) {
// For SPARK-23433 / SPARK-25250, need to make DAGScheduler lets all tasksets know
// about complete partitions. Super implementation is not enough, because we've mocked
// out too much of the rest of the DAGScheduler.
taskScheduler.markPartitionCompletedFromEventLoop(task.partitionId, task.stageId)
}
super.taskEnded(task, reason, result, accumUpdates, taskInfo)
}
}
taskScheduler
}
Expand Down