From 83af656cfafdc789ee514ea7ee704e5f40e74b3c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 18 Mar 2014 18:42:53 -0700 Subject: [PATCH] Scraps and pieces (no functionality change) The biggest changes would probably include (1) The refactoring of helper methods in StorageUtils (2) The renaming of SparkListenerBlockManagerLost to SparkListenerBlockManagerRemoved. (3) Rendering an empty default page if the user requests an RDD page for an RDD that doesn't exist (otherwise it just crashes) The rest are mostly formatting and comments. --- .../scala/org/apache/spark/SparkContext.scala | 45 ++--- .../scala/org/apache/spark/SparkEnv.scala | 28 +-- .../spark/deploy/master/ui/MasterWebUI.scala | 9 +- .../apache/spark/deploy/worker/Worker.scala | 13 +- .../spark/deploy/worker/ui/WorkerWebUI.scala | 67 ++++--- .../apache/spark/executor/TaskMetrics.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 3 +- .../scheduler/EventLoggingListener.scala | 17 +- .../spark/scheduler/LiveListenerBus.scala | 4 +- .../spark/scheduler/ReplayListenerBus.scala | 29 +-- .../spark/scheduler/SparkListener.scala | 11 +- .../spark/scheduler/SparkListenerBus.scala | 12 +- .../apache/spark/storage/BlockManager.scala | 4 +- .../storage/BlockManagerMasterActor.scala | 12 +- .../apache/spark/storage/MemoryStore.scala | 6 +- .../org/apache/spark/storage/PutResult.scala | 2 +- .../spark/storage/StorageStatusListener.scala | 16 +- .../apache/spark/storage/StorageUtils.scala | 94 +++++----- .../org/apache/spark/ui/JettyUtils.scala | 14 +- .../scala/org/apache/spark/ui/SparkUI.scala | 4 +- .../apache/spark/ui/env/EnvironmentUI.scala | 8 +- .../apache/spark/ui/exec/ExecutorsUI.scala | 8 +- .../spark/ui/jobs/JobProgressListener.scala | 4 +- .../org/apache/spark/ui/jobs/StagePage.scala | 167 +++++++++--------- .../org/apache/spark/ui/jobs/StageTable.scala | 2 +- .../spark/ui/storage/BlockManagerUI.scala | 12 +- .../org/apache/spark/ui/storage/RDDPage.scala | 13 +- .../org/apache/spark/util/FileLogger.scala | 35 ++-- .../org/apache/spark/util/JsonProtocol.scala | 33 ++-- .../scala/org/apache/spark/util/Utils.scala | 2 + .../apache/spark/util/JsonProtocolSuite.scala | 15 +- 31 files changed, 346 insertions(+), 345 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 244e8db917812..a1003b7925715 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -155,6 +155,27 @@ class SparkContext( private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf) + // Initialize the Spark UI, registering all associated listeners + private[spark] val ui = new SparkUI(this) + ui.bind() + ui.start() + + // Optionally log Spark events + private[spark] val eventLogger: Option[EventLoggingListener] = { + if (conf.getBoolean("spark.eventLog.enabled", false)) { + val logger = new EventLoggingListener(appName, conf) + listenerBus.addListener(logger) + Some(logger) + } else None + } + + // Information needed to replay logged events, if any + private[spark] val eventLoggingInfo: Option[EventLoggingInfo] = + eventLogger.map { logger => Some(logger.info) }.getOrElse(None) + + // At this point, all relevant SparkListeners have been registered, so begin releasing events + listenerBus.start() + val startTime = System.currentTimeMillis() // Add each JAR given through the constructor @@ -199,27 +220,6 @@ class SparkContext( } executorEnvs("SPARK_USER") = sparkUser - // Start the UI before posting events to listener bus, because the UI listens for Spark events - private[spark] val ui = new SparkUI(this) - ui.bind() - ui.start() - - // Optionally log SparkListenerEvents - private[spark] val eventLogger: Option[EventLoggingListener] = { - if (conf.getBoolean("spark.eventLog.enabled", false)) { - val logger = new EventLoggingListener(appName, conf) - listenerBus.addListener(logger) - Some(logger) - } else None - } - - // Information needed to replay logged events, if any - private[spark] val eventLoggingInfo: Option[EventLoggingInfo] = - eventLogger.map { logger => Some(logger.info) }.getOrElse(None) - - // At this point, all relevant SparkListeners have been registered, so begin releasing events - listenerBus.start() - // Create and start the scheduler private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master) taskScheduler.start() @@ -835,6 +835,7 @@ class SparkContext( if (dagSchedulerCopy != null) { metadataCleaner.cancel() dagSchedulerCopy.stop() + listenerBus.stop() taskScheduler = null // TODO: Cache.stop()? env.stop() @@ -1065,7 +1066,7 @@ class SparkContext( /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = nextRddId.getAndIncrement() - /** Post the environment update event once the task scheduler is ready. */ + /** Post the environment update event once the task scheduler is ready */ private def postEnvironmentUpdate() { if (taskScheduler != null) { val schedulingMode = getSchedulingMode.toString diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index a99cf2675e1f5..a1af63fa4a391 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -180,10 +180,9 @@ object SparkEnv extends Logging { } } - val blockManagerMaster = new BlockManagerMaster( - registerOrLookup("BlockManagerMaster", - new BlockManagerMasterActor(isLocal, conf, listenerBus)), - conf) + val blockManagerMaster = new BlockManagerMaster(registerOrLookup( + "BlockManagerMaster", + new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf, securityManager) @@ -271,23 +270,26 @@ object SparkEnv extends Logging { ("Scala Home", Properties.scalaHome) ).sorted - // Spark properties, including scheduling mode whether or not it is configured - var additionalFields = Seq[(String, String)]() - conf.getOption("spark.scheduler.mode").getOrElse { - additionalFields ++= Seq(("spark.scheduler.mode", schedulingMode)) - } - val sparkProperties = conf.getAll.sorted ++ additionalFields + // Spark properties + // This includes the scheduling mode whether or not it is configured (used by SparkUI) + val schedulerMode = + if (!conf.contains("spark.scheduler.mode")) { + Seq(("spark.scheduler.mode", schedulingMode)) + } else { + Seq[(String, String)]() + } + val sparkProperties = (conf.getAll ++ schedulerMode).sorted // System properties that are not java classpaths val systemProperties = System.getProperties.iterator.toSeq - val classPathProperty = systemProperties.find { case (k, v) => - k == "java.class.path" - }.getOrElse(("", "")) val otherProperties = systemProperties.filter { case (k, v) => k != "java.class.path" && !k.startsWith("spark.") }.sorted // Class paths including all added jars and files + val classPathProperty = systemProperties.find { case (k, v) => + k == "java.class.path" + }.getOrElse(("", "")) val classPathEntries = classPathProperty._2 .split(conf.get("path.separator", ":")) .filterNot(e => e.isEmpty) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 4d94105ca5e1c..73c3b40fbe228 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.master.ui import javax.servlet.http.HttpServletRequest + import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.Logging @@ -33,12 +34,12 @@ private[spark] class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) - var serverInfo: Option[ServerInfo] = None private val host = Utils.localHostName() private val port = requestedPort private val applicationPage = new ApplicationPage(this) private val indexPage = new IndexPage(this) + private var serverInfo: Option[ServerInfo] = None private val handlers: Seq[ServletContextHandler] = { master.masterMetricsSystem.getServletHandlers ++ @@ -71,7 +72,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ def attachUI(ui: SparkUI) { - assert(serverInfo.isDefined, "Master UI must be initialized before attaching SparkUIs") + assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") val rootHandler = serverInfo.get.rootHandler for (handler <- ui.handlers) { rootHandler.addHandler(handler) @@ -83,7 +84,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */ def detachUI(ui: SparkUI) { - assert(serverInfo.isDefined, "Master UI must be initialized before detaching SparkUIs") + assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") val rootHandler = serverInfo.get.rootHandler for (handler <- ui.handlers) { if (handler.isStarted) { @@ -94,7 +95,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { } def stop() { - assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not initialized!") + assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!") serverInfo.get.server.stop() } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 74cd4d96b0964..5e0fc31fff22f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -339,10 +339,15 @@ private[spark] object Worker { actorSystem.awaitTermination() } - def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, - masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None) - : (ActorSystem, Int) = - { + def startSystemAndActor( + host: String, + port: Int, + webUiPort: Int, + cores: Int, + memory: Int, + masterUrls: Array[String], + workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { + // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val conf = new SparkConf val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 8ceaa5bbb0d1a..20ed5ef1dfe8b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.worker.ui import java.io.File import javax.servlet.http.HttpServletRequest + import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.Logging @@ -32,29 +33,30 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ private[spark] class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) - extends Logging { - val timeout = AkkaUtils.askTimeout(worker.conf) - val host = Utils.localHostName() - val port = requestedPort.getOrElse( - worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) - - var serverInfo: Option[ServerInfo] = None + extends Logging { - val indexPage = new IndexPage(this) - - val metricsHandlers = worker.metricsSystem.getServletHandlers + val timeout = AkkaUtils.askTimeout(worker.conf) - val handlers = metricsHandlers ++ Seq[ServletContextHandler]( - createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static/*"), - createServletHandler("/log", - (request: HttpServletRequest) => log(request), worker.securityMgr), - createServletHandler("/logPage", - (request: HttpServletRequest) => logPage(request), worker.securityMgr), - createServletHandler("/json", - (request: HttpServletRequest) => indexPage.renderJson(request), worker.securityMgr), - createServletHandler("/", - (request: HttpServletRequest) => indexPage.render(request), worker.securityMgr) - ) + private val host = Utils.localHostName() + private val port = requestedPort.getOrElse( + worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) + private val indexPage = new IndexPage(this) + private var serverInfo: Option[ServerInfo] = None + + private val handlers: Seq[ServletContextHandler] = { + worker.metricsSystem.getServletHandlers ++ + Seq[ServletContextHandler]( + createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static/*"), + createServletHandler("/log", + (request: HttpServletRequest) => log(request), worker.securityMgr), + createServletHandler("/logPage", + (request: HttpServletRequest) => logPage(request), worker.securityMgr), + createServletHandler("/json", + (request: HttpServletRequest) => indexPage.renderJson(request), worker.securityMgr), + createServletHandler("/", + (request: HttpServletRequest) => indexPage.render(request), worker.securityMgr) + ) + } def bind() { try { @@ -69,7 +71,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - def log(request: HttpServletRequest): String = { + private def log(request: HttpServletRequest): String = { val defaultBytes = 100 * 1024 val appId = Option(request.getParameter("appId")) @@ -96,7 +98,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I pre + Utils.offsetBytes(path, startByte, endByte) } - def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = { + private def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = { val defaultBytes = 100 * 1024 val appId = Option(request.getParameter("appId")) val executorId = Option(request.getParameter("executorId")) @@ -117,17 +119,14 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val (startByte, endByte) = getByteRange(path, offset, byteLength) val file = new File(path) val logLength = file.length - val logText = {Utils.offsetBytes(path, startByte, endByte)} - val linkToMaster =

