Skip to content

Commit

Permalink
Consolidate Executor use of akka frame size
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Mar 16, 2014
1 parent c9b6109 commit 2b4e085
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 12 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage

private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf)
extends Actor with Logging {
val maxAkkaFrameSize = AkkaUtils.maxFrameSize(conf) * 1024 * 1024 // MB
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

def receive = {
case GetMapOutputStatuses(shuffleId: Int) =>
Expand All @@ -46,8 +46,8 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
val serializedSize = mapOutputStatuses.size
if (serializedSize > maxAkkaFrameSize) {
throw new SparkException(
"spark.akka.frameSize exceeded! Map output statuses were %d bytes".format(serializedSize))
throw new SparkException(s"Map output statuses were $serializedSize bytes which " +
s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes).")
}
sender ! mapOutputStatuses

Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.Utils
import org.apache.spark.util.{AkkaUtils, Utils}

/**
* Spark executor used with Mesos, YARN, and the standalone scheduler.
Expand Down Expand Up @@ -120,9 +120,7 @@ private[spark] class Executor(

// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
private val akkaFrameSize = {
env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
}
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

// Start worker thread pool
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[spark] object AkkaUtils extends Logging {

val akkaTimeout = conf.getInt("spark.akka.timeout", 100)

val akkaFrameSize = maxFrameSize(conf)
val akkaFrameSize = maxFrameSizeBytes(conf)
val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
if (!akkaLogLifecycleEvents) {
Expand Down Expand Up @@ -92,7 +92,7 @@ private[spark] object AkkaUtils extends Logging {
|akka.remote.netty.tcp.port = $port
|akka.remote.netty.tcp.tcp-nodelay = on
|akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
|akka.log-config-on-start = $logAkkaConfig
Expand Down Expand Up @@ -122,8 +122,8 @@ private[spark] object AkkaUtils extends Logging {
Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds")
}

/** Returns the default max frame size for Akka messages in MB. */
def maxFrameSize(conf: SparkConf): Int = {
conf.getInt("spark.akka.frameSize", 10)
/** Returns the configured max frame size for Akka messages in bytes. */
def maxFrameSizeBytes(conf: SparkConf): Int = {
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
}
}

0 comments on commit 2b4e085

Please sign in to comment.