Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Apr 16, 2019
1 parent 9a7b053 commit eb427f7
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul

def enqueuePartitionCompletionNotification(stageId: Int, partitionId: Int): Unit = {
getTaskResultExecutor.execute(() => Utils.logUncaughtExceptions {
scheduler.markPartitionCompleted(stageId, partitionId)
scheduler.handlePartitionCompleted(stageId, partitionId)
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,23 @@ private[spark] class TaskSchedulerImpl(
}
}

/**
* Marks the task has completed in the active TaskSetManager for the given stage.
*
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
* If an earlier zombie attempt of a stage completes a task, we can ask the later active attempt
* to skip submitting and running the task for the same partition, to save resource. That also
* means that a task completion from an earlier zombie attempt can lead to the entire stage
* getting marked as successful.
*/
private[scheduler] def handlePartitionCompleted(
stageId: Int,
partitionId: Int) = synchronized {
taskSetsByStageIdAndAttempt.get(stageId).foreach(_.values.filter(!_.isZombie).foreach { tsm =>
tsm.markPartitionCompleted(partitionId)
})
}

def error(message: String) {
synchronized {
if (taskSetsByStageIdAndAttempt.nonEmpty) {
Expand Down Expand Up @@ -872,23 +889,6 @@ private[spark] class TaskSchedulerImpl(
manager
}
}

/**
* Marks the task has completed in the active TaskSetManager for the given stage.
*
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
* If an earlier zombie attempt of a stage completes a task, we can ask the later active attempt
* to skip submitting and running the task for the same partition, to save resource. That also
* means that a task completion from an earlier zombie attempt can lead to the entire stage
* getting marked as successful.
*/
private[scheduler] def markPartitionCompleted(
stageId: Int,
partitionId: Int) = {
taskSetsByStageIdAndAttempt.get(stageId).foreach(_.values.filter(!_.isZombie).foreach { tsm =>
tsm.markPartitionCompleted(partitionId)
})
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,10 @@ private[spark] class TaskSetManager(
private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
if (speculationEnabled && !isZombie) {
// The task is skipped, its duration should be 0.
successfulTaskDurations.insert(0)
}
tasksSuccessful += 1
successful(index) = true
if (tasksSuccessful == numTasks) {
Expand Down Expand Up @@ -1035,11 +1039,7 @@ private[spark] class TaskSetManager(
val minFinishedForSpeculation = (speculationQuantile * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)

// It's possible that a task is marked as completed by the scheduler, then the size of
// `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we should only count the
// tasks that are submitted by this `TaskSetManager` and are completed successfully.
val numSuccessfulTasks = successfulTaskDurations.size()
if (numSuccessfulTasks >= minFinishedForSpeculation && numSuccessfulTasks > 0) {
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
val time = clock.getTimeMillis()
val medianDuration = successfulTaskDurations.median
val threshold = max(speculationMultiplier * medianDuration, minTimeToSpeculation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1387,9 +1387,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched.setDAGScheduler(dagScheduler)

val taskSet = FakeTask.createTaskSet(10)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}

sched.submitTasks(taskSet)
sched.resourceOffers(
Expand All @@ -1398,6 +1395,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSetManager = sched.taskSetManagerForAttempt(0, 0).get
assert(taskSetManager.runningTasks === 8)
taskSetManager.markPartitionCompleted(8)
assert(!taskSetManager.successfulTaskDurations.isEmpty())
taskSetManager.checkSpeculatableTasks(0)
}

Expand Down

0 comments on commit eb427f7

Please sign in to comment.