diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index a40e351155b9e..26c9d9130e56a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -88,15 +88,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } def setupSchedulerWithMaster(master: String, confs: (String, String)*): TaskSchedulerImpl = { - setupSchedulerWithMasterAndClock(master, new SystemClock, confs: _*) - } - - def setupSchedulerWithMasterAndClock(master: String, clock: Clock, confs: (String, String)*): - TaskSchedulerImpl = { val conf = new SparkConf().setMaster(master).setAppName("TaskSchedulerImplSuite") confs.foreach { case (k, v) => conf.set(k, v) } sc = new SparkContext(conf) - taskScheduler = new TaskSchedulerImpl(sc, sc.conf.get(config.TASK_MAX_FAILURES), clock = clock) + taskScheduler = new TaskSchedulerImpl(sc, sc.conf.get(config.TASK_MAX_FAILURES)) setupHelper() } @@ -1834,22 +1829,41 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(2 == taskDescriptions.head.resources(GPU).addresses.size) } - private def setupSchedulerForDecommissionTests(clock: Clock): TaskSchedulerImpl = { - val taskScheduler = setupSchedulerWithMasterAndClock( - s"local[2]", - clock, - config.CPUS_PER_TASK.key -> 1.toString) - taskScheduler.submitTasks(FakeTask.createTaskSet(2)) - val multiCoreWorkerOffers = IndexedSeq(WorkerOffer("executor0", "host0", 1), - WorkerOffer("executor1", "host1", 1)) - val taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten - assert(taskDescriptions.map(_.executorId).sorted === Seq("executor0", "executor1")) + private def setupSchedulerForDecommissionTests(clock: Clock, numTasks: Int): TaskSchedulerImpl = { + // one task per host + val numHosts = numTasks + val conf = new SparkConf() + .setMaster(s"local[$numHosts]") + .setAppName("TaskSchedulerImplSuite") + .set(config.CPUS_PER_TASK.key, "1") + sc = new SparkContext(conf) + val maxTaskFailures = sc.conf.get(config.TASK_MAX_FAILURES) + taskScheduler = new TaskSchedulerImpl(sc, maxTaskFailures, clock = clock) { + override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = { + val tsm = super.createTaskSetManager(taskSet, maxFailures) + // we need to create a spied tsm so that we can see the copies running + val tsmSpy = spy(tsm) + stageToMockTaskSetManager(taskSet.stageId) = tsmSpy + tsmSpy + } + } + setupHelper() + // Spawn the tasks on different executors/hosts + taskScheduler.submitTasks(FakeTask.createTaskSet(numTasks)) + for (i <- 0 until numTasks) { + val executorId = s"executor$i" + val taskDescriptions = taskScheduler.resourceOffers(IndexedSeq(WorkerOffer( + executorId, s"host$i", 1))).flatten + assert(taskDescriptions.size === 1) + assert(taskDescriptions(0).executorId == executorId) + assert(taskDescriptions(0).index === i) + } taskScheduler } test("scheduler should keep the decommission state where host was decommissioned") { val clock = new ManualClock(10000L) - val scheduler = setupSchedulerForDecommissionTests(clock) + val scheduler = setupSchedulerForDecommissionTests(clock, 2) val oldTime = clock.getTimeMillis() scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", false)) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", true)) @@ -1865,9 +1879,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(scheduler.getExecutorDecommissionState("executor2").isEmpty) } - test("scheduler should eventually purge removed and decommissioned executors") { + test("test full decommissioning flow") { val clock = new ManualClock(10000L) - val scheduler = setupSchedulerForDecommissionTests(clock) + val scheduler = setupSchedulerForDecommissionTests(clock, 2) + val manager = stageToMockTaskSetManager(0) + // The task started should be running. + assert(manager.copiesRunning.take(2) === Array(1, 1)) // executor 0 is decommissioned after loosing assert(scheduler.getExecutorDecommissionState("executor0").isEmpty) @@ -1876,28 +1893,47 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", false)) assert(scheduler.getExecutorDecommissionState("executor0").isEmpty) + // 0th task just died above + assert(manager.copiesRunning.take(2) === Array(0, 1)) + assert(scheduler.executorsPendingDecommission.isEmpty) clock.advance(5000) - // executor 1 is decommissioned before loosing + // executor1 hasn't been decommissioned yet assert(scheduler.getExecutorDecommissionState("executor1").isEmpty) + + // executor 1 is decommissioned before loosing scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) assert(scheduler.getExecutorDecommissionState("executor1").isDefined) clock.advance(2000) + + // executor1 is eventually lost scheduler.executorLost("executor1", ExecutorExited(0, false, "normal")) assert(scheduler.decommissionedExecutorsRemoved.size === 1) assert(scheduler.executorsPendingDecommission.isEmpty) + // So now both the tasks are no longer running + assert(manager.copiesRunning.take(2) === Array(0, 0)) clock.advance(2000) + // Decommission state should hang around a bit after removal ... assert(scheduler.getExecutorDecommissionState("executor1").isDefined) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) clock.advance(2000) assert(scheduler.decommissionedExecutorsRemoved.size === 1) assert(scheduler.getExecutorDecommissionState("executor1").isDefined) - // The default timeout is 5 minutes which completes now. - clock.advance(301000) + + // The default timeout for expiry is 300k milliseconds (5 minutes) which completes now, + // and the executor1's decommission state should finally be purged. + clock.advance(300000) assert(scheduler.getExecutorDecommissionState("executor1").isEmpty) assert(scheduler.decommissionedExecutorsRemoved.isEmpty) + + // Now give it some resources and both tasks should be rerun + val taskDescriptions = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor2", "host2", 1), WorkerOffer("executor3", "host3", 1))).flatten + assert(taskDescriptions.size === 2) + assert(taskDescriptions.map(_.index).sorted == Seq(0, 1)) + assert(manager.copiesRunning.take(2) === Array(1, 1)) } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 0beb5a837ac65..e4a358d38af95 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -2086,96 +2086,25 @@ class TaskSetManagerSuite assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) } - test("SPARK-21040: Tasks are not speculated on decommissioning if speculation is disabled") { + test("Check that speculation does not happen when disabled") { sc = new SparkContext("local", "test") val clock = new ManualClock() sched = new FakeTaskScheduler(sc, clock, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) sched.backend = mock(classOf[SchedulerBackend]) - val taskSet = FakeTask.createTaskSet(4) + val taskSet = FakeTask.createTaskSet(1) sc.conf.set(config.SPECULATION_ENABLED, false) - sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s") val manager = sched.createTaskSetManager(taskSet, MAX_TASK_FAILURES) - val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => - task.metrics.internalAccums - } - - // Start TASK 0,1 on exec1, TASK 2 on exec2 - (0 until 2).foreach { _ => - val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 - assert(taskOption.isDefined) - assert(taskOption.get.executorId === "exec1") - } - val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1 - assert(taskOption2.isDefined) - assert(taskOption2.get.executorId === "exec2") - - clock.advance(6*1000) // time = 6s - // Start TASK 3 on exec2 after some delay - val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1 - assert(taskOption3.isDefined) - assert(taskOption3.get.executorId === "exec2") - - assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) - - clock.advance(4*1000) // time = 10s - // Complete the first 2 tasks and leave the other 2 tasks in running - for (id <- Set(0, 1)) { - manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) - assert(sched.endedTasks(id) === Success) - } - - // Speculation is disabled so nothing should be speculated. - assert(!manager.checkSpeculatableTasks(0)) - assert(sched.speculativeTasks.toSet === Set()) - - // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be now - // checked if they should be speculated. - // (TASK 2 -> 15, TASK 3 -> 15) - sched.executorDecommission("exec2", ExecutorDecommissionInfo("decom", - isHostDecommissioned = false)) - assert(sched.getExecutorDecommissionState("exec2").map(_.startTime) === - Some(clock.getTimeMillis())) + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 + assert(taskOption.isDefined) + assert(taskOption.get.executorId === "exec1") + assert(taskOption.get.index === 0) + assert(sched.startedTasks.toSet === Set(0)) + assert(manager.copiesRunning(0) === 1) // Speculation is disabled so nothing should be speculated. assert(!manager.checkSpeculatableTasks(0)) assert(sched.speculativeTasks.toSet === Set()) - // Old copy should continue to be running. - assert(manager.copiesRunning(3) === 1) - - // Offer resource to start the speculative attempt for the running task - assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty) - assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty) - - clock.advance(1*1000) // time = 11s - // Running checkSpeculatableTasks again should return false - assert(!manager.checkSpeculatableTasks(0)) - assert(manager.copiesRunning(2) === 1) - assert(manager.copiesRunning(3) === 1) - - clock.advance(5*1000) // time = 16s - // The decommissioned executor will finally die and take down the running tasks 2 and 3 with it - manager.executorLost("exec2", "host2", ExecutorProcessLost("decommissioned")) - // Still no speculation - assert(!manager.checkSpeculatableTasks(0)) - assert(sched.speculativeTasks.toSet === Set()) - assert(manager.copiesRunning(2) === 0) - assert(manager.copiesRunning(3) === 0) - // Give it 2 new resources and it should spawn the tasks 2 and 3 lost on exec2 - val indicesRerun = (1 to 2).map { _ => - val taskRerunOpt = manager.resourceOffer("exec3", "host3", NO_PREF)._1 - assert(taskRerunOpt.isDefined) - val taskRerun = taskRerunOpt.get - assert(taskRerun.executorId === "exec3") - assert(taskRerun.attemptNumber === 1) - taskRerun.index - }.toSet - assert(indicesRerun === Set(2, 3)) - assert(manager.copiesRunning(2) === 1) - assert(manager.copiesRunning(3) === 1) - - // Offering additional resources should not lead to any more tasks being respawned - assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) } test("SPARK-29976 Regular speculation configs should still take effect even when a " +