diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 244e8db917812..a1003b7925715 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -155,6 +155,27 @@ class SparkContext( private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf) + // Initialize the Spark UI, registering all associated listeners + private[spark] val ui = new SparkUI(this) + ui.bind() + ui.start() + + // Optionally log Spark events + private[spark] val eventLogger: Option[EventLoggingListener] = { + if (conf.getBoolean("spark.eventLog.enabled", false)) { + val logger = new EventLoggingListener(appName, conf) + listenerBus.addListener(logger) + Some(logger) + } else None + } + + // Information needed to replay logged events, if any + private[spark] val eventLoggingInfo: Option[EventLoggingInfo] = + eventLogger.map { logger => Some(logger.info) }.getOrElse(None) + + // At this point, all relevant SparkListeners have been registered, so begin releasing events + listenerBus.start() + val startTime = System.currentTimeMillis() // Add each JAR given through the constructor @@ -199,27 +220,6 @@ class SparkContext( } executorEnvs("SPARK_USER") = sparkUser - // Start the UI before posting events to listener bus, because the UI listens for Spark events - private[spark] val ui = new SparkUI(this) - ui.bind() - ui.start() - - // Optionally log SparkListenerEvents - private[spark] val eventLogger: Option[EventLoggingListener] = { - if (conf.getBoolean("spark.eventLog.enabled", false)) { - val logger = new EventLoggingListener(appName, conf) - listenerBus.addListener(logger) - Some(logger) - } else None - } - - // Information needed to replay logged events, if any - private[spark] val eventLoggingInfo: Option[EventLoggingInfo] = - eventLogger.map { logger => Some(logger.info) }.getOrElse(None) - - // At this point, all relevant SparkListeners have been registered, so begin releasing events - listenerBus.start() - // Create and start the scheduler private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master) taskScheduler.start() @@ -835,6 +835,7 @@ class SparkContext( if (dagSchedulerCopy != null) { metadataCleaner.cancel() dagSchedulerCopy.stop() + listenerBus.stop() taskScheduler = null // TODO: Cache.stop()? env.stop() @@ -1065,7 +1066,7 @@ class SparkContext( /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = nextRddId.getAndIncrement() - /** Post the environment update event once the task scheduler is ready. */ + /** Post the environment update event once the task scheduler is ready */ private def postEnvironmentUpdate() { if (taskScheduler != null) { val schedulingMode = getSchedulingMode.toString diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index a99cf2675e1f5..a1af63fa4a391 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -180,10 +180,9 @@ object SparkEnv extends Logging { } } - val blockManagerMaster = new BlockManagerMaster( - registerOrLookup("BlockManagerMaster", - new BlockManagerMasterActor(isLocal, conf, listenerBus)), - conf) + val blockManagerMaster = new BlockManagerMaster(registerOrLookup( + "BlockManagerMaster", + new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf, securityManager) @@ -271,23 +270,26 @@ object SparkEnv extends Logging { ("Scala Home", Properties.scalaHome) ).sorted - // Spark properties, including scheduling mode whether or not it is configured - var additionalFields = Seq[(String, String)]() - conf.getOption("spark.scheduler.mode").getOrElse { - additionalFields ++= Seq(("spark.scheduler.mode", schedulingMode)) - } - val sparkProperties = conf.getAll.sorted ++ additionalFields + // Spark properties + // This includes the scheduling mode whether or not it is configured (used by SparkUI) + val schedulerMode = + if (!conf.contains("spark.scheduler.mode")) { + Seq(("spark.scheduler.mode", schedulingMode)) + } else { + Seq[(String, String)]() + } + val sparkProperties = (conf.getAll ++ schedulerMode).sorted // System properties that are not java classpaths val systemProperties = System.getProperties.iterator.toSeq - val classPathProperty = systemProperties.find { case (k, v) => - k == "java.class.path" - }.getOrElse(("", "")) val otherProperties = systemProperties.filter { case (k, v) => k != "java.class.path" && !k.startsWith("spark.") }.sorted // Class paths including all added jars and files + val classPathProperty = systemProperties.find { case (k, v) => + k == "java.class.path" + }.getOrElse(("", "")) val classPathEntries = classPathProperty._2 .split(conf.get("path.separator", ":")) .filterNot(e => e.isEmpty) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 4d94105ca5e1c..73c3b40fbe228 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.master.ui import javax.servlet.http.HttpServletRequest + import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.Logging @@ -33,12 +34,12 @@ private[spark] class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) - var serverInfo: Option[ServerInfo] = None private val host = Utils.localHostName() private val port = requestedPort private val applicationPage = new ApplicationPage(this) private val indexPage = new IndexPage(this) + private var serverInfo: Option[ServerInfo] = None private val handlers: Seq[ServletContextHandler] = { master.masterMetricsSystem.getServletHandlers ++ @@ -71,7 +72,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ def attachUI(ui: SparkUI) { - assert(serverInfo.isDefined, "Master UI must be initialized before attaching SparkUIs") + assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") val rootHandler = serverInfo.get.rootHandler for (handler <- ui.handlers) { rootHandler.addHandler(handler) @@ -83,7 +84,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */ def detachUI(ui: SparkUI) { - assert(serverInfo.isDefined, "Master UI must be initialized before detaching SparkUIs") + assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") val rootHandler = serverInfo.get.rootHandler for (handler <- ui.handlers) { if (handler.isStarted) { @@ -94,7 +95,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { } def stop() { - assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not initialized!") + assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!") serverInfo.get.server.stop() } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 74cd4d96b0964..5e0fc31fff22f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -339,10 +339,15 @@ private[spark] object Worker { actorSystem.awaitTermination() } - def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, - masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None) - : (ActorSystem, Int) = - { + def startSystemAndActor( + host: String, + port: Int, + webUiPort: Int, + cores: Int, + memory: Int, + masterUrls: Array[String], + workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { + // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val conf = new SparkConf val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 8ceaa5bbb0d1a..20ed5ef1dfe8b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.worker.ui import java.io.File import javax.servlet.http.HttpServletRequest + import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.Logging @@ -32,29 +33,30 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ private[spark] class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) - extends Logging { - val timeout = AkkaUtils.askTimeout(worker.conf) - val host = Utils.localHostName() - val port = requestedPort.getOrElse( - worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) - - var serverInfo: Option[ServerInfo] = None + extends Logging { - val indexPage = new IndexPage(this) - - val metricsHandlers = worker.metricsSystem.getServletHandlers + val timeout = AkkaUtils.askTimeout(worker.conf) - val handlers = metricsHandlers ++ Seq[ServletContextHandler]( - createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static/*"), - createServletHandler("/log", - (request: HttpServletRequest) => log(request), worker.securityMgr), - createServletHandler("/logPage", - (request: HttpServletRequest) => logPage(request), worker.securityMgr), - createServletHandler("/json", - (request: HttpServletRequest) => indexPage.renderJson(request), worker.securityMgr), - createServletHandler("/", - (request: HttpServletRequest) => indexPage.render(request), worker.securityMgr) - ) + private val host = Utils.localHostName() + private val port = requestedPort.getOrElse( + worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) + private val indexPage = new IndexPage(this) + private var serverInfo: Option[ServerInfo] = None + + private val handlers: Seq[ServletContextHandler] = { + worker.metricsSystem.getServletHandlers ++ + Seq[ServletContextHandler]( + createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static/*"), + createServletHandler("/log", + (request: HttpServletRequest) => log(request), worker.securityMgr), + createServletHandler("/logPage", + (request: HttpServletRequest) => logPage(request), worker.securityMgr), + createServletHandler("/json", + (request: HttpServletRequest) => indexPage.renderJson(request), worker.securityMgr), + createServletHandler("/", + (request: HttpServletRequest) => indexPage.render(request), worker.securityMgr) + ) + } def bind() { try { @@ -69,7 +71,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - def log(request: HttpServletRequest): String = { + private def log(request: HttpServletRequest): String = { val defaultBytes = 100 * 1024 val appId = Option(request.getParameter("appId")) @@ -96,7 +98,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I pre + Utils.offsetBytes(path, startByte, endByte) } - def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = { + private def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = { val defaultBytes = 100 * 1024 val appId = Option(request.getParameter("appId")) val executorId = Option(request.getParameter("executorId")) @@ -117,17 +119,14 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val (startByte, endByte) = getByteRange(path, offset, byteLength) val file = new File(path) val logLength = file.length - val logText = {Utils.offsetBytes(path, startByte, endByte)} - val linkToMaster =

