From 548c98c1f1e80f1f51a1b7b08356c13fd8ea25ec Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 10 Apr 2014 19:17:02 -0700 Subject: [PATCH] Wide refactoring of WebUI, UITab, and UIPage (see commit message) The biggest changes include (1) Decoupling the SparkListener from any member of the hierarchy. This was previously arbitrarily tied to the UITab. (2) Decoupling initializing a UITab from attaching it to a WebUI. This involves having each UITab initializing itself instead. (3) Add an abstract parent for each UITab. This allows us to move the access of header tabs of the UI into the UITab abstract class itself. (4) Abstract bind() logic into WebUI. (5) Renaming UITab -> WebUITab, and UIPage -> WebUIPage. --- .../scala/org/apache/spark/SparkContext.scala | 1 - .../spark/deploy/history/HistoryServer.scala | 32 +++------ .../spark/deploy/history/IndexPage.scala | 4 +- .../apache/spark/deploy/master/Master.scala | 2 - .../deploy/master/ui/ApplicationPage.scala | 4 +- .../spark/deploy/master/ui/IndexPage.scala | 4 +- .../spark/deploy/master/ui/MasterWebUI.scala | 22 ++---- .../apache/spark/deploy/worker/Worker.scala | 1 - .../spark/deploy/worker/ui/IndexPage.scala | 4 +- .../spark/deploy/worker/ui/LogPage.scala | 4 +- .../spark/deploy/worker/ui/WorkerWebUI.scala | 35 ++++------ .../scala/org/apache/spark/ui/SparkUI.scala | 45 +++++------- .../scala/org/apache/spark/ui/UIUtils.scala | 7 +- .../scala/org/apache/spark/ui/WebUI.scala | 69 +++++++++++-------- .../apache/spark/ui/env/EnvironmentTab.scala | 16 ++--- .../org/apache/spark/ui/env/IndexPage.scala | 6 +- .../apache/spark/ui/exec/ExecutorsTab.scala | 18 ++--- .../org/apache/spark/ui/exec/IndexPage.scala | 6 +- .../apache/spark/ui/jobs/ExecutorTable.scala | 2 +- .../org/apache/spark/ui/jobs/IndexPage.scala | 6 +- .../spark/ui/jobs/JobProgressListener.scala | 10 ++- .../apache/spark/ui/jobs/JobProgressTab.scala | 22 +++--- .../org/apache/spark/ui/jobs/PoolPage.scala | 6 +- .../org/apache/spark/ui/jobs/PoolTable.scala | 2 +- .../org/apache/spark/ui/jobs/StagePage.scala | 6 +- .../org/apache/spark/ui/jobs/StageTable.scala | 2 +- .../spark/ui/storage/BlockManagerTab.scala | 16 ++--- .../apache/spark/ui/storage/IndexPage.scala | 6 +- .../org/apache/spark/ui/storage/RDDPage.scala | 6 +- .../spark/streaming/StreamingContext.scala | 1 - .../spark/streaming/ui/StreamingPage.scala | 2 +- .../spark/streaming/ui/StreamingTab.scala | 12 ++-- 32 files changed, 164 insertions(+), 215 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c9af0778bdb29..28923a1d8c340 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -212,7 +212,6 @@ class SparkContext(config: SparkConf) extends Logging { // Initialize the Spark UI, registering all associated listeners private[spark] val ui = new SparkUI(this) - ui.start() ui.bind() // Optionally log Spark events diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 16abfe920da72..df3c394bacfa9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -17,12 +17,9 @@ package org.apache.spark.deploy.history -import javax.servlet.http.HttpServletRequest - import scala.collection.mutable import org.apache.hadoop.fs.{FileStatus, Path} -import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.scheduler._ @@ -45,15 +42,15 @@ import org.apache.spark.util.Utils */ class HistoryServer( val baseLogDir: String, + securityManager: SecurityManager, conf: SparkConf) - extends WebUI(new SecurityManager(conf)) with Logging { + extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging { import HistoryServer._ private val fileSystem = Utils.getHadoopFileSystem(baseLogDir) private val localHost = Utils.localHostName() private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) - private val port = WEB_UI_PORT // A timestamp of when the disk was last accessed to check for log updates private var lastLogCheckTime = -1L @@ -90,30 +87,20 @@ class HistoryServer( // A mapping of application ID to its history information, which includes the rendered UI val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]() + initialize() + /** - * Start the history server. + * Initialize the history server. * * This starts a background thread that periodically synchronizes information displayed on * this UI with the event logs in the provided base directory. */ - def start() { + def initialize() { attachPage(new IndexPage(this)) attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static")) logCheckingThread.start() } - /** Bind to the HTTP server behind this web interface. */ - def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf)) - logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort)) - } catch { - case e: Exception => - logError("Failed to bind HistoryServer", e) - System.exit(1) - } - } - /** * Check for any updates to event logs in the base directory. This is only effective once * the server has been bound. @@ -179,12 +166,11 @@ class HistoryServer( val path = logDir.getPath val appId = path.getName val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec) - val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId) val appListener = new ApplicationEventListener replayBus.addListener(appListener) + val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId) // Do not call ui.bind() to avoid creating a new server for each application - ui.start() replayBus.replay() if (appListener.applicationStarted) { attachUI(ui) @@ -267,9 +253,9 @@ object HistoryServer { def main(argStrings: Array[String]) { val args = new HistoryServerArguments(argStrings) - val server = new HistoryServer(args.logDir, conf) + val securityManager = new SecurityManager(conf) + val server = new HistoryServer(args.logDir, securityManager, conf) server.bind() - server.start() // Wait until the end of the world... or if the HistoryServer process is manually stopped while(true) { Thread.sleep(Int.MaxValue) } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala index eeb22ab000558..69a6baa4aaeab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala @@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} -private[spark] class IndexPage(parent: HistoryServer) extends UIPage("") { +private[spark] class IndexPage(parent: HistoryServer) extends WebUIPage("") { override def render(request: HttpServletRequest): Seq[Node] = { val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 9c90c4b4d11ef..076bb92bf2a10 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -118,7 +118,6 @@ private[spark] class Master( logInfo("Starting Spark master at " + masterUrl) // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) - webUi.start() webUi.bind() masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) @@ -670,7 +669,6 @@ private[spark] class Master( val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec) val ui = new SparkUI( new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id) - ui.start() replayBus.replay() app.desc.appUiUrl = ui.basePath appIdToUI(app.id) = ui diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 24282048b842e..d8c3321ea51ec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -28,11 +28,11 @@ import org.json4s.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorInfo -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils private[spark] class ApplicationPage(parent: MasterWebUI) - extends UIPage("app", includeJson = true) { + extends WebUIPage("app", includeJson = true) { private val master = parent.masterActorRef private val timeout = parent.timeout diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index f011c830a02da..3d2ad04110b77 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -28,10 +28,10 @@ import org.json4s.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class IndexPage(parent: MasterWebUI) extends UIPage("", includeJson = true) { +private[spark] class IndexPage(parent: MasterWebUI) extends WebUIPage("", includeJson = true) { private val master = parent.masterActorRef private val timeout = parent.timeout diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index d0f1a9bc9ffd1..965f7a0fac9e2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,8 +17,6 @@ package org.apache.spark.deploy.master.ui -import javax.servlet.http.HttpServletRequest - import org.apache.spark.Logging import org.apache.spark.deploy.master.Master import org.apache.spark.ui.{SparkUI, WebUI} @@ -30,15 +28,15 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ private[spark] class MasterWebUI(val master: Master, requestedPort: Int) - extends WebUI(master.securityMgr) with Logging { + extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging { - private val host = Utils.localHostName() - private val port = requestedPort val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) + initialize() + /** Initialize all components of the server. */ - def start() { + def initialize() { attachPage(new ApplicationPage(this)) attachPage(new IndexPage(this)) attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) @@ -46,18 +44,6 @@ class MasterWebUI(val master: Master, requestedPort: Int) master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler) } - /** Bind to the HTTP server behind this web interface. */ - def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf)) - logInfo("Started Master web UI at http://%s:%d".format(host, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Master web UI", e) - System.exit(1) - } - } - /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ def attachUI(ui: SparkUI) { assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 08ad87957c3d4..52c164ca3c574 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -130,7 +130,6 @@ private[spark] class Worker( createWorkDir() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) - webUi.start() webUi.bind() registerWithMaster() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index bf7d552101484..42ef8ed703779 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -28,10 +28,10 @@ import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class IndexPage(parent: WorkerWebUI) extends UIPage("", includeJson = true) { +private[spark] class IndexPage(parent: WorkerWebUI) extends WebUIPage("", includeJson = true) { val workerActor = parent.worker.self val worker = parent.worker val timeout = parent.timeout diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala index f57900c99ce3d..8f6b36faf85ee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala @@ -22,10 +22,10 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class LogPage(parent: WorkerWebUI) extends UIPage("logPage") { +private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") { private val worker = parent.worker private val workDir = parent.workDir diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index c1fdc5cea173c..34b5acd2f9b64 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -20,26 +20,29 @@ package org.apache.spark.deploy.worker.ui import java.io.File import javax.servlet.http.HttpServletRequest -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.worker.Worker import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.AkkaUtils /** * Web UI server for the standalone worker. */ private[spark] -class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) - extends WebUI(worker.securityMgr) with Logging { +class WorkerWebUI( + val worker: Worker, + val workDir: File, + port: Option[Int] = None) + extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf) + with Logging { - private val host = Utils.localHostName() - private val port = requestedPort.getOrElse( - worker.conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT)) val timeout = AkkaUtils.askTimeout(worker.conf) + initialize() + /** Initialize all components of the server. */ - def start() { + def initialize() { val logPage = new LogPage(this) attachPage(logPage) attachPage(new IndexPage(this)) @@ -48,21 +51,13 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I (request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr)) worker.metricsSystem.getServletHandlers.foreach(attachHandler) } - - /** Bind to the HTTP server behind this web interface. */ - def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, worker.conf)) - logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Worker web UI", e) - System.exit(1) - } - } } private[spark] object WorkerWebUI { val DEFAULT_PORT = 8081 val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR + + def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = { + requestedPort.getOrElse(conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT)) + } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index f2633dfa8abd7..2eda1aff5ac73 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -25,19 +25,19 @@ import org.apache.spark.ui.env.EnvironmentTab import org.apache.spark.ui.exec.ExecutorsTab import org.apache.spark.ui.jobs.JobProgressTab import org.apache.spark.ui.storage.BlockManagerTab -import org.apache.spark.util.Utils /** * Top level user interface for Spark. */ private[spark] class SparkUI( val sc: SparkContext, - conf: SparkConf, + val conf: SparkConf, val securityManager: SecurityManager, val listenerBus: SparkListenerBus, var appName: String, val basePath: String = "") - extends WebUI(securityManager, basePath) with Logging { + extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath) + with Logging { def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName) def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) = @@ -46,21 +46,14 @@ private[spark] class SparkUI( // If SparkContext is not provided, assume the associated application is not live val live = sc != null - private val bindHost = Utils.localHostName() - private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) - private val port = conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) - // Maintain executor storage status through Spark events val storageStatusListener = new StorageStatusListener - listenerBus.addListener(storageStatusListener) - /** Set the app name for this UI. */ - def setAppName(name: String) { - appName = name - } + initialize() /** Initialize all components of the server. */ - def start() { + def initialize() { + listenerBus.addListener(storageStatusListener) attachTab(new JobProgressTab(this)) attachTab(new BlockManagerTab(this)) attachTab(new EnvironmentTab(this)) @@ -72,22 +65,14 @@ private[spark] class SparkUI( } } - /** Bind to the HTTP server behind this web interface. */ - def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf)) - logInfo("Started Spark Web UI at http://%s:%d".format(publicHost, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Spark web UI", e) - System.exit(1) - } + /** Set the app name for this UI. */ + def setAppName(name: String) { + appName = name } - /** Attach a tab to this UI, along with its corresponding listener if it exists. */ - override def attachTab(tab: UITab) { - super.attachTab(tab) - tab.listener.foreach(listenerBus.addListener) + /** Register the given listener with the listener bus. */ + def registerListener(listener: SparkListener) { + listenerBus.addListener(listener) } /** Stop the server behind this web interface. Only valid after bind(). */ @@ -96,10 +81,14 @@ private[spark] class SparkUI( logInfo("Stopped Spark web UI at %s".format(appUIAddress)) } - private[spark] def appUIAddress = "http://" + publicHost + ":" + boundPort + private[spark] def appUIAddress = "http://" + publicHostName + ":" + boundPort } private[spark] object SparkUI { val DEFAULT_PORT = 4040 val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" + + def getUIPort(conf: SparkConf): Int = { + conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) + } } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index fcda341ae5941..e7b756b2bd276 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -60,10 +60,9 @@ private[spark] object UIUtils { basePath: String, appName: String, title: String, - tabs: Seq[UITab], - activeTab: UITab, - refreshInterval: Option[Int] = None - ) : Seq[Node] = { + tabs: Seq[WebUITab], + activeTab: WebUITab, + refreshInterval: Option[Int] = None): Seq[Node] = { val header = tabs.map { tab =>
  • diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 126a7ff2f6080..655239089015c 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -25,8 +25,7 @@ import scala.xml.Node import org.eclipse.jetty.servlet.ServletContextHandler import org.json4s.JsonAST.{JNothing, JValue} -import org.apache.spark.SecurityManager -import org.apache.spark.scheduler.SparkListener +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils @@ -36,24 +35,31 @@ import org.apache.spark.util.Utils * Each WebUI represents a collection of tabs, each of which in turn represents a collection of * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly. */ -private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: String = "") { - protected val tabs = ArrayBuffer[UITab]() +private[spark] abstract class WebUI( + securityManager: SecurityManager, + port: Int, + conf: SparkConf, + basePath: String = "") + extends Logging { + + protected val tabs = ArrayBuffer[WebUITab]() protected val handlers = ArrayBuffer[ServletContextHandler]() protected var serverInfo: Option[ServerInfo] = None + protected val localHostName = Utils.localHostName() + protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) + private val className = Utils.getFormattedClassName(this) - def getTabs: Seq[UITab] = tabs.toSeq + def getTabs: Seq[WebUITab] = tabs.toSeq def getHandlers: Seq[ServletContextHandler] = handlers.toSeq - def getListeners: Seq[SparkListener] = tabs.flatMap(_.listener) /** Attach a tab to this UI, along with all of its attached pages. */ - def attachTab(tab: UITab) { - tab.start() + def attachTab(tab: WebUITab) { tab.pages.foreach(attachPage) tabs += tab } /** Attach a page to this UI. */ - def attachPage(page: UIPage) { + def attachPage(page: WebUIPage) { val pagePath = "/" + page.prefix attachHandler(createServletHandler(pagePath, (request: HttpServletRequest) => page.render(request), securityManager, basePath)) @@ -86,13 +92,20 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: } /** Initialize all components of the server. */ - def start() - - /** - * Bind to the HTTP server behind this web interface. - * Overridden implementation should set serverInfo. - */ - def bind() + def initialize() + + /** Bind to the HTTP server behind this web interface. */ + def bind() { + assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) + try { + serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf)) + logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) + } catch { + case e: Exception => + logError("Failed to bind %s".format(className), e) + System.exit(1) + } + } /** Return the actual port to which this server is bound. Only valid after bind(). */ def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) @@ -100,39 +113,41 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: /** Stop the server behind this web interface. Only valid after bind(). */ def stop() { assert(serverInfo.isDefined, - "Attempted to stop %s before binding to a server!".format(Utils.getFormattedClassName(this))) + "Attempted to stop %s before binding to a server!".format(className)) serverInfo.get.server.stop() } } /** - * A tab that represents a collection of pages and a unit of listening for Spark events. - * Associating each tab with a listener is arbitrary and need not be the case. + * A tab that represents a collection of pages. */ -private[spark] abstract class UITab(val prefix: String) { - val pages = ArrayBuffer[UIPage]() - var listener: Option[SparkListener] = None - var name = prefix.capitalize +private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) { + val pages = ArrayBuffer[WebUIPage]() + val name = prefix.capitalize /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */ - def attachPage(page: UIPage) { + def attachPage(page: WebUIPage) { page.prefix = (prefix + "/" + page.prefix).stripSuffix("/") pages += page } - /** Initialize listener and attach pages. */ - def start() + /** Initialize this tab and attach all relevant pages. */ + def initialize() + + /** Get a list of header tabs from the parent UI. */ + def headerTabs: Seq[WebUITab] = parent.getTabs } /** * A page that represents the leaf node in the UI hierarchy. * + * The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab. * If includeJson is true, the parent WebUI (direct or indirect) creates handlers for both the * HTML and the JSON content, rather than just the former. */ -private[spark] abstract class UIPage(var prefix: String, val includeJson: Boolean = false) { +private[spark] abstract class WebUIPage(var prefix: String, val includeJson: Boolean = false) { def render(request: HttpServletRequest): Seq[Node] = Seq[Node]() def renderJson(request: HttpServletRequest): JValue = JNothing } diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala index 6a2304f1ad42f..0f1ea7fa8d44d 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala @@ -20,21 +20,17 @@ package org.apache.spark.ui.env import org.apache.spark.scheduler._ import org.apache.spark.ui._ -private[ui] class EnvironmentTab(parent: SparkUI) extends UITab("environment") { +private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "environment") { val appName = parent.appName val basePath = parent.basePath + val listener = new EnvironmentListener - def start() { - listener = Some(new EnvironmentListener) - attachPage(new IndexPage(this)) - } + initialize() - def environmentListener: EnvironmentListener = { - assert(listener.isDefined, "EnvironmentTab has not started yet!") - listener.get.asInstanceOf[EnvironmentListener] + def initialize() { + attachPage(new IndexPage(this)) + parent.registerListener(listener) } - - def headerTabs: Seq[UITab] = parent.getTabs } /** diff --git a/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala index bde672909bbcc..55a19774ed02d 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala @@ -21,12 +21,12 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIUtils, UIPage} +import org.apache.spark.ui.{UIUtils, WebUIPage} -private[ui] class IndexPage(parent: EnvironmentTab) extends UIPage("") { +private[ui] class IndexPage(parent: EnvironmentTab) extends WebUIPage("") { private val appName = parent.appName private val basePath = parent.basePath - private val listener = parent.environmentListener + private val listener = parent.listener override def render(request: HttpServletRequest): Seq[Node] = { val runtimeInformationTable = UIUtils.listingTable( diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index c1f5ca856ffe1..843db7c8d956d 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -22,23 +22,19 @@ import scala.collection.mutable.HashMap import org.apache.spark.ExceptionFailure import org.apache.spark.scheduler._ import org.apache.spark.storage.StorageStatusListener -import org.apache.spark.ui.{SparkUI, UITab} +import org.apache.spark.ui.{SparkUI, WebUITab} -private[ui] class ExecutorsTab(parent: SparkUI) extends UITab("executors") { +private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "executors") { val appName = parent.appName val basePath = parent.basePath + val listener = new ExecutorsListener(parent.storageStatusListener) - def start() { - listener = Some(new ExecutorsListener(parent.storageStatusListener)) - attachPage(new IndexPage(this)) - } + initialize() - def executorsListener: ExecutorsListener = { - assert(listener.isDefined, "ExecutorsTab has not started yet!") - listener.get.asInstanceOf[ExecutorsListener] + def initialize() { + attachPage(new IndexPage(this)) + parent.registerListener(listener) } - - def headerTabs: Seq[UITab] = parent.getTabs } /** diff --git a/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala index bc6a822b080c3..83c89c2fbca3e 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala @@ -21,13 +21,13 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils -private[ui] class IndexPage(parent: ExecutorsTab) extends UIPage("") { +private[ui] class IndexPage(parent: ExecutorsTab) extends WebUIPage("") { private val appName = parent.appName private val basePath = parent.basePath - private val listener = parent.executorsListener + private val listener = parent.listener override def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = listener.storageStatusList diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 31173e48d7a1e..c83e196c9c156 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -25,7 +25,7 @@ import org.apache.spark.util.Utils /** Page showing executor summary */ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { - private lazy val listener = parent.jobProgressListener + private val listener = parent.listener def toNodeSeq: Seq[Node] = { listener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 2b54603af104e..f217965ea2053 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -22,15 +22,15 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{Node, NodeSeq} import org.apache.spark.scheduler.Schedulable -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing list of all ongoing and recently finished stages and pools */ -private[ui] class IndexPage(parent: JobProgressTab) extends UIPage("") { +private[ui] class IndexPage(parent: JobProgressTab) extends WebUIPage("") { private val appName = parent.appName private val basePath = parent.basePath private val live = parent.live private val sc = parent.sc - private lazy val listener = parent.jobProgressListener + private val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler override def render(request: HttpServletRequest): Seq[Node] = { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 5167e20ea3d7d..18559f732d2a3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -222,12 +222,10 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { synchronized { - val schedulingModeName = - environmentUpdate.environmentDetails("Spark Properties").toMap.get("spark.scheduler.mode") - schedulingMode = schedulingModeName match { - case Some(name) => Some(SchedulingMode.withName(name)) - case None => None - } + environmentUpdate + .environmentDetails("Spark Properties").toMap + .get("spark.scheduler.mode") + .map(SchedulingMode.withName) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala index 93d26f7dd3632..7fe06b39346f5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala @@ -19,29 +19,25 @@ package org.apache.spark.ui.jobs import org.apache.spark.SparkConf import org.apache.spark.scheduler.SchedulingMode -import org.apache.spark.ui.{SparkUI, UITab} +import org.apache.spark.ui.{SparkUI, WebUITab} /** Web UI showing progress status of all jobs in the given SparkContext. */ -private[ui] class JobProgressTab(parent: SparkUI) extends UITab("stages") { +private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stages") { val appName = parent.appName val basePath = parent.basePath val live = parent.live val sc = parent.sc + val conf = if (live) sc.conf else new SparkConf + val listener = new JobProgressListener(conf) - def start() { - val conf = if (live) sc.conf else new SparkConf - listener = Some(new JobProgressListener(conf)) + initialize() + + def initialize() { attachPage(new IndexPage(this)) attachPage(new StagePage(this)) attachPage(new PoolPage(this)) + parent.registerListener(listener) } - def jobProgressListener: JobProgressListener = { - assert(listener.isDefined, "JobProgressTab has not started yet!") - listener.get.asInstanceOf[JobProgressListener] - } - - def isFairScheduler = jobProgressListener.schedulingMode.exists(_ == SchedulingMode.FAIR) - - def headerTabs: Seq[UITab] = parent.getTabs + def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 7fffe2affb0f2..228bfb2881c53 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -22,15 +22,15 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.scheduler.{Schedulable, StageInfo} -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing specific pool details */ -private[ui] class PoolPage(parent: JobProgressTab) extends UIPage("pool") { +private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { private val appName = parent.appName private val basePath = parent.basePath private val live = parent.live private val sc = parent.sc - private lazy val listener = parent.jobProgressListener + private val listener = parent.listener override def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index bb7a9c14f7761..f4b68f241966d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -26,7 +26,7 @@ import org.apache.spark.ui.UIUtils /** Table showing list of pools */ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) { private val basePath = parent.basePath - private lazy val listener = parent.jobProgressListener + private val listener = parent.listener def toNodeSeq: Seq[Node] = { listener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 372210919cd91..71eda45d253e1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -22,14 +22,14 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ -private[ui] class StagePage(parent: JobProgressTab) extends UIPage("stage") { +private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { private val appName = parent.appName private val basePath = parent.basePath - private lazy val listener = parent.jobProgressListener + private val listener = parent.listener override def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index d918feafd97d0..5cc1fcd10a08d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -29,7 +29,7 @@ import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished stages */ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressTab) { private val basePath = parent.basePath - private lazy val listener = parent.jobProgressListener + private val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler def toNodeSeq: Seq[Node] = { diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala index ac83f71ed31de..492c223625e6b 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala @@ -24,22 +24,18 @@ import org.apache.spark.scheduler._ import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils} /** Web UI showing storage status of all RDD's in the given SparkContext. */ -private[ui] class BlockManagerTab(parent: SparkUI) extends UITab("storage") { +private[ui] class BlockManagerTab(parent: SparkUI) extends WebUITab(parent, "storage") { val appName = parent.appName val basePath = parent.basePath + val listener = new BlockManagerListener(parent.storageStatusListener) - def start() { - listener = Some(new BlockManagerListener(parent.storageStatusListener)) + initialize() + + def initialize() { attachPage(new IndexPage(this)) attachPage(new RddPage(this)) + parent.registerListener(listener) } - - def blockManagerListener: BlockManagerListener = { - assert(listener.isDefined, "BlockManagerTab has not started yet!") - listener.get.asInstanceOf[BlockManagerListener] - } - - def headerTabs: Seq[UITab] = parent.getTabs } /** diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index cb1b0dc7574f8..054369bc4730c 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -22,14 +22,14 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.RDDInfo -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils /** Page showing list of RDD's currently stored in the cluster */ -private[ui] class IndexPage(parent: BlockManagerTab) extends UIPage("") { +private[ui] class IndexPage(parent: BlockManagerTab) extends WebUIPage("") { private val appName = parent.appName private val basePath = parent.basePath - private lazy val listener = parent.blockManagerListener + private val listener = parent.listener override def render(request: HttpServletRequest): Seq[Node] = { val rdds = listener.rddInfoList diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index a65ba0a020bcd..5eaf41c985ecf 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -22,14 +22,14 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils} -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ -private[ui] class RddPage(parent: BlockManagerTab) extends UIPage("rdd") { +private[ui] class RddPage(parent: BlockManagerTab) extends WebUIPage("rdd") { private val appName = parent.appName private val basePath = parent.basePath - private lazy val listener = parent.blockManagerListener + private val listener = parent.listener override def render(request: HttpServletRequest): Seq[Node] = { val rddId = request.getParameter("id").toInt diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 906d4067a14eb..cf4aca2ed8b62 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -158,7 +158,6 @@ class StreamingContext private[streaming] ( private[streaming] val waiter = new ContextWaiter private[streaming] val ui = new StreamingTab(this) - ui.start() /** Enumeration to identify current state of the StreamingContext */ private[streaming] object StreamingContextState extends Enumeration { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 5cd900c2f88f0..80bd364af4e10 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -28,7 +28,7 @@ import org.apache.spark.util.Distribution /** Page for Spark Web UI that shows statistics of a streaming job */ private[ui] class StreamingPage(parent: StreamingTab) - extends UIPage("") with Logging { + extends WebUIPage("") with Logging { private val ssc = parent.ssc private val sc = ssc.sparkContext diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala index 1aaf7764b5ceb..44f230976427a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -18,15 +18,17 @@ package org.apache.spark.streaming.ui import org.apache.spark.streaming.StreamingContext -import org.apache.spark.ui.UITab +import org.apache.spark.ui.WebUITab import org.apache.spark.Logging /** Spark Web UI tab that shows statistics of a streaming job */ private[spark] class StreamingTab(val ssc: StreamingContext) - extends UITab("streaming") with Logging { + extends WebUITab(ssc.sc.ui, "streaming") with Logging { - val streamingPage = new StreamingPage(this) - ssc.sc.ui.attachTab(this) + initialize() - def start() { } + def initialize() { + attachPage(new StreamingPage(this)) + ssc.sc.ui.attachTab(this) + } }