Back to Master

- val range = Bytes {startByte.toString} - {endByte.toString} of {logLength} val backButton = if (startByte > 0) { + .format(params, logType, math.max(startByte - byteLength, 0), byteLength)}> @@ -144,7 +143,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I } @@ -173,29 +172,23 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I } /** Determine the byte range for a log or log page. */ - def getByteRange(path: String, offset: Option[Long], byteLength: Int) - : (Long, Long) = { + private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = { val defaultBytes = 100 * 1024 val maxBytes = 1024 * 1024 - val file = new File(path) val logLength = file.length() - val getOffset = offset.getOrElse(logLength-defaultBytes) - + val getOffset = offset.getOrElse(logLength - defaultBytes) val startByte = if (getOffset < 0) 0L else if (getOffset > logLength) logLength else getOffset - val logPageLength = math.min(byteLength, maxBytes) - val endByte = math.min(startByte + logPageLength, logLength) - (startByte, endByte) } def stop() { - assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not initialized!") + assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not bound to a server!") serverInfo.get.server.stop() } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 237f42b48a8c9..88625e79a5c68 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -72,7 +72,7 @@ class TaskMetrics extends Serializable { var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None /** - * Statuses of any blocks that have been updated as a result of this task. + * Storage statuses of any blocks that have been updated as a result of this task. */ var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 11a8d848c8e70..77c558ac46f6f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1093,7 +1093,7 @@ class DAGScheduler( "stageToInfos" -> stageToInfos, "jobIdToStageIds" -> jobIdToStageIds, "stageIdToJobIds" -> stageIdToJobIds). - foreach { case(s, t) => + foreach { case (s, t) => val sizeBefore = t.size t.clearOldValues(cleanupTime) logInfo("%s %d --> %d".format(s, sizeBefore, t.size)) @@ -1106,7 +1106,6 @@ class DAGScheduler( } metadataCleaner.cancel() taskScheduler.stop() - listenerBus.stop() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 4985f8c43e70d..217f8825c2ae9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -66,11 +66,16 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) } // Events that do not trigger a flush - override def onStageSubmitted(event: SparkListenerStageSubmitted) = logEvent(event) - override def onTaskStart(event: SparkListenerTaskStart) = logEvent(event) - override def onTaskGettingResult(event: SparkListenerTaskGettingResult) = logEvent(event) - override def onTaskEnd(event: SparkListenerTaskEnd) = logEvent(event) - override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate) = logEvent(event) + override def onStageSubmitted(event: SparkListenerStageSubmitted) = + logEvent(event) + override def onTaskStart(event: SparkListenerTaskStart) = + logEvent(event) + override def onTaskGettingResult(event: SparkListenerTaskGettingResult) = + logEvent(event) + override def onTaskEnd(event: SparkListenerTaskEnd) = + logEvent(event) + override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate) = + logEvent(event) // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted) = @@ -81,7 +86,7 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) logEvent(event, flushLogger = true) override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded) = logEvent(event, flushLogger = true) - override def onBlockManagerLost(event: SparkListenerBlockManagerLost) = + override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved) = logEvent(event, flushLogger = true) override def onUnpersistRDD(event: SparkListenerUnpersistRDD) = logEvent(event, flushLogger = true) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 9aec4164f58b4..353a48661b0f7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -54,11 +54,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { override def run() { while (true) { val event = eventQueue.take - val shutdown = postToAll(event) - if (shutdown) { + if (event == SparkListenerShutdown) { // Get out of the while loop and shutdown the daemon thread return } + postToAll(event) } } }.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 4e27d959d4534..db76178b65501 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -67,28 +67,35 @@ private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus } logPaths.foreach { path => - // In case there is an exception, keep track of the highest level stream to close it later - var streamToClose: Option[InputStream] = None - var currentLine = "" + // Keep track of input streams at all levels to close them later + // This is necessary because an exception can occur in between stream initializations + var fileStream: Option[InputStream] = None + var bufferedStream: Option[InputStream] = None + var compressStream: Option[InputStream] = None + var currentLine = "" try { - val fstream = fileSystem.open(path) - val bstream = new FastBufferedInputStream(fstream) - val cstream = if (compressed) compressionCodec.compressedInputStream(bstream) else bstream - streamToClose = Some(cstream) + currentLine = "" + fileStream = Some(fileSystem.open(path)) + bufferedStream = Some(new FastBufferedInputStream(fileStream.get)) + compressStream = + if (compressed) { + Some(compressionCodec.compressedInputStream(bufferedStream.get)) + } else bufferedStream // Parse each line as an event and post it to all attached listeners - val lines = Source.fromInputStream(cstream).getLines() + val lines = Source.fromInputStream(compressStream.get).getLines() lines.foreach { line => currentLine = line - val event = JsonProtocol.sparkEventFromJson(parse(line)) - postToAll(event) + postToAll(JsonProtocol.sparkEventFromJson(parse(line))) } } catch { case e: Exception => logError("Exception in parsing Spark event log %s".format(path), e) logError("Malformed line: %s\n".format(currentLine)) } finally { - streamToClose.foreach(_.close()) + fileStream.foreach(_.close()) + bufferedStream.foreach(_.close()) + compressStream.foreach(_.close()) } } fileSystem.close() diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index e4391bc56664b..d4eb0ac88d8e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -57,7 +57,8 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long) extends SparkListenerEvent -case class SparkListenerBlockManagerLost(blockManagerId: BlockManagerId) extends SparkListenerEvent +case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId) + extends SparkListenerEvent case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent @@ -116,9 +117,9 @@ trait SparkListener { def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) { } /** - * Called when an existing block manager has been lost + * Called when an existing block manager has been removed */ - def onBlockManagerLost(blockManagerLost: SparkListenerBlockManagerLost) { } + def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { } /** * Called when an RDD is manually unpersisted by the application @@ -130,6 +131,9 @@ trait SparkListener { * Simple SparkListener that logs a few summary statistics when each stage completes */ class StatsReportListener extends SparkListener with Logging { + + import org.apache.spark.scheduler.StatsReportListener._ + private val taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]() override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { @@ -141,7 +145,6 @@ class StatsReportListener extends SparkListener with Logging { } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { - import org.apache.spark.scheduler.StatsReportListener._ implicit val sc = stageCompleted this.logInfo("Finished stage: " + stageCompleted.stageInfo) showMillisDistribution("task runtime:", (info, _) => Some(info.duration), taskInfoMetrics) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 67d39c40291f6..729e120497571 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -34,9 +34,10 @@ private[spark] trait SparkListenerBus { } /** - * Post an event to all attached listeners. Return true if the shutdown event is posted. + * Post an event to all attached listeners. This does nothing if the event is + * SparkListenerShutdown. */ - def postToAll(event: SparkListenerEvent): Boolean = { + protected def postToAll(event: SparkListenerEvent) { event match { case stageSubmitted: SparkListenerStageSubmitted => sparkListeners.foreach(_.onStageSubmitted(stageSubmitted)) @@ -56,14 +57,11 @@ private[spark] trait SparkListenerBus { sparkListeners.foreach(_.onEnvironmentUpdate(environmentUpdate)) case blockManagerAdded: SparkListenerBlockManagerAdded => sparkListeners.foreach(_.onBlockManagerAdded(blockManagerAdded)) - case blockManagerLost: SparkListenerBlockManagerLost => - sparkListeners.foreach(_.onBlockManagerLost(blockManagerLost)) + case blockManagerRemoved: SparkListenerBlockManagerRemoved => + sparkListeners.foreach(_.onBlockManagerRemoved(blockManagerRemoved)) case unpersistRDD: SparkListenerUnpersistRDD => sparkListeners.foreach(_.onUnpersistRDD(unpersistRDD)) case SparkListenerShutdown => - return true - case _ => } - false } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 690718832a3d0..71584b6eb102a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -250,9 +250,7 @@ private[spark] class BlockManager( val inMemSize = Math.max(status.memSize, droppedMemorySize) val onDiskSize = status.diskSize master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) - } else { - true - } + } else true } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 7c479b17b46d9..ff2652b640272 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -37,10 +37,8 @@ import org.apache.spark.util.{AkkaUtils, Utils} * all slaves' block managers. */ private[spark] -class BlockManagerMasterActor( - val isLocal: Boolean, - conf: SparkConf, - listenerBus: LiveListenerBus) extends Actor with Logging { +class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus: LiveListenerBus) + extends Actor with Logging { // Mapping from block manager id to the block manager's information. private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo] @@ -163,8 +161,7 @@ class BlockManagerMasterActor( blockLocations.remove(locations) } } - val blockManagerLost = SparkListenerBlockManagerLost(blockManagerId) - listenerBus.post(blockManagerLost) + listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId)) } private def expireDeadHosts() { @@ -241,8 +238,7 @@ class BlockManagerMasterActor( blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor) } - val blockManagerAdded = SparkListenerBlockManagerAdded(id, maxMemSize) - listenerBus.post(blockManagerAdded) + listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize)) } private def updateBlockInfo( diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 7ed7cdd430e7a..488f1ea9628f5 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -73,11 +73,11 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) returnValues: Boolean): PutResult = { if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) - val putAttempt = tryToPut(blockId, values, sizeEstimate, true) + val putAttempt = tryToPut(blockId, values, sizeEstimate, deserialized = true) PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks) } else { val bytes = blockManager.dataSerialize(blockId, values.iterator) - val putAttempt = tryToPut(blockId, bytes, bytes.limit, false) + val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false) PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) } } @@ -158,7 +158,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * blocks to free memory for one block, another thread may use up the freed space for * another block. * - * Return whether put was successful, along with the blocks dropped in the process + * Return whether put was successful, along with the blocks dropped in the process. */ private def tryToPut( blockId: BlockId, diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala b/core/src/main/scala/org/apache/spark/storage/PutResult.scala index cd3f61e75d574..f0eac7594ecf6 100644 --- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala +++ b/core/src/main/scala/org/apache/spark/storage/PutResult.scala @@ -29,4 +29,4 @@ import java.nio.ByteBuffer private[spark] case class PutResult( size: Long, data: Either[Iterator[_], ByteBuffer], - droppedBlocks: Seq[(BlockId, BlockStatus)] = Seq()) + droppedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index fd6b82f4a64d5..26565f56ad858 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -51,14 +51,12 @@ private[spark] class StorageStatusListener extends SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val info = taskEnd.taskInfo - if (info != null) { + val metrics = taskEnd.taskMetrics + if (info != null && metrics != null) { val execId = formatExecutorId(info.executorId) - val metrics = taskEnd.taskMetrics - if (metrics != null) { - val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq()) - if (updatedBlocks.length > 0) { - updateStorageStatus(execId, updatedBlocks) - } + val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + if (updatedBlocks.length > 0) { + updateStorageStatus(execId, updatedBlocks) } } } @@ -77,9 +75,9 @@ private[spark] class StorageStatusListener extends SparkListener { } } - override def onBlockManagerLost(blockManagerLost: SparkListenerBlockManagerLost) { + override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { synchronized { - val executorId = blockManagerLost.blockManagerId.executorId + val executorId = blockManagerRemoved.blockManagerId.executorId executorIdToStorageStatus.remove(executorId) } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index a1d1393348d49..6153dfe0b7e13 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -70,31 +70,41 @@ class RDDInfo(val id: Int, val name: String, val numPartitions: Int, val storage private[spark] object StorageUtils { - /** Returns RDD-level information from a list of StorageStatus objects and SparkContext */ + /** + * Returns basic information of all RDDs persisted in the given SparkContext. This does not + * include storage information. + */ + def rddInfoFromSparkContext(sc: SparkContext): Array[RDDInfo] = { + sc.persistentRdds.values.map { rdd => + val rddName = Option(rdd.name).getOrElse(rdd.id.toString) + val rddNumPartitions = rdd.partitions.size + val rddStorageLevel = rdd.getStorageLevel + val rddInfo = new RDDInfo(rdd.id, rddName, rddNumPartitions, rddStorageLevel) + rddInfo + }.toArray + } + + /** Returns storage information of all RDDs persisted in the given SparkContext. */ def rddInfoFromStorageStatus( - storageStatusList: Seq[StorageStatus], + storageStatuses: Seq[StorageStatus], sc: SparkContext): Array[RDDInfo] = { - val blockStatusMap = blockStatusMapFromStorageStatus(storageStatusList) - val rddInfoList = rddInfoFromSparkContext(blockStatusMap.keys.toSeq, sc) - val rddInfoMap = rddInfoList.map { info => (info.id, info) }.toMap - rddInfoFromBlockStatusMap(blockStatusMap, rddInfoMap) + rddInfoFromStorageStatus(storageStatuses, rddInfoFromSparkContext(sc)) } - /** - * Returns RDD-level information from a list of StorageStatus objects and an existing - * RDD ID to RDDInfo mapping - */ + /** Returns storage information of all RDDs in the given list. */ def rddInfoFromStorageStatus( - storageStatusList: Seq[StorageStatus], - rddInfoMap: Map[Int, RDDInfo]): Array[RDDInfo] = { - val blockStatusMap = blockStatusMapFromStorageStatus(storageStatusList) - rddInfoFromBlockStatusMap(blockStatusMap, rddInfoMap) - } + storageStatuses: Seq[StorageStatus], + rddInfos: Seq[RDDInfo]): Array[RDDInfo] = { + + // Mapping from RDD ID -> an array of associated BlockStatuses + val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap + .groupBy { case (k, _) => k.rddId } + .mapValues(_.values.toArray) - private def rddInfoFromBlockStatusMap( - blockStatusMap: Map[Int, Array[BlockStatus]], - rddInfoMap: Map[Int, RDDInfo]): Array[RDDInfo] = { - val rddInfos = blockStatusMap.map { case (rddId, blocks) => + // Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information) + val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap + + val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) => // Add up memory and disk sizes val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize > 0 } val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L) @@ -105,42 +115,28 @@ object StorageUtils { rddInfo.diskSize = diskSize rddInfo } - }.flatten.toArray - - scala.util.Sorting.quickSort(rddInfos) - rddInfos - } - - private def blockStatusMapFromStorageStatus(storageStatusList: Seq[StorageStatus]) - : Map[Int, Array[BlockStatus]] = { - val rddBlockMap = storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus] - rddBlockMap.groupBy { case (k, v) => k.rddId }.mapValues(_.values.toArray) - } - - private def rddInfoFromSparkContext(rddIds: Seq[Int], sc: SparkContext): Array[RDDInfo] = { - rddIds.flatMap { rddId => - sc.persistentRdds.get(rddId).map { r => - val rddName = Option(r.name).getOrElse(rddId.toString) - val rddNumPartitions = r.partitions.size - val rddStorageLevel = r.getStorageLevel - val rddInfo = new RDDInfo(rddId, rddName, rddNumPartitions, rddStorageLevel) - rddInfo - } }.toArray + + scala.util.Sorting.quickSort(rddStorageInfos) + rddStorageInfos } - /** Returns a map of blocks to their locations, compiled from a list of StorageStatus objects */ - def blockLocationsFromStorageStatus(storageStatusList: Seq[StorageStatus]) = { - val blockLocationPairs = - storageStatusList.flatMap(s => s.blocks.map(b => (b._1, s.blockManagerId.hostPort))) - blockLocationPairs.groupBy(_._1).map{case (k, v) => (k, v.unzip._2)}.toMap + /** Returns a mapping from BlockId to the locations of the associated block. */ + def blockLocationsFromStorageStatus( + storageStatuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = { + val blockLocationPairs = storageStatuses.flatMap { storageStatus => + storageStatus.blocks.map { case (bid, _) => (bid, storageStatus.blockManagerId.hostPort) } + } + blockLocationPairs.toMap + .groupBy { case (blockId, _) => blockId } + .mapValues(_.values.toSeq) } - /** Filters storage status by a given RDD id. */ + /** Filters the given list of StorageStatus by the given RDD ID. */ def filterStorageStatusByRDD( - storageStatusList: Seq[StorageStatus], - rddId: Int) : Array[StorageStatus] = { - storageStatusList.map { status => + storageStatuses: Seq[StorageStatus], + rddId: Int): Array[StorageStatus] = { + storageStatuses.map { status => val filteredBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toSeq val filteredBlockMap = mutable.Map[BlockId, BlockStatus](filteredBlocks: _*) new StorageStatus(status.blockManagerId, status.maxMem, filteredBlockMap) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a1bb6cabd3c9c..af7a737e01911 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -33,11 +33,13 @@ import org.json4s.jackson.JsonMethods.{pretty, render} import org.apache.spark.{Logging, SecurityManager, SparkConf} -/** Utilities for launching a web server using Jetty's HTTP Server class */ +/** + * Utilities for launching a web server using Jetty's HTTP Server class + */ private[spark] object JettyUtils extends Logging { + // Base type for a function that returns something based on an HTTP request. Allows for // implicit conversion from many types of functions to jetty Handlers. - type Responder[T] = HttpServletRequest => T class ServletParams[T <% AnyRef](val responder: Responder[T], @@ -102,7 +104,6 @@ private[spark] object JettyUtils extends Logging { srcPath: String, destPath: String, basePath: String = ""): ServletContextHandler = { - val prefixedSrcPath = attachPrefix(basePath, srcPath) val prefixedDestPath = attachPrefix(basePath, destPath) val servlet = new HttpServlet { override def doGet(request: HttpServletRequest, response: HttpServletResponse) { @@ -111,11 +112,7 @@ private[spark] object JettyUtils extends Logging { response.sendRedirect(newUrl) } } - val contextHandler = new ServletContextHandler - val holder = new ServletHolder(servlet) - contextHandler.setContextPath(prefixedSrcPath) - contextHandler.addServlet(holder, "/") - contextHandler + createServletHandler(srcPath, servlet, basePath) } /** Create a handler for serving files from a static directory */ @@ -133,6 +130,7 @@ private[spark] object JettyUtils extends Logging { contextHandler } + /** Add security filters, if any, do the given list of ServletContextHandlers */ private def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) { val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim()) filters.foreach { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index f5bff6dbce523..5ef08e4a924a9 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -29,7 +29,7 @@ import org.apache.spark.ui.jobs.JobProgressUI import org.apache.spark.ui.storage.BlockManagerUI import org.apache.spark.util.Utils -/** Top level user interface for Spark. */ +/** Top level user interface for Spark */ private[spark] class SparkUI( val sc: SparkContext, conf: SparkConf, @@ -106,7 +106,7 @@ private[spark] class SparkUI( } def stop() { - assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not initialized!") + assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not bound to a server!") serverInfo.get.server.stop() logInfo("Stopped Spark Web UI at %s".format(appUIAddress)) } diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala index acebaa68101a5..23e90c34d5b33 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala @@ -75,10 +75,10 @@ private[ui] class EnvironmentUI(parent: SparkUI) { * A SparkListener that prepares information to be displayed on the EnvironmentUI */ private[ui] class EnvironmentListener extends SparkListener { - var jvmInformation: Seq[(String, String)] = Seq() - var sparkProperties: Seq[(String, String)] = Seq() - var systemProperties: Seq[(String, String)] = Seq() - var classpathEntries: Seq[(String, String)] = Seq() + var jvmInformation = Seq[(String, String)]() + var sparkProperties = Seq[(String, String)]() + var systemProperties = Seq[(String, String)]() + var classpathEntries = Seq[(String, String)]() override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index 90604357d95b9..031ed88a493a8 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -151,11 +151,11 @@ private[ui] class ExecutorsUI(parent: SparkUI) { totalTasks, totalDuration, totalShuffleRead, - totalShuffleWrite - ) ++ Seq(maxMem) + totalShuffleWrite, + maxMem + ).map(_.toString) - val execValuesString = execValues.map(_.toString) - execFields.zip(execValuesString).toMap + execFields.zip(execValues).toMap } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index c2b2d41054e63..d10aa12b9ebca 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -247,9 +247,9 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { } } - override def onBlockManagerLost(blockManagerLost: SparkListenerBlockManagerLost) { + override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { synchronized { - val executorId = blockManagerLost.blockManagerId.executorId + val executorId = blockManagerRemoved.blockManagerId.executorId executorIdToBlockManagerId.remove(executorId) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 1427b1f6e0458..da7f20233063e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -136,8 +136,9 @@ private[ui] class StagePage(parent: JobProgressUI) { } } val gettingResultQuantiles = "Time spent fetching task results" +: - Distribution(gettingResultTimes).get.getQuantiles() - .map(millis => parent.formatDuration(millis.toLong)) + Distribution(gettingResultTimes).get.getQuantiles().map { millis => + parent.formatDuration(millis.toLong) + } // The scheduler delay includes the network delay to send the task to the worker // machine and to send back the result (but not the time to fetch the task result, // if it needed to be fetched from the block manager on the worker). @@ -152,8 +153,9 @@ private[ui] class StagePage(parent: JobProgressUI) { totalExecutionTime - metrics.get.executorRunTime } val schedulerDelayQuantiles = "Scheduler delay" +: - Distribution(schedulerDelays).get.getQuantiles() - .map(millis => parent.formatDuration(millis.toLong)) + Distribution(schedulerDelays).get.getQuantiles().map { millis => + parent.formatDuration(millis.toLong) + } def getQuantileCols(data: Seq[Double]) = Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong)) @@ -209,89 +211,88 @@ private[ui] class StagePage(parent: JobProgressUI) { } def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean) - (taskData: TaskUIData): Seq[Node] = { + (taskData: TaskUIData): Seq[Node] = { def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] = trace.map(e => {e.toString}) - val info = taskData.taskInfo - val metrics = taskData.taskMetrics - val exception = taskData.exception - - val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) - else metrics.map(_.executorRunTime).getOrElse(1L) - val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration) - else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("") - val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) - val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) - - val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) - val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") - val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") - - val maybeShuffleWrite = - metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten) - val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("") - val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("") - - val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) - val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("") - val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map { ms => - if (ms == 0) "" else parent.formatDuration(ms) - }.getOrElse("") - - val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled) - val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") - val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") - - val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled) - val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") - val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") - - - {info.index} - {info.taskId} - {info.status} - {info.taskLocality} - {info.host} - {dateFmt.format(new Date(info.launchTime))} - - {formatDuration} - - - {if (gcTime > 0) parent.formatDuration(gcTime) else ""} - - - {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""} - - {if (shuffleRead) { - - {shuffleReadReadable} - - }} - {if (shuffleWrite) { - - {writeTimeReadable} - - - {shuffleWriteReadable} - - }} - {if (bytesSpilled) { - - {memoryBytesSpilledReadable} + taskData match { case TaskUIData(info, metrics, exception) => + val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) + else metrics.map(_.executorRunTime).getOrElse(1L) + val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration) + else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("") + val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) + val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) + + val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) + val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") + val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") + + val maybeShuffleWrite = + metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten) + val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("") + val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("") + + val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) + val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("") + val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map { ms => + if (ms == 0) "" else parent.formatDuration(ms) + }.getOrElse("") + + val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled) + val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") + val memoryBytesSpilledReadable = + maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") + + val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled) + val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") + val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") + + + {info.index} + {info.taskId} + {info.status} + {info.taskLocality} + {info.host} + {dateFmt.format(new Date(info.launchTime))} + + {formatDuration} + + + {if (gcTime > 0) parent.formatDuration(gcTime) else ""} - - {diskBytesSpilledReadable} + + {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""} - }} - - {exception.map { e => - - {e.className} ({e.description})
- {fmtStackTrace(e.stackTrace)} -
- }.getOrElse("")} - - + {if (shuffleRead) { + + {shuffleReadReadable} + + }} + {if (shuffleWrite) { + + {writeTimeReadable} + + + {shuffleWriteReadable} + + }} + {if (bytesSpilled) { + + {memoryBytesSpilledReadable} + + + {diskBytesSpilledReadable} + + }} + + {exception.map { e => + + {e.className} ({e.description})
+ {fmtStackTrace(e.stackTrace)} +
+ }.getOrElse("")} + + + } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 466c014dfdf36..063c4bfebfeeb 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -85,7 +85,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) { case Some(t) => dateFmt.format(new Date(t)) case None => "Unknown" } - val finishTime = s.completionTime.getOrElse(System.currentTimeMillis()) + val finishTime = s.completionTime.getOrElse(System.currentTimeMillis) val duration = s.submissionTime.map(t => finishTime - t) val formattedDuration = duration.map(d => parent.formatDuration(d)).getOrElse("Unknown") val startedTasks = diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala index a3c4f2d08be93..4d8b01dbe6e1b 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala @@ -66,13 +66,14 @@ private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListe /** Update each RDD's info to reflect any updates to the RDD's storage status */ private def updateRDDInfo() { - val updatedRDDInfoList = StorageUtils.rddInfoFromStorageStatus(storageStatusList, _rddInfoMap) - updatedRDDInfoList.foreach { info => _rddInfoMap(info.id) = info } + val rddInfos = _rddInfoMap.values.toSeq + val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos) + updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info } } /** - * Assumes the storage status list is fully up-to-date. This implies that the corresponding - * StorageStatusSparkListener must + * Assumes the storage status list is fully up-to-date. This implies the corresponding + * StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener. */ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val metrics = taskEnd.taskMetrics @@ -88,11 +89,10 @@ private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListe override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { // Remove all partitions that are no longer cached - _rddInfoMap.retain { case (id, info) => info.numCachedPartitions > 0 } + _rddInfoMap.retain { case (_, info) => info.numCachedPartitions > 0 } } override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized { - super.onUnpersistRDD(unpersistRDD) updateRDDInfo() } } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 6f82194a108a1..3f42eba4ece00 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -33,20 +33,23 @@ private[ui] class RDDPage(parent: BlockManagerUI) { private lazy val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { + val rddId = request.getParameter("id").toInt val storageStatusList = listener.storageStatusList - val id = request.getParameter("id").toInt - val rddInfo = listener.rddInfoList.filter(_.id == id).head + val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse { + // Rather than crashing, render an "RDD Not Found" page + return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found", Storage) + } // Worker table - val workers = storageStatusList.map((id, _)) + val workers = storageStatusList.map((rddId, _)) val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers) // Block table - val filteredStorageStatusList = StorageUtils.filterStorageStatusByRDD(storageStatusList, id) + val filteredStorageStatusList = StorageUtils.filterStorageStatusByRDD(storageStatusList, rddId) val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).sortWith(_._1.name < _._1.name) val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList) val blocks = blockStatuses.map { case (blockId, status) => - (blockId, status, blockLocations.get(blockId).getOrElse(Seq("Unknown"))) + (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) } val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks) diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 409223b67737e..f07962096a32c 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -39,7 +39,7 @@ import org.apache.spark.io.CompressionCodec class FileLogger( logDir: String, conf: SparkConf = new SparkConf, - outputBufferSize: Int = 8 * 1024, + outputBufferSize: Int = 8 * 1024, // 8 KB compress: Boolean = false, overwrite: Boolean = true) extends Logging { @@ -59,7 +59,9 @@ class FileLogger( Some(createWriter()) } - /** Create a logging directory with the given path. */ + /** + * Create a logging directory with the given path. + */ private def createLogDir() { val path = new Path(logDir) if (fileSystem.exists(path)) { @@ -86,18 +88,14 @@ class FileLogger( /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). * Therefore, for local files, use FileOutputStream instead. */ val dstream = uri.getScheme match { - case "hdfs" | "s3" => - val path = new Path(logPath) - hadoopDataStream = Some(fileSystem.create(path, overwrite)) - hadoopDataStream.get - case "file" | null => // Second parameter is whether to append new FileOutputStream(logPath, !overwrite) - case unsupportedScheme => - throw new UnsupportedOperationException("File system scheme %s is not supported!" - .format(unsupportedScheme)) + case _ => + val path = new Path(logPath) + hadoopDataStream = Some(fileSystem.create(path, overwrite)) + hadoopDataStream.get } val bstream = new FastBufferedOutputStream(dstream, outputBufferSize) @@ -106,21 +104,20 @@ class FileLogger( } /** - * Log the message to the given writer + * Log the message to the given writer. * @param msg The message to be logged * @param withTime Whether to prepend message with a timestamp */ def log(msg: String, withTime: Boolean = false) { - var writeInfo = msg - if (withTime) { + val writeInfo = if (!withTime) msg else { val date = new Date(System.currentTimeMillis()) - writeInfo = DATE_FORMAT.format(date) + ": " + msg + DATE_FORMAT.format(date) + ": " + msg } writer.foreach(_.print(writeInfo)) } /** - * Log the message to the given writer as a new line + * Log the message to the given writer as a new line. * @param msg The message to be logged * @param withTime Whether to prepend message with a timestamp */ @@ -138,13 +135,17 @@ class FileLogger( hadoopDataStream.foreach(_.sync()) } - /** Close the writer. Any subsequent calls to log or flush will have no effect. */ + /** + * Close the writer. Any subsequent calls to log or flush will have no effect. + */ def close() { writer.foreach(_.close()) writer = None } - /** Start a writer for a new file if one does not already exit */ + /** + * Start a writer for a new file if one does not already exit. + */ def start() { writer.getOrElse { fileIndex += 1 diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 2b51597785b98..346f2b7856791 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -21,7 +21,6 @@ import java.util.{Properties, UUID} import scala.collection.JavaConverters._ import scala.collection.Map -import scala.collection.mutable import org.json4s.DefaultFormats import org.json4s.JsonDSL._ @@ -59,12 +58,13 @@ private[spark] object JsonProtocol { environmentUpdateToJson(environmentUpdate) case blockManagerAdded: SparkListenerBlockManagerAdded => blockManagerAddedToJson(blockManagerAdded) - case blockManagerLost: SparkListenerBlockManagerLost => - blockManagerLostToJson(blockManagerLost) + case blockManagerRemoved: SparkListenerBlockManagerRemoved => + blockManagerRemovedToJson(blockManagerRemoved) case unpersistRDD: SparkListenerUnpersistRDD => unpersistRDDToJson(unpersistRDD) - case SparkListenerShutdown => - shutdownToJson() + + // Not used, but keeps compiler happy + case SparkListenerShutdown => JNothing } } @@ -146,9 +146,9 @@ private[spark] object JsonProtocol { ("Maximum Memory" -> blockManagerAdded.maxMem) } - def blockManagerLostToJson(blockManagerLost: SparkListenerBlockManagerLost): JValue = { - val blockManagerId = blockManagerIdToJson(blockManagerLost.blockManagerId) - ("Event" -> Utils.getFormattedClassName(blockManagerLost)) ~ + def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = { + val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId) + ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~ ("Block Manager ID" -> blockManagerId) } @@ -157,9 +157,6 @@ private[spark] object JsonProtocol { ("RDD ID" -> unpersistRDD.rddId) } - def shutdownToJson(): JValue = { - "Event" -> Utils.getFormattedClassName(SparkListenerShutdown) - } /** ------------------------------------------------------------------- * * JSON serialization methods for classes SparkListenerEvents depend on | @@ -323,6 +320,7 @@ private[spark] object JsonProtocol { ("Disk Size" -> blockStatus.diskSize) } + /** ------------------------------ * * Util JSON serialization methods | * ------------------------------- */ @@ -357,6 +355,7 @@ private[spark] object JsonProtocol { ("Stack Trace" -> stackTraceToJson(exception.getStackTrace)) } + /** --------------------------------------------------- * * JSON deserialization methods for SparkListenerEvents | * ---------------------------------------------------- */ @@ -371,9 +370,8 @@ private[spark] object JsonProtocol { val jobEnd = Utils.getFormattedClassName(SparkListenerJobEnd) val environmentUpdate = Utils.getFormattedClassName(SparkListenerEnvironmentUpdate) val blockManagerAdded = Utils.getFormattedClassName(SparkListenerBlockManagerAdded) - val blockManagerLost = Utils.getFormattedClassName(SparkListenerBlockManagerLost) + val blockManagerRemoved = Utils.getFormattedClassName(SparkListenerBlockManagerRemoved) val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD) - val shutdown = Utils.getFormattedClassName(SparkListenerShutdown) (json \ "Event").extract[String] match { case `stageSubmitted` => stageSubmittedFromJson(json) @@ -385,9 +383,8 @@ private[spark] object JsonProtocol { case `jobEnd` => jobEndFromJson(json) case `environmentUpdate` => environmentUpdateFromJson(json) case `blockManagerAdded` => blockManagerAddedFromJson(json) - case `blockManagerLost` => blockManagerLostFromJson(json) + case `blockManagerRemoved` => blockManagerRemovedFromJson(json) case `unpersistRDD` => unpersistRDDFromJson(json) - case `shutdown` => SparkListenerShutdown } } @@ -450,15 +447,16 @@ private[spark] object JsonProtocol { SparkListenerBlockManagerAdded(blockManagerId, maxMem) } - def blockManagerLostFromJson(json: JValue): SparkListenerBlockManagerLost = { + def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = { val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") - SparkListenerBlockManagerLost(blockManagerId) + SparkListenerBlockManagerRemoved(blockManagerId) } def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = { SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int]) } + /** --------------------------------------------------------------------- * * JSON deserialization methods for classes SparkListenerEvents depend on | * ---------------------------------------------------------------------- */ @@ -669,6 +667,7 @@ private[spark] object JsonProtocol { BlockStatus(storageLevel, memorySize, diskSize) } + /** -------------------------------- * * Util JSON deserialization methods | * --------------------------------- */ diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 26be4bfdae1c2..13d9dbdd9af2d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -900,6 +900,7 @@ private[spark] object Utils extends Logging { obj.getClass.getSimpleName.replace("$", "") } + /** Return an option that translates JNothing to None */ def jsonOption(json: JValue): Option[JValue] = { json match { case JNothing => None @@ -907,6 +908,7 @@ private[spark] object Utils extends Logging { } } + /** Return an empty JSON object */ def emptyJson = JObject(List[JField]()) /** diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index da3933cff04cc..67c0a434c9b52 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -49,7 +49,7 @@ class JsonProtocolSuite extends FunSuite { )) val blockManagerAdded = SparkListenerBlockManagerAdded( BlockManagerId("Stars", "In your multitude...", 300, 400), 500) - val blockManagerLost = SparkListenerBlockManagerLost( + val blockManagerRemoved = SparkListenerBlockManagerRemoved( BlockManagerId("Scarce", "to be counted...", 100, 200)) val unpersistRdd = SparkListenerUnpersistRDD(12345) @@ -62,9 +62,8 @@ class JsonProtocolSuite extends FunSuite { testEvent(jobEnd, jobEndJsonString) testEvent(environmentUpdate, environmentUpdateJsonString) testEvent(blockManagerAdded, blockManagerAddedJsonString) - testEvent(blockManagerLost, blockManagerLostJsonString) + testEvent(blockManagerRemoved, blockManagerRemovedJsonString) testEvent(unpersistRdd, unpersistRDDJsonString) - testEvent(SparkListenerShutdown, shutdownJsonString) } test("Dependent Classes") { @@ -208,7 +207,7 @@ class JsonProtocolSuite extends FunSuite { case (e1: SparkListenerBlockManagerAdded, e2: SparkListenerBlockManagerAdded) => assert(e1.maxMem == e2.maxMem) assertEquals(e1.blockManagerId, e2.blockManagerId) - case (e1: SparkListenerBlockManagerLost, e2: SparkListenerBlockManagerLost) => + case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) => assertEquals(e1.blockManagerId, e2.blockManagerId) case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) => assert(e1.rddId == e2.rddId) @@ -546,9 +545,9 @@ class JsonProtocolSuite extends FunSuite { "Host":"In your multitude...","Port":300,"Netty Port":400},"Maximum Memory":500} """ - private val blockManagerLostJsonString = + private val blockManagerRemovedJsonString = """ - {"Event":"SparkListenerBlockManagerLost","Block Manager ID":{"Executor ID":"Scarce", + {"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"Scarce", "Host":"to be counted...","Port":100,"Netty Port":200}} """ @@ -557,8 +556,4 @@ class JsonProtocolSuite extends FunSuite { {"Event":"SparkListenerUnpersistRDD","RDD ID":12345} """ - private val shutdownJsonString = - """ - {"Event":"SparkListenerShutdown"} - """ }