Skip to content

Commit

Permalink
Handle port collisions when creating Akka systems
Browse files Browse the repository at this point in the history
This requires us to handle exceptions thrown more carefully, because
akka throws its own exceptions that are not java.net.BindException.
We workaround this by traversing the Exception causality tree to find
a java.net.BindException with an "Address already in use" message.
  • Loading branch information
andrewor14 committed Aug 5, 2014
1 parent a2dd05c commit d502e5f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
24 changes: 19 additions & 5 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}

import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem}
import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask

import com.typesafe.config.ConfigFactory
Expand All @@ -44,14 +44,28 @@ private[spark] object AkkaUtils extends Logging {
* If indestructible is set to true, the Actor System will continue running in the event
* of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
*/
def createActorSystem(name: String, host: String, port: Int,
conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = {
def createActorSystem(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, name)
}

private def doCreateActorSystem(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager): (ActorSystem, Int) = {

val akkaThreads = conf.getInt("spark.akka.threads", 4)
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)

val akkaTimeout = conf.getInt("spark.akka.timeout", 100)

val akkaFrameSize = maxFrameSizeBytes(conf)
val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
Expand Down
20 changes: 16 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1367,10 +1367,7 @@ private[spark] object Utils extends Logging {
logInfo(s"Successfully started service$serviceString on port $port.")
return (service, port)
} catch {
case e: BindException =>
if (!e.getMessage.contains("Address already in use")) {
throw e
}
case e: Exception if isBindCollision(e) =>
if (offset >= maxRetries) {
val exceptionMessage =
s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!"
Expand All @@ -1387,4 +1384,19 @@ private[spark] object Utils extends Logging {
throw new SparkException(s"Failed to start service on port $startPort")
}

/**
* Return whether the exception is caused by an address-port collision when binding.
*/
private def isBindCollision(exception: Throwable): Boolean = {
exception match {
case e: BindException =>
if (e.getMessage != null && e.getMessage.contains("Address already in use")) {
return true
}
isBindCollision(e.getCause)
case e: Exception => isBindCollision(e.getCause)
case _ => false
}
}

}

0 comments on commit d502e5f

Please sign in to comment.