diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4ebe567c4a5c1..5230703fad234 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -756,33 +756,124 @@ class DAGSchedulerSuite * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a * later, active job if they were previously run under a job that is no longer active */ - test("stage used by two jobs, the first no longer active") { + test("stage used by two jobs, the first no longer active (SPARK-6880)") { val baseRdd = new MyRDD(sc, 1, Nil) - val finalRdd1 = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) - val finalRdd2 = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) + val shuffleDep1 = new ShuffleDependency(baseRdd, null) + val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1)) + val shuffleDep2 = new ShuffleDependency(intermediateRdd, null) + val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2)) + val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2)) val job1Properties = new Properties() val job2Properties = new Properties() job1Properties.setProperty("testProperty", "job1") job2Properties.setProperty("testProperty", "job2") - // run job1 + // run both job 1 & 2, referencing the same stage, then cancel job1 + // Note that we have to submit job2 before we cancel job1, to have them actually share + // *Stages*, and not just shuffle dependencies, due to skipped stages. (at least until + // we address SPARK-10193) val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) + val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) assert(scheduler.activeJobs.nonEmpty) val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty") // remove job1 as an ActiveJob cancel(jobId1) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.failedStages.contains(0)) - assert(sparkListener.failedStages.size === 1) + + // job2 should still be running + assert(scheduler.activeJobs.nonEmpty) + val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty") + assert(testProperty1 != testProperty2) + assert(taskSets(0).properties != null) + // NB: this is next assert isn't necessarily the "desired" behavior, its more so to just document + // the current behavior. We've already submitted the task set for stage 0 based on job1 -- + // even though we have cancelled that job, and now we're running it b/c of job2, we haven't + // updated its properties. It might be desirable to have this actually change to "job2" + assert(taskSets(0).properties.getProperty("testProperty") === "job1") + complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) + + // the next two asserts the key checks for SPARK-6880 -- they make sure that the stage which + // was shared by both jobs, but never submitted any tasks for the first job, takes the props + // of the second job + assert(taskSets(1).properties != null) + assert(taskSets(1).properties.getProperty("testProperty") === "job2") + complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1)))) + assert(taskSets(2).properties != null) + complete(taskSets(2), Seq((Success, 42))) + assert(results === Map(0 -> 42)) assert(scheduler.activeJobs.isEmpty) - // run job2 + assertDataStructuresEmpty() + } + + /** + * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a + * later, active job if they were previously run under a job that is no longer active, even when + * there are fetch failures + */ + test("stage used by two jobs, some fetch failures, and the first job no longer active " + + "(SPARK-6880)") { + val baseRdd = new MyRDD(sc, 1, Nil) + val shuffleDep1 = new ShuffleDependency(baseRdd, null) + val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1)) + val shuffleDep2 = new ShuffleDependency(intermediateRdd, null) + val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2)) + val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2)) + val job1Properties = new Properties() + val job2Properties = new Properties() + job1Properties.setProperty("testProperty", "job1") + job2Properties.setProperty("testProperty", "job2") + + def checkJobProperties(taskSet: TaskSet, expected: String): Unit = { + assert(taskSet.properties != null) + assert(taskSet.properties.getProperty("testProperty") === expected) + } + + // run both job 1 & 2, referencing the same stage, then cancel job1 + // Note that we have to submit job2 before we cancel job1, to have them actually share + // *Stages*, and not just shuffle dependencies, due to skipped stages. (at least until + // we address SPARK-10193) + val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) assert(scheduler.activeJobs.nonEmpty) + val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty") + + // job 1 finishes stage 0 + assert(taskSets(0).properties.getProperty("testProperty") === "job1") + complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) + + // remove job1 as an ActiveJob + cancel(jobId1) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + + // job2 should still be running, starts from stage 1 + assert(scheduler.activeJobs.nonEmpty) val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty") assert(testProperty1 != testProperty2) - complete(taskSets(1), Seq((Success, 42))) + // NB: this is next assert isn't necessarily the "desired" behavior, its more so to just document + // the current behavior. We've already submitted the task set for stage 0 based on job1 -- + // even though we have cancelled that job, and now we're running it b/c of job2, we haven't + // updated its properties. It might be desirable to have this actually change to "job2" + checkJobProperties(taskSets(1), "job1") + + // lets say there is a fetch failure in this task set, which makes us go back and + // run stage 0, attempt 1 + complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null))) + scheduler.resubmitFailedStages() + + // stage 0, attempt 1 should have the properties of job2 + assert(taskSets(2).stageId === 0) + assert(taskSets(2).stageAttemptId === 1) + checkJobProperties(taskSets(2), "job2") + + // run the rest of the stages normally, checking they have the right properties + complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) + checkJobProperties(taskSets(3), "job2") + complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1)))) + checkJobProperties(taskSets(4), "job2") + complete(taskSets(4), Seq((Success, 42))) assert(results === Map(0 -> 42)) assert(scheduler.activeJobs.isEmpty)