Skip to content

Commit

Permalink
regression test for SPARK-19868
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Mar 15, 2017
1 parent 6c40b9f commit aac8d98
Showing 1 changed file with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import java.util.{Properties, Random}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.mockito.Matchers.{anyInt, anyString}
import org.mockito.Matchers.{any, anyInt, anyString}
import org.mockito.Mockito.{mock, never, spy, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer

import org.apache.spark._
import org.apache.spark.internal.config
Expand Down Expand Up @@ -1042,6 +1044,28 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.isZombie)
}

test("SPARK-19868: DagScheduler only notified of taskEnd when state is ready") {
// dagScheduler.taskEnded() is async, so it may *seem* ok to call it before we've set all
// appropriate state, eg. isZombie. However, this sets up a race that could go the wrong way.
// This is a super-focused regression test which checks the zombie state as soon as
// dagScheduler.taskEnded() is called, to ensure we haven't introduced a race.
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val mockDAGScheduler = mock(classOf[DAGScheduler])
sched.dagScheduler = mockDAGScheduler
val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock)
when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).then(new Answer[Unit] {
override def answer(invocationOnMock: InvocationOnMock): Unit = {
assert(manager.isZombie === true)
}
})
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
// this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon
manager.handleSuccessfulTask(0, createTaskResult(0))
}

test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
Expand Down

0 comments on commit aac8d98

Please sign in to comment.