Skip to content

Commit

Permalink
Merge branch 'SPARK-2157' of github.com:ash211/spark into configure-p…
Browse files Browse the repository at this point in the history
…orts
  • Loading branch information
andrewor14 committed Aug 4, 2014
2 parents 8e7d5ba + 038a579 commit ec676f4
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 50 deletions.
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
var httpServer : HttpServer = null
var serverUri : String = null

def initialize() {
def initialize(port: Option[Int]) {
baseDir = Utils.createTempDir()
fileDir = new File(baseDir, "files")
jarDir = new File(baseDir, "jars")
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = new HttpServer(baseDir, securityManager)
httpServer = if (port.isEmpty) {
new HttpServer(baseDir, securityManager)
} else {
new HttpServer(baseDir, securityManager, port.get)
}
httpServer.start()
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
Expand Down
74 changes: 42 additions & 32 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.File

import org.apache.spark.network.PortManager
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
Expand All @@ -41,45 +42,54 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
* around a Jetty server.
*/
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
extends Logging {
private[spark] class HttpServer(resourceBase: File,
securityManager: SecurityManager,
localPort: Int = 0) extends Logging {
private var server: Server = null
private var port: Int = -1
private var port: Int = localPort

private def startOnPort(startPort: Int): (Server, Int) = {
val server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
val actualPort = server.getConnectors()(0).getLocalPort()

(server, actualPort)
}

def start() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
connector.setSoLingerTime(-1)
connector.setPort(0)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
port = server.getConnectors()(0).getLocalPort()
val (actualServer, actualPort) = PortManager.startWithIncrements(localPort, 3, startOnPort)
server = actualServer
port = actualPort
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ object SparkEnv extends Logging {
val httpFileServer =
if (isDriver) {
val server = new HttpFileServer(securityManager)
server.initialize()
server.initialize(conf.getOption("spark.fileserver.port").map(_.toInt))
conf.set("spark.fileserver.uri", server.serverUri)
server
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging {

private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
server = new HttpServer(broadcastDir, securityManager)
val broadcastListenPort: Int = conf.getInt("spark.broadcast.port", 0)
server = new HttpServer(broadcastDir, securityManager, broadcastListenPort)
server.start()
serverUri = server.uri
logInfo("Broadcast server started at " + serverUri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
serverChannel.socket.setReuseAddress(true)
serverChannel.socket.setReceiveBufferSize(256 * 1024)

serverChannel.socket.bind(new InetSocketAddress(port))
private def startService(port: Int) = {
serverChannel.socket.bind(new InetSocketAddress(port))
(serverChannel, port)
}
PortManager.startWithIncrements(port, 3, startService)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)

val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Expand Down
60 changes: 60 additions & 0 deletions core/src/main/scala/org/apache/spark/network/PortManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network

import java.net.InetSocketAddress

import org.apache.spark.{Logging, SparkException}
import org.eclipse.jetty.server.Server

private[spark] object PortManager extends Logging
{

/**
* Start service on given port, or attempt to fall back to the n+1 port for a certain number of
* retries
*
* @param startPort
* @param maxRetries Maximum number of retries to attempt. A value of e.g. 3 will cause 4
* total attempts, on ports n, n+1, n+2, and n+3
* @param startService Function to start service on a given port. Expected to throw a java.net
* .BindException if the port is already in use
* @tparam T
* @throws SparkException When unable to start service in the given number of attempts
* @return
*/
def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => (T, Int)):
(T, Int) = {
for( offset <- 0 to maxRetries) {
val tryPort = startPort + offset
try {
return startService(tryPort)
} catch {
case e: java.net.BindException => {
if (!e.getMessage.contains("Address already in use") ||
offset == (maxRetries-1)) {
throw e
}
logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1))
}
case e: Exception => throw e
}
}
throw new SparkException(s"Couldn't start service on port $startPort")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ private[spark] class BlockManager(
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager = new ConnectionManager(0, conf, securityManager)
val connectionManager = new ConnectionManager(conf.getInt("spark.blockManager.port", 0), conf,
securityManager)

implicit val futureExecContext = connectionManager.futureExecContext

Expand Down
41 changes: 30 additions & 11 deletions docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ configure those ports.
</tr>
<tr>
<td>Browser</td>
<td>Driver</td>
<td>Application</td>
<td>4040</td>
<td>Web UI</td>
<td><code>spark.ui.port</code></td>
Expand Down Expand Up @@ -369,18 +369,37 @@ configure those ports.

<!-- Other misc stuff -->
<tr>
<td>Driver and other Workers</td>
<td>Worker</td>
<td>Application</td>
<td>(random)</td>
<td>
<ul>
<li>File server for file and jars</li>
<li>Http Broadcast</li>
<li>Class file server (Spark Shell only)</li>
</ul>
</td>
<td>None</td>
<td>Jetty-based. Each of these services starts on a random port that cannot be configured</td>
<td>File server for files and jars</td>
<td><code>spark.fileserver.port</code></td>
<td>Jetty-based</td>
</tr>
<tr>
<td>Worker</td>
<td>Application</td>
<td>(random)</td>
<td>HTTP Broadcast</td>
<td><code>spark.broadcast.port</code></td>
<td>Jetty-based. Not used by TorrentBroadcast, which sends data through the block manager
instead</td>
</tr>
<tr>
<td>Worker</td>
<td>Spark Shell</td>
<td>(random)</td>
<td>Class file server (Spark Shell only)</td>
<td><code>spark.replClassServer.port</code></td>
<td>Jetty-based</td>
</tr>
<tr>
<td>Worker</td>
<td>Other Workers</td>
<td>(random)</td>
<td>Block Manager port</td>
<td><code>spark.blockManager.port</code></td>
<td>Raw socket via ServerSocketChannel</td>
</tr>

</table>
Expand Down
3 changes: 2 additions & 1 deletion repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ import org.apache.spark.util.Utils

val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles
/** Jetty server that will serve our classes to worker nodes */
val classServer = new HttpServer(outputDir, new SecurityManager(conf))
val classServerListenPort = conf.getInt("spark.replClassServer.port", 0)
val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerListenPort)
private var currentSettings: Settings = initialSettings
var printResults = true // whether to print result lines
var totalSilence = false // whether to print anything
Expand Down

0 comments on commit ec676f4

Please sign in to comment.