From 281d7c9f83344cf107ed946635eda7be922e0267 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 14 Mar 2014 20:18:35 -0700 Subject: [PATCH 1/6] Throw exception on spark.akka.frameSize exceeded + Unit tests --- .../org/apache/spark/MapOutputTracker.scala | 12 +++- .../scala/org/apache/spark/SparkEnv.scala | 2 +- .../org/apache/spark/AkkaUtilsSuite.scala | 10 +-- .../apache/spark/MapOutputTrackerSuite.scala | 70 +++++++++++++------ 4 files changed, 66 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 5968973132942..4676fb0a930aa 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -35,13 +35,21 @@ private[spark] case class GetMapOutputStatuses(shuffleId: Int) extends MapOutputTrackerMessage private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage -private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster) +private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf) extends Actor with Logging { + val maxAkkaFrameSize = conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 // MB + def receive = { case GetMapOutputStatuses(shuffleId: Int) => val hostPort = sender.path.address.hostPort logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort) - sender ! tracker.getSerializedMapOutputStatuses(shuffleId) + val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId) + val serializedSize = mapOutputStatuses.size + if (serializedSize > maxAkkaFrameSize) { + throw new SparkException( + "spark.akka.frameSize exceeded! Map output statuses were %d bytes".format(serializedSize)) + } + sender ! mapOutputStatuses case StopMapOutputTracker => logInfo("MapOutputTrackerActor stopped!") diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 5e43b5198422c..26c362d0fea7b 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -186,7 +186,7 @@ object SparkEnv extends Logging { } mapOutputTracker.trackerActor = registerOrLookup( "MapOutputTracker", - new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])) + new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) val shuffleFetcher = instantiateClass[ShuffleFetcher]( "spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher") diff --git a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala index cd054c1f684ab..d2e303d81c4c8 100644 --- a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala @@ -45,12 +45,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") val badconf = new SparkConf badconf.set("spark.authenticate", "true") badconf.set("spark.authenticate.secret", "bad") - val securityManagerBad = new SecurityManager(badconf); + val securityManagerBad = new SecurityManager(badconf) assert(securityManagerBad.isAuthenticationEnabled() === true) @@ -84,7 +84,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") val badconf = new SparkConf badconf.set("spark.authenticate", "false") @@ -136,7 +136,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") val goodconf = new SparkConf goodconf.set("spark.authenticate", "true") @@ -189,7 +189,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") val badconf = new SparkConf badconf.set("spark.authenticate", "false") diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 8efa072a97911..d043d556d99da 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -51,14 +51,16 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("master start and stop") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) - tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) + tracker.trackerActor = + actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf))) tracker.stop() } test("master register and fetch") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) - tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) + tracker.trackerActor = + actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf))) tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) @@ -77,7 +79,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("master register and unregister and fetch") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) - tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) + tracker.trackerActor = + actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf))) tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) @@ -97,23 +100,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { } test("remote fetch") { - val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, - securityManager = new SecurityManager(conf)) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext - - val masterTracker = new MapOutputTrackerMaster(conf) - masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") - - val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf, - securityManager = new SecurityManager(conf)) - val slaveTracker = new MapOutputTracker(conf) - val selection = slaveSystem.actorSelection( - s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") - val timeout = AkkaUtils.lookupTimeout(conf) - slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) - + val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(conf) masterTracker.registerShuffle(10, 1) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) @@ -136,4 +123,47 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { // failure should be cached intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } } + + test("remote fetch exceeding akka frame size") { + val newConf = new SparkConf + newConf.set("spark.akka.frameSize", "1") + newConf.set("spark.akka.askTimeout", "1") // Fail fast + val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(newConf) + + // Frame size should be ~123B, and no exception should be thrown + masterTracker.registerShuffle(10, 1) + masterTracker.registerMapOutput(10, 0, new MapStatus( + BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0))) + slaveTracker.getServerStatuses(10, 0) + + // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception + masterTracker.registerShuffle(20, 100) + (0 until 100).foreach { i => + masterTracker.registerMapOutput(20, i, new MapStatus( + BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0))) + } + intercept[SparkException] { slaveTracker.getServerStatuses(20, 0) } + } + + private def setUpMasterSlaveSystem(conf: SparkConf) = { + val hostname = "localhost" + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, + securityManager = new SecurityManager(conf)) + + // Will be cleared by LocalSparkContext + System.setProperty("spark.driver.port", boundPort.toString) + + val masterTracker = new MapOutputTrackerMaster(conf) + masterTracker.trackerActor = actorSystem.actorOf( + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") + + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf, + securityManager = new SecurityManager(conf)) + val slaveTracker = new MapOutputTracker(conf) + val selection = slaveSystem.actorSelection( + s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") + val timeout = AkkaUtils.lookupTimeout(conf) + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) + (masterTracker, slaveTracker) + } } From c9b610958da9fa01b34936ee4283bf266aec66bc Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 15 Mar 2014 11:31:48 -0700 Subject: [PATCH 2/6] Simplify test + make access to akka frame size more modular --- .../org/apache/spark/MapOutputTracker.scala | 2 +- .../org/apache/spark/util/AkkaUtils.scala | 7 ++- .../apache/spark/MapOutputTrackerSuite.scala | 56 ++++++++++--------- 3 files changed, 36 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 4676fb0a930aa..127d1195226b7 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -37,7 +37,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf) extends Actor with Logging { - val maxAkkaFrameSize = conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 // MB + val maxAkkaFrameSize = AkkaUtils.maxFrameSize(conf) * 1024 * 1024 // MB def receive = { case GetMapOutputStatuses(shuffleId: Int) => diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index a6c9a9aaba8eb..a466102333267 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -49,7 +49,7 @@ private[spark] object AkkaUtils extends Logging { val akkaTimeout = conf.getInt("spark.akka.timeout", 100) - val akkaFrameSize = conf.getInt("spark.akka.frameSize", 10) + val akkaFrameSize = maxFrameSize(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" if (!akkaLogLifecycleEvents) { @@ -121,4 +121,9 @@ private[spark] object AkkaUtils extends Logging { def lookupTimeout(conf: SparkConf): FiniteDuration = { Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds") } + + /** Returns the default max frame size for Akka messages in MB. */ + def maxFrameSize(conf: SparkConf): Int = { + conf.getInt("spark.akka.frameSize", 10) + } } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index d043d556d99da..dc70576d82419 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark import scala.concurrent.Await import akka.actor._ +import akka.testkit.TestActorRef import org.scalatest.FunSuite import org.apache.spark.scheduler.MapStatus @@ -100,7 +101,25 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { } test("remote fetch") { - val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(conf) + val hostname = "localhost" + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, + securityManager = new SecurityManager(conf)) + + // Will be cleared by LocalSparkContext + System.setProperty("spark.driver.port", boundPort.toString) + + val masterTracker = new MapOutputTrackerMaster(conf) + masterTracker.trackerActor = actorSystem.actorOf( + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") + + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf, + securityManager = new SecurityManager(conf)) + val slaveTracker = new MapOutputTracker(conf) + val selection = slaveSystem.actorSelection( + s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") + val timeout = AkkaUtils.lookupTimeout(conf) + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) + masterTracker.registerShuffle(10, 1) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) @@ -113,7 +132,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) assert(slaveTracker.getServerStatuses(10, 0).toSeq === - Seq((BlockManagerId("a", "hostA", 1000, 0), size1000))) + Seq((BlockManagerId("a", "hostA", 1000, 0), size1000))) masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0)) masterTracker.incrementEpoch() @@ -128,13 +147,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val newConf = new SparkConf newConf.set("spark.akka.frameSize", "1") newConf.set("spark.akka.askTimeout", "1") // Fail fast - val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(newConf) + + val masterTracker = new MapOutputTrackerMaster(conf) + val actorSystem = ActorSystem("test") + val actorRef = TestActorRef[MapOutputTrackerMasterActor]( + new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem) + val masterActor = actorRef.underlyingActor // Frame size should be ~123B, and no exception should be thrown masterTracker.registerShuffle(10, 1) masterTracker.registerMapOutput(10, 0, new MapStatus( BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0))) - slaveTracker.getServerStatuses(10, 0) + masterActor.receive(GetMapOutputStatuses(10)) // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception masterTracker.registerShuffle(20, 100) @@ -142,28 +166,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { masterTracker.registerMapOutput(20, i, new MapStatus( BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0))) } - intercept[SparkException] { slaveTracker.getServerStatuses(20, 0) } - } - - private def setUpMasterSlaveSystem(conf: SparkConf) = { - val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, - securityManager = new SecurityManager(conf)) - - // Will be cleared by LocalSparkContext - System.setProperty("spark.driver.port", boundPort.toString) - - val masterTracker = new MapOutputTrackerMaster(conf) - masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") - - val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf, - securityManager = new SecurityManager(conf)) - val slaveTracker = new MapOutputTracker(conf) - val selection = slaveSystem.actorSelection( - s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") - val timeout = AkkaUtils.lookupTimeout(conf) - slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) - (masterTracker, slaveTracker) + intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } } } From 2b4e085e057ce19bb2bce109d5108567f91ccb11 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 15 Mar 2014 17:05:32 -0700 Subject: [PATCH 3/6] Consolidate Executor use of akka frame size --- .../main/scala/org/apache/spark/MapOutputTracker.scala | 6 +++--- .../scala/org/apache/spark/executor/Executor.scala | 6 ++---- .../main/scala/org/apache/spark/util/AkkaUtils.scala | 10 +++++----- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 127d1195226b7..beab420f6d67f 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -37,7 +37,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf) extends Actor with Logging { - val maxAkkaFrameSize = AkkaUtils.maxFrameSize(conf) * 1024 * 1024 // MB + val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) def receive = { case GetMapOutputStatuses(shuffleId: Int) => @@ -46,8 +46,8 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId) val serializedSize = mapOutputStatuses.size if (serializedSize > maxAkkaFrameSize) { - throw new SparkException( - "spark.akka.frameSize exceeded! Map output statuses were %d bytes".format(serializedSize)) + throw new SparkException(s"Map output statuses were $serializedSize bytes which " + + s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes).") } sender ! mapOutputStatuses diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e69f6f72d3275..2ea2ec29f59f5 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} -import org.apache.spark.util.Utils +import org.apache.spark.util.{AkkaUtils, Utils} /** * Spark executor used with Mesos, YARN, and the standalone scheduler. @@ -120,9 +120,7 @@ private[spark] class Executor( // Akka's message frame size. If task result is bigger than this, we use the block manager // to send the result back. - private val akkaFrameSize = { - env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size") - } + private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) // Start worker thread pool val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index a466102333267..d0ff17db632c1 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -49,7 +49,7 @@ private[spark] object AkkaUtils extends Logging { val akkaTimeout = conf.getInt("spark.akka.timeout", 100) - val akkaFrameSize = maxFrameSize(conf) + val akkaFrameSize = maxFrameSizeBytes(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" if (!akkaLogLifecycleEvents) { @@ -92,7 +92,7 @@ private[spark] object AkkaUtils extends Logging { |akka.remote.netty.tcp.port = $port |akka.remote.netty.tcp.tcp-nodelay = on |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s - |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB + |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B |akka.remote.netty.tcp.execution-pool-size = $akkaThreads |akka.actor.default-dispatcher.throughput = $akkaBatchSize |akka.log-config-on-start = $logAkkaConfig @@ -122,8 +122,8 @@ private[spark] object AkkaUtils extends Logging { Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds") } - /** Returns the default max frame size for Akka messages in MB. */ - def maxFrameSize(conf: SparkConf): Int = { - conf.getInt("spark.akka.frameSize", 10) + /** Returns the configured max frame size for Akka messages in bytes. */ + def maxFrameSizeBytes(conf: SparkConf): Int = { + conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 } } From 8045103265d96d565ec427056aa8ab668f2b1951 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 15 Mar 2014 17:32:13 -0700 Subject: [PATCH 4/6] Breaking out into two tests --- .../org/apache/spark/MapOutputTracker.scala | 11 +++++-- .../apache/spark/MapOutputTrackerSuite.scala | 30 ++++++++++++++----- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index beab420f6d67f..0e3c6304c44f1 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -46,8 +46,15 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId) val serializedSize = mapOutputStatuses.size if (serializedSize > maxAkkaFrameSize) { - throw new SparkException(s"Map output statuses were $serializedSize bytes which " + - s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes).") + val msg = s"Map output statuses were $serializedSize bytes which " + + s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)." + /** + * For SPARK-1244 we'll opt for just logging an error and then throwing an exception. + * Note that on exception the actor will just restart. A bigger refactoring (SPARK-1239) + * will utlimately remove this entire code path. + */ + logError(msg) + throw new SparkException(msg) } sender ! mapOutputStatuses diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index dc70576d82419..8357101c4553b 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -143,7 +143,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } } - test("remote fetch exceeding akka frame size") { + test("remote fetch exceeds akka frame size") { val newConf = new SparkConf newConf.set("spark.akka.frameSize", "1") newConf.set("spark.akka.askTimeout", "1") // Fail fast @@ -154,13 +154,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem) val masterActor = actorRef.underlyingActor - // Frame size should be ~123B, and no exception should be thrown - masterTracker.registerShuffle(10, 1) - masterTracker.registerMapOutput(10, 0, new MapStatus( - BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0))) - masterActor.receive(GetMapOutputStatuses(10)) - - // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception + // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception. + // Note that the size is hand-selected here because map output statuses are compressed before + // being sent. masterTracker.registerShuffle(20, 100) (0 until 100).foreach { i => masterTracker.registerMapOutput(20, i, new MapStatus( @@ -168,4 +164,22 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { } intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } } + + test("remote fetch below akka frame size") { + val newConf = new SparkConf + newConf.set("spark.akka.frameSize", "1") + newConf.set("spark.akka.askTimeout", "1") // Fail fast + + val masterTracker = new MapOutputTrackerMaster(conf) + val actorSystem = ActorSystem("test") + val actorRef = TestActorRef[MapOutputTrackerMasterActor]( + new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem) + val masterActor = actorRef.underlyingActor + + // Frame size should be ~123B, and no exception should be thrown + masterTracker.registerShuffle(10, 1) + masterTracker.registerMapOutput(10, 0, new MapStatus( + BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0))) + masterActor.receive(GetMapOutputStatuses(10)) + } } From 393af4c17c431f99086809f9de783381df91f4bd Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 17 Mar 2014 10:32:56 -0700 Subject: [PATCH 5/6] Small improvement suggested by Andrew Or --- .../scala/org/apache/spark/MapOutputTracker.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 0e3c6304c44f1..80cbf951cb70e 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -48,13 +48,13 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster if (serializedSize > maxAkkaFrameSize) { val msg = s"Map output statuses were $serializedSize bytes which " + s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)." - /** - * For SPARK-1244 we'll opt for just logging an error and then throwing an exception. - * Note that on exception the actor will just restart. A bigger refactoring (SPARK-1239) - * will utlimately remove this entire code path. - */ - logError(msg) - throw new SparkException(msg) + + /* For SPARK-1244 we'll opt for just logging an error and then throwing an exception. + * Note that on exception the actor will just restart. A bigger refactoring (SPARK-1239) + * will ultimately remove this entire code path. */ + val exception = new SparkException(msg) + logError(msg, exception) + throw exception } sender ! mapOutputStatuses From e5fb3ff24062a5fe0163301b328b2a201f0e703c Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 17 Mar 2014 11:58:18 -0700 Subject: [PATCH 6/6] Reversing test order --- .../apache/spark/MapOutputTrackerSuite.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 8357101c4553b..a5bd72eb0a122 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -143,7 +143,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } } - test("remote fetch exceeds akka frame size") { + test("remote fetch below akka frame size") { val newConf = new SparkConf newConf.set("spark.akka.frameSize", "1") newConf.set("spark.akka.askTimeout", "1") // Fail fast @@ -154,18 +154,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem) val masterActor = actorRef.underlyingActor - // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception. - // Note that the size is hand-selected here because map output statuses are compressed before - // being sent. - masterTracker.registerShuffle(20, 100) - (0 until 100).foreach { i => - masterTracker.registerMapOutput(20, i, new MapStatus( - BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0))) - } - intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } + // Frame size should be ~123B, and no exception should be thrown + masterTracker.registerShuffle(10, 1) + masterTracker.registerMapOutput(10, 0, new MapStatus( + BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0))) + masterActor.receive(GetMapOutputStatuses(10)) } - test("remote fetch below akka frame size") { + test("remote fetch exceeds akka frame size") { val newConf = new SparkConf newConf.set("spark.akka.frameSize", "1") newConf.set("spark.akka.askTimeout", "1") // Fail fast @@ -176,10 +172,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem) val masterActor = actorRef.underlyingActor - // Frame size should be ~123B, and no exception should be thrown - masterTracker.registerShuffle(10, 1) - masterTracker.registerMapOutput(10, 0, new MapStatus( - BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0))) - masterActor.receive(GetMapOutputStatuses(10)) + // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception. + // Note that the size is hand-selected here because map output statuses are compressed before + // being sent. + masterTracker.registerShuffle(20, 100) + (0 until 100).foreach { i => + masterTracker.registerMapOutput(20, i, new MapStatus( + BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0))) + } + intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } } }