Skip to content

Commit

Permalink
Breaking out into two tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Mar 17, 2014
1 parent 2b4e085 commit 8045103
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,15 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
val serializedSize = mapOutputStatuses.size
if (serializedSize > maxAkkaFrameSize) {
throw new SparkException(s"Map output statuses were $serializedSize bytes which " +
s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes).")
val msg = s"Map output statuses were $serializedSize bytes which " +
s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)."
/**
* For SPARK-1244 we'll opt for just logging an error and then throwing an exception.
* Note that on exception the actor will just restart. A bigger refactoring (SPARK-1239)
* will utlimately remove this entire code path.
*/
logError(msg)
throw new SparkException(msg)
}
sender ! mapOutputStatuses

Expand Down
30 changes: 22 additions & 8 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
}

test("remote fetch exceeding akka frame size") {
test("remote fetch exceeds akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
newConf.set("spark.akka.askTimeout", "1") // Fail fast
Expand All @@ -154,18 +154,32 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
val masterActor = actorRef.underlyingActor

// Frame size should be ~123B, and no exception should be thrown
masterTracker.registerShuffle(10, 1)
masterTracker.registerMapOutput(10, 0, new MapStatus(
BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0)))
masterActor.receive(GetMapOutputStatuses(10))

// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception
// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
// Note that the size is hand-selected here because map output statuses are compressed before
// being sent.
masterTracker.registerShuffle(20, 100)
(0 until 100).foreach { i =>
masterTracker.registerMapOutput(20, i, new MapStatus(
BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
}
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
}

test("remote fetch below akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
newConf.set("spark.akka.askTimeout", "1") // Fail fast

val masterTracker = new MapOutputTrackerMaster(conf)
val actorSystem = ActorSystem("test")
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
val masterActor = actorRef.underlyingActor

// Frame size should be ~123B, and no exception should be thrown
masterTracker.registerShuffle(10, 1)
masterTracker.registerMapOutput(10, 0, new MapStatus(
BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0)))
masterActor.receive(GetMapOutputStatuses(10))
}
}

0 comments on commit 8045103

Please sign in to comment.