Skip to content

Commit

Permalink
Throw exception on spark.akka.frameSize exceeded + Unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Mar 15, 2014
1 parent e19044c commit 281d7c9
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 28 deletions.
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,21 @@ private[spark] case class GetMapOutputStatuses(shuffleId: Int)
extends MapOutputTrackerMessage
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage

private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster)
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf)
extends Actor with Logging {
val maxAkkaFrameSize = conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 // MB

def receive = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = sender.path.address.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
sender ! tracker.getSerializedMapOutputStatuses(shuffleId)
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
val serializedSize = mapOutputStatuses.size
if (serializedSize > maxAkkaFrameSize) {
throw new SparkException(
"spark.akka.frameSize exceeded! Map output statuses were %d bytes".format(serializedSize))
}
sender ! mapOutputStatuses

case StopMapOutputTracker =>
logInfo("MapOutputTrackerActor stopped!")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object SparkEnv extends Logging {
}
mapOutputTracker.trackerActor = registerOrLookup(
"MapOutputTracker",
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]))
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

val shuffleFetcher = instantiateClass[ShuffleFetcher](
"spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher")
Expand Down
10 changes: 5 additions & 5 deletions core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {

val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")

val badconf = new SparkConf
badconf.set("spark.authenticate", "true")
badconf.set("spark.authenticate.secret", "bad")
val securityManagerBad = new SecurityManager(badconf);
val securityManagerBad = new SecurityManager(badconf)

assert(securityManagerBad.isAuthenticationEnabled() === true)

Expand Down Expand Up @@ -84,7 +84,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {

val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")

val badconf = new SparkConf
badconf.set("spark.authenticate", "false")
Expand Down Expand Up @@ -136,7 +136,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {

val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")

val goodconf = new SparkConf
goodconf.set("spark.authenticate", "true")
Expand Down Expand Up @@ -189,7 +189,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {

val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")

val badconf = new SparkConf
badconf.set("spark.authenticate", "false")
Expand Down
70 changes: 50 additions & 20 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master start and stop") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
tracker.trackerActor =
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.stop()
}

test("master register and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
tracker.trackerActor =
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
Expand All @@ -77,7 +79,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master register and unregister and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
tracker.trackerActor =
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
Expand All @@ -97,23 +100,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
}

test("remote fetch") {
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf,
securityManager = new SecurityManager(conf))
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext

val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")

val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf,
securityManager = new SecurityManager(conf))
val slaveTracker = new MapOutputTracker(conf)
val selection = slaveSystem.actorSelection(
s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
val timeout = AkkaUtils.lookupTimeout(conf)
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)

val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(conf)
masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
Expand All @@ -136,4 +123,47 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// failure should be cached
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
}

test("remote fetch exceeding akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
newConf.set("spark.akka.askTimeout", "1") // Fail fast
val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(newConf)

// Frame size should be ~123B, and no exception should be thrown
masterTracker.registerShuffle(10, 1)
masterTracker.registerMapOutput(10, 0, new MapStatus(
BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0)))
slaveTracker.getServerStatuses(10, 0)

// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception
masterTracker.registerShuffle(20, 100)
(0 until 100).foreach { i =>
masterTracker.registerMapOutput(20, i, new MapStatus(
BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
}
intercept[SparkException] { slaveTracker.getServerStatuses(20, 0) }
}

private def setUpMasterSlaveSystem(conf: SparkConf) = {
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf,
securityManager = new SecurityManager(conf))

// Will be cleared by LocalSparkContext
System.setProperty("spark.driver.port", boundPort.toString)

val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")

val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf,
securityManager = new SecurityManager(conf))
val slaveTracker = new MapOutputTracker(conf)
val selection = slaveSystem.actorSelection(
s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
val timeout = AkkaUtils.lookupTimeout(conf)
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
(masterTracker, slaveTracker)
}
}

0 comments on commit 281d7c9

Please sign in to comment.