diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index feafd654e9e71..d6afb73b74242 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.Await import scala.concurrent.duration.{Duration, FiniteDuration} -import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem} +import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} import akka.pattern.ask import com.typesafe.config.ConfigFactory @@ -44,14 +44,28 @@ private[spark] object AkkaUtils extends Logging { * If indestructible is set to true, the Actor System will continue running in the event * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. */ - def createActorSystem(name: String, host: String, port: Int, - conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { + def createActorSystem( + name: String, + host: String, + port: Int, + conf: SparkConf, + securityManager: SecurityManager): (ActorSystem, Int) = { + val startService: Int => (ActorSystem, Int) = { actualPort => + doCreateActorSystem(name, host, actualPort, conf, securityManager) + } + Utils.startServiceOnPort(port, startService, name) + } + + private def doCreateActorSystem( + name: String, + host: String, + port: Int, + conf: SparkConf, + securityManager: SecurityManager): (ActorSystem, Int) = { val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) - val akkaTimeout = conf.getInt("spark.akka.timeout", 100) - val akkaFrameSize = maxFrameSizeBytes(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f4efca7ce825f..accdf2e4439a3 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1367,10 +1367,7 @@ private[spark] object Utils extends Logging { logInfo(s"Successfully started service$serviceString on port $port.") return (service, port) } catch { - case e: BindException => - if (!e.getMessage.contains("Address already in use")) { - throw e - } + case e: Exception if isBindCollision(e) => if (offset >= maxRetries) { val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" @@ -1387,4 +1384,19 @@ private[spark] object Utils extends Logging { throw new SparkException(s"Failed to start service on port $startPort") } + /** + * Return whether the exception is caused by an address-port collision when binding. + */ + private def isBindCollision(exception: Throwable): Boolean = { + exception match { + case e: BindException => + if (e.getMessage != null && e.getMessage.contains("Address already in use")) { + return true + } + isBindCollision(e.getCause) + case e: Exception => isBindCollision(e.getCause) + case _ => false + } + } + }