diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 127d1195226b7..beab420f6d67f 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -37,7 +37,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf) extends Actor with Logging { - val maxAkkaFrameSize = AkkaUtils.maxFrameSize(conf) * 1024 * 1024 // MB + val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) def receive = { case GetMapOutputStatuses(shuffleId: Int) => @@ -46,8 +46,8 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId) val serializedSize = mapOutputStatuses.size if (serializedSize > maxAkkaFrameSize) { - throw new SparkException( - "spark.akka.frameSize exceeded! Map output statuses were %d bytes".format(serializedSize)) + throw new SparkException(s"Map output statuses were $serializedSize bytes which " + + s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes).") } sender ! mapOutputStatuses diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e69f6f72d3275..2ea2ec29f59f5 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} -import org.apache.spark.util.Utils +import org.apache.spark.util.{AkkaUtils, Utils} /** * Spark executor used with Mesos, YARN, and the standalone scheduler. @@ -120,9 +120,7 @@ private[spark] class Executor( // Akka's message frame size. If task result is bigger than this, we use the block manager // to send the result back. - private val akkaFrameSize = { - env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size") - } + private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) // Start worker thread pool val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index a466102333267..d0ff17db632c1 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -49,7 +49,7 @@ private[spark] object AkkaUtils extends Logging { val akkaTimeout = conf.getInt("spark.akka.timeout", 100) - val akkaFrameSize = maxFrameSize(conf) + val akkaFrameSize = maxFrameSizeBytes(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" if (!akkaLogLifecycleEvents) { @@ -92,7 +92,7 @@ private[spark] object AkkaUtils extends Logging { |akka.remote.netty.tcp.port = $port |akka.remote.netty.tcp.tcp-nodelay = on |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s - |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB + |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B |akka.remote.netty.tcp.execution-pool-size = $akkaThreads |akka.actor.default-dispatcher.throughput = $akkaBatchSize |akka.log-config-on-start = $logAkkaConfig @@ -122,8 +122,8 @@ private[spark] object AkkaUtils extends Logging { Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds") } - /** Returns the default max frame size for Akka messages in MB. */ - def maxFrameSize(conf: SparkConf): Int = { - conf.getInt("spark.akka.frameSize", 10) + /** Returns the configured max frame size for Akka messages in bytes. */ + def maxFrameSizeBytes(conf: SparkConf): Int = { + conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 } }