Skip to content

Commit

Permalink
Fix bug that prevented jobs with inherited job group properties from …
Browse files Browse the repository at this point in the history
…being cancelled.
  • Loading branch information
JoshRosen committed Mar 31, 2015
1 parent 5677557 commit b376114
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ class DAGScheduler(
// Cancel all jobs belonging to this job group.
// First finds all active jobs with this group id, and then kill stages for them.
val activeInGroup = activeJobs.filter(activeJob =>
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
groupId == activeJob.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
submitWaitingStages()
Expand Down
35 changes: 35 additions & 0 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,41 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
assert(jobB.get() === 100)
}

test("inherited job group") {
sc = new SparkContext("local[2]", "test")

// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem.release()
}
})

sc.setJobGroup("jobA", "this is a job to be cancelled")
@volatile var exception: Exception = null
val jobA = new Thread() {
// The job group should be inherited by this thread
override def run(): Unit = {
exception = intercept[SparkException] {
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
}
}
}
jobA.start()

// Block until both tasks of job A have started and cancel job A.
sem.acquire(2)
sc.cancelJobGroup("jobA")
jobA.join(10000)
assert(!jobA.isAlive)
assert(exception.getMessage contains "cancel")

// Once A is cancelled, job B should finish fairly quickly.
val jobB = sc.parallelize(1 to 100, 2).countAsync()
assert(jobB.get() === 100)
}

test("job group with interruption") {
sc = new SparkContext("local[2]", "test")

Expand Down

0 comments on commit b376114

Please sign in to comment.