From 9a48fa1de7b357f6ffdaad8e93af7b2b7e39bc06 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 3 Apr 2014 15:53:48 -0700 Subject: [PATCH] Allow adding tabs to SparkUI dynamically + add example An example of how this is done is in org.apache.spark.ui.FooTab. Run it through bin/spark-class to see what it looks like (which should more or less match your expectations...). --- .../spark/deploy/master/ui/MasterWebUI.scala | 18 +-- .../spark/deploy/worker/ui/WorkerWebUI.scala | 2 +- .../scala/org/apache/spark/ui/FooTab.scala | 105 ++++++++++++++++++ .../scala/org/apache/spark/ui/SparkUI.scala | 13 ++- .../scala/org/apache/spark/ui/WebUI.scala | 26 ++++- 5 files changed, 139 insertions(+), 25 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/ui/FooTab.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index c8d51e44a4dff..3a30919a70584 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -35,7 +35,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) - /** Initialize all components of the server. Must be called before bind(). */ + /** Initialize all components of the server. */ def start() { attachPage(new ApplicationPage(this)) attachPage(new IndexPage(this)) @@ -59,25 +59,13 @@ class MasterWebUI(val master: Master, requestedPort: Int) /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ def attachUI(ui: SparkUI) { assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") - val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.getHandlers) { - rootHandler.addHandler(handler) - if (!handler.isStarted) { - handler.start() - } - } + ui.getHandlers.foreach(attachHandler) } /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */ def detachUI(ui: SparkUI) { assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") - val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.getHandlers) { - if (handler.isStarted) { - handler.stop() - } - rootHandler.removeHandler(handler) - } + ui.getHandlers.foreach(detachHandler) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index ae1b7ab014e6e..490a383be42e1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -38,7 +38,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) val timeout = AkkaUtils.askTimeout(worker.conf) - /** Initialize all components of the server. Must be called before bind(). */ + /** Initialize all components of the server. */ def start() { val logPage = new LogPage(this) attachPage(logPage) diff --git a/core/src/main/scala/org/apache/spark/ui/FooTab.scala b/core/src/main/scala/org/apache/spark/ui/FooTab.scala new file mode 100644 index 0000000000000..1e30fa75a263d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/FooTab.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +import javax.servlet.http.HttpServletRequest + +import scala.collection.mutable +import scala.xml.Node + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext._ +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd} + +/* + * This is an example of how to extend the SparkUI by adding new tabs to it. It is intended + * only as a demonstration and should be removed before merging into master! + * + * bin/spark-class org.apache.spark.ui.FooTab + */ + +/** A tab that displays basic information about jobs seen so far. */ +private[spark] class FooTab(parent: SparkUI) extends UITab("foo") { + val appName = parent.appName + val basePath = parent.basePath + + def start() { + listener = Some(new FooListener) + attachPage(new IndexPage(this)) + } + + def fooListener: FooListener = { + assert(listener.isDefined, "ExecutorsTab has not started yet!") + listener.get.asInstanceOf[FooListener] + } + + def headerTabs: Seq[UITab] = parent.getTabs +} + +/** A foo page. Enough said. */ +private[spark] class IndexPage(parent: FooTab) extends UIPage("") { + private val appName = parent.appName + private val basePath = parent.basePath + private val listener = parent.fooListener + + override def render(request: HttpServletRequest): Seq[Node] = { + val results = listener.jobResultMap.toSeq.sortBy { case (k, _) => k } + val content = +
+
+ Foo Jobs: +
    + {results.map { case (k, v) =>
  • Job {k}: {v}
  • }} +
+
+
+ UIUtils.headerSparkPage(content, basePath, appName, "Foo", parent.headerTabs, parent) + } +} + +/** A listener that maintains a mapping between job IDs and job results. */ +private[spark] class FooListener extends SparkListener { + val jobResultMap = mutable.Map[Int, String]() + + override def onJobEnd(end: SparkListenerJobEnd) { + jobResultMap(end.jobId) = end.jobResult.toString + } +} + + +/** + * Start a SparkContext and a SparkUI with a FooTab attached. + */ +private[spark] object FooTab { + def main(args: Array[String]) { + val sc = new SparkContext("local", "Foo Tab", new SparkConf) + val fooTab = new FooTab(sc.ui) + sc.ui.attachTab(fooTab) + + // Run a few jobs + sc.parallelize(1 to 1000).count() + sc.parallelize(1 to 2000).persist().count() + sc.parallelize(1 to 3000).map(i => (i/2, i)).groupByKey().count() + sc.parallelize(1 to 4000).map(i => (i/2, i)).groupByKey().persist().count() + sc.parallelize(1 to 5000).map(i => (i/2, i)).groupByKey().persist().count() + sc.parallelize(1 to 6000).map(i => (i/2, i)).groupByKey().persist().count() + sc.parallelize(1 to 7000).map(i => (i/2, i)).groupByKey().persist().count() + + readLine("\n> Started SparkUI with a Foo tab...") + } +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index c333dd3784bb7..ac22189f9f04f 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -52,8 +52,9 @@ private[spark] class SparkUI( // Maintain executor storage status through Spark events val storageStatusListener = new StorageStatusListener + listenerBus.addListener(storageStatusListener) - /** Initialize all components of the server. Must be called before bind(). */ + /** Initialize all components of the server. */ def start() { attachTab(new JobProgressTab(this)) attachTab(new BlockManagerTab(this)) @@ -64,14 +65,10 @@ private[spark] class SparkUI( if (live) { sc.env.metricsSystem.getServletHandlers.foreach(attachHandler) } - // Storage status listener must receive events first, as other listeners depend on its state - listenerBus.addListener(storageStatusListener) - getListeners.foreach(listenerBus.addListener) } /** Bind to the HTTP server behind this web interface. */ def bind() { - assert(!handlers.isEmpty, "SparkUI has not started yet!") try { serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf)) logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort)) @@ -82,6 +79,12 @@ private[spark] class SparkUI( } } + /** Attach a tab to this UI, along with its corresponding listener if it exists. */ + override def attachTab(tab: UITab) { + super.attachTab(tab) + tab.listener.foreach(listenerBus.addListener) + } + /** Stop the server behind this web interface. Only valid after bind(). */ override def stop() { super.stop() diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 0b847a9a471f0..4392814fd1b39 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -35,7 +35,6 @@ import org.apache.spark.util.Utils * * Each WebUI represents a collection of tabs, each of which in turn represents a collection of * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly. - * All tabs and pages must be attached before bind()'ing the server. */ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: String = "") { protected val tabs = ArrayBuffer[UITab]() @@ -46,14 +45,14 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: def getHandlers: Seq[ServletContextHandler] = handlers.toSeq def getListeners: Seq[SparkListener] = tabs.flatMap(_.listener) - /** Attach a tab to this UI, along with all of its attached pages. Only valid before bind(). */ + /** Attach a tab to this UI, along with all of its attached pages. */ def attachTab(tab: UITab) { tab.start() tab.pages.foreach(attachPage) tabs += tab } - /** Attach a page to this UI. Only valid before bind(). */ + /** Attach a page to this UI. */ def attachPage(page: UIPage) { val pagePath = "/" + page.prefix attachHandler(createServletHandler(pagePath, @@ -64,9 +63,26 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: } } - /** Attach a handler to this UI. Only valid before bind(). */ + /** Attach a handler to this UI. */ def attachHandler(handler: ServletContextHandler) { handlers += handler + serverInfo.foreach { info => + info.rootHandler.addHandler(handler) + if (!handler.isStarted) { + handler.start() + } + } + } + + /** Detach a handler from this UI. */ + def detachHandler(handler: ServletContextHandler) { + handlers -= handler + serverInfo.foreach { info => + info.rootHandler.removeHandler(handler) + if (handler.isStarted) { + handler.stop() + } + } } /** Initialize all components of the server. Must be called before bind(). */ @@ -89,6 +105,7 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: } } + /** * A tab that represents a collection of pages and a unit of listening for Spark events. * Associating each tab with a listener is arbitrary and need not be the case. @@ -108,6 +125,7 @@ private[spark] abstract class UITab(val prefix: String) { def start() } + /** * A page that represents the leaf node in the UI hierarchy. *