Skip to content

Commit

Permalink
@Ngone51's test comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dagrawal3409 committed Aug 25, 2020
1 parent b0e45c2 commit 9389ed5
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}

def setupSchedulerWithMaster(master: String, confs: (String, String)*): TaskSchedulerImpl = {
setupSchedulerWithMasterAndClock(master, new SystemClock, confs: _*)
}

def setupSchedulerWithMasterAndClock(master: String, clock: Clock, confs: (String, String)*):
TaskSchedulerImpl = {
val conf = new SparkConf().setMaster(master).setAppName("TaskSchedulerImplSuite")
confs.foreach { case (k, v) => conf.set(k, v) }
sc = new SparkContext(conf)
taskScheduler = new TaskSchedulerImpl(sc, sc.conf.get(config.TASK_MAX_FAILURES), clock = clock)
taskScheduler = new TaskSchedulerImpl(sc, sc.conf.get(config.TASK_MAX_FAILURES))
setupHelper()
}

Expand Down Expand Up @@ -1834,22 +1829,41 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(2 == taskDescriptions.head.resources(GPU).addresses.size)
}

private def setupSchedulerForDecommissionTests(clock: Clock): TaskSchedulerImpl = {
val taskScheduler = setupSchedulerWithMasterAndClock(
s"local[2]",
clock,
config.CPUS_PER_TASK.key -> 1.toString)
taskScheduler.submitTasks(FakeTask.createTaskSet(2))
val multiCoreWorkerOffers = IndexedSeq(WorkerOffer("executor0", "host0", 1),
WorkerOffer("executor1", "host1", 1))
val taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(taskDescriptions.map(_.executorId).sorted === Seq("executor0", "executor1"))
private def setupSchedulerForDecommissionTests(clock: Clock, numTasks: Int): TaskSchedulerImpl = {
// one task per host
val numHosts = numTasks
val conf = new SparkConf()
.setMaster(s"local[$numHosts]")
.setAppName("TaskSchedulerImplSuite")
.set(config.CPUS_PER_TASK.key, "1")
sc = new SparkContext(conf)
val maxTaskFailures = sc.conf.get(config.TASK_MAX_FAILURES)
taskScheduler = new TaskSchedulerImpl(sc, maxTaskFailures, clock = clock) {
override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = {
val tsm = super.createTaskSetManager(taskSet, maxFailures)
// we need to create a spied tsm so that we can see the copies running
val tsmSpy = spy(tsm)
stageToMockTaskSetManager(taskSet.stageId) = tsmSpy
tsmSpy
}
}
setupHelper()
// Spawn the tasks on different executors/hosts
taskScheduler.submitTasks(FakeTask.createTaskSet(numTasks))
for (i <- 0 until numTasks) {
val executorId = s"executor$i"
val taskDescriptions = taskScheduler.resourceOffers(IndexedSeq(WorkerOffer(
executorId, s"host$i", 1))).flatten
assert(taskDescriptions.size === 1)
assert(taskDescriptions(0).executorId == executorId)
assert(taskDescriptions(0).index === i)
}
taskScheduler
}

test("scheduler should keep the decommission state where host was decommissioned") {
val clock = new ManualClock(10000L)
val scheduler = setupSchedulerForDecommissionTests(clock)
val scheduler = setupSchedulerForDecommissionTests(clock, 2)
val oldTime = clock.getTimeMillis()
scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", false))
scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", true))
Expand All @@ -1865,9 +1879,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(scheduler.getExecutorDecommissionState("executor2").isEmpty)
}

test("scheduler should eventually purge removed and decommissioned executors") {
test("test full decommissioning flow") {
val clock = new ManualClock(10000L)
val scheduler = setupSchedulerForDecommissionTests(clock)
val scheduler = setupSchedulerForDecommissionTests(clock, 2)
val manager = stageToMockTaskSetManager(0)
// The task started should be running.
assert(manager.copiesRunning.take(2) === Array(1, 1))

// executor 0 is decommissioned after loosing
assert(scheduler.getExecutorDecommissionState("executor0").isEmpty)
Expand All @@ -1876,28 +1893,47 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", false))
assert(scheduler.getExecutorDecommissionState("executor0").isEmpty)

// 0th task just died above
assert(manager.copiesRunning.take(2) === Array(0, 1))

assert(scheduler.executorsPendingDecommission.isEmpty)
clock.advance(5000)

// executor 1 is decommissioned before loosing
// executor1 hasn't been decommissioned yet
assert(scheduler.getExecutorDecommissionState("executor1").isEmpty)

// executor 1 is decommissioned before loosing
scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false))
assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
clock.advance(2000)

// executor1 is eventually lost
scheduler.executorLost("executor1", ExecutorExited(0, false, "normal"))
assert(scheduler.decommissionedExecutorsRemoved.size === 1)
assert(scheduler.executorsPendingDecommission.isEmpty)
// So now both the tasks are no longer running
assert(manager.copiesRunning.take(2) === Array(0, 0))
clock.advance(2000)

// Decommission state should hang around a bit after removal ...
assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false))
clock.advance(2000)
assert(scheduler.decommissionedExecutorsRemoved.size === 1)
assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
// The default timeout is 5 minutes which completes now.
clock.advance(301000)

