Skip to content

Commit

Permalink
Refactoring the UI interface to add flexibility
Browse files Browse the repository at this point in the history
This commit introduces three (abstract) classes: WebUI, UITab, and UIPage.
The top of the hierarchy is the WebUI, which contains many tabs and pages.
Each tab in turn contains many pages.

When a UITab is attached to a WebUI, the WebUI creates a handler for each
of the tab's pages. Similarly, when a UIPage is attached to a WebUI, its
handler is created. The server in WebUI is then ready to be bound to a host
and a port.

This commit also breaks down a couple of unnecessarily large files by
moving certain classes to their own files.
  • Loading branch information
andrewor14 committed Apr 2, 2014
1 parent ada310a commit 7d57444
Show file tree
Hide file tree
Showing 26 changed files with 583 additions and 501 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ class SparkContext(

// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.bind()
ui.start()
ui.bind()

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private[spark] class Master(
val masterSource = new MasterSource(this)

val webUi = new MasterWebUI(this, webUiPort)
webUi.start()

val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ import org.json4s.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.masterActorRef
val timeout = parent.timeout
private[spark] class ApplicationPage(parent: MasterWebUI)
extends UIPage("app", includeJson = true) {

private val master = parent.masterActorRef
private val timeout = parent.timeout

/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
Expand All @@ -47,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
}

/** Executor details for a particular application */
def render(request: HttpServletRequest): Seq[Node] = {
override def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
Expand Down Expand Up @@ -96,7 +98,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
}

def executorRow(executor: ExecutorInfo): Seq[Node] = {
private def executorRow(executor: ExecutorInfo): Seq[Node] = {
<tr>
<td>{executor.id}</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ import scala.xml.Node
import akka.pattern.ask
import org.json4s.JValue

import org.apache.spark.deploy.{JsonProtocol}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
val timeout = parent.timeout
private[spark] class IndexPage(parent: MasterWebUI) extends UIPage("", includeJson = true) {
private val master = parent.masterActorRef
private val timeout = parent.timeout

def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
JsonProtocol.writeMasterState(state)
}

/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
override def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)

Expand Down Expand Up @@ -139,7 +139,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
}

def workerRow(worker: WorkerInfo): Seq[Node] = {
private def workerRow(worker: WorkerInfo): Seq[Node] = {
<tr>
<td>
<a href={worker.webUiAddress}>{worker.id}</a>
Expand All @@ -154,8 +154,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
</tr>
}


def appRow(app: ApplicationInfo): Seq[Node] = {
private def appRow(app: ApplicationInfo): Seq[Node] = {
<tr>
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
Expand All @@ -169,14 +168,14 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</td>
<td>{WebUI.formatDate(app.submitDate)}</td>
<td>{UIUtils.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
<td>{app.state.toString}</td>
<td>{WebUI.formatDuration(app.duration)}</td>
<td>{UIUtils.formatDuration(app.duration)}</td>
</tr>
}

def driverRow(driver: DriverInfo): Seq[Node] = {
private def driverRow(driver: DriverInfo): Seq[Node] = {
<tr>
<td>{driver.id} </td>
<td>{driver.submitDate}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,64 +17,49 @@

package org.apache.spark.deploy.master.ui

import javax.servlet.http.HttpServletRequest

import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.{ServerInfo, SparkUI}
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{AkkaUtils, Utils}

/**
* Web UI server for the standalone master.
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
class MasterWebUI(val master: Master, requestedPort: Int)
extends WebUI(master.securityMgr) with Logging {

private val host = Utils.localHostName()
private val port = requestedPort
private val applicationPage = new ApplicationPage(this)
private val indexPage = new IndexPage(this)
private var serverInfo: Option[ServerInfo] = None
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)

private val handlers: Seq[ServletContextHandler] = {
master.masterMetricsSystem.getServletHandlers ++
master.applicationMetricsSystem.getServletHandlers ++
Seq[ServletContextHandler](
createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"),
createServletHandler("/app/json",
(request: HttpServletRequest) => applicationPage.renderJson(request), master.securityMgr),
createServletHandler("/app",
(request: HttpServletRequest) => applicationPage.render(request), master.securityMgr),
createServletHandler("/json",
(request: HttpServletRequest) => indexPage.renderJson(request), master.securityMgr),
createServletHandler("/",
(request: HttpServletRequest) => indexPage.render(request), master.securityMgr)
)
def start() {
attachPage(new ApplicationPage(this))
attachPage(new IndexPage(this))
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
master.masterMetricsSystem.getServletHandlers.foreach(attachHandler)
master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler)
}

/** Bind to the HTTP server behind this web interface. */
def bind() {
try {
serverInfo = Some(startJettyServer(host, port, handlers, master.conf))
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Master JettyUtils", e)
logError("Failed to create Master web UI", e)
System.exit(1)
}
}

def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)

/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
def attachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.handlers) {
for (handler <- ui.getHandlers) {
rootHandler.addHandler(handler)
if (!handler.isStarted) {
handler.start()
Expand All @@ -86,18 +71,13 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
def detachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.handlers) {
for (handler <- ui.getHandlers) {
if (handler.isStarted) {
handler.stop()
}
rootHandler.removeHandler(handler)
}
}

def stop() {
assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!")
serverInfo.get.server.stop()
}
}

private[spark] object MasterWebUI {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ private[spark] class Worker(
logInfo("Spark home: " + sparkHome)
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
webUi.bind()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
registerWithMaster()

metricsSystem.registerSource(workerSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: WorkerWebUI) {
private[spark] class IndexPage(parent: WorkerWebUI) extends UIPage("", includeJson = true) {
val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout

def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout)
JsonProtocol.writeWorkerState(workerState)
}

def render(request: HttpServletRequest): Seq[Node] = {
override def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout)

Expand Down
Loading

0 comments on commit 7d57444

Please sign in to comment.