Skip to content

Commit

Permalink
SPARK-1929 DAGScheduler suspended by local task OOM
Browse files Browse the repository at this point in the history
  • Loading branch information
zhpengg committed May 26, 2014
1 parent d6395d8 commit aa63161
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.io.{NotSerializableException, PrintWriter, StringWriter}
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -580,6 +580,13 @@ class DAGScheduler(
case e: Exception =>
jobResult = JobFailed(e)
job.listener.jobFailed(e)
case oom: OutOfMemoryError =>
val errors: StringWriter = new StringWriter()
oom.printStackTrace(new PrintWriter(errors))
val exception = new SparkException("job failed for: " + oom.getMessage() +
"\n" + errors.toString())
jobResult = JobFailed(exception)
job.listener.jobFailed(exception)
} finally {
val s = job.finalStage
stageIdToJobIds -= s.id // clean up data structures that were populated for a local job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,20 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}

test("local job oom") {
val rdd = new MyRDD(sc, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new java.lang.OutOfMemoryError("test local job oom")
override def getPartitions = Array( new Partition { override def index = 0 } )
override def getPreferredLocations(split: Partition) = Nil
override def toString = "DAGSchedulerSuite Local RDD"
}
val jobId = scheduler.nextJobId.getAndIncrement()
runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
assert(results.size == 0)
assertDataStructuresEmpty
}

test("run trivial job w/ dependency") {
val baseRdd = makeRdd(1, Nil)
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
Expand Down

0 comments on commit aa63161

Please sign in to comment.