From 3631815a7149ecb74ca8ca4c15ff3e159229a57a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 3 Sep 2015 11:33:58 -0500 Subject: [PATCH 1/4] add test of correct behavior --- .../spark/scheduler/DAGSchedulerSuite.scala | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4ebe567c4a5c1..1e7fc9bb327ca 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -756,33 +756,48 @@ class DAGSchedulerSuite * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a * later, active job if they were previously run under a job that is no longer active */ - test("stage used by two jobs, the first no longer active") { + test("stage used by two jobs, the first no longer active (SPARK-6880)") { val baseRdd = new MyRDD(sc, 1, Nil) - val finalRdd1 = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) - val finalRdd2 = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) + val shuffleDep1 = new ShuffleDependency(baseRdd, null) + val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1)) + val shuffleDep2 = new ShuffleDependency(intermediateRdd, null) + val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2)) + val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2)) val job1Properties = new Properties() val job2Properties = new Properties() job1Properties.setProperty("testProperty", "job1") job2Properties.setProperty("testProperty", "job2") - // run job1 + // run both job 1 & 2, referencing the same stage, then cancel job1 val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) + val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) assert(scheduler.activeJobs.nonEmpty) val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty") // remove job1 as an ActiveJob cancel(jobId1) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.failedStages.contains(0)) - assert(sparkListener.failedStages.size === 1) - assert(scheduler.activeJobs.isEmpty) - // run job2 - val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) + // job2 should still be running assert(scheduler.activeJobs.nonEmpty) val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty") assert(testProperty1 != testProperty2) - complete(taskSets(1), Seq((Success, 42))) + assert(taskSets(0).properties != null) + // NB: this is next assert isn't necessarily the "desired" behavior, its more so to just document + // the current behavior. We've already submitted the task set for stage 0 based on job1 -- + // even though we have cancelled that job, and now we're running it b/c of job2, we haven't + // updated its properties. It might be desirable to have this actually change to "job2" + assert(taskSets(0).properties.getProperty("testProperty") === "job1") + complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) + + // the next two asserts the key checks for SPARK-6880 -- they make sure that the stage which + // was shared by both jobs, but never submitted any tasks for the first job, takes the props + // of the second job + assert(taskSets(1).properties != null) + assert(taskSets(1).properties.getProperty("testProperty") === "job2") + complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1)))) + assert(taskSets(2).properties != null) + complete(taskSets(2), Seq((Success, 42))) assert(results === Map(0 -> 42)) assert(scheduler.activeJobs.isEmpty) From cc565b18899a849b61f79ae118bacacc291096e0 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 3 Sep 2015 11:44:55 -0500 Subject: [PATCH 2/4] expand comment --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 1e7fc9bb327ca..8b0314f3eab8a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -769,6 +769,9 @@ class DAGSchedulerSuite job2Properties.setProperty("testProperty", "job2") // run both job 1 & 2, referencing the same stage, then cancel job1 + // Note that we have to submit job2 before we cancel job1, to have them actually share + // *Stages*, and not just shuffle dependencies, due to skipped stages. (at least until + // we address SPARK-10193) val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) assert(scheduler.activeJobs.nonEmpty) From 5fb878767f7362a25627bf5170611e923812b6a1 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 3 Sep 2015 16:52:44 -0500 Subject: [PATCH 3/4] add a test which includes a fetch failure --- .../spark/scheduler/DAGSchedulerSuite.scala | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8b0314f3eab8a..f3f42b3220be5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -807,6 +807,79 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + /** + * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a + * later, active job if they were previously run under a job that is no longer active, even when + * there are fetch failures + */ + test("stage used by two jobs, some fetch failures, and the first job no longer active " + + "(SPARK-6880)") { + val baseRdd = new MyRDD(sc, 1, Nil) + val shuffleDep1 = new ShuffleDependency(baseRdd, null) + val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1)) + val shuffleDep2 = new ShuffleDependency(intermediateRdd, null) + val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2)) + val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2)) + val job1Properties = new Properties() + val job2Properties = new Properties() + job1Properties.setProperty("testProperty", "job1") + job2Properties.setProperty("testProperty", "job2") + + def checkJobProperties(taskSet: TaskSet, expected: String): Unit = { + assert(taskSet.properties != null) + assert(taskSet.properties.getProperty("testProperty") === expected) + } + + // run both job 1 & 2, referencing the same stage, then cancel job1 + // Note that we have to submit job2 before we cancel job1, to have them actually share + // *Stages*, and not just shuffle dependencies, due to skipped stages. (at least until + // we address SPARK-10193) + val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) + val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) + assert(scheduler.activeJobs.nonEmpty) + val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty") + + // job 1 finishes stage 0 + assert(taskSets(0).properties.getProperty("testProperty") === "job1") + complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) + + // remove job1 as an ActiveJob + cancel(jobId1) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + + // job2 should still be running, starts from stage 1 + assert(scheduler.activeJobs.nonEmpty) + val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty") + assert(testProperty1 != testProperty2) + // NB: this is next assert isn't necessarily the "desired" behavior, its more so to just document + // the current behavior. We've already submitted the task set for stage 0 based on job1 -- + // even though we have cancelled that job, and now we're running it b/c of job2, we haven't + // updated its properties. It might be desirable to have this actually change to "job2" + checkJobProperties(taskSets(1), "job1") + + // but lets say there is a fetch failure in this task set, which makes us go back and + // run stage 0, attempt 1 + complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null))) + scheduler.resubmitFailedStages() + + // but stage 0, attempt 1 should have the properties of job2 + assert(taskSets(2).stageId === 0) + assert(taskSets(2).stageAttemptId === 1) + checkJobProperties(taskSets(2), "job2") + + // run the rest of the stages normally, checking they have the right properties + complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) + checkJobProperties(taskSets(3), "job2") + complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1)))) + checkJobProperties(taskSets(4), "job2") + complete(taskSets(4), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assert(scheduler.activeJobs.isEmpty) + + assertDataStructuresEmpty() + } + test("run trivial shuffle with out-of-band failure and retry") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) From 2cd132b1d79ec5b65fcd595c56c6e1bf38bdf789 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 3 Sep 2015 16:55:37 -0500 Subject: [PATCH 4/4] cleanup comments --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f3f42b3220be5..5230703fad234 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -857,13 +857,13 @@ class DAGSchedulerSuite // updated its properties. It might be desirable to have this actually change to "job2" checkJobProperties(taskSets(1), "job1") - // but lets say there is a fetch failure in this task set, which makes us go back and + // lets say there is a fetch failure in this task set, which makes us go back and // run stage 0, attempt 1 complete(taskSets(1), Seq( (FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null))) scheduler.resubmitFailedStages() - // but stage 0, attempt 1 should have the properties of job2 + // stage 0, attempt 1 should have the properties of job2 assert(taskSets(2).stageId === 0) assert(taskSets(2).stageAttemptId === 1) checkJobProperties(taskSets(2), "job2")