From 5ad6efde552fc541330386f5124c6b4055f82256 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Tue, 23 Oct 2018 09:31:27 -0500 Subject: [PATCH 01/21] [SPARK-25250] : On successful completion of a task attempt on a partition id, kill other running task attempts on that same partition The fix that this PR addresses is as follows: Whenever any Result Task gets successfully completed, we simply mark the corresponding partition id as completed in all attempts for that particular stage. As a result, we do not see any Killed tasks due to TaskCommitDenied Exceptions showing up in the UI. Also, since, the method defined uses hash maps and arrays for efficient searching and processing, so as a result, it's time complexity is not related to the number of tasks, hence, it is also efficient. --- .../apache/spark/scheduler/DAGScheduler.scala | 2 ++ .../spark/scheduler/TaskScheduler.scala | 3 ++ .../spark/scheduler/TaskSchedulerImpl.scala | 17 ++++++++++ .../spark/scheduler/TaskSetManager.scala | 4 +++ .../spark/scheduler/DAGSchedulerSuite.scala | 4 +++ .../ExternalClusterManagerSuite.scala | 2 ++ .../scheduler/TaskSchedulerImplSuite.scala | 31 +++++++++++++++++++ 7 files changed, 63 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f93d8a8d5de55..fa86ed02a3b09 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1360,6 +1360,8 @@ private[spark] class DAGScheduler( if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 + taskScheduler.markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( + task.partitionId, task.stageId) // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 94221eb0d5515..0b36ec0445966 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -109,4 +109,7 @@ private[spark] trait TaskScheduler { */ def applicationAttemptId(): Option[String] + def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( + partitionId: Int, stageId: Int): Unit + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 4f870e85ad38d..bd23ece2b946b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -281,6 +281,23 @@ private[spark] class TaskSchedulerImpl( } } + override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( + partitionId: Int, stageId: Int): Unit = { + taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => + val index: Option[Int] = tsm.partitionToIndex.get(partitionId) + if (!index.isEmpty) { + tsm.markPartitionIdAsCompletedForTaskAttempt(index.get) + val taskInfoList = tsm.taskAttempts(index.get) + taskInfoList.foreach { taskInfo => + if (taskInfo.running) { + killTaskAttempt(taskInfo.taskId, false, "Corresponding Partition Id " + partitionId + + " has been marked as Completed") + } + } + } + } + } + /** * Called to indicate that all task attempts (including speculated tasks) associated with the * given TaskSetManager have completed, so state associated with the TaskSetManager should be diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index d5e85a11cb279..b459de020b9a0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1091,6 +1091,10 @@ private[spark] class TaskSetManager( def executorAdded() { recomputeLocality() } + + def markPartitionIdAsCompletedForTaskAttempt(index: Int): Unit = { + successful(index) = true + } } private[spark] object TaskSetManager { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b41d2acab7152..ccb786577ef1a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -160,6 +160,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None + override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( + partitionId: Int, stageId: Int): Unit = {} } /** Length of time to wait while draining listener events. */ @@ -667,6 +669,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None + override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( + partitionId: Int, stageId: Int): Unit = {} } val noKillScheduler = new DAGScheduler( sc, diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 0621c98d41184..65ef5f789ed44 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -95,4 +95,6 @@ private class DummyTaskScheduler extends TaskScheduler { accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorMetrics: ExecutorMetrics): Boolean = true + override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( + partitionId: Int, stageId: Int): Unit = {} } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 9e1d13e369ad9..c16b5ba26ea0f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer +import java.util.HashSet import scala.collection.mutable.HashMap @@ -37,6 +38,14 @@ class FakeSchedulerBackend extends SchedulerBackend { def reviveOffers() {} def defaultParallelism(): Int = 1 def maxNumConcurrentTasks(): Int = 0 + val killedTaskIds: HashSet[Long] = new HashSet[Long]() + override def killTask( + taskId: Long, + executorId: String, + interruptThread: Boolean, + reason: String): Unit = { + killedTaskIds.add(taskId) + } } class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach @@ -1136,4 +1145,26 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, TaskKilled("test")) assert(tsm.isZombie) } + test("SPARK-25250 On successful completion of a task attempt on a partition id, kill other" + + " running task attempts on that same partition") { + val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() + val firstAttempt = FakeTask.createTaskSet(10, stageAttemptId = 0) + taskScheduler.submitTasks(firstAttempt) + val offersFirstAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offersFirstAttempt) + val tsm0 = taskScheduler.taskSetManagerForAttempt(0, 0).get + val matchingTaskInfoFirstAttempt = tsm0.taskAttempts(0).head + tsm0.handleFailedTask(matchingTaskInfoFirstAttempt.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) + val secondAttempt = FakeTask.createTaskSet(10, stageAttemptId = 1) + taskScheduler.submitTasks(secondAttempt) + val offersSecondAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offersSecondAttempt) + taskScheduler.markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(2, 0) + val tsm1 = taskScheduler.taskSetManagerForAttempt(0, 1).get + val indexInTsm = tsm1.partitionToIndex(2) + val matchingTaskInfoSecondAttempt = tsm1.taskAttempts.flatten.filter(_.index == indexInTsm).head + assert(taskScheduler.backend.asInstanceOf[FakeSchedulerBackend].killedTaskIds.contains( + matchingTaskInfoSecondAttempt.taskId)) + } } From a73f61987ef0a4b979349b43c58afd7418889015 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 28 Dec 2018 10:55:07 -0600 Subject: [PATCH 02/21] [SPARK-25250] : Calling maybeFinishTaskSet() from method and adding comment --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 5 +++++ .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 1 + 2 files changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index bd23ece2b946b..385bedb298f20 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -281,6 +281,11 @@ private[spark] class TaskSchedulerImpl( } } + /** + * SPARK-25250: Whenever any Result Task gets successfully completed, we simply mark the + * corresponding partition id as completed in all attempts for that particular stage. As a + * result, we do not see any Killed tasks due to TaskCommitDenied Exceptions showing up in the UI. + */ override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( partitionId: Int, stageId: Int): Unit = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b459de020b9a0..4f44ebe63e3bb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1094,6 +1094,7 @@ private[spark] class TaskSetManager( def markPartitionIdAsCompletedForTaskAttempt(index: Int): Unit = { successful(index) = true + maybeFinishTaskSet() } } From ee5bc68bfeb533a223f94fab3409d51559bc095a Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 28 Dec 2018 11:10:10 -0600 Subject: [PATCH 03/21] [SPARK-25250] : Fixing scalastyle tests Multiline comment indentation --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 13c386a191af7..ac39199317c76 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -287,10 +287,11 @@ private[spark] class TaskSchedulerImpl( } /** - * SPARK-25250: Whenever any Result Task gets successfully completed, we simply mark the - * corresponding partition id as completed in all attempts for that particular stage. As a - * result, we do not see any Killed tasks due to TaskCommitDenied Exceptions showing up in the UI. - */ + * SPARK-25250: Whenever any Result Task gets successfully completed, we simply mark the + * corresponding partition id as completed in all attempts for that particular stage. As a + * result, we do not see any Killed tasks due to TaskCommitDenied Exceptions showing up + * in the UI. + */ override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( partitionId: Int, stageId: Int): Unit = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => From 7677aece85f8c82818034228c3b2a0c86febc67c Mon Sep 17 00:00:00 2001 From: pgandhi Date: Wed, 2 Jan 2019 09:38:15 -0600 Subject: [PATCH 04/21] [SPARK-25250] : Addressing Reviews January 2, 2019 --- .../spark/scheduler/TaskSchedulerImpl.scala | 19 ++++++++++--------- .../scheduler/TaskSchedulerImplSuite.scala | 12 ++++++++++-- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ac39199317c76..3692dc9dc3a8b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -295,16 +295,17 @@ private[spark] class TaskSchedulerImpl( override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( partitionId: Int, stageId: Int): Unit = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => - val index: Option[Int] = tsm.partitionToIndex.get(partitionId) - if (!index.isEmpty) { - tsm.markPartitionIdAsCompletedForTaskAttempt(index.get) - val taskInfoList = tsm.taskAttempts(index.get) - taskInfoList.foreach { taskInfo => - if (taskInfo.running) { - killTaskAttempt(taskInfo.taskId, false, "Corresponding Partition Id " + partitionId + - " has been marked as Completed") + tsm.partitionToIndex.get(partitionId) match { + case Some(index) => + tsm.markPartitionIdAsCompletedForTaskAttempt(index) + val taskInfoList = tsm.taskAttempts(index) + taskInfoList.filter(_.running).foreach { taskInfo => + killTaskAttempt(taskInfo.taskId, false, + s"Corresponding Partition ID $partitionId has been marked as Completed") } - } + + case None => + logError(s"No corresponding index found for partition ID $partitionId") } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 266b3f9a9a907..9d93a8d1da2cd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -18,9 +18,9 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer -import java.util.HashSet import scala.collection.mutable.HashMap +import scala.collection.mutable.Set import scala.concurrent.duration._ import org.mockito.Matchers.{anyInt, anyObject, anyString, eq => meq} @@ -40,7 +40,7 @@ class FakeSchedulerBackend extends SchedulerBackend { def reviveOffers() {} def defaultParallelism(): Int = 1 def maxNumConcurrentTasks(): Int = 0 - val killedTaskIds: HashSet[Long] = new HashSet[Long]() + val killedTaskIds: Set[Long] = Set[Long]() override def killTask( taskId: Long, executorId: String, @@ -1328,22 +1328,30 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, TaskKilled("test")) assert(tsm.isZombie) } + test("SPARK-25250 On successful completion of a task attempt on a partition id, kill other" + " running task attempts on that same partition") { val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() + val firstAttempt = FakeTask.createTaskSet(10, stageAttemptId = 0) taskScheduler.submitTasks(firstAttempt) + val offersFirstAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } taskScheduler.resourceOffers(offersFirstAttempt) + val tsm0 = taskScheduler.taskSetManagerForAttempt(0, 0).get val matchingTaskInfoFirstAttempt = tsm0.taskAttempts(0).head tsm0.handleFailedTask(matchingTaskInfoFirstAttempt.taskId, TaskState.FAILED, FetchFailed(null, 0, 0, 0, "fetch failed")) + val secondAttempt = FakeTask.createTaskSet(10, stageAttemptId = 1) taskScheduler.submitTasks(secondAttempt) + val offersSecondAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } taskScheduler.resourceOffers(offersSecondAttempt) + taskScheduler.markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(2, 0) + val tsm1 = taskScheduler.taskSetManagerForAttempt(0, 1).get val indexInTsm = tsm1.partitionToIndex(2) val matchingTaskInfoSecondAttempt = tsm1.taskAttempts.flatten.filter(_.index == indexInTsm).head From f395b6551732d67656676be9289f4436713c7ca6 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Tue, 8 Jan 2019 12:56:41 -0600 Subject: [PATCH 05/21] [SPARK-25250] : Addressing Reviews January 8, 2019 Refactoring method name to completeTasks, also calling same method from task completion in ShuffleMapStage but not killing them. --- .../apache/spark/scheduler/DAGScheduler.scala | 4 ++-- .../spark/scheduler/TaskScheduler.scala | 3 +-- .../spark/scheduler/TaskSchedulerImpl.scala | 20 ++++++++++--------- .../spark/scheduler/DAGSchedulerSuite.scala | 6 ++---- .../ExternalClusterManagerSuite.scala | 3 +-- .../scheduler/TaskSchedulerImplSuite.scala | 2 +- 6 files changed, 18 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2e91ec56b1239..96e4a007dc34e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1384,8 +1384,7 @@ private[spark] class DAGScheduler( if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 - taskScheduler.markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( - task.partitionId, task.stageId) + taskScheduler.completeTasks(task.partitionId, task.stageId, true) // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) @@ -1429,6 +1428,7 @@ private[spark] class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) + taskScheduler.completeTasks(task.partitionId, task.stageId, false) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 0b36ec0445966..e70ecd3328115 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -109,7 +109,6 @@ private[spark] trait TaskScheduler { */ def applicationAttemptId(): Option[String] - def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( - partitionId: Int, stageId: Int): Unit + def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3692dc9dc3a8b..f8e70b6eda2b9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -288,20 +288,22 @@ private[spark] class TaskSchedulerImpl( /** * SPARK-25250: Whenever any Result Task gets successfully completed, we simply mark the - * corresponding partition id as completed in all attempts for that particular stage. As a - * result, we do not see any Killed tasks due to TaskCommitDenied Exceptions showing up - * in the UI. + * corresponding partition id as completed in all attempts for that particular stage and + * additionally, for a Result Stage, we also kill the remaining task attempts running on the + * same partition. As a result, we do not see any Killed tasks due to + * TaskCommitDenied Exceptions showing up in the UI. */ - override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( - partitionId: Int, stageId: Int): Unit = { + override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => tsm.partitionToIndex.get(partitionId) match { case Some(index) => tsm.markPartitionIdAsCompletedForTaskAttempt(index) - val taskInfoList = tsm.taskAttempts(index) - taskInfoList.filter(_.running).foreach { taskInfo => - killTaskAttempt(taskInfo.taskId, false, - s"Corresponding Partition ID $partitionId has been marked as Completed") + if (killTasks) { + val taskInfoList = tsm.taskAttempts(index) + taskInfoList.filter(_.running).foreach { taskInfo => + killTaskAttempt(taskInfo.taskId, false, + s"Corresponding Partition ID $partitionId has been marked as Completed") + } } case None => diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b78a42fb52ec0..602200f2fcfae 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -160,8 +160,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( - partitionId: Int, stageId: Int): Unit = {} + override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = {} } /** Length of time to wait while draining listener events. */ @@ -669,8 +668,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( - partitionId: Int, stageId: Int): Unit = {} + override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = {} } val noKillScheduler = new DAGScheduler( sc, diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index d0a25a19fd1f1..d4d7b62177f3a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -94,6 +94,5 @@ private class DummyTaskScheduler extends TaskScheduler { accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorMetrics: ExecutorMetrics): Boolean = true - override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts( - partitionId: Int, stageId: Int): Unit = {} + override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = {} } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 0b51567861e1a..ca6feffe937b7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1350,7 +1350,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val offersSecondAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } taskScheduler.resourceOffers(offersSecondAttempt) - taskScheduler.markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(2, 0) + taskScheduler.completeTasks(2, 0, true) val tsm1 = taskScheduler.taskSetManagerForAttempt(0, 1).get val indexInTsm = tsm1.partitionToIndex(2) From f7102ca293aad61477821783b09f1c7ac079451d Mon Sep 17 00:00:00 2001 From: pgandhi Date: Wed, 9 Jan 2019 16:10:59 -0600 Subject: [PATCH 06/21] [SPARK-25250] : Addressing Reviews January 9, 2019 Refactoring method name and throwing exception instead of logging error. --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 6 ++++-- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f8e70b6eda2b9..95eeac275719f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -297,7 +297,7 @@ private[spark] class TaskSchedulerImpl( taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => tsm.partitionToIndex.get(partitionId) match { case Some(index) => - tsm.markPartitionIdAsCompletedForTaskAttempt(index) + tsm.markPartitionCompletedForRedundantTaskAttempts(index) if (killTasks) { val taskInfoList = tsm.taskAttempts(index) taskInfoList.filter(_.running).foreach { taskInfo => @@ -307,7 +307,9 @@ private[spark] class TaskSchedulerImpl( } case None => - logError(s"No corresponding index found for partition ID $partitionId") + throw new SparkException(s"No corresponding index found for" + + s" partition ID $partitionId. This is likely a bug in the Spark Scheduler" + + s" implementation. Please file a bug report") } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index ea791a0033259..8efe8f29a36af 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1097,7 +1097,7 @@ private[spark] class TaskSetManager( recomputeLocality() } - def markPartitionIdAsCompletedForTaskAttempt(index: Int): Unit = { + def markPartitionCompletedForRedundantTaskAttempts(index: Int): Unit = { successful(index) = true maybeFinishTaskSet() } From 6709fe109940b9f92f70f28ec4078f7d6414783e Mon Sep 17 00:00:00 2001 From: pgandhi Date: Thu, 10 Jan 2019 14:57:33 -0600 Subject: [PATCH 07/21] [SPARK-25250] : Addressing Reviews January 10, 2019 Updating comment in code, adding try catch block and refactoring method --- .../spark/scheduler/TaskSchedulerImpl.scala | 24 +++++++++++++------ .../spark/scheduler/TaskSetManager.scala | 2 +- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 95eeac275719f..10d96298bf5ff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -287,29 +287,39 @@ private[spark] class TaskSchedulerImpl( } /** - * SPARK-25250: Whenever any Result Task gets successfully completed, we simply mark the + * SPARK-25250: Whenever any Task gets successfully completed, we simply mark the * corresponding partition id as completed in all attempts for that particular stage and * additionally, for a Result Stage, we also kill the remaining task attempts running on the * same partition. As a result, we do not see any Killed tasks due to - * TaskCommitDenied Exceptions showing up in the UI. + * TaskCommitDenied Exceptions showing up in the UI. When this method is called from + * DAGScheduler.scala on a task completion event being fired, it is assumed that the new + * TaskSet has already been created and registered. However, a small possibility does exist + * that when this method gets called, possibly the new TaskSet might have not been added + * to taskSetsByStageIdAndAttempt. In such a case, we might still hit the same issue. However, + * the above scenario has not yet been reproduced. */ override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => tsm.partitionToIndex.get(partitionId) match { case Some(index) => - tsm.markPartitionCompletedForRedundantTaskAttempts(index) + tsm.markPartitionAsAlreadyCompleted(index) if (killTasks) { val taskInfoList = tsm.taskAttempts(index) taskInfoList.filter(_.running).foreach { taskInfo => - killTaskAttempt(taskInfo.taskId, false, - s"Corresponding Partition ID $partitionId has been marked as Completed") + try { + killTaskAttempt(taskInfo.taskId, false, + s"Partition $partitionId is already completed") + } catch { + case e: Exception => + logWarning(s"Unable to kill Task ID ${taskInfo.taskId}.") + } } } case None => throw new SparkException(s"No corresponding index found for" + - s" partition ID $partitionId. This is likely a bug in the Spark Scheduler" + - s" implementation. Please file a bug report") + s" partition ID $partitionId in TaskSet ${tsm.name}. This is likely a bug" + + s" in the Spark TaskScheduler implementation. Please file a bug report") } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8efe8f29a36af..f5e80f086ae26 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1097,7 +1097,7 @@ private[spark] class TaskSetManager( recomputeLocality() } - def markPartitionCompletedForRedundantTaskAttempts(index: Int): Unit = { + def markPartitionAsAlreadyCompleted(index: Int): Unit = { successful(index) = true maybeFinishTaskSet() } From 9efbc58b0d2e10a71848188fa447e16130666009 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Tue, 15 Jan 2019 12:52:50 -0600 Subject: [PATCH 08/21] [SPARK-25250] : Addressing Reviews January 15, 2019 Moving comment to base class and fixing spaces --- .../org/apache/spark/scheduler/TaskScheduler.scala | 7 +++++++ .../apache/spark/scheduler/TaskSchedulerImpl.scala | 12 ------------ .../spark/scheduler/TaskSchedulerImplSuite.scala | 8 ++++---- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index e70ecd3328115..59a58daa945c5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -109,6 +109,13 @@ private[spark] trait TaskScheduler { */ def applicationAttemptId(): Option[String] + /** + * SPARK-25250: Whenever any Task gets successfully completed, we simply mark the + * corresponding partition id as completed in all attempts for that particular stage and + * additionally, for a Result Stage, we also kill the remaining task attempts running on the + * same partition. As a result, we do not see any Killed tasks due to + * TaskCommitDenied Exceptions showing up in the UI. + */ def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 10d96298bf5ff..e7a9ca2b767ab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -286,18 +286,6 @@ private[spark] class TaskSchedulerImpl( } } - /** - * SPARK-25250: Whenever any Task gets successfully completed, we simply mark the - * corresponding partition id as completed in all attempts for that particular stage and - * additionally, for a Result Stage, we also kill the remaining task attempts running on the - * same partition. As a result, we do not see any Killed tasks due to - * TaskCommitDenied Exceptions showing up in the UI. When this method is called from - * DAGScheduler.scala on a task completion event being fired, it is assumed that the new - * TaskSet has already been created and registered. However, a small possibility does exist - * that when this method gets called, possibly the new TaskSet might have not been added - * to taskSetsByStageIdAndAttempt. In such a case, we might still hit the same issue. However, - * the above scenario has not yet been reproduced. - */ override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => tsm.partitionToIndex.get(partitionId) match { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index ca6feffe937b7..4cd33db9475eb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -42,10 +42,10 @@ class FakeSchedulerBackend extends SchedulerBackend { def maxNumConcurrentTasks(): Int = 0 val killedTaskIds: Set[Long] = Set[Long]() override def killTask( - taskId: Long, - executorId: String, - interruptThread: Boolean, - reason: String): Unit = { + taskId: Long, + executorId: String, + interruptThread: Boolean, + reason: String): Unit = { killedTaskIds.add(taskId) } } From 6abd52cd258c98015b3043c0dad99552db60cb1a Mon Sep 17 00:00:00 2001 From: pgandhi Date: Tue, 15 Jan 2019 14:55:26 -0600 Subject: [PATCH 09/21] [SPARK-25250] : Addressing Reviews January 15, 2019 - 2 Using method "markPartitionCompleted()" instead of "markPartitionAsAlreadyCompleted()" --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 8 ++++---- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 5 ----- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index e7a9ca2b767ab..fc69af7fecf85 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -290,10 +290,10 @@ private[spark] class TaskSchedulerImpl( taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => tsm.partitionToIndex.get(partitionId) match { case Some(index) => - tsm.markPartitionAsAlreadyCompleted(index) - if (killTasks) { - val taskInfoList = tsm.taskAttempts(index) - taskInfoList.filter(_.running).foreach { taskInfo => + val taskInfoList = tsm.taskAttempts(index) + taskInfoList.foreach { taskInfo => + tsm.markPartitionCompleted(partitionId, taskInfo) + if (killTasks && taskInfo.running) { try { killTaskAttempt(taskInfo.taskId, false, s"Partition $partitionId is already completed") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f5e80f086ae26..41f032ccf82bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1096,11 +1096,6 @@ private[spark] class TaskSetManager( def executorAdded() { recomputeLocality() } - - def markPartitionAsAlreadyCompleted(index: Int): Unit = { - successful(index) = true - maybeFinishTaskSet() - } } private[spark] object TaskSetManager { From 231c51b4c48a6ab9f53e37cb8931d6eb65dd6872 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Wed, 16 Jan 2019 15:45:53 -0600 Subject: [PATCH 10/21] [SPARK-25250] : Addressing Reviews January 16, 2019 Calling "markPartitionCompleted()" only for the task that succeeded. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 5 +++-- .../apache/spark/scheduler/TaskScheduler.scala | 2 +- .../spark/scheduler/TaskSchedulerImpl.scala | 15 ++++++++------- .../spark/scheduler/DAGSchedulerSuite.scala | 6 ++++-- .../scheduler/ExternalClusterManagerSuite.scala | 3 ++- .../spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- 6 files changed, 19 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 96e4a007dc34e..5d6bf7c2e2d51 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1384,7 +1384,8 @@ private[spark] class DAGScheduler( if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 - taskScheduler.completeTasks(task.partitionId, task.stageId, true) + taskScheduler.completeTasks( + task.partitionId, task.stageId, event.taskInfo, true) // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) @@ -1428,7 +1429,7 @@ private[spark] class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - taskScheduler.completeTasks(task.partitionId, task.stageId, false) + taskScheduler.completeTasks(task.partitionId, task.stageId, event.taskInfo, false) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 59a58daa945c5..2a53c800fe9ee 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -116,6 +116,6 @@ private[spark] trait TaskScheduler { * same partition. As a result, we do not see any Killed tasks due to * TaskCommitDenied Exceptions showing up in the UI. */ - def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit + def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo, killTasks: Boolean): Unit } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index fc69af7fecf85..7fb8d3688fd71 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -286,20 +286,21 @@ private[spark] class TaskSchedulerImpl( } } - override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = { + override def completeTasks( + partitionId: Int, stageId: Int, taskInfo: TaskInfo, killTasks: Boolean): Unit = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => tsm.partitionToIndex.get(partitionId) match { case Some(index) => - val taskInfoList = tsm.taskAttempts(index) - taskInfoList.foreach { taskInfo => - tsm.markPartitionCompleted(partitionId, taskInfo) - if (killTasks && taskInfo.running) { + tsm.markPartitionCompleted(index, taskInfo) + if (killTasks) { + val taskInfoList = tsm.taskAttempts(index) + taskInfoList.filter(_.running).foreach { tInfo => try { - killTaskAttempt(taskInfo.taskId, false, + killTaskAttempt(tInfo.taskId, false, s"Partition $partitionId is already completed") } catch { case e: Exception => - logWarning(s"Unable to kill Task ID ${taskInfo.taskId}.") + logWarning(s"Unable to kill Task ID ${tInfo.taskId}.") } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 602200f2fcfae..004a25c791251 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -160,7 +160,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = {} + override def completeTasks( + partitionId: Int, stageId: Int, taskInfo: TaskInfo, killTasks: Boolean): Unit = {} } /** Length of time to wait while draining listener events. */ @@ -668,7 +669,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = {} + override def completeTasks( + partitionId: Int, stageId: Int, taskInfo: TaskInfo, killTasks: Boolean): Unit = {} } val noKillScheduler = new DAGScheduler( sc, diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index d4d7b62177f3a..08471ebe451e3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -94,5 +94,6 @@ private class DummyTaskScheduler extends TaskScheduler { accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorMetrics: ExecutorMetrics): Boolean = true - override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = {} + override def completeTasks( + partitionId: Int, stageId: Int, taskInfo: TaskInfo, killTasks: Boolean): Unit = {} } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 6b6f690c19fde..11650ca0989ba 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1350,7 +1350,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val offersSecondAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } taskScheduler.resourceOffers(offersSecondAttempt) - taskScheduler.completeTasks(2, 0, true) + taskScheduler.completeTasks(2, 0, matchingTaskInfoFirstAttempt, true) val tsm1 = taskScheduler.taskSetManagerForAttempt(0, 1).get val indexInTsm = tsm1.partitionToIndex(2) From fcfe9f555f6f0d60eb4ad8690d94bda7df620e33 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 18 Jan 2019 11:14:46 -0600 Subject: [PATCH 11/21] [SPARK-25250] : Addressing Reviews January 18, 2019 Not killing tasks anymore, modifying comment and removing redundant test --- .../apache/spark/scheduler/DAGScheduler.scala | 5 ++-- .../spark/scheduler/TaskScheduler.scala | 10 +++---- .../spark/scheduler/TaskSchedulerImpl.scala | 18 +---------- .../spark/scheduler/DAGSchedulerSuite.scala | 6 ++-- .../ExternalClusterManagerSuite.scala | 3 +- .../scheduler/TaskSchedulerImplSuite.scala | 30 ------------------- 6 files changed, 11 insertions(+), 61 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5d6bf7c2e2d51..dd682a90e8833 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1384,8 +1384,7 @@ private[spark] class DAGScheduler( if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 - taskScheduler.completeTasks( - task.partitionId, task.stageId, event.taskInfo, true) + taskScheduler.completeTasks(task.partitionId, task.stageId, event.taskInfo) // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) @@ -1429,7 +1428,7 @@ private[spark] class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - taskScheduler.completeTasks(task.partitionId, task.stageId, event.taskInfo, false) + taskScheduler.completeTasks(task.partitionId, task.stageId, event.taskInfo) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 2a53c800fe9ee..b41c93ec34b64 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -111,11 +111,11 @@ private[spark] trait TaskScheduler { /** * SPARK-25250: Whenever any Task gets successfully completed, we simply mark the - * corresponding partition id as completed in all attempts for that particular stage and - * additionally, for a Result Stage, we also kill the remaining task attempts running on the - * same partition. As a result, we do not see any Killed tasks due to - * TaskCommitDenied Exceptions showing up in the UI. + * corresponding partition id as completed in all attempts for that particular stage. + * This ensures that multiple attempts of the same task do not keep running even when the + * corresponding partition is completed. This method must be called from inside the DAGScheduler + * event loop, to ensure a consistent view of all task sets for the given stage. */ - def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo, killTasks: Boolean): Unit + def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 7fb8d3688fd71..71f81bcf12c35 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -286,29 +286,13 @@ private[spark] class TaskSchedulerImpl( } } - override def completeTasks( - partitionId: Int, stageId: Int, taskInfo: TaskInfo, killTasks: Boolean): Unit = { + override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => tsm.partitionToIndex.get(partitionId) match { case Some(index) => tsm.markPartitionCompleted(index, taskInfo) - if (killTasks) { - val taskInfoList = tsm.taskAttempts(index) - taskInfoList.filter(_.running).foreach { tInfo => - try { - killTaskAttempt(tInfo.taskId, false, - s"Partition $partitionId is already completed") - } catch { - case e: Exception => - logWarning(s"Unable to kill Task ID ${tInfo.taskId}.") - } - } - } case None => - throw new SparkException(s"No corresponding index found for" + - s" partition ID $partitionId in TaskSet ${tsm.name}. This is likely a bug" + - s" in the Spark TaskScheduler implementation. Please file a bug report") } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 004a25c791251..1486259a5dcf5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -160,8 +160,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def completeTasks( - partitionId: Int, stageId: Int, taskInfo: TaskInfo, killTasks: Boolean): Unit = {} + override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = {} } /** Length of time to wait while draining listener events. */ @@ -669,8 +668,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def completeTasks( - partitionId: Int, stageId: Int, taskInfo: TaskInfo, killTasks: Boolean): Unit = {} + override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = {} } val noKillScheduler = new DAGScheduler( sc, diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 08471ebe451e3..9e08995998943 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -94,6 +94,5 @@ private class DummyTaskScheduler extends TaskScheduler { accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorMetrics: ExecutorMetrics): Boolean = true - override def completeTasks( - partitionId: Int, stageId: Int, taskInfo: TaskInfo, killTasks: Boolean): Unit = {} + override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = {} } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 11650ca0989ba..53f35b7da6712 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1328,34 +1328,4 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, TaskKilled("test")) assert(tsm.isZombie) } - - test("SPARK-25250 On successful completion of a task attempt on a partition id, kill other" + - " running task attempts on that same partition") { - val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() - - val firstAttempt = FakeTask.createTaskSet(10, stageAttemptId = 0) - taskScheduler.submitTasks(firstAttempt) - - val offersFirstAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } - taskScheduler.resourceOffers(offersFirstAttempt) - - val tsm0 = taskScheduler.taskSetManagerForAttempt(0, 0).get - val matchingTaskInfoFirstAttempt = tsm0.taskAttempts(0).head - tsm0.handleFailedTask(matchingTaskInfoFirstAttempt.taskId, TaskState.FAILED, - FetchFailed(null, 0, 0, 0, "fetch failed")) - - val secondAttempt = FakeTask.createTaskSet(10, stageAttemptId = 1) - taskScheduler.submitTasks(secondAttempt) - - val offersSecondAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } - taskScheduler.resourceOffers(offersSecondAttempt) - - taskScheduler.completeTasks(2, 0, matchingTaskInfoFirstAttempt, true) - - val tsm1 = taskScheduler.taskSetManagerForAttempt(0, 1).get - val indexInTsm = tsm1.partitionToIndex(2) - val matchingTaskInfoSecondAttempt = tsm1.taskAttempts.flatten.filter(_.index == indexInTsm).head - assert(taskScheduler.backend.asInstanceOf[FakeSchedulerBackend].killedTaskIds.contains( - matchingTaskInfoSecondAttempt.taskId)) - } } From 7ce6f104980a0cbb0a3d017d40003983f09b0c58 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Tue, 22 Jan 2019 17:24:27 -0600 Subject: [PATCH 12/21] [SPARK-25250] : Adding unit test --- .../spark/scheduler/DAGSchedulerSuite.scala | 47 ++++++++++++++++++- .../scheduler/TaskSchedulerImplSuite.scala | 9 ---- 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 1486259a5dcf5..91734bed11ae3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -133,6 +133,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi /** Stages for which the DAGScheduler has called TaskScheduler.cancelTasks(). */ val cancelledStages = new HashSet[Int]() + val completedPartitions = new HashMap[Int, HashSet[Int]]() + val taskScheduler = new TaskScheduler() { override def schedulingMode: SchedulingMode = SchedulingMode.FIFO override def rootPool: Pool = new Pool("", schedulingMode, 0, 0) @@ -160,7 +162,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = {} + // Since, the method completeTasks in TaskSchedulerImpl.scala marks the partition complete + // for all stage attempts in the particular stage id, it does not need any info about + // stageAttemptId. Hence, completed partition id's are stored only for stage id's to mock + // the method implementation here. + override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = { + val partitionIds = completedPartitions.getOrElseUpdate(stageId, new HashSet[Int]) + partitionIds.add(partitionId) + completedPartitions.put(stageId, partitionIds) + } } /** Length of time to wait while draining listener events. */ @@ -249,6 +259,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi cancelledStages.clear() cacheLocations.clear() results.clear() + completedPartitions.clear() securityMgr = new SecurityManager(conf) broadcastManager = new BroadcastManager(true, conf, securityMgr) mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) { @@ -2851,6 +2862,40 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + test("SPARK-25250: Late zombie task completions handled correctly even before" + + " new taskset launched") { + val shuffleMapRdd = new MyRDD(sc, 4, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(4)) + val reduceRdd = new MyRDD(sc, 4, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1, 2, 3)) + + completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = 4) + + // Fail Stage 1 Attempt 0 with Fetch Failure + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, 0, "ignored"), + null)) + + // this will trigger a resubmission of stage 0, since we've lost some of its + // map output, for the next iteration through the loop + scheduler.resubmitFailedStages() + completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = 4) + + runEvent(makeCompletionEvent( + taskSets(1).tasks(3), Success, Nil, Nil)) + assert(completedPartitions.get(taskSets(3).stageId).get.contains( + taskSets(3).tasks(1).partitionId) == false, "Corresponding partition id for" + + " stage 1 attempt 1 is not complete yet") + + // this will mark partition id 1 of stage 1 attempt 0 as complete. So we expect the status + // of that partition id to be reflected for stage 1 attempt 1 as well. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, Nil, Nil)) + assert(completedPartitions.get(taskSets(3).stageId).get.contains( + taskSets(3).tasks(1).partitionId) == true) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 53f35b7da6712..85c87b95f7c2e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer import scala.collection.mutable.HashMap -import scala.collection.mutable.Set import scala.concurrent.duration._ import org.mockito.ArgumentMatchers.{any, anyInt, anyString, eq => meq} @@ -40,14 +39,6 @@ class FakeSchedulerBackend extends SchedulerBackend { def reviveOffers() {} def defaultParallelism(): Int = 1 def maxNumConcurrentTasks(): Int = 0 - val killedTaskIds: Set[Long] = Set[Long]() - override def killTask( - taskId: Long, - executorId: String, - interruptThread: Boolean, - reason: String): Unit = { - killedTaskIds.add(taskId) - } } class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach From 929fbf9a32190944db57948c0da358d98a7f4d4f Mon Sep 17 00:00:00 2001 From: pgandhi Date: Thu, 24 Jan 2019 12:54:51 -0600 Subject: [PATCH 13/21] [SPARK-25250] : Addressing Reviews January 24, 2019 Reverting redundant method call from PR #21131, adding test setup code in test, changing from index to partition id etc. --- .../spark/scheduler/TaskSchedulerImpl.scala | 7 +------ .../spark/scheduler/TaskSetManager.scala | 3 --- .../spark/scheduler/DAGSchedulerSuite.scala | 19 ++++++++++++++++--- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f3f3d4fe54553..26c113ab57cc8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -289,12 +289,7 @@ private[spark] class TaskSchedulerImpl( override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => - tsm.partitionToIndex.get(partitionId) match { - case Some(index) => - tsm.markPartitionCompleted(index, taskInfo) - - case None => - } + tsm.markPartitionCompleted(partitionId, taskInfo) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6f3f77c304dc7..7a53cbb2b631e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -784,9 +784,6 @@ private[spark] class TaskSetManager( logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } - // There may be multiple tasksets for this stage -- we let all of them know that the partition - // was completed. This may result in some of the tasksets getting completed. - sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 91734bed11ae3..b5454b17433b4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -169,7 +169,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = { val partitionIds = completedPartitions.getOrElseUpdate(stageId, new HashSet[Int]) partitionIds.add(partitionId) - completedPartitions.put(stageId, partitionIds) + completedPartitions(stageId) = partitionIds } } @@ -2862,6 +2862,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + // This test is kind of similar and goes alongwith "Completions in zombie tasksets update + // status of non-zombie taskset" in TaskSchedulerImplSuite.scala. test("SPARK-25250: Late zombie task completions handled correctly even before" + " new taskset launched") { val shuffleMapRdd = new MyRDD(sc, 4, Nil) @@ -2882,8 +2884,19 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi scheduler.resubmitFailedStages() completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = 4) - runEvent(makeCompletionEvent( - taskSets(1).tasks(3), Success, Nil, Nil)) + // tasksets 1 & 3 should be two different attempts for our reduce stage -- lets + // double-check test setup + val reduceStage = taskSets(1).stageId + assert(taskSets(3).stageId === reduceStage) + + // complete one task from the original taskset, make sure we update the taskSchedulerImpl + // so it can notify all taskSetManagers. Some of that is mocked here, just check there + // is the right event. + val taskToComplete = taskSets(1).tasks(3) + + runEvent(makeCompletionEvent(taskToComplete, Success, Nil, Nil)) + assert(completedPartitions.getOrElse(reduceStage, Set()) === Set(taskToComplete.partitionId)) + assert(completedPartitions.get(taskSets(3).stageId).get.contains( taskSets(3).tasks(1).partitionId) == false, "Corresponding partition id for" + " stage 1 attempt 1 is not complete yet") From 393f901040fa1bf55220c2e2eb1510492286f470 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 25 Jan 2019 11:01:43 -0600 Subject: [PATCH 14/21] [SPARK-25250] : Fixing Unit Tests Fixed failing unit test in TaskSetManagerSuite, reverted the test in TaskSchedulerImplSuite as it seems redundant now. --- .../scheduler/TaskSchedulerImplSuite.scala | 104 ------------------ .../spark/scheduler/TaskSetManagerSuite.scala | 1 + 2 files changed, 1 insertion(+), 104 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 016adb8b70e78..c9fb1020a192e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1102,110 +1102,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } } - test("Completions in zombie tasksets update status of non-zombie taskset") { - val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() - val valueSer = SparkEnv.get.serializer.newInstance() - - def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { - val indexInTsm = tsm.partitionToIndex(partition) - val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head - val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) - tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) - } - - // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, - // two times, so we have three active task sets for one stage. (For this to really happen, - // you'd need the previous stage to also get restarted, and then succeed, in between each - // attempt, but that happens outside what we're mocking here.) - val zombieAttempts = (0 until 2).map { stageAttempt => - val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) - taskScheduler.submitTasks(attempt) - val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get - val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } - taskScheduler.resourceOffers(offers) - assert(tsm.runningTasks === 10) - // fail attempt - tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, - FetchFailed(null, 0, 0, 0, "fetch failed")) - // the attempt is a zombie, but the tasks are still running (this could be true even if - // we actively killed those tasks, as killing is best-effort) - assert(tsm.isZombie) - assert(tsm.runningTasks === 9) - tsm - } - - // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for - // the stage, but this time with insufficient resources so not all tasks are active. - - val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) - taskScheduler.submitTasks(finalAttempt) - val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get - val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } - val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => - finalAttempt.tasks(task.index).partitionId - }.toSet - assert(finalTsm.runningTasks === 5) - assert(!finalTsm.isZombie) - - // We simulate late completions from our zombie tasksets, corresponding to all the pending - // partitions in our final attempt. This means we're only waiting on the tasks we've already - // launched. - val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) - finalAttemptPendingPartitions.foreach { partition => - completeTaskSuccessfully(zombieAttempts(0), partition) - } - - // If there is another resource offer, we shouldn't run anything. Though our final attempt - // used to have pending tasks, now those tasks have been completed by zombie attempts. The - // remaining tasks to compute are already active in the non-zombie attempt. - assert( - taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) - - val remainingTasks = finalAttemptLaunchedPartitions.toIndexedSeq.sorted - - // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be - // marked as zombie. - // for each of the remaining tasks, find the tasksets with an active copy of the task, and - // finish the task. - remainingTasks.foreach { partition => - val tsm = if (partition == 0) { - // we failed this task on both zombie attempts, this one is only present in the latest - // taskset - finalTsm - } else { - // should be active in every taskset. We choose a zombie taskset just to make sure that - // we transition the active taskset correctly even if the final completion comes - // from a zombie. - zombieAttempts(partition % 2) - } - completeTaskSuccessfully(tsm, partition) - } - - assert(finalTsm.isZombie) - - // no taskset has completed all of its tasks, so no updates to the blacklist tracker yet - verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), any()) - - // finally, lets complete all the tasks. We simulate failures in attempt 1, but everything - // else succeeds, to make sure we get the right updates to the blacklist in all cases. - (zombieAttempts ++ Seq(finalTsm)).foreach { tsm => - val stageAttempt = tsm.taskSet.stageAttemptId - tsm.runningTasksSet.foreach { index => - if (stageAttempt == 1) { - tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState.FAILED, TaskResultLost) - } else { - val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) - tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result) - } - } - - // we update the blacklist for the stage attempts with all successful tasks. Even though - // some tasksets had failures, we still consider them all successful from a blacklisting - // perspective, as the failures weren't from a problem w/ the tasks themselves. - verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), any()) - } - } - test("don't schedule for a barrier taskSet if available slots are less than pending tasks") { val taskCpus = 2 val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d0f98b5e4eabd..5c4f06e0bf373 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -48,6 +48,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo) { taskScheduler.endedTasks(taskInfo.index) = reason + taskScheduler.completeTasks(task.partitionId, task.stageId, taskInfo) } override def executorAdded(execId: String, host: String) {} From 52e832a800ca8af49876dfbd2ffdf2d49a448377 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Wed, 30 Jan 2019 11:09:29 -0600 Subject: [PATCH 15/21] [SPARK-25250] : Addressing Reviews January 30, 2019 Refactoring method name, fixing unit tests, removing redundant method etc. --- .../apache/spark/scheduler/DAGScheduler.scala | 6 +- .../spark/scheduler/TaskScheduler.scala | 16 ++- .../spark/scheduler/TaskSchedulerImpl.scala | 22 +--- .../spark/scheduler/DAGSchedulerSuite.scala | 15 ++- .../ExternalClusterManagerSuite.scala | 5 +- .../scheduler/TaskSchedulerImplSuite.scala | 120 +++++++++++++++++- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 7 files changed, 154 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dd682a90e8833..97e885de77dd1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1384,7 +1384,8 @@ private[spark] class DAGScheduler( if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 - taskScheduler.completeTasks(task.partitionId, task.stageId, event.taskInfo) + taskScheduler.markPartitionCompletedInAllTaskSets( + task.partitionId, task.stageId, event.taskInfo) // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) @@ -1428,7 +1429,8 @@ private[spark] class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - taskScheduler.completeTasks(task.partitionId, task.stageId, event.taskInfo) + taskScheduler.markPartitionCompletedInAllTaskSets( + task.partitionId, task.stageId, event.taskInfo) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index b41c93ec34b64..2fe95a6c50f5b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -110,12 +110,18 @@ private[spark] trait TaskScheduler { def applicationAttemptId(): Option[String] /** - * SPARK-25250: Whenever any Task gets successfully completed, we simply mark the - * corresponding partition id as completed in all attempts for that particular stage. - * This ensures that multiple attempts of the same task do not keep running even when the - * corresponding partition is completed. This method must be called from inside the DAGScheduler + * SPARK-25250: Marks the task has completed in all TaskSetManagers for the given stage. + * After stage failure and retry, there may be multiple TaskSetManagers for the stage. If an + * earlier attempt of a stage completes a task, we should ensure that the later attempts do not + * also submit those same tasks. That also means that a task completion from an earlier attempt + * can lead to the entire stage getting marked as successful. Whenever any Task gets + * successfully completed, we simply mark the corresponding partition id as completed in all + * attempts for that particular stage. This method must be called from inside the DAGScheduler * event loop, to ensure a consistent view of all task sets for the given stage. */ - def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit + def markPartitionCompletedInAllTaskSets( + partitionId: Int, + stageId: Int, + taskInfo: TaskInfo): Unit } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 26c113ab57cc8..3c0fc0215ac06 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -287,7 +287,10 @@ private[spark] class TaskSchedulerImpl( } } - override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = { + override def markPartitionCompletedInAllTaskSets( + partitionId: Int, + stageId: Int, + taskInfo: TaskInfo): Unit = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => tsm.markPartitionCompleted(partitionId, taskInfo) } @@ -842,23 +845,6 @@ private[spark] class TaskSchedulerImpl( } } - /** - * Marks the task has completed in all TaskSetManagers for the given stage. - * - * After stage failure and retry, there may be multiple TaskSetManagers for the stage. - * If an earlier attempt of a stage completes a task, we should ensure that the later attempts - * do not also submit those same tasks. That also means that a task completion from an earlier - * attempt can lead to the entire stage getting marked as successful. - */ - private[scheduler] def markPartitionCompletedInAllTaskSets( - stageId: Int, - partitionId: Int, - taskInfo: TaskInfo) = { - taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => - tsm.markPartitionCompleted(partitionId, taskInfo) - } - } - } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b5454b17433b4..05477c022ddf7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -166,10 +166,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // for all stage attempts in the particular stage id, it does not need any info about // stageAttemptId. Hence, completed partition id's are stored only for stage id's to mock // the method implementation here. - override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = { + override def markPartitionCompletedInAllTaskSets( + partitionId: Int, + stageId: Int, + taskInfo: TaskInfo): Unit = { val partitionIds = completedPartitions.getOrElseUpdate(stageId, new HashSet[Int]) partitionIds.add(partitionId) - completedPartitions(stageId) = partitionIds } } @@ -679,7 +681,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = {} + override def markPartitionCompletedInAllTaskSets( + partitionId: Int, + stageId: Int, + taskInfo: TaskInfo): Unit = {} } val noKillScheduler = new DAGScheduler( sc, @@ -2897,7 +2902,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent(taskToComplete, Success, Nil, Nil)) assert(completedPartitions.getOrElse(reduceStage, Set()) === Set(taskToComplete.partitionId)) - assert(completedPartitions.get(taskSets(3).stageId).get.contains( + assert(completedPartitions(taskSets(3).stageId).contains( taskSets(3).tasks(1).partitionId) == false, "Corresponding partition id for" + " stage 1 attempt 1 is not complete yet") @@ -2907,6 +2912,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(1).tasks(1), Success, Nil, Nil)) assert(completedPartitions.get(taskSets(3).stageId).get.contains( taskSets(3).tasks(1).partitionId) == true) + assert(completedPartitions(reduceStage) === Set( + taskSets(3).tasks(1).partitionId, taskSets(3).tasks(3).partitionId)) } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 9e08995998943..d1950b0924610 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -94,5 +94,8 @@ private class DummyTaskScheduler extends TaskScheduler { accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorMetrics: ExecutorMetrics): Boolean = true - override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = {} + override def markPartitionCompletedInAllTaskSets( + partitionId: Int, + stageId: Int, + taskInfo: TaskInfo): Unit = {} } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index c9fb1020a192e..d86fe7add71e8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -31,7 +31,7 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config -import org.apache.spark.util.ManualClock +import org.apache.spark.util.{AccumulatorV2, ManualClock} class FakeSchedulerBackend extends SchedulerBackend { def start() {} @@ -125,6 +125,20 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B failedTaskSetReason = reason failedTaskSetException = exception } + override def taskEnded( + task: Task[_], + reason: TaskEndReason, + result: Any, + accumUpdates: Seq[AccumulatorV2[_, _]], + taskInfo: TaskInfo): Unit = { + if (reason == Success) { + // For SPARK-23433 / SPARK-25250, need to make DAGScheduler lets all tasksets know + // about complete partitions. Super implementation is not enough, because we've mocked + // out too much of the rest of the DAGScheduler. + taskScheduler.markPartitionCompletedInAllTaskSets(task.partitionId, task.stageId, taskInfo) + } + super.taskEnded(task, reason, result, accumUpdates, taskInfo) + } } taskScheduler } @@ -1102,6 +1116,110 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } } + test("Completions in zombie tasksets update status of non-zombie taskset") { + val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() + val valueSer = SparkEnv.get.serializer.newInstance() + + def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) + } + + // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, + // two times, so we have three active task sets for one stage. (For this to really happen, + // you'd need the previous stage to also get restarted, and then succeed, in between each + // attempt, but that happens outside what we're mocking here.) + val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + // fail attempt + tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) + // the attempt is a zombie, but the tasks are still running (this could be true even if + // we actively killed those tasks, as killing is best-effort) + assert(tsm.isZombie) + assert(tsm.runningTasks === 9) + tsm + } + + // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for + // the stage, but this time with insufficient resources so not all tasks are active. + + val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) + taskScheduler.submitTasks(finalAttempt) + val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get + val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => + finalAttempt.tasks(task.index).partitionId + }.toSet + assert(finalTsm.runningTasks === 5) + assert(!finalTsm.isZombie) + + // We simulate late completions from our zombie tasksets, corresponding to all the pending + // partitions in our final attempt. This means we're only waiting on the tasks we've already + // launched. + val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) + finalAttemptPendingPartitions.foreach { partition => + completeTaskSuccessfully(zombieAttempts(0), partition) + } + + // If there is another resource offer, we shouldn't run anything. Though our final attempt + // used to have pending tasks, now those tasks have been completed by zombie attempts. The + // remaining tasks to compute are already active in the non-zombie attempt. + assert( + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) + + val remainingTasks = finalAttemptLaunchedPartitions.toIndexedSeq.sorted + + // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be + // marked as zombie. + // for each of the remaining tasks, find the tasksets with an active copy of the task, and + // finish the task. + remainingTasks.foreach { partition => + val tsm = if (partition == 0) { + // we failed this task on both zombie attempts, this one is only present in the latest + // taskset + finalTsm + } else { + // should be active in every taskset. We choose a zombie taskset just to make sure that + // we transition the active taskset correctly even if the final completion comes + // from a zombie. + zombieAttempts(partition % 2) + } + completeTaskSuccessfully(tsm, partition) + } + + assert(finalTsm.isZombie) + + // no taskset has completed all of its tasks, so no updates to the blacklist tracker yet + verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), any()) + + // finally, lets complete all the tasks. We simulate failures in attempt 1, but everything + // else succeeds, to make sure we get the right updates to the blacklist in all cases. + (zombieAttempts ++ Seq(finalTsm)).foreach { tsm => + val stageAttempt = tsm.taskSet.stageAttemptId + tsm.runningTasksSet.foreach { index => + if (stageAttempt == 1) { + tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState.FAILED, TaskResultLost) + } else { + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result) + } + } + + // we update the blacklist for the stage attempts with all successful tasks. Even though + // some tasksets had failures, we still consider them all successful from a blacklisting + // perspective, as the failures weren't from a problem w/ the tasks themselves. + verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), any()) + } + } + test("don't schedule for a barrier taskSet if available slots are less than pending tasks") { val taskCpus = 2 val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 5c4f06e0bf373..28e96d1712368 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -48,7 +48,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo) { taskScheduler.endedTasks(taskInfo.index) = reason - taskScheduler.completeTasks(task.partitionId, task.stageId, taskInfo) + taskScheduler.markPartitionCompletedInAllTaskSets(task.partitionId, task.stageId, taskInfo) } override def executorAdded(execId: String, host: String) {} From 024ec53b6161303c927d76ba402ef275177dee04 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Wed, 30 Jan 2019 11:22:33 -0600 Subject: [PATCH 16/21] [SPARK-25250] : Fixing Scalastyle Checks --- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index d86fe7add71e8..e29c0a42229d2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -135,7 +135,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // For SPARK-23433 / SPARK-25250, need to make DAGScheduler lets all tasksets know // about complete partitions. Super implementation is not enough, because we've mocked // out too much of the rest of the DAGScheduler. - taskScheduler.markPartitionCompletedInAllTaskSets(task.partitionId, task.stageId, taskInfo) + taskScheduler.markPartitionCompletedInAllTaskSets( + task.partitionId, task.stageId, taskInfo) } super.taskEnded(task, reason, result, accumUpdates, taskInfo) } From d2b70446a143bdda06299f5c56c09910bd8d4010 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Wed, 30 Jan 2019 15:45:02 -0600 Subject: [PATCH 17/21] [SPARK-25250] : Addressing Minor Reviews January 30, 2019 Moving method call higher up and making it common, indentation, removing unnecessary assert statements etc. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 6 ++---- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 6 ------ .../spark/scheduler/TaskSchedulerImplSuite.scala | 10 +++++----- 3 files changed, 7 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ff46d837d1362..5b2eb433d5a4b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1378,6 +1378,8 @@ private[spark] class DAGScheduler( event.reason match { case Success => + taskScheduler.markPartitionCompletedInAllTaskSets( + task.partitionId, task.stageId, event.taskInfo) task match { case rt: ResultTask[_, _] => // Cast to ResultStage here because it's part of the ResultTask @@ -1388,8 +1390,6 @@ private[spark] class DAGScheduler( if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 - taskScheduler.markPartitionCompletedInAllTaskSets( - task.partitionId, task.stageId, event.taskInfo) // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) @@ -1433,8 +1433,6 @@ private[spark] class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - taskScheduler.markPartitionCompletedInAllTaskSets( - task.partitionId, task.stageId, event.taskInfo) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 05477c022ddf7..441f3ffbc0566 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2902,16 +2902,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent(taskToComplete, Success, Nil, Nil)) assert(completedPartitions.getOrElse(reduceStage, Set()) === Set(taskToComplete.partitionId)) - assert(completedPartitions(taskSets(3).stageId).contains( - taskSets(3).tasks(1).partitionId) == false, "Corresponding partition id for" + - " stage 1 attempt 1 is not complete yet") - // this will mark partition id 1 of stage 1 attempt 0 as complete. So we expect the status // of that partition id to be reflected for stage 1 attempt 1 as well. runEvent(makeCompletionEvent( taskSets(1).tasks(1), Success, Nil, Nil)) - assert(completedPartitions.get(taskSets(3).stageId).get.contains( - taskSets(3).tasks(1).partitionId) == true) assert(completedPartitions(reduceStage) === Set( taskSets(3).tasks(1).partitionId, taskSets(3).tasks(3).partitionId)) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index e29c0a42229d2..93fd54321cfda 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -126,11 +126,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B failedTaskSetException = exception } override def taskEnded( - task: Task[_], - reason: TaskEndReason, - result: Any, - accumUpdates: Seq[AccumulatorV2[_, _]], - taskInfo: TaskInfo): Unit = { + task: Task[_], + reason: TaskEndReason, + result: Any, + accumUpdates: Seq[AccumulatorV2[_, _]], + taskInfo: TaskInfo): Unit = { if (reason == Success) { // For SPARK-23433 / SPARK-25250, need to make DAGScheduler lets all tasksets know // about complete partitions. Super implementation is not enough, because we've mocked From d6ac4a9267c690409d54176a4e815c3e9a64af78 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Thu, 31 Jan 2019 09:01:50 -0600 Subject: [PATCH 18/21] [SPARK-25250] : Addressing Reviews January 31, 2019 Removing call to maybeFinishTaskSet(). --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f21c4e9cb1975..932a58ded13e6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -805,7 +805,6 @@ private[spark] class TaskSetManager( if (tasksSuccessful == numTasks) { isZombie = true } - maybeFinishTaskSet() } } } From b55dbb02ce4974feb4b002ab2a8c6220bff8395a Mon Sep 17 00:00:00 2001 From: pgandhi Date: Mon, 18 Feb 2019 11:04:41 -0600 Subject: [PATCH 19/21] [SPARK-25250] : Restructuring PR and trying out a different solution Have created a new HashMap "stageIdToFinishedPartitions" that maintains a set of completed partitions for the corresponding stage. It is updated from the DAGScheduler Event Loop and is checked by TaskSetManager each time a task fails to check whether corresponding partition has been completed. If it is completed, then TSM will call markPartitionCompleted(). All this happens within the synchronized block of handleFailedTask() method. --- .../apache/spark/scheduler/DAGScheduler.scala | 3 +- .../spark/scheduler/TaskScheduler.scala | 5 +--- .../spark/scheduler/TaskSchedulerImpl.scala | 28 ++++++++++++++----- .../spark/scheduler/TaskSetManager.scala | 6 ++++ .../spark/scheduler/DAGSchedulerSuite.scala | 10 ++----- .../ExternalClusterManagerSuite.scala | 5 +--- .../scheduler/TaskSchedulerImplSuite.scala | 3 +- .../spark/scheduler/TaskSetManagerSuite.scala | 1 - 8 files changed, 33 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 195df601eac02..3d1ead7eb40ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1383,8 +1383,7 @@ private[spark] class DAGScheduler( event.reason match { case Success => - taskScheduler.markPartitionCompletedInAllTaskSets( - task.partitionId, task.stageId, event.taskInfo) + taskScheduler.markPartitionCompletedFromEventLoop(task.partitionId, task.stageId) task match { case rt: ResultTask[_, _] => // Cast to ResultStage here because it's part of the ResultTask diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 2fe95a6c50f5b..98ff9b8632fec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -119,9 +119,6 @@ private[spark] trait TaskScheduler { * attempts for that particular stage. This method must be called from inside the DAGScheduler * event loop, to ensure a consistent view of all task sets for the given stage. */ - def markPartitionCompletedInAllTaskSets( - partitionId: Int, - stageId: Int, - taskInfo: TaskInfo): Unit + def markPartitionCompletedFromEventLoop(partitionId: Int, stageId: Int): Unit } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3c0fc0215ac06..d883223f0b50a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -150,6 +150,8 @@ private[spark] class TaskSchedulerImpl( private[scheduler] var barrierCoordinator: RpcEndpoint = null + private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, HashSet[Int]] + private def maybeInitBarrierCoordinator(): Unit = { if (barrierCoordinator == null) { barrierCoordinator = new BarrierCoordinator(barrierSyncTimeout, sc.listenerBus, @@ -287,13 +289,8 @@ private[spark] class TaskSchedulerImpl( } } - override def markPartitionCompletedInAllTaskSets( - partitionId: Int, - stageId: Int, - taskInfo: TaskInfo): Unit = { - taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => - tsm.markPartitionCompleted(partitionId, taskInfo) - } + override def markPartitionCompletedFromEventLoop(partitionId: Int, stageId: Int): Unit = { + stageIdToFinishedPartitions.getOrElseUpdate(stageId, new HashSet[Int]).add(partitionId) } /** @@ -845,6 +842,23 @@ private[spark] class TaskSchedulerImpl( } } + /** + * Marks the task has completed in all TaskSetManagers for the given stage. + * + * After stage failure and retry, there may be multiple TaskSetManagers for the stage. + * If an earlier attempt of a stage completes a task, we should ensure that the later attempts + * do not also submit those same tasks. That also means that a task completion from an earlier + * attempt can lead to the entire stage getting marked as successful. + */ + private[scheduler] def markPartitionCompletedInAllTaskSets( + stageId: Int, + partitionId: Int, + taskInfo: TaskInfo) = { + taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => + tsm.markPartitionCompleted(partitionId, taskInfo) + } + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 932a58ded13e6..f4329edb622e3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -784,6 +784,9 @@ private[spark] class TaskSetManager( logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } + // There may be multiple tasksets for this stage -- we let all of them know that the partition + // was completed. This may result in some of the tasksets getting completed. + sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call @@ -920,6 +923,9 @@ private[spark] class TaskSetManager( s" be re-executed (either because the task failed with a shuffle data fetch failure," + s" so the previous stage needs to be re-run, or because a different copy of the task" + s" has already succeeded).") + } else if (sched.stageIdToFinishedPartitions.getOrElse( + stageId, new HashSet[Int]).contains(tasks(index).partitionId)) { + sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info) } else { addPendingTask(index) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 441f3ffbc0566..aecd4ba1605fc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -166,10 +166,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // for all stage attempts in the particular stage id, it does not need any info about // stageAttemptId. Hence, completed partition id's are stored only for stage id's to mock // the method implementation here. - override def markPartitionCompletedInAllTaskSets( - partitionId: Int, - stageId: Int, - taskInfo: TaskInfo): Unit = { + override def markPartitionCompletedFromEventLoop(partitionId: Int, stageId: Int): Unit = { val partitionIds = completedPartitions.getOrElseUpdate(stageId, new HashSet[Int]) partitionIds.add(partitionId) } @@ -681,10 +678,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def markPartitionCompletedInAllTaskSets( - partitionId: Int, - stageId: Int, - taskInfo: TaskInfo): Unit = {} + override def markPartitionCompletedFromEventLoop(partitionId: Int, stageId: Int): Unit = {} } val noKillScheduler = new DAGScheduler( sc, diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index d1950b0924610..8436e6928973d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -94,8 +94,5 @@ private class DummyTaskScheduler extends TaskScheduler { accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorMetrics: ExecutorMetrics): Boolean = true - override def markPartitionCompletedInAllTaskSets( - partitionId: Int, - stageId: Int, - taskInfo: TaskInfo): Unit = {} + override def markPartitionCompletedFromEventLoop(partitionId: Int, stageId: Int): Unit = {} } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 93fd54321cfda..13efdf53a152b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -135,8 +135,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // For SPARK-23433 / SPARK-25250, need to make DAGScheduler lets all tasksets know // about complete partitions. Super implementation is not enough, because we've mocked // out too much of the rest of the DAGScheduler. - taskScheduler.markPartitionCompletedInAllTaskSets( - task.partitionId, task.stageId, taskInfo) + taskScheduler.markPartitionCompletedFromEventLoop(task.partitionId, task.stageId) } super.taskEnded(task, reason, result, accumUpdates, taskInfo) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index a9d2259ceae63..60acd3ed4cd49 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -48,7 +48,6 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo) { taskScheduler.endedTasks(taskInfo.index) = reason - taskScheduler.markPartitionCompletedInAllTaskSets(task.partitionId, task.stageId, taskInfo) } override def executorAdded(execId: String, host: String) {} From 551f412554c623a99cbd7f33fbceea5da48beb65 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Mon, 18 Feb 2019 11:08:52 -0600 Subject: [PATCH 20/21] [SPARK-25250] : Fixing indentation --- .../spark/scheduler/TaskSchedulerImpl.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d883223f0b50a..1d12954438843 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -843,17 +843,17 @@ private[spark] class TaskSchedulerImpl( } /** - * Marks the task has completed in all TaskSetManagers for the given stage. - * - * After stage failure and retry, there may be multiple TaskSetManagers for the stage. - * If an earlier attempt of a stage completes a task, we should ensure that the later attempts - * do not also submit those same tasks. That also means that a task completion from an earlier - * attempt can lead to the entire stage getting marked as successful. - */ + * Marks the task has completed in all TaskSetManagers for the given stage. + * + * After stage failure and retry, there may be multiple TaskSetManagers for the stage. + * If an earlier attempt of a stage completes a task, we should ensure that the later attempts + * do not also submit those same tasks. That also means that a task completion from an earlier + * attempt can lead to the entire stage getting marked as successful. + */ private[scheduler] def markPartitionCompletedInAllTaskSets( - stageId: Int, - partitionId: Int, - taskInfo: TaskInfo) = { + stageId: Int, + partitionId: Int, + taskInfo: TaskInfo) = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => tsm.markPartitionCompleted(partitionId, taskInfo) } From 28017ed0e336fea9a9685a50d0db9c893c603c55 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Tue, 19 Feb 2019 09:17:43 -0600 Subject: [PATCH 21/21] [SPARK-25250] : Addressing Reviews February 19, 2019 Changing else if condition to not create unnecessary hash set. --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f4329edb622e3..b1f53beebd67c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -923,8 +923,8 @@ private[spark] class TaskSetManager( s" be re-executed (either because the task failed with a shuffle data fetch failure," + s" so the previous stage needs to be re-run, or because a different copy of the task" + s" has already succeeded).") - } else if (sched.stageIdToFinishedPartitions.getOrElse( - stageId, new HashSet[Int]).contains(tasks(index).partitionId)) { + } else if (sched.stageIdToFinishedPartitions.get(stageId).exists( + partitions => partitions.contains(tasks(index).partitionId))) { sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info) } else { addPendingTask(index)