Skip to content

Commit

Permalink
some minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
WangTaoTheTonic committed Jan 13, 2015
1 parent bc6e1ec commit 61a370d
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 19 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.google.common.io.Files
import org.apache.spark.util.Utils

private[spark] class HttpFileServer(
conf: SparkConf,
securityManager: SecurityManager,
requestedPort: Int = 0)
extends Logging {
Expand All @@ -41,7 +42,7 @@ private[spark] class HttpFileServer(
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
httpServer.start()
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
* around a Jetty server.
*/
private[spark] class HttpServer(
conf: SparkConf,
resourceBase: File,
securityManager: SecurityManager,
requestedPort: Int = 0,
Expand All @@ -57,7 +58,7 @@ private[spark] class HttpServer(
} else {
logInfo("Starting HTTP Server")
val (actualServer, actualPort) =
Utils.startServiceOnPort[Server](requestedPort, doStart, new SparkConf(), serverName)
Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName)
server = actualServer
port = actualPort
}
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ private[spark] object SparkConf {
}

/**
* Return whether the given config is a Spark port config.
* Return true if the given config matches either `spark.*.port` or `spark.port.*`.
*/
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.contains(".port")
def isSparkPortConf(name: String): Boolean = {
(name.startsWith("spark.") && name.endsWith(".port")) | name.startsWith("spark.port.")
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ object SparkEnv extends Logging {
val httpFileServer =
if (isDriver) {
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(securityManager, fileServerPort)
val server = new HttpFileServer(conf, securityManager, fileServerPort)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
server
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1693,11 +1693,12 @@ private[spark] object Utils extends Logging {
* Default maximum number of retries when binding to a port before giving up.
*/
def portMaxRetries(conf: SparkConf): Int = {
if (sys.props.contains("spark.testing")) {
val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt)
if (conf.contains("spark.testing")) {
// Set a higher number of retries for tests...
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100)
maxRetries.getOrElse(100)
} else {
conf.getOption("spark.port.maxRetries").map(_.toInt).getOrElse(16)
maxRetries.getOrElse(16)
}
}

Expand All @@ -1708,18 +1709,16 @@ private[spark] object Utils extends Logging {
* @param startPort The initial port to start the service on.
* @param startService Function to start service on a given port.
* This is expected to throw java.net.BindException on port collision.
* @param conf Used to get maximum number of retries.
* @param conf A SparkConf used to get the maximum number of retries when binding to a port.
* @param serviceName Name of the service.
*/
def startServiceOnPort[T](
startPort: Int,
startService: Int => (T, Int),
conf: SparkConf,
serviceName: String = ""
): (T, Int) = {
serviceName: String = ""): (T, Int) = {
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
val maxRetries = portMaxRetries(conf)
logInfo(s"Starting service$serviceString on port $startPort with maximum $maxRetries retries. ")
for (offset <- 0 to maxRetries) {
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,9 @@ class ExecutorRunnable(
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
// uses Akka to connect to the scheduler, the akka settings are needed as well as the
// authentication settings.
sparkConf.getAll.filter { case (k, v) =>
k.startsWith("spark.auth") || k.startsWith("spark.akka") || k.equals("spark.port.maxRetries")
}.
foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }

sparkConf.getAkkaConf.
foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
sparkConf.getAll
.filter { case (k, v) => SparkConf.isExecutorStartupConf(k) }
.foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }

// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
Expand Down

0 comments on commit 61a370d

Please sign in to comment.