Skip to content

Commit

Permalink
more debug printlns
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jun 11, 2015
1 parent ecb4e7d commit 9601b47
Showing 1 changed file with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ class DAGSchedulerSuite
/** This tests the case where another FetchFailed comes in while the map stage is getting
* re-run. */
test("late fetch failures don't cause multiple concurrent attempts for the same map stage") {
println("begin late fetch failure")
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
Expand All @@ -572,13 +573,15 @@ class DAGSchedulerSuite
// The map stage should have been submitted.
assert(countSubmittedMapStageAttempts() === 1)

println("late fetch failure: taskSets = " + taskSets)
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
// The MapOutputTracker should know about both map output locations.
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
Array("hostA", "hostB"))

println("late fetch failure: taskSets = " + taskSets)
// The first result task fails, with a fetch failure for the output from the first mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
Expand Down Expand Up @@ -622,6 +625,7 @@ class DAGSchedulerSuite
*/
test("extremely late fetch failures don't cause multiple concurrent attempts for " +
"the same stage") {
println("begin extremely late fetch failure")
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
Expand All @@ -639,6 +643,7 @@ class DAGSchedulerSuite
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 1)

println("extremely late fetch failure: taskSets = " + taskSets)
// Complete the map stage.
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
Expand All @@ -648,6 +653,7 @@ class DAGSchedulerSuite
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedReduceStageAttempts() === 1)

println("extremely late fetch failure: taskSets = " + taskSets)
// The first result task fails, with a fetch failure for the output from the first mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
Expand Down

0 comments on commit 9601b47

Please sign in to comment.