Back to Master

- val range = Bytes {startByte.toString} - {endByte.toString} of {logLength} val backButton = if (startByte > 0) { + .format(params, logType, math.max(startByte - byteLength, 0), byteLength)}> @@ -144,7 +143,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I } @@ -173,29 +172,23 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I } /** Determine the byte range for a log or log page. */ - def getByteRange(path: String, offset: Option[Long], byteLength: Int) - : (Long, Long) = { + private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = { val defaultBytes = 100 * 1024 val maxBytes = 1024 * 1024 - val file = new File(path) val logLength = file.length() - val getOffset = offset.getOrElse(logLength-defaultBytes) - + val getOffset = offset.getOrElse(logLength - defaultBytes) val startByte = if (getOffset < 0) 0L else if (getOffset > logLength) logLength else getOffset - val logPageLength = math.min(byteLength, maxBytes) - val endByte = math.min(startByte + logPageLength, logLength) - (startByte, endByte) } def stop() { - assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not initialized!") + assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not bound to a server!") serverInfo.get.server.stop() } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 237f42b48a8c9..88625e79a5c68 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -72,7 +72,7 @@ class TaskMetrics extends Serializable { var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None /** - * Statuses of any blocks that have been updated as a result of this task. + * Storage statuses of any blocks that have been updated as a result of this task. */ var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 11a8d848c8e70..77c558ac46f6f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1093,7 +1093,7 @@ class DAGScheduler( "stageToInfos" -> stageToInfos, "jobIdToStageIds" -> jobIdToStageIds, "stageIdToJobIds" -> stageIdToJobIds). - foreach { case(s, t) => + foreach { case (s, t) => val sizeBefore = t.size t.clearOldValues(cleanupTime) logInfo("%s %d --> %d".format(s, sizeBefore, t.size)) @@ -1106,7 +1106,6 @@ class DAGScheduler( } metadataCleaner.cancel() taskScheduler.stop() - listenerBus.stop() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 4985f8c43e70d..217f8825c2ae9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -66,11 +66,16 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) } // Events that do not trigger a flush - override def onStageSubmitted(event: SparkListenerStageSubmitted) = logEvent(event) - override def onTaskStart(event: SparkListenerTaskStart) = logEvent(event) - override def onTaskGettingResult(event: SparkListenerTaskGettingResult) = logEvent(event) - override def onTaskEnd(event: SparkListenerTaskEnd) = logEvent(event) - override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate) = logEvent(event) + override def onStageSubmitted(event: SparkListenerStageSubmitted) = + logEvent(event) + override def onTaskStart(event: SparkListenerTaskStart) = + logEvent(event) + override def onTaskGettingResult(event: SparkListenerTaskGettingResult) = + logEvent(event) + override def onTaskEnd(event: SparkListenerTaskEnd) = + logEvent(event) + override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate) = + logEvent(event) // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted) = @@ -81,7 +86,7 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) logEvent(event, flushLogger = true) override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded) = logEvent(event, flushLogger = true) - override def onBlockManagerLost(event: SparkListenerBlockManagerLost) = + override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved) = logEvent(event, flushLogger = true) override def onUnpersistRDD(event: SparkListenerUnpersistRDD) = logEvent(event, flushLogger = true) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 9aec4164f58b4..353a48661b0f7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -54,11 +54,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { override def run() { while (true) { val event = eventQueue.take - val shutdown = postToAll(event) - if (shutdown) { + if (event == SparkListenerShutdown) { // Get out of the while loop and shutdown the daemon thread return } + postToAll(event) } } }.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 4e27d959d4534..db76178b65501 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -67,28 +67,35 @@ private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus } logPaths.foreach { path => - // In case there is an exception, keep track of the highest level stream to close it later - var streamToClose: Option[InputStream] = None - var currentLine = "" + // Keep track of input streams at all levels to close them later + // This is necessary because an exception can occur in between stream initializations + var fileStream: Option[InputStream] = None + var bufferedStream: Option[InputStream] = None + var compressStream: Option[InputStream] = None + var currentLine = "" try { - val fstream = fileSystem.open(path) - val bstream = new FastBufferedInputStream(fstream) - val cstream = if (compressed) compressionCodec.compressedInputStream(bstream) else bstream - streamToClose = Some(cstream) + currentLine = "" + fileStream = Some(fileSystem.open(path)) + bufferedStream = Some(new FastBufferedInputStream(fileStream.get)) + compressStream = + if (compressed) { + Some(compressionCodec.compressedInputStream(bufferedStream.get)) + } else bufferedStream // Parse each line as an event and post it to all attached listeners - val lines = Source.fromInputStream(cstream).getLines() + val lines = Source.fromInputStream(compressStream.get).getLines() lines.foreach { line => currentLine = line - val event = JsonProtocol.sparkEventFromJson(parse(line)) - postToAll(event) + postToAll(JsonProtocol.sparkEventFromJson(parse(line))) } } catch { case e: Exception => logError("Exception in parsing Spark event log %s".format(path), e) logError("Malformed line: %s\n".format(currentLine)) } finally { - streamToClose.foreach(_.close()) + fileStream.foreach(_.close()) + bufferedStream.foreach(_.close()) + compressStream.foreach(_.close()) } } fileSystem.close() diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index e4391bc56664b..d4eb0ac88d8e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -57,7 +57,8 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long) extends SparkListenerEvent -case class SparkListenerBlockManagerLost(blockManagerId: BlockManagerId) extends SparkListenerEvent +case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId) + extends SparkListenerEvent case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent @@ -116,9 +117,9 @@ trait SparkListener { def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) { } /** - * Called when an existing block manager has been lost + * Called when an existing block manager has been removed */ - def onBlockManagerLost(blockManagerLost: SparkListenerBlockManagerLost) { } + def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { } /** * Called when an RDD is manually unpersisted by the application @@ -130,6 +131,9 @@ trait SparkListener { * Simple SparkListener that logs a few summary statistics when each stage completes */ class StatsReportListener extends SparkListener with Logging { + + import org.apache.spark.scheduler.StatsReportListener._ + private val taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]() override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { @@ -141,7 +145,6 @@ class StatsReportListener extends SparkListener with Logging { } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { - import org.apache.spark.scheduler.StatsReportListener._ implicit val sc = stageCompleted this.logInfo("Finished stage: " + stageCompleted.stageInfo) showMillisDistribution("task runtime:", (info, _) => Some(info.duration), taskInfoMetrics) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 67d39c40291f6..729e120497571 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -34,9 +34,10 @@ private[spark] trait SparkListenerBus { } /** - * Post an event to all attached listeners. Return true if the shutdown event is posted. + * Post an event to all attached listeners. This does nothing if the event is + * SparkListenerShutdown. */ - def postToAll(event: SparkListenerEvent): Boolean = { + protected def postToAll(event: SparkListenerEvent) { event match { case stageSubmitted: SparkListenerStageSubmitted => sparkListeners.foreach(_.onStageSubmitted(stageSubmitted)) @@ -56,14 +57,11 @@ private[spark] trait SparkListenerBus { sparkListeners.foreach(_.onEnvironmentUpdate(environmentUpdate)) case blockManagerAdded: SparkListenerBlockManagerAdded => sparkListeners.foreach(_.onBlockManagerAdded(blockManagerAdded)) - case blockManagerLost: SparkListenerBlockManagerLost => - sparkListeners.foreach(_.onBlockManagerLost(blockManagerLost)) + case blockManagerRemoved: SparkListenerBlockManagerRemoved => + sparkListeners.foreach(_.onBlockManagerRemoved(blockManagerRemoved)) case unpersistRDD: SparkListenerUnpersistRDD => sparkListeners.foreach(_.onUnpersistRDD(unpersistRDD)) case SparkListenerShutdown => - return true - case _ => } - false } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 690718832a3d0..71584b6eb102a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -250,9 +250,7 @@ private[spark] class BlockManager( val inMemSize = Math.max(status.memSize, droppedMemorySize) val onDiskSize = status.diskSize master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) - } else { - true - } + } else true } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 7c479b17b46d9..ff2652b640272 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -37,10 +37,8 @@ import org.apache.spark.util.{AkkaUtils, Utils} * all slaves' block managers. */ private[spark] -class BlockManagerMasterActor( - val isLocal: Boolean, - conf: SparkConf, - listenerBus: LiveListenerBus) extends Actor with Logging { +class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus: LiveListenerBus) + extends Actor with Logging { // Mapping from block manager id to the block manager's information. private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo] @@ -163,8 +161,7 @@ class BlockManagerMasterActor( blockLocations.remove(locations) } } - val blockManagerLost = SparkListenerBlockManagerLost(blockManagerId) - listenerBus.post(blockManagerLost) + listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId)) } private def expireDeadHosts() { @@ -241,8 +238,7 @@ class BlockManagerMasterActor( blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor) } - val blockManagerAdded = SparkListenerBlockManagerAdded(id, maxMemSize) - listenerBus.post(blockManagerAdded) + listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize)) } private def updateBlockInfo( diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 7ed7cdd430e7a..488f1ea9628f5 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -73,11 +73,11 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) returnValues: Boolean): PutResult = { if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) - val putAttempt = tryToPut(blockId, values, sizeEstimate, true) + val putAttempt = tryToPut(blockId, values, sizeEstimate, deserialized = true) PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks) } else { val bytes = blockManager.dataSerialize(blockId, values.iterator) - val putAttempt = tryToPut(blockId, bytes, bytes.limit, false) + val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false) PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) } } @@ -158,7 +158,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * blocks to free memory for one block, another thread may use up the freed space for * another block. * - * Return whether put was successful, along with the blocks dropped in the process + * Return whether put was successful, along with the blocks dropped in the process. */ private def tryToPut( blockId: BlockId, diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala b/core/src/main/scala/org/apache/spark/storage/PutResult.scala index cd3f61e75d574..f0eac7594ecf6 100644 --- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala +++ b/core/src/main/scala/org/apache/spark/storage/PutResult.scala @@ -29,4 +29,4 @@ import java.nio.ByteBuffer private[spark] case class PutResult( size: Long, data: Either[Iterator[_], ByteBuffer], - droppedBlocks: Seq[(BlockId, BlockStatus)] = Seq()) + droppedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index fd6b82f4a64d5..26565f56ad858 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -51,14 +51,12 @@ private[spark] class StorageStatusListener extends SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val info = taskEnd.taskInfo - if (info != null) { + val metrics = taskEnd.taskMetrics + if (info != null && metrics != null) { val execId = formatExecutorId(info.executorId) - val metrics = taskEnd.taskMetrics - if (metrics != null) { - val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq()) - if (updatedBlocks.length > 0) { - updateStorageStatus(execId, updatedBlocks) - } + val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + if (updatedBlocks.length > 0) { + updateStorageStatus(execId, updatedBlocks) } } } @@ -77,9 +75,9 @@ private[spark] class StorageStatusListener extends SparkListener { } } - override def onBlockManagerLost(blockManagerLost: SparkListenerBlockManagerLost) { + override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { synchronized { - val executorId = blockManagerLost.blockManagerId.executorId + val executorId = blockManagerRemoved.blockManagerId.executorId executorIdToStorageStatus.remove(executorId) } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index a1d1393348d49..6153dfe0b7e13 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -70,31 +70,41 @@ class RDDInfo(val id: Int, val name: String, val numPartitions: Int, val storage private[spark] object StorageUtils { - /** Returns RDD-level information from a list of StorageStatus objects and SparkContext */ + /** + * Returns basic information of all RDDs persisted in the given SparkContext. This does not + * include storage information. + */ + def rddInfoFromSparkContext(sc: SparkContext): Array[RDDInfo] = { + sc.persistentRdds.values.map { rdd => + val rddName = Option(rdd.name).getOrElse(rdd.id.toString) + val rddNumPartitions = rdd.partitions.size + val rddStorageLevel = rdd.getStorageLevel + val rddInfo = new RDDInfo(rdd.id, rddName, rddNumPartitions, rddStorageLevel) + rddInfo + }.toArray + } + + /** Returns storage information of all RDDs persisted in the given SparkContext. */ def rddInfoFromStorageStatus( - storageStatusList: Seq[StorageStatus], + storageStatuses: Seq[StorageStatus], sc: SparkContext): Array[RDDInfo] = { - val blockStatusMap = blockStatusMapFromStorageStatus(storageStatusList) - val rddInfoList = rddInfoFromSparkContext(blockStatusMap.keys.toSeq, sc) - val rddInfoMap = rddInfoList.map { info => (info.id, info) }.toMap - rddInfoFromBlockStatusMap(blockStatusMap, rddInfoMap) + rddInfoFromStorageStatus(storageStatuses, rddInfoFromSparkContext(sc)) } - /** - * Returns RDD-level information from a list of StorageStatus objects and an existing - * RDD ID to RDDInfo mapping - */ + /** Returns storage information of all RDDs in the given list. */ def rddInfoFromStorageStatus( - storageStatusList: Seq[StorageStatus], - rddInfoMap: Map[Int, RDDInfo]): Array[RDDInfo] = { - val blockStatusMap = blockStatusMapFromStorageStatus(storageStatusList) - rddInfoFromBlockStatusMap(blockStatusMap, rddInfoMap) - } + storageStatuses: Seq[StorageStatus], + rddInfos: Seq[RDDInfo]): Array[RDDInfo] = { + + // Mapping from RDD ID -> an array of associated BlockStatuses + val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap + .groupBy { case (k, _) => k.rddId } + .mapValues(_.values.toArray) - private def rddInfoFromBlockStatusMap( - blockStatusMap: Map[Int, Array[BlockStatus]], - rddInfoMap: Map[Int, RDDInfo]): Array[RDDInfo] = { - val rddInfos = blockStatusMap.map { case (rddId, blocks) => + // Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information) + val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap + + val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) => // Add up memory and disk sizes val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize > 0 } val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L) @@ -105,42 +115,28 @@ object StorageUtils { rddInfo.diskSize = diskSize rddInfo } - }.flatten.toArray - - scala.util.Sorting.quickSort(rddInfos) - rddInfos - } - - private def blockStatusMapFromStorageStatus(storageStatusList: Seq[StorageStatus]) - : Map[Int, Array[BlockStatus]] = { - val rddBlockMap = storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus] - rddBlockMap.groupBy { case (k, v) => k.rddId }.mapValues(_.values.toArray) - } - - private def rddInfoFromSparkContext(rddIds: Seq[Int], sc: SparkContext): Array[RDDInfo] = { - rddIds.flatMap { rddId => - sc.persistentRdds.get(rddId).map { r => - val rddName = Option(r.name).getOrElse(rddId.toString) - val rddNumPartitions = r.partitions.size - val rddStorageLevel = r.getStorageLevel - val rddInfo = new RDDInfo(rddId, rddName, rddNumPartitions, rddStorageLevel) - rddInfo - } }.toArray + + scala.util.Sorting.quickSort(rddStorageInfos) + rddStorageInfos } - /** Returns a map of blocks to their locations, compiled from a list of StorageStatus objects */ - def blockLocationsFromStorageStatus(storageStatusList: Seq[StorageStatus]) = { - val blockLocationPairs = - storageStatusList.flatMap(s => s.blocks.map(b => (b._1, s.blockManagerId.hostPort))) - blockLocationPairs.groupBy(_._1).map{case (k, v) => (k, v.unzip._2)}.toMap + /** Returns a mapping from BlockId to the locations of the associated block. */ + def blockLocationsFromStorageStatus( + storageStatuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = { + val blockLocationPairs = storageStatuses.flatMap { storageStatus => + storageStatus.blocks.map { case (bid, _) => (bid, storageStatus.blockManagerId.hostPort) } + } + blockLocationPairs.toMap + .groupBy { case (blockId, _) => blockId } + .mapValues(_.values.toSeq) } - /** Filters storage status by a given RDD id. */ + /** Filters the given list of StorageStatus by the given RDD ID. */ def filterStorageStatusByRDD( - storageStatusList: Seq[StorageStatus], - rddId: Int) : Array[StorageStatus] = { - storageStatusList.map { status => + storageStatuses: Seq[StorageStatus], + rddId: Int): Array[StorageStatus] = { + storageStatuses.map { status => val filteredBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toSeq val filteredBlockMap = mutable.Map[BlockId, BlockStatus](filteredBlocks: _*) new StorageStatus(status.blockManagerId, status.maxMem, filteredBlockMap) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a1bb6cabd3c9c..af7a737e01911 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -33,11 +33,13 @@ import org.json4s.jackson.JsonMethods.{pretty, render} import org.apache.spark.{Logging, SecurityManager, SparkConf} -/** Utilities for launching a web server using Jetty's HTTP Server class */ +/** + * Utilities for launching a web server using Jetty's HTTP Server class + */ private[spark] object JettyUtils extends Logging { + // Base type for a function that returns something based on an HTTP request. Allows for // implicit conversion from many types of functions to jetty Handlers. - type Responder[T] = HttpServletRequest => T class ServletParams[T <% AnyRef](val responder: Responder[T], @@ -102,7 +104,6 @@ private[spark] object JettyUtils extends Logging { srcPath: String, destPath: String, basePath: String = ""): ServletContextHandler = { - val prefixedSrcPath = attachPrefix(basePath, srcPath) val prefixedDestPath = attachPrefix(basePath, destPath) val servlet = new HttpServlet { override def doGet(request: HttpServletRequest, response: HttpServletResponse) { @@ -111,11 +112,7 @@ private[spark] object JettyUtils extends Logging { response.sendRedirect(newUrl) } } - val contextHandler = new ServletContextHandler - val holder = new ServletHolder(servlet) - contextHandler.setContextPath(prefixedSrcPath) - contextHandler.addServlet(holder, "/") - contextHandler + createServletHandler(srcPath, servlet, basePath) } /** Create a handler for serving files from a static directory */ @@ -133,6 +130,7 @@ private[spark] object JettyUtils extends Logging { contextHandler } + /** Add security filters, if any, do the given list of ServletContextHandlers */ private def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) { val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim()) filters.foreach { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index f5bff6dbce523..5ef08e4a924a9 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -29,7 +29,7 @@ import org.apache.spark.ui.jobs.JobProgressUI import org.apache.spark.ui.storage.BlockManagerUI import org.apache.spark.util.Utils -/** Top level user interface for Spark. */ +/** Top level user interface for Spark */ private[spark] class SparkUI( val sc: SparkContext, conf: SparkConf, @@ -106,7 +106,7 @@ private[spark] class SparkUI( } def stop() { - assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not initialized!") + assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not bound to a server!") serverInfo.get.server.stop() logInfo("Stopped Spark Web UI at %s".format(appUIAddress)) } diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala index acebaa68101a5..23e90c34d5b33 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala @@ -75,10 +75,10 @@ private[ui] class EnvironmentUI(parent: SparkUI) { * A SparkListener that prepares information to be displayed on the EnvironmentUI */ private[ui] class EnvironmentListener extends SparkListener { - var jvmInformation: Seq[(String, String)] = Seq() - var sparkProperties: Seq[(String, String)] = Seq() - var systemProperties: Seq[(String, String)] = Seq() - var classpathEntries: Seq[(String, String)] = Seq() + var jvmInformation = Seq[(String, String)]() + var sparkProperties = Seq[(String, String)]() + var systemProperties = Seq[(String, String)]() + var classpathEntries = Seq[(String, String)]() override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index 90604357d95b9..031ed88a493a8 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -151,11 +151,11 @@ private[ui] class ExecutorsUI(parent: SparkUI) { totalTasks, totalDuration, totalShuffleRead, - totalShuffleWrite - ) ++ Seq(maxMem) + totalShuffleWrite, + maxMem + ).map(_.toString) - val execValuesString = execValues.map(_.toString) - execFields.zip(execValuesString).toMap + execFields.zip(execValues).toMap } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index c2b2d41054e63..d10aa12b9ebca 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -247,9 +247,9 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { } } - override def onBlockManagerLost(blockManagerLost: SparkListenerBlockManagerLost) { + override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { synchronized { - val executorId = blockManagerLost.blockManagerId.executorId + val executorId = blockManagerRemoved.blockManagerId.executorId executorIdToBlockManagerId.remove(executorId) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 1427b1f6e0458..da7f20233063e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -136,8 +136,9 @@ private[ui] class StagePage(parent: JobProgressUI) { } } val gettingResultQuantiles = "Time spent fetching task results" +: - Distribution(gettingResultTimes).get.getQuantiles() - .map(millis => parent.formatDuration(millis.toLong)) + Distribution(gettingResultTimes).get.getQuantiles().map { millis => + parent.formatDuration(millis.toLong) + } // The scheduler delay includes the network delay to send the task to the worker // machine and to send back the result (but not the time to fetch the task result, // if it needed to be fetched from the block manager on the worker). @@ -152,8 +153,9 @@ private[ui] class StagePage(parent: JobProgressUI) { totalExecutionTime - metrics.get.executorRunTime } val schedulerDelayQuantiles = "Scheduler delay" +: - Distribution(schedulerDelays).get.getQuantiles() - .map(millis => parent.formatDuration(millis.toLong)) + Distribution(schedulerDelays).get.getQuantiles().map { millis => + parent.formatDuration(millis.toLong) + } def getQuantileCols(data: Seq[Double]) = Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong)) @@ -209,89 +211,88 @@ private[ui] class StagePage(parent: JobProgressUI) { } def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean) - (taskData: TaskUIData): Seq[Node] = { + (taskData: TaskUIData): Seq[Node] = { def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] = trace.map(e => {e.toString}) - val info = taskData.taskInfo - val metrics = taskData.taskMetrics - val exception = taskData.exception - - val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) - else metrics.map(_.executorRunTime).getOrElse(1L) - val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration) - else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("") - val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) - val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) - - val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) - val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") - val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") - - val maybeShuffleWrite = - metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten) - val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("") - val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("") - - val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) - val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("") - val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map { ms => - if (ms == 0) "" else parent.formatDuration(ms) - }.getOrElse("") - - val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled) - val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") - val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") - - val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled) - val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") - val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") - - - {info.index} - {info.taskId} - {info.status} - {info.taskLocality} - {info.host} - {dateFmt.format(new Date(info.launchTime))} - - {formatDuration} - - - {if (gcTime > 0) parent.formatDuration(gcTime) else ""} - - - {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""} - - {if (shuffleRead) { - - {shuffleReadReadable} - - }} - {if (shuffleWrite) { - - {writeTimeReadable} - - - {shuffleWriteReadable} - - }} - {if (bytesSpilled) { - - {memoryBytesSpilledReadable} + taskData match { case TaskUIData(info, metrics, exception) => + val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) + else metrics.map(_.executorRunTime).getOrElse(1L) + val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration) + else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("") + val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) + val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) + + val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) + val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") + val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") + + val maybeShuffleWrite = + metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten) + val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("") + val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("") + + val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) + val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("") + val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map { ms => + if (ms == 0) "" else parent.formatDuration(ms) + }.getOrElse("") + + val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled) + val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") + val memoryBytesSpilledReadable = + maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") + + val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled) + val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") + val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") + + + {info.index} + {info.taskId} + {info.status} + {info.taskLocality} + {info.host} + {dateFmt.format(new Date(info.launchTime))} + + {formatDuration} + + + {if (gcTime > 0) parent.formatDuration(gcTime) else ""} - - {diskBytesSpilledReadable} + + {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""} - }} - - {exception.map { e => - - {e.className} ({e.description})
- {fmtStackTrace(e.stackTrace)} -
- }.getOrElse("")} - - + {if (shuffleRead) { + + {shuffleReadReadable} + + }} + {if (shuffleWrite) { + + {writeTimeReadable} + + + {shuffleWriteReadable} + + }} + {if (bytesSpilled) { + + {memoryBytesSpilledReadable} + + + {diskBytesSpilledReadable} + + }} + + {exception.map { e => + + {e.className} ({e.description})
+ {fmtStackTrace(e.stackTrace)} +
+ }.getOrElse("")} + + + } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 466c014dfdf36..063c4bfebfeeb 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -85,7 +85,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) { case Some(t) => dateFmt.format(new Date(t)) case None => "Unknown" } - val finishTime = s.completionTime.getOrElse(System.currentTimeMillis()) + val finishTime = s.completionTime.getOrElse(System.currentTimeMillis) val duration = s.submissionTime.map(t => finishTime - t) val formattedDuration = duration.map(d => parent.formatDuration(d)).getOrElse("Unknown") val startedTasks = diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala index a3c4f2d08be93..4d8b01dbe6e1b 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala @@ -66,13 +66,14 @@ private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListe /** Update each RDD's info to reflect any updates to the RDD's storage status */ private def updateRDDInfo() { - val updatedRDDInfoList = StorageUtils.rddInfoFromStorageStatus(storageStatusList, _rddInfoMap) - updatedRDDInfoList.foreach { info => _rddInfoMap(info.id) = info } + val rddInfos = _rddInfoMap.values.toSeq + val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos) + updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info } } /** - * Assumes the storage status list is fully up-to-date. This implies that the corresponding - * StorageStatusSparkListener must + * Assumes the storage status list is fully up-to-date. This implies the corresponding + * StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener. */ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val metrics = taskEnd.taskMetrics @@ -88,11 +89,10 @@ private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListe override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { // Remove all partitions that are no longer cached - _rddInfoMap.retain { case (id, info) => info.numCachedPartitions > 0 } + _rddInfoMap.retain { case (_, info) => info.numCachedPartitions > 0 } } override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized { - super.onUnpersistRDD(unpersistRDD) updateRDDInfo() } } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 6f82194a108a1..3f42eba4ece00 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -33,20 +33,23 @@ private[ui] class RDDPage(parent: BlockManagerUI) { private lazy val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { + val rddId = request.getParameter("id").toInt val storageStatusList = listener.storageStatusList - val id = request.getParameter("id").toInt - val rddInfo = listener.rddInfoList.filter(_.id == id).head + val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse { + // Rather than crashing, render an "RDD Not Found" page + return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found", Storage) + } // Worker table - val workers = storageStatusList.map((id, _)) + val workers = storageStatusList.map((rddId, _)) val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers) // Block table - val filteredStorageStatusList = StorageUtils.filterStorageStatusByRDD(storageStatusList, id) + val filteredStorageStatusList = StorageUtils.filterStorageStatusByRDD(storageStatusList, rddId) val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).sortWith(_._1.name < _._1.name) val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList) val blocks = blockStatuses.map { case (blockId, status) => - (blockId, status, blockLocations.get(blockId).getOrElse(Seq("Unknown"))) + (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) } val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks) diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 409223b67737e..f07962096a32c 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -39,7 +39,7 @@ import org.apache.spark.io.CompressionCodec class FileLogger( logDir: String, conf: SparkConf = new SparkConf, - outputBufferSize: Int = 8 * 1024, + outputBufferSize: Int = 8 * 1024, // 8 KB compress: Boolean = false, overwrite: Boolean = true) extends Logging { @@ -59,7 +59,9 @@ class FileLogger( Some(createWriter()) } - /** Create a logging directory with the given path. */ + /** + * Create a logging directory with the given path. + */ private def createLogDir() { val path = new Path(logDir) if (fileSystem.exists(path)) { @@ -86,18 +88,14 @@ class FileLogger( /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). * Therefore, for local files, use FileOutputStream instead. */ val dstream = uri.getScheme match { - case "hdfs" | "s3" => - val path = new Path(logPath) - hadoopDataStream = Some(fileSystem.create(path, overwrite)) - hadoopDataStream.get - case "file" | null => // Second parameter is whether to append new FileOutputStream(logPath, !overwrite) - case unsupportedScheme => - throw new UnsupportedOperationException("File system scheme %s is not supported!" - .format(unsupportedScheme)) + case _ => + val path = new Path(logPath) + hadoopDataStream = Some(fileSystem.create(path, overwrite)) + hadoopDataStream.get } val bstream = new FastBufferedOutputStream(dstream, outputBufferSize) @@ -106,21 +104,20 @@ class FileLogger( } /** - * Log the message to the given writer + * Log the message to the given writer. * @param msg The message to be logged * @param withTime Whether to prepend message with a timestamp */ def log(msg: String, withTime: Boolean = false) { - var writeInfo = msg - if (withTime) { + val writeInfo = if (!withTime) msg else { val date = new Date(System.currentTimeMillis()) - writeInfo = DATE_FORMAT.format(date) + ": " + msg + DATE_FORMAT.format(date) + ": " + msg } writer.foreach(_.print(writeInfo)) } /** - * Log the message to the given writer as a new line + * Log the message to the given writer as a new line. * @param msg The message to be logged * @param withTime Whether to prepend message with a timestamp */ @@ -138,13 +135,17 @@ class FileLogger( hadoopDataStream.foreach(_.sync()) } - /** Close the writer. Any subsequent calls to log or flush will have no effect. */ + /** + * Close the writer. Any subsequent calls to log or flush will have no effect. + */ def close() { writer.foreach(_.close()) writer = None } - /** Start a writer for a new file if one does not already exit */ + /** + * Start a writer for a new file if one does not already exit. + */ def start() { writer.getOrElse { fileIndex += 1 diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 2b51597785b98..346f2b7856791 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -21,7 +21,6 @@ import java.util.{Properties, UUID} import scala.collection.JavaConverters._ import scala.collection.Map -import scala.collection.mutable import org.json4s.DefaultFormats import org.json4s.JsonDSL._ @@ -59,12 +58,13 @@ private[spark] object JsonProtocol { environmentUpdateToJson(environmentUpdate) case blockManagerAdded: SparkListenerBlockManagerAdded => blockManagerAddedToJson(blockManagerAdded) - case blockManagerLost: SparkListenerBlockManagerLost => - blockManagerLostToJson(blockManagerLost) + case blockManagerRemoved: SparkListenerBlockManagerRemoved => + blockManagerRemovedToJson(blockManagerRemoved) case unpersistRDD: SparkListenerUnpersistRDD => unpersistRDDToJson(unpersistRDD) - case SparkListenerShutdown => - shutdownToJson() + + // Not used, but keeps compiler happy + case SparkListenerShutdown => JNothing } } @@ -146,9 +146,9 @@ private[spark] object JsonProtocol { ("Maximum Memory" -> blockManagerAdded.maxMem) } - def blockManagerLostToJson(blockManagerLost: SparkListenerBlockManagerLost): JValue = { - val blockManagerId = blockManagerIdToJson(blockManagerLost.blockManagerId) - ("Event" -> Utils.getFormattedClassName(blockManagerLost)) ~ + def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = { + val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId) + ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~ ("Block Manager ID" -> blockManagerId) } @@ -157,9 +157,6 @@ private[spark] object JsonProtocol { ("RDD ID" -> unpersistRDD.rddId) } - def shutdownToJson(): JValue = { - "Event" -> Utils.getFormattedClassName(SparkListenerShutdown) - } /** ------------------------------------------------------------------- * * JSON serialization methods for classes SparkListenerEvents depend on | @@ -323,6 +320,7 @@ private[spark] object JsonProtocol { ("Disk Size" -> blockStatus.diskSize) } + /** ------------------------------ * * Util JSON serialization methods | * ------------------------------- */ @@ -357,6 +355,7 @@ private[spark] object JsonProtocol { ("Stack Trace" -> stackTraceToJson(exception.getStackTrace)) } + /** --------------------------------------------------- * * JSON deserialization methods for SparkListenerEvents | * ---------------------------------------------------- */ @@ -371,9 +370,8 @@ private[spark] object JsonProtocol { val jobEnd = Utils.getFormattedClassName(SparkListenerJobEnd) val environmentUpdate = Utils.getFormattedClassName(SparkListenerEnvironmentUpdate) val blockManagerAdded = Utils.getFormattedClassName(SparkListenerBlockManagerAdded) - val blockManagerLost = Utils.getFormattedClassName(SparkListenerBlockManagerLost) + val blockManagerRemoved = Utils.getFormattedClassName(SparkListenerBlockManagerRemoved) val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD) - val shutdown = Utils.getFormattedClassName(SparkListenerShutdown) (json \ "Event").extract[String] match { case `stageSubmitted` => stageSubmittedFromJson(json) @@ -385,9 +383,8 @@ private[spark] object JsonProtocol { case `jobEnd` => jobEndFromJson(json) case `environmentUpdate` => environmentUpdateFromJson(json) case `blockManagerAdded` => blockManagerAddedFromJson(json) - case `blockManagerLost` => blockManagerLostFromJson(json) + case `blockManagerRemoved` => blockManagerRemovedFromJson(json) case `unpersistRDD` => unpersistRDDFromJson(json) - case `shutdown` => SparkListenerShutdown } } @@ -450,15 +447,16 @@ private[spark] object JsonProtocol { SparkListenerBlockManagerAdded(blockManagerId, maxMem) } - def blockManagerLostFromJson(json: JValue): SparkListenerBlockManagerLost = { + def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = { val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") - SparkListenerBlockManagerLost(blockManagerId) + SparkListenerBlockManagerRemoved(blockManagerId) } def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = { SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int]) } + /** --------------------------------------------------------------------- * * JSON deserialization methods for classes SparkListenerEvents depend on | * ---------------------------------------------------------------------- */ @@ -669,6 +667,7 @@ private[spark] object JsonProtocol { BlockStatus(storageLevel, memorySize, diskSize) } + /** -------------------------------- * * Util JSON deserialization methods | * --------------------------------- */ diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 26be4bfdae1c2..13d9dbdd9af2d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -900,6 +900,7 @@ private[spark] object Utils extends Logging { obj.getClass.getSimpleName.replace("$", "") } + /** Return an option that translates JNothing to None */ def jsonOption(json: JValue): Option[JValue] = { json match { case JNothing => None @@ -907,6 +908,7 @@ private[spark] object Utils extends Logging { } } + /** Return an empty JSON object */ def emptyJson = JObject(List[JField]()) /** diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index da3933cff04cc..67c0a434c9b52 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -49,7 +49,7 @@ class JsonProtocolSuite extends FunSuite { )) val blockManagerAdded = SparkListenerBlockManagerAdded( BlockManagerId("Stars", "In your multitude...", 300, 400), 500) - val blockManagerLost = SparkListenerBlockManagerLost( + val blockManagerRemoved = SparkListenerBlockManagerRemoved( BlockManagerId("Scarce", "to be counted...", 100, 200)) val unpersistRdd = SparkListenerUnpersistRDD(12345) @@ -62,9 +62,8 @@ class JsonProtocolSuite extends FunSuite { testEvent(jobEnd, jobEndJsonString) testEvent(environmentUpdate, environmentUpdateJsonString) testEvent(blockManagerAdded, blockManagerAddedJsonString) - testEvent(blockManagerLost, blockManagerLostJsonString) + testEvent(blockManagerRemoved, blockManagerRemovedJsonString) testEvent(unpersistRdd, unpersistRDDJsonString) - testEvent(SparkListenerShutdown, shutdownJsonString) } test("Dependent Classes") { @@ -208,7 +207,7 @@ class JsonProtocolSuite extends FunSuite { case (e1: SparkListenerBlockManagerAdded, e2: SparkListenerBlockManagerAdded) => assert(e1.maxMem == e2.maxMem) assertEquals(e1.blockManagerId, e2.blockManagerId) - case (e1: SparkListenerBlockManagerLost, e2: SparkListenerBlockManagerLost) => + case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) => assertEquals(e1.blockManagerId, e2.blockManagerId) case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) => assert(e1.rddId == e2.rddId) @@ -546,9 +545,9 @@ class JsonProtocolSuite extends FunSuite { "Host":"In your multitude...","Port":300,"Netty Port":400},"Maximum Memory":500} """ - private val blockManagerLostJsonString = + private val blockManagerRemovedJsonString = """ - {"Event":"SparkListenerBlockManagerLost","Block Manager ID":{"Executor ID":"Scarce", + {"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"Scarce", "Host":"to be counted...","Port":100,"Netty Port":200}} """ @@ -557,8 +556,4 @@ class JsonProtocolSuite extends FunSuite { {"Event":"SparkListenerUnpersistRDD","RDD ID":12345} """ - private val shutdownJsonString = - """ - {"Event":"SparkListenerShutdown"} - """ }