Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
dskrvk committed Nov 25, 2015
2 parents 1b1a5ab + 0a5aef7 commit c773e90
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,9 @@ class DAGScheduler(
stage.resetInternalAccumulators()
}

val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties

runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
Expand Down Expand Up @@ -1047,7 +1049,7 @@ class DAGScheduler(
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
Expand Down
107 changes: 105 additions & 2 deletions core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import java.util.Properties

import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
import scala.language.reflectiveCalls
import scala.util.control.NonFatal
Expand Down Expand Up @@ -262,9 +264,10 @@ class DAGSchedulerSuite
rdd: RDD[_],
partitions: Array[Int],
func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
listener: JobListener = jobListener): Int = {
listener: JobListener = jobListener,
properties: Properties = null): Int = {
val jobId = scheduler.nextJobId.getAndIncrement()
runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener))
runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties))
jobId
}

Expand Down Expand Up @@ -1322,6 +1325,106 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}

def checkJobPropertiesAndPriority(taskSet: TaskSet, expected: String, priority: Int): Unit = {
assert(taskSet.properties != null)
assert(taskSet.properties.getProperty("testProperty") === expected)
assert(taskSet.priority === priority)
}

def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, Int, Nothing] = {
val baseRdd = new MyRDD(sc, 1, Nil)
val shuffleDep1 = new ShuffleDependency(baseRdd, new HashPartitioner(1))
val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
val shuffleDep2 = new ShuffleDependency(intermediateRdd, new HashPartitioner(1))
val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
val job1Properties = new Properties()
val job2Properties = new Properties()
job1Properties.setProperty("testProperty", "job1")
job2Properties.setProperty("testProperty", "job2")

// Run jobs 1 & 2, both referencing the same stage, then cancel job1.
// Note that we have to submit job2 before we cancel job1 to have them actually share
// *Stages*, and not just shuffle dependencies, due to skipped stages (at least until
// we address SPARK-10193.)
val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
assert(scheduler.activeJobs.nonEmpty)
val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")

// remove job1 as an ActiveJob
cancel(jobId1)

// job2 should still be running
assert(scheduler.activeJobs.nonEmpty)
val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
assert(testProperty1 != testProperty2)
// NB: This next assert isn't necessarily the "desired" behavior; it's just to document
// the current behavior. We've already submitted the TaskSet for stage 0 based on job1, but
// even though we have cancelled that job and are now running it because of job2, we haven't
// updated the TaskSet's properties. Changing the properties to "job2" is likely the more
// correct behavior.
val job1Id = 0 // TaskSet priority for Stages run with "job1" as the ActiveJob
checkJobPropertiesAndPriority(taskSets(0), "job1", job1Id)
complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))

shuffleDep1
}

/**
* Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
* later, active job if they were previously run under a job that is no longer active
*/
test("stage used by two jobs, the first no longer active (SPARK-6880)") {
launchJobsThatShareStageAndCancelFirst()

// The next check is the key for SPARK-6880. For the stage which was shared by both job1 and
// job2 but never had any tasks submitted for job1, the properties of job2 are now used to run
// the stage.
checkJobPropertiesAndPriority(taskSets(1), "job2", 1)

complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
assert(taskSets(2).properties != null)
complete(taskSets(2), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assert(scheduler.activeJobs.isEmpty)

assertDataStructuresEmpty()
}

/**
* Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
* later, active job if they were previously run under a job that is no longer active, even when
* there are fetch failures
*/
test("stage used by two jobs, some fetch failures, and the first job no longer active " +
"(SPARK-6880)") {
val shuffleDep1 = launchJobsThatShareStageAndCancelFirst()
val job2Id = 1 // TaskSet priority for Stages run with "job2" as the ActiveJob

// lets say there is a fetch failure in this task set, which makes us go back and
// run stage 0, attempt 1
complete(taskSets(1), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null)))
scheduler.resubmitFailedStages()

// stage 0, attempt 1 should have the properties of job2
assert(taskSets(2).stageId === 0)
assert(taskSets(2).stageAttemptId === 1)
checkJobPropertiesAndPriority(taskSets(2), "job2", job2Id)

// run the rest of the stages normally, checking that they have the correct properties
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
checkJobPropertiesAndPriority(taskSets(3), "job2", job2Id)
complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1))))
checkJobPropertiesAndPriority(taskSets(4), "job2", job2Id)
complete(taskSets(4), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assert(scheduler.activeJobs.isEmpty)

assertDataStructuresEmpty()
}

test("run trivial shuffle with out-of-band failure and retry") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
Expand Down

0 comments on commit c773e90

Please sign in to comment.