Skip to content

Commit

Permalink
more printlns ...
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jun 11, 2015
1 parent 9601b47 commit 89a59b6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ class DAGScheduler(
stage.pendingTasks.clear()

// First figure out the indexes of partition ids to compute.
println(s"finding partitions to compute for $stage")
val partitionsToCompute: Seq[Int] = {
stage match {
case stage: ShuffleMapStage =>
Expand Down Expand Up @@ -928,6 +929,7 @@ class DAGScheduler(
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)
println(debugString)
}
}

Expand Down Expand Up @@ -1083,7 +1085,8 @@ class DAGScheduler(
{
newlyRunnable += shuffleStage
}
println(s"newly runnable stages = $newlyRunnable")
val newlyRunnableWithJob = newlyRunnable.map{x => x -> activeJobForStage(x)}
println(s"newly runnable stages = $newlyRunnableWithJob")
waitingStages --= newlyRunnable
runningStages ++= newlyRunnable
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,13 @@ class DAGSchedulerSuite
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
val jobId = submit(reduceRdd, Array(0, 1))
println(s"late fetch failure: jobId = $jobId")
println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}")
println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}")
println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}")
println(s"late fetch failure: runningStages = ${scheduler.runningStages}")
println(s"late fetch failure: failedStages = ${scheduler.failedStages}")

val mapStageId = 0
def countSubmittedMapStageAttempts(): Int = {
Expand All @@ -574,6 +580,11 @@ class DAGSchedulerSuite
assert(countSubmittedMapStageAttempts() === 1)

println("late fetch failure: taskSets = " + taskSets)
println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}")
println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}")
println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}")
println(s"late fetch failure: runningStages = ${scheduler.runningStages}")
println(s"late fetch failure: failedStages = ${scheduler.failedStages}")
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
Expand All @@ -582,6 +593,13 @@ class DAGSchedulerSuite
Array("hostA", "hostB"))

println("late fetch failure: taskSets = " + taskSets)
println("late fetch failure: submittedStages = " + sparkListener.submittedStageInfos)
println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}")
println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}")
println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}")
println(s"late fetch failure: runningStages = ${scheduler.runningStages}")
println(s"late fetch failure: failedStages = ${scheduler.failedStages}")

// The first result task fails, with a fetch failure for the output from the first mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
Expand Down

0 comments on commit 89a59b6

Please sign in to comment.