Skip to content

Commit

Permalink
[SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associa…
Browse files Browse the repository at this point in the history
…ted with a Stage

This issue was addressed in apache/spark#5494, but the fix in that PR, while safe in the sense that it will prevent the SparkContext from shutting down, misses the actual bug.  The intent of `submitMissingTasks` should be understood as "submit the Tasks that are missing for the Stage, and run them as part of the ActiveJob identified by jobId".  Because of a long-standing bug, the `jobId` parameter was never being used.  Instead, we were trying to use the jobId with which the Stage was created -- which may no longer exist as an ActiveJob, hence the crash reported in SPARK-6880.

The correct fix is to use the ActiveJob specified by the supplied jobId parameter, which is guaranteed to exist at the call sites of submitMissingTasks.

This fix should be applied to all maintenance branches, since it has existed since 1.0.

kayousterhout pankajarora12

Author: Mark Hamstra <markhamstra@gmail.com>
Author: Imran Rashid <irashid@cloudera.com>

Closes #6291 from markhamstra/SPARK-6880.
  • Loading branch information
markhamstra authored and kiszk committed Dec 26, 2015
1 parent a38e179 commit 37bd9a1
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,9 @@ class DAGScheduler(
stage.resetInternalAccumulators()
}

val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties

runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
Expand Down Expand Up @@ -1047,7 +1049,7 @@ class DAGScheduler(
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
Expand Down
107 changes: 105 additions & 2 deletions core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import java.util.Properties

import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
import scala.language.reflectiveCalls
import scala.util.control.NonFatal
Expand Down Expand Up @@ -262,9 +264,10 @@ class DAGSchedulerSuite
rdd: RDD[_],
partitions: Array[Int],
func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
listener: JobListener = jobListener): Int = {
listener: JobListener = jobListener,
properties: Properties = null): Int = {
val jobId = scheduler.nextJobId.getAndIncrement()
runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener))
runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties))
jobId
}

Expand Down Expand Up @@ -1322,6 +1325,106 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}

def checkJobPropertiesAndPriority(taskSet: TaskSet, expected: String, priority: Int): Unit = {
assert(taskSet.properties != null)
assert(taskSet.properties.getProperty("testProperty") === expected)
assert(taskSet.priority === priority)
}

def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, Int, Nothing] = {
val baseRdd = new MyRDD(sc, 1, Nil)
val shuffleDep1 = new ShuffleDependency(baseRdd, new HashPartitioner(1))
val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
val shuffleDep2 = new ShuffleDependency(intermediateRdd, new HashPartitioner(1))
val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
val job1Properties = new Properties()
val job2Properties = new Properties()
job1Properties.setProperty("testProperty", "job1")
job2Properties.setProperty("testProperty", "job2")

// Run jobs 1 & 2, both referencing the same stage, then cancel job1.
// Note that we have to submit job2 before we cancel job1 to have them actually share
// *Stages*, and not just shuffle dependencies, due to skipped stages (at least until
// we address SPARK-10193.)
val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
assert(scheduler.activeJobs.nonEmpty)
val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")

// remove job1 as an ActiveJob
cancel(jobId1)

// job2 should still be running
assert(scheduler.activeJobs.nonEmpty)
val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
assert(testProperty1 != testProperty2)
// NB: This next assert isn't necessarily the "desired" behavior; it's just to document
// the current behavior. We've already submitted the TaskSet for stage 0 based on job1, but
// even though we have cancelled that job and are now running it because of job2, we haven't
// updated the TaskSet's properties. Changing the properties to "job2" is likely the more
// correct behavior.
val job1Id = 0 // TaskSet priority for Stages run with "job1" as the ActiveJob
checkJobPropertiesAndPriority(taskSets(0), "job1", job1Id)
complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))

shuffleDep1
}

/**
* Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
* later, active job if they were previously run under a job that is no longer active
*/
test("stage used by two jobs, the first no longer active (SPARK-6880)") {
launchJobsThatShareStageAndCancelFirst()

// The next check is the key for SPARK-6880. For the stage which was shared by both job1 and
// job2 but never had any tasks submitted for job1, the properties of job2 are now used to run
// the stage.
checkJobPropertiesAndPriority(taskSets(1), "job2", 1)

complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
assert(taskSets(2).properties != null)
complete(taskSets(2), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assert(scheduler.activeJobs.isEmpty)

assertDataStructuresEmpty()
}

/**
* Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
* later, active job if they were previously run under a job that is no longer active, even when
* there are fetch failures
*/
test("stage used by two jobs, some fetch failures, and the first job no longer active " +
"(SPARK-6880)") {
val shuffleDep1 = launchJobsThatShareStageAndCancelFirst()
val job2Id = 1 // TaskSet priority for Stages run with "job2" as the ActiveJob

// lets say there is a fetch failure in this task set, which makes us go back and
// run stage 0, attempt 1
complete(taskSets(1), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null)))
scheduler.resubmitFailedStages()

// stage 0, attempt 1 should have the properties of job2
assert(taskSets(2).stageId === 0)
assert(taskSets(2).stageAttemptId === 1)
checkJobPropertiesAndPriority(taskSets(2), "job2", job2Id)

// run the rest of the stages normally, checking that they have the correct properties
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
checkJobPropertiesAndPriority(taskSets(3), "job2", job2Id)
complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1))))
checkJobPropertiesAndPriority(taskSets(4), "job2", job2Id)
complete(taskSets(4), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assert(scheduler.activeJobs.isEmpty)

assertDataStructuresEmpty()
}

test("run trivial shuffle with out-of-band failure and retry") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
Expand Down

0 comments on commit 37bd9a1

Please sign in to comment.