Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6880] add test of correct behavior #1

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -756,33 +756,124 @@ class DAGSchedulerSuite
* Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
* later, active job if they were previously run under a job that is no longer active
*/
test("stage used by two jobs, the first no longer active") {
test("stage used by two jobs, the first no longer active (SPARK-6880)") {
val baseRdd = new MyRDD(sc, 1, Nil)
val finalRdd1 = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
val finalRdd2 = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
val shuffleDep1 = new ShuffleDependency(baseRdd, null)
val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
val shuffleDep2 = new ShuffleDependency(intermediateRdd, null)
val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
val job1Properties = new Properties()
val job2Properties = new Properties()
job1Properties.setProperty("testProperty", "job1")
job2Properties.setProperty("testProperty", "job2")

// run job1
// run both job 1 & 2, referencing the same stage, then cancel job1
// Note that we have to submit job2 before we cancel job1, to have them actually share
// *Stages*, and not just shuffle dependencies, due to skipped stages. (at least until
// we address SPARK-10193)
val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
assert(scheduler.activeJobs.nonEmpty)
val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")

// remove job1 as an ActiveJob
cancel(jobId1)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)

// job2 should still be running
assert(scheduler.activeJobs.nonEmpty)
val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
assert(testProperty1 != testProperty2)
assert(taskSets(0).properties != null)
// NB: this is next assert isn't necessarily the "desired" behavior, its more so to just document
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the grammar and stay within the 100-char bound:

// NB: this next assert isn't necessarily the "desired" behavior; it's just to document

// the current behavior. We've already submitted the task set for stage 0 based on job1 --
// even though we have cancelled that job, and now we're running it b/c of job2, we haven't
// updated its properties. It might be desirable to have this actually change to "job2"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I guess we should do a follow-up JIRA and PR to tie off this little loose end.

assert(taskSets(0).properties.getProperty("testProperty") === "job1")
complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))

// the next two asserts the key checks for SPARK-6880 -- they make sure that the stage which
// was shared by both jobs, but never submitted any tasks for the first job, takes the props
// of the second job
assert(taskSets(1).properties != null)
assert(taskSets(1).properties.getProperty("testProperty") === "job2")
complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
assert(taskSets(2).properties != null)
complete(taskSets(2), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assert(scheduler.activeJobs.isEmpty)

// run job2
assertDataStructuresEmpty()
}

/**
* Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
* later, active job if they were previously run under a job that is no longer active, even when
* there are fetch failures
*/
test("stage used by two jobs, some fetch failures, and the first job no longer active " +
"(SPARK-6880)") {
val baseRdd = new MyRDD(sc, 1, Nil)
val shuffleDep1 = new ShuffleDependency(baseRdd, null)
val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
val shuffleDep2 = new ShuffleDependency(intermediateRdd, null)
val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
val job1Properties = new Properties()
val job2Properties = new Properties()
job1Properties.setProperty("testProperty", "job1")
job2Properties.setProperty("testProperty", "job2")

def checkJobProperties(taskSet: TaskSet, expected: String): Unit = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good thing, but please refactor so that the prior test also uses this helper.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the amount of code duplication between these two tests, it's probably worth having just one common function with a doFetchFailures kind of branch -- i.e.

test("stage used by two jobs, the first no longer active (SPARK-6880)") {
  someAppropriateNameForTheHelper(doFetchFailures = false)
}

assert(taskSet.properties != null)
assert(taskSet.properties.getProperty("testProperty") === expected)
}

// run both job 1 & 2, referencing the same stage, then cancel job1
// Note that we have to submit job2 before we cancel job1, to have them actually share
// *Stages*, and not just shuffle dependencies, due to skipped stages. (at least until
// we address SPARK-10193)
val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
assert(scheduler.activeJobs.nonEmpty)
val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")

// job 1 finishes stage 0
assert(taskSets(0).properties.getProperty("testProperty") === "job1")
complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))

// remove job1 as an ActiveJob
cancel(jobId1)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

// job2 should still be running, starts from stage 1
assert(scheduler.activeJobs.nonEmpty)
val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
assert(testProperty1 != testProperty2)
complete(taskSets(1), Seq((Success, 42)))
// NB: this is next assert isn't necessarily the "desired" behavior, its more so to just document
// the current behavior. We've already submitted the task set for stage 0 based on job1 --
// even though we have cancelled that job, and now we're running it b/c of job2, we haven't
// updated its properties. It might be desirable to have this actually change to "job2"
checkJobProperties(taskSets(1), "job1")

// lets say there is a fetch failure in this task set, which makes us go back and
// run stage 0, attempt 1
complete(taskSets(1), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null)))
scheduler.resubmitFailedStages()

// stage 0, attempt 1 should have the properties of job2
assert(taskSets(2).stageId === 0)
assert(taskSets(2).stageAttemptId === 1)
checkJobProperties(taskSets(2), "job2")

// run the rest of the stages normally, checking they have the right properties
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
checkJobProperties(taskSets(3), "job2")
complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1))))
checkJobProperties(taskSets(4), "job2")
complete(taskSets(4), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assert(scheduler.activeJobs.isEmpty)

Expand Down