From eb427f7e5ffc1123cb4044c6223e54ee32dcbb25 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 16 Apr 2019 11:00:09 +0800 Subject: [PATCH] address comments --- .../spark/scheduler/TaskResultGetter.scala | 2 +- .../spark/scheduler/TaskSchedulerImpl.scala | 34 +++++++++---------- .../spark/scheduler/TaskSetManager.scala | 10 +++--- .../spark/scheduler/TaskSetManagerSuite.scala | 4 +-- 4 files changed, 24 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 0a23fcb40d61f..72f4a4128bb22 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -157,7 +157,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul def enqueuePartitionCompletionNotification(stageId: Int, partitionId: Int): Unit = { getTaskResultExecutor.execute(() => Utils.logUncaughtExceptions { - scheduler.markPartitionCompleted(stageId, partitionId) + scheduler.handlePartitionCompleted(stageId, partitionId) }) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 539869d29f75a..7ac6d8131b42d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -641,6 +641,23 @@ private[spark] class TaskSchedulerImpl( } } + /** + * Marks the task has completed in the active TaskSetManager for the given stage. + * + * After stage failure and retry, there may be multiple TaskSetManagers for the stage. + * If an earlier zombie attempt of a stage completes a task, we can ask the later active attempt + * to skip submitting and running the task for the same partition, to save resource. That also + * means that a task completion from an earlier zombie attempt can lead to the entire stage + * getting marked as successful. + */ + private[scheduler] def handlePartitionCompleted( + stageId: Int, + partitionId: Int) = synchronized { + taskSetsByStageIdAndAttempt.get(stageId).foreach(_.values.filter(!_.isZombie).foreach { tsm => + tsm.markPartitionCompleted(partitionId) + }) + } + def error(message: String) { synchronized { if (taskSetsByStageIdAndAttempt.nonEmpty) { @@ -872,23 +889,6 @@ private[spark] class TaskSchedulerImpl( manager } } - - /** - * Marks the task has completed in the active TaskSetManager for the given stage. - * - * After stage failure and retry, there may be multiple TaskSetManagers for the stage. - * If an earlier zombie attempt of a stage completes a task, we can ask the later active attempt - * to skip submitting and running the task for the same partition, to save resource. That also - * means that a task completion from an earlier zombie attempt can lead to the entire stage - * getting marked as successful. - */ - private[scheduler] def markPartitionCompleted( - stageId: Int, - partitionId: Int) = { - taskSetsByStageIdAndAttempt.get(stageId).foreach(_.values.filter(!_.isZombie).foreach { tsm => - tsm.markPartitionCompleted(partitionId) - }) - } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 9f4c1f423e5bf..ef9cb528f3e64 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -819,6 +819,10 @@ private[spark] class TaskSetManager( private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { + if (speculationEnabled && !isZombie) { + // The task is skipped, its duration should be 0. + successfulTaskDurations.insert(0) + } tasksSuccessful += 1 successful(index) = true if (tasksSuccessful == numTasks) { @@ -1035,11 +1039,7 @@ private[spark] class TaskSetManager( val minFinishedForSpeculation = (speculationQuantile * numTasks).floor.toInt logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) - // It's possible that a task is marked as completed by the scheduler, then the size of - // `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we should only count the - // tasks that are submitted by this `TaskSetManager` and are completed successfully. - val numSuccessfulTasks = successfulTaskDurations.size() - if (numSuccessfulTasks >= minFinishedForSpeculation && numSuccessfulTasks > 0) { + if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTimeMillis() val medianDuration = successfulTaskDurations.median val threshold = max(speculationMultiplier * medianDuration, minTimeToSpeculation) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 3a0467478f671..8d8d7994964b8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1387,9 +1387,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sched.setDAGScheduler(dagScheduler) val taskSet = FakeTask.createTaskSet(10) - val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => - task.metrics.internalAccums - } sched.submitTasks(taskSet) sched.resourceOffers( @@ -1398,6 +1395,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskSetManager = sched.taskSetManagerForAttempt(0, 0).get assert(taskSetManager.runningTasks === 8) taskSetManager.markPartitionCompleted(8) + assert(!taskSetManager.successfulTaskDurations.isEmpty()) taskSetManager.checkSpeculatableTasks(0) }