From 115c072384d6e05fdcfc3f28efe12d83712170f5 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 7 May 2019 00:55:02 +0800 Subject: [PATCH] address comments --- .../org/apache/spark/scheduler/TaskSetManager.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 25f9da5961cb5..52323b3331d7e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -62,14 +62,8 @@ private[spark] class TaskSetManager( private val addedJars = HashMap[String, Long](sched.sc.addedJars.toSeq: _*) private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*) - // Quantile of tasks at which to start speculation - val speculationQuantile = conf.get(SPECULATION_QUANTILE) - val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER) - val maxResultSize = conf.get(config.MAX_RESULT_SIZE) - val speculationEnabled = conf.get(SPECULATION_ENABLED) - // Serializer for closures and tasks. val env = SparkEnv.get val ser = env.closureSerializer.newInstance() @@ -80,6 +74,12 @@ private[spark] class TaskSetManager( val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) + val speculationEnabled = conf.get(SPECULATION_ENABLED) + // Quantile of tasks at which to start speculation + val speculationQuantile = conf.get(SPECULATION_QUANTILE) + val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER) + val minFinishedForSpeculation = math.max((speculationQuantile * numTasks).floor.toInt, 1) + // For each task, tracks whether a copy of the task has succeeded. A task will also be // marked as "succeeded" if it failed with a fetch failure, in which case it should not // be re-run because the missing map data needs to be regenerated first. @@ -1032,7 +1032,6 @@ private[spark] class TaskSetManager( return false } var foundTasks = false - val minFinishedForSpeculation = math.max((speculationQuantile * numTasks).floor.toInt, 1) logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) // It's possible that a task is marked as completed by the scheduler, then the size of