// The default timeout for expiry is 300k milliseconds (5 minutes) which completes now,
// and the executor1's decommission state should finally be purged.
clock.advance(300000)
assert(scheduler.getExecutorDecommissionState("executor1").isEmpty)
assert(scheduler.decommissionedExecutorsRemoved.isEmpty)

// Now give it some resources and both tasks should be rerun
val taskDescriptions = taskScheduler.resourceOffers(IndexedSeq(
WorkerOffer("executor2", "host2", 1), WorkerOffer("executor3", "host3", 1))).flatten
assert(taskDescriptions.size === 2)
assert(taskDescriptions.map(_.index).sorted == Seq(0, 1))
assert(manager.copiesRunning.take(2) === Array(1, 1))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2086,96 +2086,25 @@ class TaskSetManagerSuite
assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
}

test("SPARK-21040: Tasks are not speculated on decommissioning if speculation is disabled") {
test("Check that speculation does not happen when disabled") {
sc = new SparkContext("local", "test")
val clock = new ManualClock()
sched = new FakeTaskScheduler(sc, clock,
("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
sched.backend = mock(classOf[SchedulerBackend])
val taskSet = FakeTask.createTaskSet(4)
val taskSet = FakeTask.createTaskSet(1)
sc.conf.set(config.SPECULATION_ENABLED, false)
sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s")
val manager = sched.createTaskSetManager(taskSet, MAX_TASK_FAILURES)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}

// Start TASK 0,1 on exec1, TASK 2 on exec2
(0 until 2).foreach { _ =>
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption.isDefined)
assert(taskOption.get.executorId === "exec1")
}
val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
assert(taskOption2.isDefined)
assert(taskOption2.get.executorId === "exec2")

clock.advance(6*1000) // time = 6s
// Start TASK 3 on exec2 after some delay
val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
assert(taskOption3.isDefined)
assert(taskOption3.get.executorId === "exec2")

assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))

clock.advance(4*1000) // time = 10s
// Complete the first 2 tasks and leave the other 2 tasks in running
for (id <- Set(0, 1)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
assert(sched.endedTasks(id) === Success)
}

// Speculation is disabled so nothing should be speculated.
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set())

// decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be now
// checked if they should be speculated.
// (TASK 2 -> 15, TASK 3 -> 15)
sched.executorDecommission("exec2", ExecutorDecommissionInfo("decom",
isHostDecommissioned = false))
assert(sched.getExecutorDecommissionState("exec2").map(_.startTime) ===
Some(clock.getTimeMillis()))
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption.isDefined)
assert(taskOption.get.executorId === "exec1")
assert(taskOption.get.index === 0)
assert(sched.startedTasks.toSet === Set(0))
assert(manager.copiesRunning(0) === 1)

// Speculation is disabled so nothing should be speculated.
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set())
// Old copy should continue to be running.
assert(manager.copiesRunning(3) === 1)

// Offer resource to start the speculative attempt for the running task
assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)

clock.advance(1*1000) // time = 11s
// Running checkSpeculatableTasks again should return false
assert(!manager.checkSpeculatableTasks(0))
assert(manager.copiesRunning(2) === 1)
assert(manager.copiesRunning(3) === 1)

clock.advance(5*1000) // time = 16s
// The decommissioned executor will finally die and take down the running tasks 2 and 3 with it
manager.executorLost("exec2", "host2", ExecutorProcessLost("decommissioned"))
// Still no speculation
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set())
assert(manager.copiesRunning(2) === 0)
assert(manager.copiesRunning(3) === 0)
// Give it 2 new resources and it should spawn the tasks 2 and 3 lost on exec2
val indicesRerun = (1 to 2).map { _ =>
val taskRerunOpt = manager.resourceOffer("exec3", "host3", NO_PREF)._1
assert(taskRerunOpt.isDefined)
val taskRerun = taskRerunOpt.get
assert(taskRerun.executorId === "exec3")
assert(taskRerun.attemptNumber === 1)
taskRerun.index
}.toSet
assert(indicesRerun === Set(2, 3))
assert(manager.copiesRunning(2) === 1)
assert(manager.copiesRunning(3) === 1)

// Offering additional resources should not lead to any more tasks being respawned
assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
}

test("SPARK-29976 Regular speculation configs should still take effect even when a " +
Expand Down

0 comments on commit 9389ed5

Please sign in to comment.