Skip to content

Commit

Permalink
Show the REST port on the Master UI
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Jan 27, 2015
1 parent d8d3717 commit 837475b
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 18 deletions.
6 changes: 6 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ span.expand-details {
float: right;
}

span.stable-uri {
font-size: 10pt;
font-style: italic;
color: gray;
}

pre {
font-size: 0.8em;
}
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,22 @@ private[deploy] object DeployMessages {

// Master to MasterWebUI

case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo],
status: MasterState) {
case class MasterStateResponse(
host: String,
port: Int,
stablePort: Option[Int],
workers: Array[WorkerInfo],
activeApps: Array[ApplicationInfo],
completedApps: Array[ApplicationInfo],
activeDrivers: Array[DriverInfo],
completedDrivers: Array[DriverInfo],
status: MasterState) {

Utils.checkHost(host, "Required hostname")
assert (port > 0)

def uri = "spark://" + host + ":" + port
def stableUri: Option[String] = stablePort.map { p => "spark://" + host + ":" + p }
}

// WorkerWebUI to Worker
Expand Down
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,17 @@ object SparkSubmit {
* Second, we use this launch environment to invoke the main method of the child
* main class.
*
* Note that standalone cluster mode is an exception in that we do not invoke the
* main method of a child class. Instead, we pass the submit parameters directly to
* a REST client, which will submit the application using the stable REST protocol.
* As of Spark 1.3, a stable REST-based application submission gateway is introduced.
* If this is enabled, then we will run standalone cluster mode by passing the submit
* parameters directly to a REST client, which will submit the application using the
* REST protocol instead.
*/
private def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
// In standalone cluster mode, use the stable application submission REST protocol.
// Otherwise, just call the main method of the child class.
if (args.isStandaloneCluster) {
val restKey = "spark.submit.rest.enabled"
val restEnabled = args.sparkProperties.get(restKey).getOrElse("false").toBoolean
if (args.isStandaloneCluster && restEnabled) {
printStream.println("Running standalone cluster mode using the stable REST protocol.")
// NOTE: since we mutate the values of some configs in `prepareSubmitEnvironment`, we
// must update the corresponding fields in the original SparkSubmitArguments to reflect
// these changes.
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,14 @@ private[spark] class Master(

// Alternative application submission gateway that is stable across Spark versions
private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true)
private val restServerPort = conf.getInt("spark.master.rest.port", 17077)
private val restServer = new StandaloneRestServer(this, host, restServerPort)
if (restServerEnabled) {
restServer.start()
}
private val restServer =
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 17077)
Some(new StandaloneRestServer(this, host, port))
} else {
None
}
private val restServerBoundPort = restServer.map(_.start())

override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
Expand Down Expand Up @@ -183,7 +186,7 @@ private[spark] class Master(
recoveryCompletionTask.cancel()
}
webUi.stop()
restServer.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
Expand Down Expand Up @@ -431,7 +434,9 @@ private[spark] class Master(
}

case RequestMasterState => {
sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
sender ! MasterStateResponse(
host, port, restServerBoundPort,
workers.toArray, apps.toArray, completedApps.toArray,
drivers.toArray, completedDrivers.toArray, state)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<div class="span12">
<ul class="unstyled">
<li><strong>URL:</strong> {state.uri}</li>
{
state.stableUri
.map { uri =>
<li>
<strong>Stable URL:</strong> {uri}
<span class="stable-uri"> (for standalone cluster mode in Spark 1.3+)</span>
</li> }
.getOrElse { Seq.empty }
}
<li><strong>Workers:</strong> {state.workers.size}</li>
<li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
{state.workers.map(_.coresUsed).sum} Used</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ private[spark] abstract class SubmitRestServer(host: String, requestedPort: Int,
protected val handler: SubmitRestServerHandler
private var _server: Option[Server] = None

def start(): Unit = {
/** Start the server and return the bound port. */
def start(): Int = {
val (server, boundPort) = Utils.startServiceOnPort[Server](requestedPort, doStart, conf)
_server = Some(server)
logInfo(s"Started REST server for submitting applications on port $boundPort")
boundPort
}

def stop(): Unit = {
Expand Down

0 comments on commit 837475b

Please sign in to comment.