From bbe3501c63029ffa9c1fd9053e7ab868d0f28b10 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 26 Feb 2014 15:27:43 -0800 Subject: [PATCH] Embed storage status and RDD info in Task events This commit achieves three main things. First and foremost, it embeds the information from the SparkListenerFetchStorageStatus and SparkListenerGetRDDInfo events into events that are more descriptive of the SparkListenerInterface. In particular, every Task now maintains a list of blocks whose storage status have been updated as a result of the task. Previously, this information is retrieved from fetching storage status from the driver, an action arbitrarily associated with a stage. This change involves keeping track of what blocks are dropped during each call to an RDD persist. A big TODO is to also capture the behavior of an RDD unpersist in a SparkListenerEvent. Second, the SparkListenerEvent interface now handles the dynamic nature of Executors. In particular, a new event, SparkListenerExecutorStateChange, is introduced, which triggers a storage status fetch from the driver. The purpose of this is mainly to decouple fetching storage status from the driver from the Stage. Note that storage status is not ready until the remote BlockManagers have been registered, so this involves attaching a registration listener to the BlockManagerMasterActor. Third, changes in environment properties is now supported. This accounts for the fact that the user can invoke sc.addFile and sc.addJar in his/her own application, which should be reflected appropriately on the EnvironmentUI. In the previous implementation, coupling this information with application start prevents this from happening. Other relatively minor changes include: 1) Refactoring BlockStatus and BlockManagerInfo to not be a part of the BlockManagerMasterActor object, 2) Formatting changes, especially those involving multi-line arguments, and 3) Making all UI widgets and listeners private[ui] instead of private[spark]. --- .../scala/org/apache/spark/CacheManager.scala | 16 +- .../scala/org/apache/spark/SparkContext.scala | 49 +++- .../scala/org/apache/spark/SparkEnv.scala | 48 ++-- .../apache/spark/deploy/worker/Worker.scala | 1 - .../apache/spark/executor/TaskMetrics.scala | 7 + .../apache/spark/scheduler/DAGScheduler.scala | 32 ++- .../spark/scheduler/SchedulerBackend.scala | 2 - .../spark/scheduler/SparkListener.scala | 22 +- .../spark/scheduler/SparkListenerBus.scala | 8 +- .../apache/spark/scheduler/StageInfo.scala | 9 +- .../cluster/SparkDeploySchedulerBackend.scala | 2 - .../apache/spark/storage/BlockManager.scala | 227 +++++++++++------- .../spark/storage/BlockManagerMaster.scala | 4 +- .../storage/BlockManagerMasterActor.scala | 157 ++++++------ .../BlockManagerRegistrationListener.scala | 51 ++++ .../apache/spark/storage/MemoryStore.scala | 82 ++++--- .../org/apache/spark/storage/PutResult.scala | 12 +- .../apache/spark/storage/StorageUtils.scala | 114 ++++++--- .../org/apache/spark/ui/UISparkListener.scala | 137 ++++++----- .../apache/spark/ui/env/EnvironmentUI.scala | 11 +- .../apache/spark/ui/exec/ExecutorsUI.scala | 14 +- .../spark/ui/jobs/ExecutorSummary.scala | 2 +- .../apache/spark/ui/jobs/ExecutorTable.scala | 2 +- .../org/apache/spark/ui/jobs/IndexPage.scala | 2 +- .../spark/ui/jobs/JobProgressListener.scala | 32 ++- .../apache/spark/ui/jobs/JobProgressUI.scala | 4 +- .../org/apache/spark/ui/jobs/PoolPage.scala | 2 +- .../org/apache/spark/ui/jobs/PoolTable.scala | 2 +- .../org/apache/spark/ui/jobs/StagePage.scala | 2 +- .../org/apache/spark/ui/jobs/StageTable.scala | 2 +- .../spark/ui/storage/BlockManagerUI.scala | 37 +-- .../apache/spark/ui/storage/IndexPage.scala | 4 +- .../org/apache/spark/ui/storage/RDDPage.scala | 19 +- .../org/apache/spark/util/JsonProtocol.scala | 171 ++++++------- .../spark/scheduler/SparkListenerSuite.scala | 10 +- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- 36 files changed, 754 insertions(+), 544 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/storage/BlockManagerRegistrationListener.scala diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index bca74825efd28..85abfd8e65045 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -18,13 +18,15 @@ package org.apache.spark import scala.collection.mutable.{ArrayBuffer, HashSet} + import org.apache.spark.storage.{BlockManager, StorageLevel, RDDBlockId} import org.apache.spark.rdd.RDD -/** Spark class responsible for passing RDDs split contents to the BlockManager and making - sure a node doesn't load two copies of an RDD at once. - */ +/** + * Spark class responsible for passing RDDs split contents to the BlockManager and making + * sure a node doesn't load two copies of an RDD at once. + */ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { /** Keys of RDD splits that are being computed/loaded. */ @@ -69,11 +71,17 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { // If we got here, we have to load the split logInfo("Partition %s not found, computing it".format(key)) val computedValues = rdd.computeOrReadCheckpoint(split, context) + // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } val elements = new ArrayBuffer[Any] elements ++= computedValues - blockManager.put(key, elements, storageLevel, tellMaster = true) + val updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true) + + // Update task metrics to include any updated blocks + val metrics = context.taskMetrics + metrics.updatedBlocks = Some(updatedBlocks ++ metrics.updatedBlocks.getOrElse(Seq())) + elements.iterator.asInstanceOf[Iterator[T]] } finally { loading.synchronized { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d024bcddb21f4..a43fd6b2f0c63 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -161,7 +161,7 @@ class SparkContext( // Add each JAR given through the constructor if (jars != null) { - jars.foreach(addJar) + jars.foreach { jar => addJar(jar, updateEnvironment = false) } } private[spark] val executorMemory = conf.getOption("spark.executor.memory") @@ -204,14 +204,14 @@ class SparkContext( taskScheduler.start() @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) - dagScheduler.start() + // Start the UI before the DAG scheduler, because the UI listens for Spark events ui.start() - // Trigger application start - val environmentDetails = SparkEnv.environmentDetails(this) - val applicationStart = new SparkListenerApplicationStart(environmentDetails) - dagScheduler.post(applicationStart) + dagScheduler.start() + dagScheduler.post(new SparkListenerApplicationStart(appName)) + + updateEnvironmentProperties() /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { @@ -631,7 +631,7 @@ class SparkContext( * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, * use `SparkFiles.get(path)` to find its download location. */ - def addFile(path: String) { + def addFile(path: String, updateEnvironment: Boolean = true) { val uri = new URI(path) val key = uri.getScheme match { case null | "file" => env.httpFileServer.addFile(new File(uri.getPath)) @@ -644,6 +644,9 @@ class SparkContext( Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) + if (updateEnvironment) { + updateEnvironmentProperties() + } } def addSparkListener(listener: SparkListener) { @@ -711,8 +714,11 @@ class SparkContext( * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. */ - def clearFiles() { + def clearFiles(updateEnvironment: Boolean = true) { addedFiles.clear() + if (updateEnvironment) { + updateEnvironmentProperties() + } } /** @@ -730,7 +736,7 @@ class SparkContext( * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */ - def addJar(path: String) { + def addJar(path: String, updateEnvironment: Boolean = true) { if (path == null) { logWarning("null specified as parameter to addJar") } else { @@ -774,14 +780,20 @@ class SparkContext( logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) } } + if (updateEnvironment) { + updateEnvironmentProperties() + } } /** * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to * any new nodes. */ - def clearJars() { + def clearJars(updateEnvironment: Boolean = true) { addedJars.clear() + if (updateEnvironment) { + updateEnvironmentProperties() + } } /** Shut down the SparkContext. */ @@ -798,8 +810,8 @@ class SparkContext( // TODO: Cache.stop()? env.stop() // Clean up locally linked files - clearFiles() - clearJars() + clearFiles(updateEnvironment = false) + clearJars(updateEnvironment = false) SparkEnv.set(null) ShuffleMapTask.clearCache() ResultTask.clearCache() @@ -1022,6 +1034,19 @@ class SparkContext( /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = nextRddId.getAndIncrement() + /** Update environment properties and post the corresponding event to the DAG scheduler */ + private def updateEnvironmentProperties() { + val schedulingMode = getSchedulingMode.toString + val addedJarPaths = addedJars.keys.toSeq + val addedFilePaths = addedFiles.keys.toSeq + val environmentDetails = + SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths) + val environmentUpdate = new SparkListenerEnvironmentUpdate(environmentDetails) + + // In case the DAG scheduler is not ready yet, first check whether its reference is valid + Option(dagScheduler).foreach(_.post(environmentUpdate)) + } + /** Called by MetadataCleaner to clean up the persistentRdds map periodically */ private[spark] def cleanup(cleanupTime: Long) { persistentRdds.clearOldValues(cleanupTime) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ff3f25388a631..86db2d0eb7f67 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -24,15 +24,15 @@ import scala.util.Properties import akka.actor._ +import com.google.common.collect.MapMaker + +import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster} import org.apache.spark.network.ConnectionManager import org.apache.spark.serializer.{Serializer, SerializerManager} +import org.apache.spark.storage._ import org.apache.spark.util.{Utils, AkkaUtils} -import org.apache.spark.api.python.PythonWorkerFactory - -import com.google.common.collect.MapMaker /** * Holds all the runtime environment objects for a running Spark instance (either master or worker), @@ -167,9 +167,18 @@ object SparkEnv extends Logging { } } - val blockManagerMaster = new BlockManagerMaster(registerOrLookup( - "BlockManagerMaster", - new BlockManagerMasterActor(isLocal, conf)), conf) + // Listen for block manager registration + val blockManagerListener = new BlockManagerRegistrationListener + lazy val blockManagerMasterActor = { + val actor = new BlockManagerMasterActor(isLocal, conf) + actor.registerListener(blockManagerListener) + actor + } + + val blockManagerMaster = + new BlockManagerMaster(registerOrLookup("BlockManagerMaster", blockManagerMasterActor), conf) + blockManagerMaster.registrationListener = Some(blockManagerListener) + val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf) @@ -243,7 +252,12 @@ object SparkEnv extends Logging { * attributes as a sequence of KV pairs. */ private[spark] - def environmentDetails(sc: SparkContext): Map[String, Seq[(String, String)]] = { + def environmentDetails( + conf: SparkConf, + schedulingMode: String, + addedJars: Seq[String], + addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = { + val jvmInformation = Seq( ("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)), ("Java Home", Properties.javaHome), @@ -251,15 +265,12 @@ object SparkEnv extends Logging { ("Scala Home", Properties.scalaHome) ).sorted - // Spark properties, including scheduling mode and app name whether or not they are configured + // Spark properties, including scheduling mode whether or not it is configured var additionalFields = Seq[(String, String)]() - sc.conf.getOption("spark.scheduler.mode").getOrElse { - additionalFields ++= Seq(("spark.scheduler.mode", sc.getSchedulingMode.toString)) - } - sc.conf.getOption("spark.app.name").getOrElse { - additionalFields ++= Seq(("spark.app.name", sc.appName)) + conf.getOption("spark.scheduler.mode").getOrElse { + additionalFields ++= Seq(("spark.scheduler.mode", schedulingMode)) } - val sparkProperties = sc.conf.getAll.sorted ++ additionalFields + val sparkProperties = conf.getAll.sorted ++ additionalFields val systemProperties = System.getProperties.iterator.toSeq val classPathProperty = systemProperties.find { case (k, v) => @@ -273,12 +284,11 @@ object SparkEnv extends Logging { // Class paths including all added jars and files val classPathEntries = classPathProperty._2 - .split(sc.conf.get("path.separator", ":")) + .split(conf.get("path.separator", ":")) .filterNot(e => e.isEmpty) .map(e => (e, "System Classpath")) - val addedJars = sc.addedJars.iterator.toSeq.map{ case (path, _) => (path, "Added By User") } - val addedFiles = sc.addedFiles.iterator.toSeq.map{ case (path, _) => (path, "Added By User") } - val classPaths = (addedJars ++ addedFiles ++ classPathEntries).sorted + val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) + val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted Map[String, Seq[(String, String)]]( "JVM Information" -> jvmInformation, diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f4ee0e2343849..7b0b7861b76e1 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -31,7 +31,6 @@ import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} -import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{AkkaUtils, Utils} diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index bf421da04d6fe..8b6ed9e38e99f 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,6 +17,8 @@ package org.apache.spark.executor +import org.apache.spark.storage.{BlockId, BlockStatus} + class TaskMetrics extends Serializable { /** * Host's name the task runs on @@ -68,6 +70,11 @@ class TaskMetrics extends Serializable { * here */ var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None + + /** + * If blocks have been updated as a result of this task, collect the statuses of this blocks here + */ + var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None } object TaskMetrics { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 74b7e501954ba..b9c1c236f45eb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -194,6 +194,9 @@ class DAGScheduler( } } })) + + // Start listening for block manager registration + blockManagerMaster.registrationListener.foreach(_.setListenerBus(listenerBus)) } def addSparkListener(listener: SparkListener) { @@ -545,14 +548,13 @@ class DAGScheduler( logInfo("Missing parents: " + getMissingParentStages(finalStage)) if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { // Compute very short actions like first() or take() with no parent stages locally. - listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) + post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) runLocally(job) } else { idToActiveJob(jobId) = job activeJobs += job resultStageToJob(finalStage) = job - listenerBus.post( - SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties)) + post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties)) submitStage(finalStage) } @@ -593,15 +595,15 @@ class DAGScheduler( task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, TASK_SIZE_TO_WARN)) } } - listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo)) + post(SparkListenerTaskStart(task.stageId, taskInfo)) case GettingResultEvent(task, taskInfo) => - listenerBus.post(SparkListenerTaskGettingResult(taskInfo)) + post(SparkListenerTaskGettingResult(taskInfo)) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) - listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo, taskMetrics)) + post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo, taskMetrics)) handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => @@ -619,7 +621,7 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, -1))) + post(SparkListenerJobEnd(job.jobId, JobFailed(error, -1))) } return true } @@ -697,7 +699,7 @@ class DAGScheduler( stageIdToStage -= s.id // but that won't get cleaned up via the normal paths through stageToInfos -= s // completion events or stage abort jobIdToStageIds -= job.jobId - listenerBus.post(SparkListenerJobEnd(job.jobId, jobResult)) + post(SparkListenerJobEnd(job.jobId, jobResult)) } } @@ -771,7 +773,7 @@ class DAGScheduler( // must be run listener before possible NotSerializableException // should be "StageSubmitted" first and then "JobEnded" - listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties)) + post(SparkListenerStageSubmitted(stageToInfos(stage), properties)) if (tasks.size > 0) { // Preemptively serialize a task to make sure it can be serialized. We are catching this @@ -820,7 +822,7 @@ class DAGScheduler( } logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stageToInfos(stage).completionTime = Some(System.currentTimeMillis()) - listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) + post(SparkListenerStageCompleted(stageToInfos(stage))) running -= stage } event.reason match { @@ -845,7 +847,7 @@ class DAGScheduler( resultStageToJob -= stage markStageAsFinished(stage) jobIdToStageIdsRemove(job.jobId) - listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) + post(SparkListenerJobEnd(job.jobId, JobSucceeded)) } job.listener.taskSucceeded(rt.outputId, event.result) } @@ -982,6 +984,8 @@ class DAGScheduler( logDebug("Additional executor lost message for " + execId + "(epoch " + currentEpoch + ")") } + val storageStatusList = blockManagerMaster.getStorageStatus + post(new SparkListenerExecutorsStateChange(storageStatusList)) } private def handleExecutorGained(execId: String, host: String) { @@ -990,6 +994,8 @@ class DAGScheduler( logInfo("Host gained which was in lost list earlier: " + host) failedEpoch -= execId } + // Do not trigger SparkListenerExecutorsStateChange, because it is already triggered in + // blockManagerMaster.registrationListener when a new BlockManager registers with the master } private def handleJobCancellation(jobId: Int) { @@ -1004,7 +1010,7 @@ class DAGScheduler( jobIdToStageIds -= jobId activeJobs -= job idToActiveJob -= jobId - listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id))) + post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id))) } } @@ -1027,7 +1033,7 @@ class DAGScheduler( idToActiveJob -= resultStage.jobId activeJobs -= job resultStageToJob -= resultStage - listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id))) + post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id))) } if (dependentStages.isEmpty) { logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 02bdbba825781..eefc8c232b564 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -17,8 +17,6 @@ package org.apache.spark.scheduler -import org.apache.spark.SparkContext - /** * A backend interface for scheduling systems that allows plugging in different ones under * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index a492cfc07cd78..6ae05e75b6387 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -24,7 +24,7 @@ import scala.collection.Map import org.apache.spark.util.{Utils, Distribution} import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.storage.{RDDInfo, StorageStatus} +import org.apache.spark.storage.StorageStatus sealed trait SparkListenerEvent @@ -45,19 +45,16 @@ case class SparkListenerJobStart(jobId: Int, stageIds: Seq[Int], properties: Pro case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent -case class SparkListenerApplicationStart(environmentDetails: Map[String, Seq[(String, String)]]) - extends SparkListenerEvent +case class SparkListenerApplicationStart(appName: String) extends SparkListenerEvent -/** An event used in the ExecutorsUI and BlockManagerUI to fetch storage status from SparkEnv */ -private[spark] case class SparkListenerStorageStatusFetch(storageStatusList: Seq[StorageStatus]) +case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]]) extends SparkListenerEvent -/** An event used in the BlockManagerUI to query information of persisted RDDs */ -private[spark] case class SparkListenerGetRDDInfo(rddInfoList: Seq[RDDInfo]) +case class SparkListenerExecutorsStateChange(storageStatusList: Seq[StorageStatus]) extends SparkListenerEvent /** An event used in the listener to shutdown the listener daemon thread. */ -private[scheduler] case object SparkListenerShutdown extends SparkListenerEvent +private[spark] case object SparkListenerShutdown extends SparkListenerEvent /** @@ -106,14 +103,15 @@ trait SparkListener { def onApplicationStart(applicationStart: SparkListenerApplicationStart) { } /** - * Called when Spark fetches storage statuses from the driver + * Called when environment properties have been updated */ - def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) { } + def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { } /** - * Called when Spark queries statuses of persisted RDD's + * Called when a new executor has joined, or an existing executor is lost */ - def onGetRDDInfo(getRDDInfo: SparkListenerGetRDDInfo) { } + def onExecutorsStateChange(executorsStateChange: SparkListenerExecutorsStateChange) { } + } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 2eb57996b4feb..3a7d0ff9408a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -71,10 +71,10 @@ private[spark] class SparkListenerBus extends Logging { listeners.foreach(_.onTaskEnd(taskEnd)) case applicationStart: SparkListenerApplicationStart => listeners.foreach(_.onApplicationStart(applicationStart)) - case storageStatusFetch: SparkListenerStorageStatusFetch => - listeners.foreach(_.onStorageStatusFetch(storageStatusFetch)) - case getRDDInfo: SparkListenerGetRDDInfo => - listeners.foreach(_.onGetRDDInfo(getRDDInfo)) + case environmentUpdate: SparkListenerEnvironmentUpdate => + listeners.foreach(_.onEnvironmentUpdate(environmentUpdate)) + case executorsStateChange: SparkListenerExecutorsStateChange => + listeners.foreach(_.onExecutorsStateChange(executorsStateChange)) case SparkListenerShutdown => return true case _ => diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 6cfe34e836f74..577932474c1fc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import scala.collection.mutable import org.apache.spark.executor.TaskMetrics +import org.apache.spark.storage.RDDInfo /** * Stores information about a stage to pass from the scheduler to SparkListeners. Also @@ -29,9 +30,8 @@ private[spark] class StageInfo( val stageId: Int, val name: String, - val rddName: String, - val numPartitions: Int, val numTasks: Int, + val rddInfo: RDDInfo, val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = mutable.Buffer[(TaskInfo, TaskMetrics)]()) { @@ -44,6 +44,9 @@ class StageInfo( private[spark] object StageInfo { def fromStage(stage: Stage): StageInfo = { - new StageInfo(stage.id, stage.name, stage.rdd.name, stage.numPartitions, stage.numTasks) + val rdd = stage.rdd + val rddName = Option(rdd.name).getOrElse(rdd.id.toString) + val rddInfo = new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel) + new StageInfo(stage.id, stage.name, stage.numTasks, rddInfo) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 04f35cca08262..016145dfde57f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -17,8 +17,6 @@ package org.apache.spark.scheduler.cluster -import scala.collection.mutable.HashMap - import org.apache.spark.{Logging, SparkContext} import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.deploy.{Command, ApplicationDescription} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 47ea144c0c799..8d32ceda32ae1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -21,22 +21,22 @@ import java.io.{File, InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} import scala.collection.mutable.{HashMap, ArrayBuffer} +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ import scala.util.Random import akka.actor.{ActorSystem, Cancellable, Props} -import scala.concurrent.{Await, Future} -import scala.concurrent.duration._ import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream} +import sun.nio.ch.DirectBuffer + import org.apache.spark.{SparkConf, Logging, SparkEnv, SparkException} import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.serializer.Serializer import org.apache.spark.util._ -import sun.nio.ch.DirectBuffer - private[spark] class BlockManager( executorId: String, actorSystem: ActorSystem, @@ -87,7 +87,7 @@ private[spark] class BlockManager( val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)), name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next) - // Pending reregistration action being executed asynchronously or null if none + // Pending re-registration action being executed asynchronously or null if none // is pending. Accesses should synchronize on asyncReregisterLock. var asyncReregisterTask: Future[Unit] = null val asyncReregisterLock = new Object @@ -117,8 +117,12 @@ private[spark] class BlockManager( /** * Construct a BlockManager with a memory limit set based on system properties. */ - def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster, - serializer: Serializer, conf: SparkConf) = { + def this( + execId: String, + actorSystem: ActorSystem, + master: BlockManagerMaster, + serializer: Serializer, + conf: SparkConf) = { this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf) } @@ -142,14 +146,15 @@ private[spark] class BlockManager( * an executor crash. * * This function deliberately fails silently if the master returns false (indicating that - * the slave needs to reregister). The error condition will be detected again by the next - * heart beat attempt or new block registration and another try to reregister all blocks + * the slave needs to re-register). The error condition will be detected again by the next + * heart beat attempt or new block registration and another try to re-register all blocks * will be made then. */ private def reportAllBlocks() { logInfo("Reporting " + blockInfo.size + " blocks to the master.") for ((blockId, info) <- blockInfo) { - if (!tryToReportBlockStatus(blockId, info)) { + val status = getUpdatedBlockStatus(blockId, info) + if (!tryToReportBlockStatus(blockId, info, status)) { logError("Failed to report " + blockId + " to master; giving up.") return } @@ -157,20 +162,20 @@ private[spark] class BlockManager( } /** - * Reregister with the master and report all blocks to it. This will be called by the heart beat + * Re-register with the master and report all blocks to it. This will be called by the heart beat * thread if our heartbeat to the block manager indicates that we were not registered. * * Note that this method must be called without any BlockInfo locks held. */ def reregister() { - // TODO: We might need to rate limit reregistering. - logInfo("BlockManager reregistering with master") + // TODO: We might need to rate limit re-registering. + logInfo("BlockManager re-registering with master") master.registerBlockManager(blockManagerId, maxMemory, slaveActor) reportAllBlocks() } /** - * Reregister with the master sometime soon. + * Re-register with the master sometime soon. */ def asyncReregister() { asyncReregisterLock.synchronized { @@ -186,7 +191,7 @@ private[spark] class BlockManager( } /** - * For testing. Wait for any pending asynchronous reregistration; otherwise, do nothing. + * For testing. Wait for any pending asynchronous re-registration; otherwise, do nothing. */ def waitForAsyncReregister() { val task = asyncReregisterTask @@ -205,15 +210,19 @@ private[spark] class BlockManager( * message reflecting the current status, *not* the desired storage level in its block info. * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. * - * droppedMemorySize exists to account for when block is dropped from memory to disk (so it - * is still valid). This ensures that update in master will compensate for the increase in + * droppedMemorySize exists to account for when the block is dropped from memory to disk (so + * it is still valid). This ensures that update in master will compensate for the increase in * memory on slave. */ - def reportBlockStatus(blockId: BlockId, info: BlockInfo, droppedMemorySize: Long = 0L) { - val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize) + def reportBlockStatus( + blockId: BlockId, + info: BlockInfo, + status: BlockStatus, + droppedMemorySize: Long = 0L) { + val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize) if (needReregister) { - logInfo("Got told to reregister updating block " + blockId) - // Reregistering will report our new block for free. + logInfo("Got told to re-register updating block " + blockId) + // Re-registering will report our new block for free. asyncReregister() } logDebug("Told master about block " + blockId) @@ -224,27 +233,41 @@ private[spark] class BlockManager( * which will be true if the block was successfully recorded and false if * the slave needs to re-register. */ - private def tryToReportBlockStatus(blockId: BlockId, info: BlockInfo, + private def tryToReportBlockStatus( + blockId: BlockId, + info: BlockInfo, + status: BlockStatus, droppedMemorySize: Long = 0L): Boolean = { - val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized { + val storageLevel = status.storageLevel + val inMemSize = Math.max(status.memSize, droppedMemorySize) + val onDiskSize = status.diskSize + if (info.tellMaster) { + master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) + } else { + true + } + } + + /** + * Return the updated storage status of the block with the given ID. More specifically, if + * the block is dropped from memory and possibly added to disk, return the new storage level + * and the updated in-memory and on-disk sizes. + */ + private def getUpdatedBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = { + val (newLevel, inMemSize, onDiskSize) = info.synchronized { info.level match { case null => - (StorageLevel.NONE, 0L, 0L, false) + (StorageLevel.NONE, 0L, 0L) case level => val inMem = level.useMemory && memoryStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) val storageLevel = StorageLevel(onDisk, inMem, level.deserialized, level.replication) - val memSize = if (inMem) memoryStore.getSize(blockId) else droppedMemorySize + val memSize = if (inMem) memoryStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L - (storageLevel, memSize, diskSize, info.tellMaster) + (storageLevel, memSize, diskSize) } } - - if (tellMaster) { - master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize) - } else { - true - } + BlockStatus(newLevel, inMemSize, onDiskSize) } /** @@ -392,10 +415,10 @@ private[spark] class BlockManager( /** * Get block from remote block managers as serialized bytes. */ - def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { + def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { logDebug("Getting remote block " + blockId + " as bytes") doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] - } + } private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") @@ -441,9 +464,8 @@ private[spark] class BlockManager( * so that we can control the maxMegabytesInFlight for the fetch. */ def getMultiple( - blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], serializer: Serializer) - : BlockFetcherIterator = { - + blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], + serializer: Serializer): BlockFetcherIterator = { val iter = if (conf.getBoolean("spark.shuffle.use.netty", false)) { new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer) @@ -455,8 +477,11 @@ private[spark] class BlockManager( iter } - def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean) - : Long = { + def put( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel, + tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = { val elements = new ArrayBuffer[Any] elements ++= values put(blockId, elements, level, tellMaster) @@ -476,32 +501,47 @@ private[spark] class BlockManager( } /** - * Put a new block of values to the block manager. Returns its (estimated) size in bytes. + * Put a new block of values to the block manager. Return a list of blocks updated as a + * result of this put. */ - def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, - tellMaster: Boolean = true) : Long = { + def put( + blockId: BlockId, + values: ArrayBuffer[Any], + level: StorageLevel, + tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { require(values != null, "Values is null") doPut(blockId, Left(values), level, tellMaster) } /** - * Put a new block of serialized bytes to the block manager. + * Put a new block of serialized bytes to the block manager. Return a list of blocks updated + * as a result of this put. */ - def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, - tellMaster: Boolean = true) { + def putBytes( + blockId: BlockId, + bytes: ByteBuffer, + level: StorageLevel, + tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { require(bytes != null, "Bytes is null") doPut(blockId, Right(bytes), level, tellMaster) } - private def doPut(blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer], - level: StorageLevel, tellMaster: Boolean = true): Long = { + private def doPut( + blockId: BlockId, + data: Either[ArrayBuffer[Any], ByteBuffer], + level: StorageLevel, + tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { + require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") + // Return value + val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + // Remember the block's storage level so that we can correctly drop it to disk if it needs // to be dropped right after it got put into memory. Note, however, that other threads will // not be able to get() this block until we call markReady on its BlockInfo. - val myInfo = { + val putBlockInfo = { val tinfo = new BlockInfo(level, tellMaster) // Do atomically ! val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) @@ -509,7 +549,7 @@ private[spark] class BlockManager( if (oldBlockOpt.isDefined) { if (oldBlockOpt.get.waitForReady()) { logWarning("Block " + blockId + " already exists on this machine; not re-adding it") - return oldBlockOpt.get.size + return updatedBlocks } // TODO: So the block info exists - but previous attempt to load it (?) failed. @@ -531,7 +571,7 @@ private[spark] class BlockManager( // Ditto for the bytes after the put var bytesAfterPut: ByteBuffer = null - // Size of the block in bytes (to return to caller) + // Size of the block in bytes var size = 0L // If we're storing bytes, then initiate the replication before storing them locally. @@ -545,7 +585,7 @@ private[spark] class BlockManager( null } - myInfo.synchronized { + putBlockInfo.synchronized { logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") @@ -562,6 +602,8 @@ private[spark] class BlockManager( case Right(newBytes) => bytesAfterPut = newBytes case Left(newIterator) => valuesAfterPut = newIterator } + // Keep track of which blocks are dropped from memory + res.droppedBlocks.foreach { block => updatedBlocks += block } } else { // Save directly to disk. // Don't get back the bytes unless we replicate them. @@ -585,20 +627,23 @@ private[spark] class BlockManager( // Now that the block is in either the memory or disk store, let other threads read it, // and tell the master about it. marked = true - myInfo.markReady(size) + putBlockInfo.markReady(size) + val putBlockStatus = getUpdatedBlockStatus(blockId, putBlockInfo) if (tellMaster) { - reportBlockStatus(blockId, myInfo) - } - } finally { - // If we failed at putting the block to memory/disk, notify other possible readers - // that it has failed, and then remove it from the block info map. - if (! marked) { - // Note that the remove must happen before markFailure otherwise another thread - // could've inserted a new BlockInfo before we remove it. - blockInfo.remove(blockId) - myInfo.markFailure() - logWarning("Putting block " + blockId + " failed") + reportBlockStatus(blockId, putBlockInfo, putBlockStatus) } + updatedBlocks += ((blockId, putBlockStatus)) + } catch { + case e: Exception => + // If we failed in putting the block to memory/disk, notify other possible readers + // that it has failed, and then remove it from the block info map. + if (!marked) { + // Note that the remove must happen before markFailure otherwise another thread + // could've inserted a new BlockInfo before we remove it. + blockInfo.remove(blockId) + putBlockInfo.markFailure() + logWarning("Putting block " + blockId + " failed") + } } } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) @@ -635,7 +680,7 @@ private[spark] class BlockManager( Utils.getUsedTimeMs(startTimeMs)) } - size + updatedBlocks } /** @@ -679,21 +724,31 @@ private[spark] class BlockManager( /** * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * store reaches its limit and needs to free up space. + * + * Return the block status if the given block has been updated, else None. */ - def dropFromMemory(blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer]) { + def dropFromMemory( + blockId: BlockId, + data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = { + logInfo("Dropping block " + blockId + " from memory") val info = blockInfo.get(blockId).orNull + + // If the block has not already been dropped if (info != null) { info.synchronized { // required ? As of now, this will be invoked only for blocks which are ready // But in case this changes in future, adding for consistency sake. - if (! info.waitForReady() ) { + if (!info.waitForReady()) { // If we get here, the block write failed. logWarning("Block " + blockId + " was marked as failure. Nothing to drop") - return + return None } + var blockIsUpdated = false val level = info.level + + // Drop to disk, if storage level requires if (level.useDisk && !diskStore.contains(blockId)) { logInfo("Writing block " + blockId + " to disk") data match { @@ -702,24 +757,33 @@ private[spark] class BlockManager( case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } + blockIsUpdated = true } + + // Actually drop from memory store val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L - val blockWasRemoved = memoryStore.remove(blockId) - if (!blockWasRemoved) { + val blockIsRemoved = memoryStore.remove(blockId) + if (blockIsRemoved) { + blockIsUpdated = true + } else { logWarning("Block " + blockId + " could not be dropped from memory as it does not exist") } + + val status = getUpdatedBlockStatus(blockId, info) if (info.tellMaster) { - reportBlockStatus(blockId, info, droppedMemorySize) + reportBlockStatus(blockId, info, status, droppedMemorySize) } if (!level.useDisk) { // The block is completely gone from this node; forget it so we can put() it again later. blockInfo.remove(blockId) } + if (blockIsUpdated) { + return Some(status) + } } - } else { - // The block has already been dropped } + None } /** @@ -751,7 +815,8 @@ private[spark] class BlockManager( } blockInfo.remove(blockId) if (tellMaster && info.tellMaster) { - reportBlockStatus(blockId, info) + val status = getUpdatedBlockStatus(blockId, info) + reportBlockStatus(blockId, info, status) } } else { // The block has already been removed; do nothing. @@ -786,7 +851,8 @@ private[spark] class BlockManager( iterator.remove() logInfo("Dropped block " + id) } - reportBlockStatus(id, info) + val status = getUpdatedBlockStatus(id, info) + reportBlockStatus(id, info, status) } } } @@ -896,9 +962,8 @@ private[spark] object BlockManager extends Logging { def blockIdsToBlockManagers( blockIds: Array[BlockId], env: SparkEnv, - blockManagerMaster: BlockManagerMaster = null) - : Map[BlockId, Seq[BlockManagerId]] = - { + blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[BlockManagerId]] = { + // blockManagerMaster != null is used in tests assert (env != null || blockManagerMaster != null) val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) { @@ -917,18 +982,14 @@ private[spark] object BlockManager extends Logging { def blockIdsToExecutorIds( blockIds: Array[BlockId], env: SparkEnv, - blockManagerMaster: BlockManagerMaster = null) - : Map[BlockId, Seq[String]] = - { + blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = { blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.executorId)) } def blockIdsToHosts( blockIds: Array[BlockId], env: SparkEnv, - blockManagerMaster: BlockManagerMaster = null) - : Map[BlockId, Seq[String]] = - { + blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = { blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host)) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index c54e4f2664753..f1c363caed581 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -28,7 +28,7 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging { +class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging { val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3) val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000) @@ -37,6 +37,8 @@ class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Lo val timeout = AkkaUtils.askTimeout(conf) + var registrationListener: Option[BlockManagerRegistrationListener] = None + /** Remove a dead executor from the driver actor. This is only called on the driver side. */ def removeExecutor(execId: String) { tell(RemoveExecutor(execId)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 893418fb8cad9..6a84a1217212f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.util.{HashMap => JHashMap} import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.concurrent.Future import scala.concurrent.duration._ @@ -39,8 +40,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Actor with Logging { // Mapping from block manager id to the block manager's information. - private val blockManagerInfo = - new mutable.HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo] + private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo] // Mapping from executor ID to block manager ID. private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] @@ -50,6 +50,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act private val akkaTimeout = AkkaUtils.askTimeout(conf) + private val listeners = new ArrayBuffer[BlockManagerRegistrationListener] + val slaveTimeout = conf.get("spark.storage.blockManagerSlaveTimeoutMs", "" + (BlockManager.getHeartBeatFrequency(conf) * 3)).toLong @@ -67,6 +69,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act super.preStart() } + def registerListener(listener: BlockManagerRegistrationListener) = listeners += listener + def receive = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => register(blockManagerId, maxMemSize, slaveActor) @@ -217,8 +221,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act private def storageStatus: Array[StorageStatus] = { blockManagerInfo.map { case(blockManagerId, info) => - import collection.JavaConverters._ - StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap) + val blockMap = mutable.Map[BlockId, BlockStatus](info.blocks.toSeq: _*) + new StorageStatus(blockManagerId, info.maxMem, blockMap) }.toArray } @@ -233,9 +237,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act case None => blockManagerIdByExecutor(id.executorId) = id } - blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo( - id, System.currentTimeMillis(), maxMemSize, slaveActor) + blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), + maxMemSize, slaveActor) } + listeners.foreach(_.onBlockManagerRegister(storageStatus)) } private def updateBlockInfo( @@ -307,97 +312,93 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act } -private[spark] -object BlockManagerMasterActor { +private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) - case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) +private[spark] class BlockManagerInfo( + val blockManagerId: BlockManagerId, + timeMs: Long, + val maxMem: Long, + val slaveActor: ActorRef) + extends Logging { - class BlockManagerInfo( - val blockManagerId: BlockManagerId, - timeMs: Long, - val maxMem: Long, - val slaveActor: ActorRef) - extends Logging { + private var _lastSeenMs: Long = timeMs + private var _remainingMem: Long = maxMem - private var _lastSeenMs: Long = timeMs - private var _remainingMem: Long = maxMem + // Mapping from block id to its status. + private val _blocks = new JHashMap[BlockId, BlockStatus] - // Mapping from block id to its status. - private val _blocks = new JHashMap[BlockId, BlockStatus] + logInfo("Registering block manager %s with %s RAM".format( + blockManagerId.hostPort, Utils.bytesToString(maxMem))) - logInfo("Registering block manager %s with %s RAM".format( - blockManagerId.hostPort, Utils.bytesToString(maxMem))) - - def updateLastSeenMs() { - _lastSeenMs = System.currentTimeMillis() - } + def updateLastSeenMs() { + _lastSeenMs = System.currentTimeMillis() + } - def updateBlockInfo(blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long) { + def updateBlockInfo(blockId: BlockId, storageLevel: StorageLevel, memSize: Long, + diskSize: Long) { - updateLastSeenMs() + updateLastSeenMs() - if (_blocks.containsKey(blockId)) { - // The block exists on the slave already. - val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel + if (_blocks.containsKey(blockId)) { + // The block exists on the slave already. + val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel - if (originalLevel.useMemory) { - _remainingMem += memSize - } + if (originalLevel.useMemory) { + _remainingMem += memSize } + } - if (storageLevel.isValid) { - // isValid means it is either stored in-memory or on-disk. - // But the memSize here indicates the data size in or dropped from memory, - // and the diskSize here indicates the data size in or dropped to disk. - // They can be both larger than 0, when a block is dropped from memory to disk. - // Therefore, a safe way to set BlockStatus is to set its info in accurate modes. - if (storageLevel.useMemory) { - _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0)) - _remainingMem -= memSize - logInfo("Added %s in memory on %s (size: %s, free: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), - Utils.bytesToString(_remainingMem))) - } - if (storageLevel.useDisk) { - _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize)) - logInfo("Added %s on disk on %s (size: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) - } - } else if (_blocks.containsKey(blockId)) { - // If isValid is not true, drop the block. - val blockStatus: BlockStatus = _blocks.get(blockId) - _blocks.remove(blockId) - if (blockStatus.storageLevel.useMemory) { - _remainingMem += blockStatus.memSize - logInfo("Removed %s on %s in memory (size: %s, free: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize), - Utils.bytesToString(_remainingMem))) - } - if (blockStatus.storageLevel.useDisk) { - logInfo("Removed %s on %s on disk (size: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize))) - } + if (storageLevel.isValid) { + // isValid means it is either stored in-memory or on-disk. + // But the memSize here indicates the data size in or dropped from memory, + // and the diskSize here indicates the data size in or dropped to disk. + // They can be both larger than 0, when a block is dropped from memory to disk. + // Therefore, a safe way to set BlockStatus is to set its info in accurate modes. + if (storageLevel.useMemory) { + _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0)) + _remainingMem -= memSize + logInfo("Added %s in memory on %s (size: %s, free: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), + Utils.bytesToString(_remainingMem))) + } + if (storageLevel.useDisk) { + _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize)) + logInfo("Added %s on disk on %s (size: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) + } + } else if (_blocks.containsKey(blockId)) { + // If isValid is not true, drop the block. + val blockStatus: BlockStatus = _blocks.get(blockId) + _blocks.remove(blockId) + if (blockStatus.storageLevel.useMemory) { + _remainingMem += blockStatus.memSize + logInfo("Removed %s on %s in memory (size: %s, free: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize), + Utils.bytesToString(_remainingMem))) + } + if (blockStatus.storageLevel.useDisk) { + logInfo("Removed %s on %s on disk (size: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize))) } } + } - def removeBlock(blockId: BlockId) { - if (_blocks.containsKey(blockId)) { - _remainingMem += _blocks.get(blockId).memSize - _blocks.remove(blockId) - } + def removeBlock(blockId: BlockId) { + if (_blocks.containsKey(blockId)) { + _remainingMem += _blocks.get(blockId).memSize + _blocks.remove(blockId) } + } - def remainingMem: Long = _remainingMem + def remainingMem: Long = _remainingMem - def lastSeenMs: Long = _lastSeenMs + def lastSeenMs: Long = _lastSeenMs - def blocks: JHashMap[BlockId, BlockStatus] = _blocks + def blocks: JHashMap[BlockId, BlockStatus] = _blocks - override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem + override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem - def clear() { - _blocks.clear() - } + def clear() { + _blocks.clear() } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerRegistrationListener.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerRegistrationListener.scala new file mode 100644 index 0000000000000..1825198c7741f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerRegistrationListener.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.scheduler._ +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +/** A listener for block manager state changes */ +private[spark] class BlockManagerRegistrationListener { + + private var _listenerBus: Option[SparkListenerBus] = None + + // Buffer any events received before the listener bus is ready + private val bufferedEvents = new ArrayBuffer[SparkListenerEvent] + with mutable.SynchronizedBuffer[SparkListenerEvent] + + /** + * Set the listener bus. If there are buffered events, post them all to the listener bus at once. + */ + def setListenerBus(listenerBus: SparkListenerBus) = { + _listenerBus = Some(listenerBus) + bufferedEvents.map(listenerBus.post) + } + + /** + * Called when a new BlockManager is registered with the master. If the listener bus is ready, + * post the event; otherwise, buffer it. + */ + def onBlockManagerRegister(storageStatus: Array[StorageStatus]) { + val executorsStateChange = new SparkListenerExecutorsStateChange(storageStatus) + _listenerBus.map(_.post(executorsStateChange)).getOrElse { + bufferedEvents += executorsStateChange + } + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 82089b923d190..c596c81446e81 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -19,7 +19,9 @@ package org.apache.spark.storage import java.util.LinkedHashMap import java.nio.ByteBuffer -import collection.mutable.ArrayBuffer + +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.util.{SizeEstimator, Utils} /** @@ -66,17 +68,15 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, - returnValues: Boolean) - : PutResult = { - + returnValues: Boolean): PutResult = { if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) - tryToPut(blockId, values, sizeEstimate, true) - PutResult(sizeEstimate, Left(values.iterator)) + val putAttempt = tryToPut(blockId, values, sizeEstimate, true) + PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks) } else { val bytes = blockManager.dataSerialize(blockId, values.iterator) - tryToPut(blockId, bytes, bytes.limit, false) - PutResult(bytes.limit(), Right(bytes.duplicate())) + val putAttempt = tryToPut(blockId, bytes, bytes.limit, false) + PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) } } @@ -141,19 +141,34 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) * size must also be passed by the caller. * - * Locks on the object putLock to ensure that all the put requests and its associated block + * Lock on the object putLock to ensure that all the put requests and its associated block * dropping is done by only on thread at a time. Otherwise while one thread is dropping * blocks to free memory for one block, another thread may use up the freed space for * another block. + * + * Return whether put was successful, along with the blocks dropped in the process */ - private def tryToPut(blockId: BlockId, value: Any, size: Long, deserialized: Boolean): Boolean = { + private def tryToPut( + blockId: BlockId, + value: Any, + size: Long, + deserialized: Boolean): ResultWithDroppedBlocks = { + // TODO: Its possible to optimize the locking by locking entries only when selecting blocks - // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been - // released, it must be ensured that those to-be-dropped blocks are not double counted for - // freeing up more space for another block that needs to be put. Only then the actually dropping - // of blocks (and writing to disk if necessary) can proceed in parallel. + // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has + // been released, it must be ensured that those to-be-dropped blocks are not double counted + // for freeing up more space for another block that needs to be put. Only then the actually + // dropping of blocks (and writing to disk if necessary) can proceed in parallel. + + var putSuccess = false + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + putLock.synchronized { - if (ensureFreeSpace(blockId, size)) { + val freeSpaceResult = ensureFreeSpace(blockId, size) + val enoughFreeSpace = freeSpaceResult.success + droppedBlocks ++= freeSpaceResult.droppedBlocks + + if (enoughFreeSpace) { val entry = new Entry(value, size, deserialized) entries.synchronized { entries.put(blockId, entry) @@ -166,7 +181,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) logInfo("Block %s stored as bytes to memory (size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) } - true + putSuccess = true } else { // Tell the block manager that we couldn't put it in memory so that it can drop it to // disk if the block allows disk storage. @@ -175,29 +190,33 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } else { Right(value.asInstanceOf[ByteBuffer].duplicate()) } - blockManager.dropFromMemory(blockId, data) - false + val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) + droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } } + ResultWithDroppedBlocks(putSuccess, droppedBlocks) } /** - * Tries to free up a given amount of space to store a particular block, but can fail and return - * false if either the block is bigger than our memory or it would require replacing another - * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that + * Try to free up a given amount of space to store a particular block, but can fail if + * either the block is bigger than our memory or it would require replacing another block + * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that * don't fit into memory that we want to avoid). * - * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks. + * Assume that a lock is held by the caller to ensure only one thread is dropping blocks. * Otherwise, the freed space may fill up before the caller puts in their new value. + * + * Return whether there is enough free space, along with the blocks dropped in the process. */ - private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): Boolean = { - + private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = { logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( space, currentMemory, maxMemory)) + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + if (space > maxMemory) { logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit") - return false + return ResultWithDroppedBlocks(success = false, droppedBlocks) } if (maxMemory - currentMemory < space) { @@ -216,7 +235,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (rddToAdd.isDefined && rddToAdd == getRddId(blockId)) { logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + "block from the same RDD") - return false + return ResultWithDroppedBlocks(success = false, droppedBlocks) } selectedBlocks += blockId selectedMemory += pair.getValue.size @@ -236,15 +255,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } else { Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } - blockManager.dropFromMemory(blockId, data) + val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) + droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } } - return true + return ResultWithDroppedBlocks(success = true, droppedBlocks) } else { - return false + return ResultWithDroppedBlocks(success = false, droppedBlocks) } } - true + ResultWithDroppedBlocks(success = true, droppedBlocks) } override def contains(blockId: BlockId): Boolean = { @@ -252,3 +272,5 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } +private case class ResultWithDroppedBlocks(success: Boolean, + droppedBlocks: Seq[(BlockId, BlockStatus)]) diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala b/core/src/main/scala/org/apache/spark/storage/PutResult.scala index 2eba2f06b5bfd..cd3f61e75d574 100644 --- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala +++ b/core/src/main/scala/org/apache/spark/storage/PutResult.scala @@ -20,7 +20,13 @@ package org.apache.spark.storage import java.nio.ByteBuffer /** - * Result of adding a block into a BlockStore. Contains its estimated size, and possibly the - * values put if the caller asked for them to be returned (e.g. for chaining replication) + * Result of adding a block into a BlockStore. This case class contains a few things: + * (1) The estimated size of the put, + * (2) The values put if the caller asked for them to be returned (e.g. for chaining + * replication), and + * (3) A list of blocks dropped as a result of this put. This is always empty for DiskStore. */ -private[spark] case class PutResult(size: Long, data: Either[Iterator[_], ByteBuffer]) +private[spark] case class PutResult( + size: Long, + data: Either[Iterator[_], ByteBuffer], + droppedBlocks: Seq[(BlockId, BlockStatus)] = Seq()) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 0126229686aa2..d89d5061e4849 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -17,14 +17,18 @@ package org.apache.spark.storage +import scala.collection.mutable +import scala.collection.Map + import org.apache.spark.SparkContext import org.apache.spark.util.Utils -import BlockManagerMasterActor.BlockStatus private[spark] -case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, - blocks: Map[BlockId, BlockStatus]) { +class StorageStatus( + val blockManagerId: BlockManagerId, + val maxMem: Long, + val blocks: mutable.Map[BlockId, BlockStatus]) { def memUsed() = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L) @@ -44,9 +48,14 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, } } -case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, - numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) +private[spark] +class RDDInfo(val id: Int, val name: String, val numPartitions: Int, val storageLevel: StorageLevel) extends Ordered[RDDInfo] { + + var numCachedPartitions = 0 + var memSize = 0L + var diskSize = 0L + override def toString = { ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " + "DiskSize: %s").format(name, id, storageLevel.toString, numCachedPartitions, @@ -62,55 +71,80 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, private[spark] object StorageUtils { - /* Returns RDD-level information, compiled from a list of StorageStatus objects */ - def rddInfoFromStorageStatus(storageStatusList: Seq[StorageStatus], - sc: SparkContext) : Array[RDDInfo] = { - rddInfoFromBlockStatusList( - storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus], sc) + /** Returns RDD-level information from a list of StorageStatus objects and SparkContext */ + def rddInfoFromStorageStatus( + storageStatusList: Seq[StorageStatus], + sc: SparkContext) : Array[RDDInfo] = { + val blockStatusMap = blockStatusMapFromStorageStatus(storageStatusList) + val rddInfoList = rddInfoFromSparkContext(blockStatusMap.keys.toSeq, sc) + val rddInfoMap = rddInfoList.map { info => (info.id, info) }.toMap + rddInfoFromBlockStatusMap(blockStatusMap, rddInfoMap) } - /* Returns a map of blocks to their locations, compiled from a list of StorageStatus objects */ - def blockLocationsFromStorageStatus(storageStatusList: Seq[StorageStatus]) = { - val blockLocationPairs = storageStatusList - .flatMap(s => s.blocks.map(b => (b._1, s.blockManagerId.hostPort))) - blockLocationPairs.groupBy(_._1).map{case (k, v) => (k, v.unzip._2)}.toMap + /** + * Returns RDD-level information from a list of StorageStatus objects and an existing + * RDD ID to RDDInfo mapping + */ + def rddInfoFromStorageStatus( + storageStatusList: Seq[StorageStatus], + rddInfoMap: Map[Int, RDDInfo]): Array[RDDInfo] = { + val blockStatusMap = blockStatusMapFromStorageStatus(storageStatusList) + rddInfoFromBlockStatusMap(blockStatusMap, rddInfoMap) } - /* Given a list of BlockStatus objects, returns information for each RDD */ - def rddInfoFromBlockStatusList(infos: Map[RDDBlockId, BlockStatus], - sc: SparkContext) : Array[RDDInfo] = { + private def rddInfoFromBlockStatusMap( + blockStatusMap: Map[Int, Array[BlockStatus]], + rddInfoMap: Map[Int, RDDInfo]): Array[RDDInfo] = { + val rddInfos = blockStatusMap.map { case (rddId, blocks) => + // Add up memory and disk sizes + val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize > 0 } + val memSize = persistedBlocks.map(_.memSize).reduceOption(_+_).getOrElse(0L) + val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_+_).getOrElse(0L) + rddInfoMap.get(rddId).map { rddInfo => + rddInfo.numCachedPartitions = persistedBlocks.length + rddInfo.memSize = memSize + rddInfo.diskSize = diskSize + rddInfo + } + }.flatten.toArray - // Group by rddId, ignore the partition name - val groupedRddBlocks = infos.groupBy { case (k, v) => k.rddId }.mapValues(_.values.toArray) + scala.util.Sorting.quickSort(rddInfos) + rddInfos + } - // For each RDD, generate an RDDInfo object - val rddInfos = groupedRddBlocks.map { case (rddId, rddBlocks) => - // Add up memory and disk sizes - val memSize = rddBlocks.map(_.memSize).reduce(_ + _) - val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _) + private def blockStatusMapFromStorageStatus(storageStatusList: Seq[StorageStatus]) + : Map[Int, Array[BlockStatus]] = { + val rddBlockMap = storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus] + rddBlockMap.groupBy { case (k, v) => k.rddId }.mapValues(_.values.toArray) + } - // Get the friendly name and storage level for the RDD, if available + private def rddInfoFromSparkContext(rddIds: Seq[Int], sc: SparkContext): Array[RDDInfo] = { + rddIds.flatMap { rddId => sc.persistentRdds.get(rddId).map { r => val rddName = Option(r.name).getOrElse(rddId.toString) + val rddNumPartitions = r.partitions.size val rddStorageLevel = r.getStorageLevel - RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size, - memSize, diskSize) + val rddInfo = new RDDInfo(rddId, rddName, rddNumPartitions, rddStorageLevel) + rddInfo } - }.flatten.toArray - - scala.util.Sorting.quickSort(rddInfos) - - rddInfos + }.toArray } - /* Filters storage status by a given RDD id. */ - def filterStorageStatusByRDD(storageStatusList: Array[StorageStatus], rddId: Int) - : Array[StorageStatus] = { + /** Returns a map of blocks to their locations, compiled from a list of StorageStatus objects */ + def blockLocationsFromStorageStatus(storageStatusList: Seq[StorageStatus]) = { + val blockLocationPairs = + storageStatusList.flatMap(s => s.blocks.map(b => (b._1, s.blockManagerId.hostPort))) + blockLocationPairs.groupBy(_._1).map{case (k, v) => (k, v.unzip._2)}.toMap + } + /** Filters storage status by a given RDD id. */ + def filterStorageStatusByRDD( + storageStatusList: Seq[StorageStatus], + rddId: Int) : Array[StorageStatus] = { storageStatusList.map { status => - val newBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toMap[BlockId, BlockStatus] - //val newRemainingMem = status.maxMem - newBlocks.values.map(_.memSize).reduce(_ + _) - StorageStatus(status.blockManagerId, status.maxMem, newBlocks) - } + val filteredBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toSeq + val filteredBlockMap = mutable.Map[BlockId, BlockStatus](filteredBlocks: _*) + new StorageStatus(status.blockManagerId, status.maxMem, filteredBlockMap) + }.toArray } } diff --git a/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala b/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala index 91e461c6fc7f1..25d8c3d5d001a 100644 --- a/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala @@ -17,17 +17,17 @@ package org.apache.spark.ui +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import net.liftweb.json.JsonAST._ + import org.apache.spark.scheduler._ -import org.apache.spark.SparkContext -import org.apache.spark.storage.StorageStatus +import org.apache.spark.storage._ import org.apache.spark.util.FileLogger import org.apache.spark.util.JsonProtocol -import net.liftweb.json.JsonAST._ - -private[spark] trait UISparkListener extends SparkListener +private[ui] trait UISparkListener extends SparkListener /** * A SparkListener that serves as an entry point for all events posted to the UI. @@ -42,7 +42,7 @@ private[spark] trait UISparkListener extends SparkListener * (2) If the UI is rendered from disk, GatewayUISparkListener replays each event deserialized * from the event logs to all attached listeners. */ -private[spark] class GatewayUISparkListener(parent: SparkUI, live: Boolean) extends SparkListener { +private[ui] class GatewayUISparkListener(parent: SparkUI, live: Boolean) extends SparkListener { // Log events only if the UI is live private val logger: Option[FileLogger] = if (live) Some(new FileLogger()) else None @@ -50,26 +50,28 @@ private[spark] class GatewayUISparkListener(parent: SparkUI, live: Boolean) exte // Children listeners for which this gateway is responsible private val listeners = ArrayBuffer[UISparkListener]() - def registerSparkListener(listener: UISparkListener) = { - listeners += listener - } + def registerSparkListener(listener: UISparkListener) = listeners += listener /** Log the event as JSON */ - private def logEvent(event: SparkListenerEvent) { + private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { val eventJson = JsonProtocol.sparkEventToJson(event) logger.foreach(_.logLine(compactRender(eventJson))) + if (flushLogger) { + logger.foreach(_.flush()) + } } + private def closeLogger() = logger.foreach(_.close()) + private def restartLogger() = logger.foreach(_.start()) + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { listeners.foreach(_.onStageSubmitted(stageSubmitted)) - logEvent(stageSubmitted) - logger.foreach(_.flush()) + logEvent(stageSubmitted, flushLogger = true) } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { listeners.foreach(_.onStageCompleted(stageCompleted)) - logEvent(stageCompleted) - logger.foreach(_.flush()) + logEvent(stageCompleted, flushLogger = true) } override def onTaskStart(taskStart: SparkListenerTaskStart) { @@ -87,72 +89,93 @@ private[spark] class GatewayUISparkListener(parent: SparkUI, live: Boolean) exte override def onJobStart(jobStart: SparkListenerJobStart) { listeners.foreach(_.onJobStart(jobStart)) - logger.foreach(_.start()) + restartLogger() logEvent(jobStart) } override def onJobEnd(jobEnd: SparkListenerJobEnd) { listeners.foreach(_.onJobEnd(jobEnd)) logEvent(jobEnd) - logger.foreach(_.close()) + closeLogger() } override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { - // Retrieve app name from the application start event // For live UI's, this should be equivalent to sc.appName - val sparkProperties = applicationStart.environmentDetails("Spark Properties").toMap - val appName = sparkProperties.get("spark.app.name") - appName.foreach(parent.setAppName) - + parent.setAppName(applicationStart.appName) listeners.foreach(_.onApplicationStart(applicationStart)) - logEvent(applicationStart) - logger.foreach(_.flush()) + logEvent(applicationStart, flushLogger = true) } - override def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) { - listeners.foreach(_.onStorageStatusFetch(storageStatusFetch)) - logEvent(storageStatusFetch) - logger.foreach(_.flush()) + override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { + listeners.foreach(_.onEnvironmentUpdate(environmentUpdate)) + logEvent(environmentUpdate) } - override def onGetRDDInfo(getRDDInfo: SparkListenerGetRDDInfo) { - listeners.foreach(_.onGetRDDInfo(getRDDInfo)) - logEvent(getRDDInfo) - logger.foreach(_.flush()) + override def onExecutorsStateChange(executorsStateChange: SparkListenerExecutorsStateChange) { + listeners.foreach(_.onExecutorsStateChange(executorsStateChange)) + logEvent(executorsStateChange, flushLogger = true) } } /** - * A SparkListener that fetches storage information from SparkEnv. - * - * The frequency at which this occurs is by default every time a stage event is triggered. - * This needs not be the case, however; a stage can be arbitrarily long, so any failure - * in the middle of a stage causes the storage status for that stage to be lost. + * A UISparkListener that maintains executor storage status */ -private[spark] class StorageStatusFetchSparkListener( - sc: SparkContext, - gateway: GatewayUISparkListener, - live: Boolean) - extends UISparkListener { - var storageStatusList: Seq[StorageStatus] = Seq() - - /** - * Fetch storage information from SparkEnv, which involves a query to the driver. This is - * expensive and should be invoked sparingly. - */ - def fetchStorageStatus() { - if (live) { - // Fetch only this is a live UI - val storageStatus = sc.getExecutorStorageStatus - val event = new SparkListenerStorageStatusFetch(storageStatus) - gateway.onStorageStatusFetch(event) +private[ui] class StorageStatusSparkListener extends UISparkListener { + var storageStatusList = Seq[StorageStatus]() + + /** Update storage status list to reflect updated block statuses */ + def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { + val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId) + filteredStatus.foreach { storageStatus => + updatedBlocks.foreach { case (blockId, updatedStatus) => + storageStatus.blocks(blockId) = updatedStatus + } } } - override def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) { - storageStatusList = storageStatusFetch.storageStatusList + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val execId = taskEnd.taskInfo.executorId + val updatedBlocks = taskEnd.taskMetrics.updatedBlocks.getOrElse(Seq()) + if (updatedBlocks.length > 0) { + updateStorageStatus(execId, updatedBlocks) + } } - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = fetchStorageStatus() - override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = fetchStorageStatus() + override def onExecutorsStateChange(executorsStateChange: SparkListenerExecutorsStateChange) { + storageStatusList = executorsStateChange.storageStatusList + } +} + +/** + * A UISparkListener that maintains RDD information + */ +private[ui] class RDDInfoSparkListener extends StorageStatusSparkListener { + private val _rddInfoMap = mutable.Map[Int, RDDInfo]() + + /** Filter RDD info to include only those with cached partitions */ + def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq + + /** Update each RDD's info to reflect any updates to the RDD's storage status */ + private def updateRDDInfo() { + val updatedRDDInfoList = StorageUtils.rddInfoFromStorageStatus(storageStatusList, _rddInfoMap) + updatedRDDInfoList.foreach { info => _rddInfoMap(info.id) = info } + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + super.onTaskEnd(taskEnd) + if (taskEnd.taskMetrics.updatedBlocks.isDefined) { + updateRDDInfo() + } + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { + val rddInfo = stageSubmitted.stageInfo.rddInfo + _rddInfoMap(rddInfo.id) = rddInfo + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { + // Remove all partitions that are no longer cached + // TODO(aor): Handle unpersist + _rddInfoMap.retain { case (id, info) => info.numCachedPartitions > 0 } + } } diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala index 3cc85f6629b76..798c13f65b862 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala @@ -23,13 +23,12 @@ import scala.xml.Node import org.eclipse.jetty.server.Handler -import org.apache.spark.SparkContext import org.apache.spark.scheduler._ import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.Page.Environment import org.apache.spark.ui._ -private[spark] class EnvironmentUI(parent: SparkUI) { +private[ui] class EnvironmentUI(parent: SparkUI) { val live = parent.live val sc = parent.sc @@ -40,7 +39,7 @@ private[spark] class EnvironmentUI(parent: SparkUI) { def start() { val gateway = parent.gatewayListener - _listener = Some(new EnvironmentListener(sc)) + _listener = Some(new EnvironmentListener()) gateway.registerSparkListener(listener) } @@ -78,14 +77,14 @@ private[spark] class EnvironmentUI(parent: SparkUI) { /** * A SparkListener that prepares information to be displayed on the EnvironmentUI */ -private[spark] class EnvironmentListener(sc: SparkContext) extends UISparkListener { +private[ui] class EnvironmentListener extends UISparkListener { var jvmInformation: Seq[(String, String)] = Seq() var sparkProperties: Seq[(String, String)] = Seq() var systemProperties: Seq[(String, String)] = Seq() var classpathEntries: Seq[(String, String)] = Seq() - override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { - val environmentDetails = applicationStart.environmentDetails + override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { + val environmentDetails = environmentUpdate.environmentDetails jvmInformation = environmentDetails("JVM Information") sparkProperties = environmentDetails("Spark Properties") systemProperties = environmentDetails("System Properties") diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index 13e332e1e20a0..58f79e1435e75 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -24,14 +24,14 @@ import scala.xml.Node import org.eclipse.jetty.server.Handler -import org.apache.spark.{SparkContext, ExceptionFailure} +import org.apache.spark.ExceptionFailure import org.apache.spark.scheduler._ import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.Page.Executors import org.apache.spark.ui._ import org.apache.spark.util.Utils -private[spark] class ExecutorsUI(parent: SparkUI) { +private[ui] class ExecutorsUI(parent: SparkUI) { val live = parent.live val sc = parent.sc @@ -42,7 +42,7 @@ private[spark] class ExecutorsUI(parent: SparkUI) { def start() { val gateway = parent.gatewayListener - _listener = Some(new ExecutorsListener(sc, gateway, live)) + _listener = Some(new ExecutorsListener()) gateway.registerSparkListener(listener) } @@ -51,7 +51,6 @@ private[spark] class ExecutorsUI(parent: SparkUI) { ) def render(request: HttpServletRequest): Seq[Node] = { - listener.fetchStorageStatus() val storageStatusList = listener.storageStatusList val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _) val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _) @@ -164,11 +163,7 @@ private[spark] class ExecutorsUI(parent: SparkUI) { /** * A SparkListener that prepares information to be displayed on the ExecutorsUI */ -private[spark] class ExecutorsListener( - sc: SparkContext, - gateway: GatewayUISparkListener, - live: Boolean) - extends StorageStatusFetchSparkListener(sc, gateway, live) { +private[ui] class ExecutorsListener extends StorageStatusSparkListener { val executorToTasksActive = HashMap[String, Int]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() @@ -203,6 +198,7 @@ private[spark] class ExecutorsListener( executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten } } + super.onTaskEnd(taskEnd) } /** diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index 64e22a30b48f9..1dfe1d4f1fa11 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -18,7 +18,7 @@ package org.apache.spark.ui.jobs /** class for reporting aggregated metrics for each executors in stageUI */ -private[spark] class ExecutorSummary { +private[ui] class ExecutorSummary { var taskTime : Long = 0 var failedTasks : Int = 0 var succeededTasks : Int = 0 diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index a37bc7f9aeb9d..49581bc6beb1e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -23,7 +23,7 @@ import scala.xml.Node import org.apache.spark.util.Utils /** Page showing executor summary */ -private[spark] class ExecutorTable(stageId: Int, parent: JobProgressUI) { +private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) { private def listener = parent.listener def toNodeSeq: Seq[Node] = { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index f753741cb26ac..e9657cff34151 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -25,7 +25,7 @@ import org.apache.spark.ui.Page._ import org.apache.spark.ui.UIUtils /** Page showing list of all ongoing and recently finished stages and pools*/ -private[spark] class IndexPage(parent: JobProgressUI) { +private[ui] class IndexPage(parent: JobProgressUI) { private val live = parent.live private val sc = parent.sc private def appName = parent.appName diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 2f4a3fd52f6cf..495ea62d75739 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -21,9 +21,9 @@ import scala.collection.mutable.{ListBuffer, HashMap} import org.apache.spark.{ExceptionFailure, SparkContext, Success} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.scheduler._ -import org.apache.spark.ui.{GatewayUISparkListener, StorageStatusFetchSparkListener} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler._ +import org.apache.spark.ui.StorageStatusSparkListener /** * Tracks task-level information to be displayed in the UI. @@ -32,11 +32,9 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode * class, since the UI thread and the DAGScheduler event loop may otherwise * be reading/updating the internal data structures concurrently. */ -private[spark] class JobProgressListener( - sc: SparkContext, - gateway: GatewayUISparkListener, - live: Boolean) - extends StorageStatusFetchSparkListener(sc, gateway, live) { +private[ui] class JobProgressListener(sc: SparkContext, live: Boolean) + extends StorageStatusSparkListener { + import JobProgressListener._ // How many stages to remember @@ -75,7 +73,7 @@ private[spark] class JobProgressListener( override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { val stage = stageCompleted.stageInfo val stageId = stage.stageId - // Remove by stageId, rather than by StageInfo, in case the StageInfo is from disk + // Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage poolToActiveStages(stageIdToPool(stageId)).remove(stageId) activeStages.remove(stageId) completedStages += stage @@ -172,7 +170,7 @@ private[spark] class JobProgressListener( } val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]()) - // Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from disk + // Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage tasksActive.remove(taskEnd.taskInfo.taskId) val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = @@ -220,7 +218,7 @@ private[spark] class JobProgressListener( jobEnd.jobResult match { case JobFailed(_, stageId) => activeStages.get(stageId).foreach { s => - // Remove by stageId, rather than by StageInfo, in case the StageInfo is from disk + // Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage activeStages.remove(s.stageId) poolToActiveStages(stageIdToPool(stageId)).remove(s.stageId) failedStages += s @@ -230,9 +228,9 @@ private[spark] class JobProgressListener( } } - override def onApplicationStart(applicationStart: SparkListenerApplicationStart) = synchronized { + override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) = { val schedulingModeName = - applicationStart.environmentDetails("Spark Properties").toMap.get("spark.scheduler.mode") + environmentUpdate.environmentDetails("Spark Properties").toMap.get("spark.scheduler.mode") schedulingMode = schedulingModeName match { case Some(name) => Some(SchedulingMode.withName(name)) case None => None @@ -240,12 +238,12 @@ private[spark] class JobProgressListener( } } -private[spark] case class TaskUIData( - taskInfo: TaskInfo, - taskMetrics: Option[TaskMetrics] = None, - exception: Option[ExceptionFailure] = None) +private[ui] case class TaskUIData( + taskInfo: TaskInfo, + taskMetrics: Option[TaskMetrics] = None, + exception: Option[ExceptionFailure] = None) -private[spark] object JobProgressListener { +private object JobProgressListener { val DEFAULT_RETAINED_STAGES = 1000 val DEFAULT_POOL_NAME = "default" } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index a0e681c777198..2e693caa2c0f3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -29,7 +29,7 @@ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils /** Web UI showing progress status of all jobs in the given SparkContext. */ -private[spark] class JobProgressUI(parent: SparkUI) { +private[ui] class JobProgressUI(parent: SparkUI) { val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") val live = parent.live val sc = parent.sc @@ -45,7 +45,7 @@ private[spark] class JobProgressUI(parent: SparkUI) { def start() { val gateway = parent.gatewayListener - _listener = Some(new JobProgressListener(sc, gateway, live)) + _listener = Some(new JobProgressListener(sc, live)) gateway.registerSparkListener(listener) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 29bef73ed9b50..431b4515c11f7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -25,7 +25,7 @@ import org.apache.spark.ui.Page._ import org.apache.spark.ui.UIUtils /** Page showing specific pool details */ -private[spark] class PoolPage(parent: JobProgressUI) { +private[ui] class PoolPage(parent: JobProgressUI) { private val live = parent.live private val sc = parent.sc private def appName = parent.appName diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index c9897d4fa4ca9..45e647da9968a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -24,7 +24,7 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.UIUtils /** Table showing list of pools */ -private[spark] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) { +private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) { private val poolToActiveStages = listener.poolToActiveStages private def listener = parent.listener diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 37e7e6302599f..81ea66f639495 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -28,7 +28,7 @@ import org.apache.spark.ui.UIUtils import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ -private[spark] class StagePage(parent: JobProgressUI) { +private[ui] class StagePage(parent: JobProgressUI) { private val dateFmt = parent.dateFmt private def appName = parent.appName private def listener = parent.listener diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 3983bb00184d5..7785c6fffd5ee 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -27,7 +27,7 @@ import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished stages */ -private[spark] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) { +private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) { private val dateFmt = parent.dateFmt private def isFairScheduler = parent.isFairScheduler private def listener = parent.listener diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala index a9c2d956f0b6a..cee7426b9a9e5 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala @@ -21,14 +21,11 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.Handler -import org.apache.spark.SparkContext -import org.apache.spark.scheduler._ -import org.apache.spark.storage.{StorageUtils, RDDInfo} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui._ /** Web UI showing storage status of all RDD's in the given SparkContext. */ -private[spark] class BlockManagerUI(parent: SparkUI) { +private[ui] class BlockManagerUI(parent: SparkUI) { val live = parent.live val sc = parent.sc @@ -41,7 +38,7 @@ private[spark] class BlockManagerUI(parent: SparkUI) { def start() { val gateway = parent.gatewayListener - _listener = Some(new BlockManagerListener(sc, gateway, live)) + _listener = Some(new BlockManagerListener) gateway.registerSparkListener(listener) } @@ -54,32 +51,4 @@ private[spark] class BlockManagerUI(parent: SparkUI) { /** * A SparkListener that prepares information to be displayed on the BlockManagerUI */ -private[spark] class BlockManagerListener( - sc: SparkContext, - gateway: GatewayUISparkListener, - live: Boolean) - extends StorageStatusFetchSparkListener(sc, gateway, live) { - var rddInfoList: Seq[RDDInfo] = Seq() - - def getRDDInfo() { - if (live) { - val rddInfo = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) - val getRDDInfo = new SparkListenerGetRDDInfo(rddInfo) - gateway.onGetRDDInfo(getRDDInfo) - } - } - - override def onGetRDDInfo(getRDDInfo: SparkListenerGetRDDInfo) { - rddInfoList = getRDDInfo.rddInfoList - } - - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = { - super.onStageSubmitted(stageSubmitted) - getRDDInfo() - } - - override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = { - super.onStageCompleted(stageCompleted) - getRDDInfo() - } -} +private[ui] class BlockManagerListener extends RDDInfoSparkListener diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index 3c4cb119fc05b..f24bdf366d94b 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -27,14 +27,12 @@ import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils /** Page showing list of RDD's currently stored in the cluster */ -private[spark] class IndexPage(parent: BlockManagerUI) { +private[ui] class IndexPage(parent: BlockManagerUI) { private def appName = parent.appName private def listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { // Calculate macro-level statistics - listener.fetchStorageStatus() - listener.getRDDInfo() val rdds = listener.rddInfoList val content = UIUtils.listingTable(rddHeader, rddRow, rdds) UIUtils.headerSparkPage(content, appName, "Storage ", Storage) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 0f8ac552ab052..290520a015fa0 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -21,36 +21,31 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.storage.{BlockId, StorageStatus, StorageUtils} -import org.apache.spark.storage.BlockManagerMasterActor.BlockStatus +import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils} import org.apache.spark.ui.Page._ import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ -private[spark] class RDDPage(parent: BlockManagerUI) { +private[ui] class RDDPage(parent: BlockManagerUI) { private def appName = parent.appName private def listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { - listener.fetchStorageStatus() - listener.getRDDInfo() val storageStatusList = listener.storageStatusList val id = request.getParameter("id").toInt - val filteredStorageStatusList = - StorageUtils.filterStorageStatusByRDD(storageStatusList.toArray, id) val rddInfo = listener.rddInfoList.filter(_.id == id).head // Worker table - val workers = filteredStorageStatusList.map((id, _)) + val workers = storageStatusList.map((id, _)) val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers) // Block table - val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray. - sortWith(_._1.name < _._1.name) + val filteredStorageStatusList = StorageUtils.filterStorageStatusByRDD(storageStatusList, id) + val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).sortWith(_._1.name < _._1.name) val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList) - val blocks = blockStatuses.map { case (id, status) => - (id, status, blockLocations.get(id).getOrElse(Seq("Unknown"))) + val blocks = blockStatuses.map { case (blockId, status) => + (blockId, status, blockLocations.get(blockId).getOrElse(Seq("Unknown"))) } val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index f3af976cf1098..5a26a77bee728 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -21,18 +21,20 @@ import java.util.{UUID, Properties} import scala.collection.JavaConverters._ import scala.collection.Map +import scala.collection.mutable + +import net.liftweb.json.JsonDSL._ +import net.liftweb.json.JsonAST._ +import net.liftweb.json.DefaultFormats + import org.apache.spark.executor.{ShuffleWriteMetrics, ShuffleReadMetrics, TaskMetrics} import org.apache.spark.scheduler._ -import org.apache.spark.storage.BlockManagerMasterActor.BlockStatus import org.apache.spark.storage._ import org.apache.spark._ -import net.liftweb.json.JsonDSL._ -import net.liftweb.json.JsonAST._ -import net.liftweb.json.DefaultFormats - private[spark] object JsonProtocol { + private implicit val format = DefaultFormats /** * JSON serialization methods for SparkListenerEvent's @@ -41,30 +43,31 @@ private[spark] object JsonProtocol { def sparkEventToJson(event: SparkListenerEvent): JValue = { event match { case stageSubmitted: SparkListenerStageSubmitted => - stageSubmittedEventToJson(stageSubmitted) + stageSubmittedToJson(stageSubmitted) case stageCompleted: SparkListenerStageCompleted => - stageCompletedEventToJson(stageCompleted) + stageCompletedToJson(stageCompleted) case taskStart: SparkListenerTaskStart => - taskStartEventToJson(taskStart) + taskStartToJson(taskStart) case taskGettingResult: SparkListenerTaskGettingResult => - taskGettingResultEventToJson(taskGettingResult) + taskGettingResultToJson(taskGettingResult) case taskEnd: SparkListenerTaskEnd => - taskEndEventToJson(taskEnd) + taskEndToJson(taskEnd) case jobStart: SparkListenerJobStart => - jobStartEventToJson(jobStart) + jobStartToJson(jobStart) case jobEnd: SparkListenerJobEnd => - jobEndEventToJson(jobEnd) + jobEndToJson(jobEnd) case applicationStart: SparkListenerApplicationStart => - applicationStartEventToJson(applicationStart) - case storageStatusFetch: SparkListenerStorageStatusFetch => - storageStatusFetchEventToJson(storageStatusFetch) - case getRDDInfo: SparkListenerGetRDDInfo => - getRDDInfoEventToJson(getRDDInfo) - // SparkListenerShutdown is not supported + applicationStartToJson(applicationStart) + case environmentUpdate: SparkListenerEnvironmentUpdate => + environmentUpdateToJson(environmentUpdate) + case executorsStateChange: SparkListenerExecutorsStateChange => + executorsStateChangeToJson(executorsStateChange) + case SparkListenerShutdown => + shutdownToJson() } } - def stageSubmittedEventToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = { + def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = { val stageInfo = stageInfoToJson(stageSubmitted.stageInfo) val properties = propertiesToJson(stageSubmitted.properties) ("Event" -> Utils.getFormattedClassName(stageSubmitted)) ~ @@ -72,26 +75,26 @@ private[spark] object JsonProtocol { ("Properties" -> properties) } - def stageCompletedEventToJson(stageCompleted: SparkListenerStageCompleted): JValue = { + def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = { val stageInfo = stageInfoToJson(stageCompleted.stageInfo) ("Event" -> Utils.getFormattedClassName(stageCompleted)) ~ ("Stage Info" -> stageInfo) } - def taskStartEventToJson(taskStart: SparkListenerTaskStart): JValue = { + def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = { val taskInfo = taskInfoToJson(taskStart.taskInfo) ("Event" -> Utils.getFormattedClassName(taskStart)) ~ ("Stage ID" -> taskStart.stageId) ~ ("Task Info" -> taskInfo) } - def taskGettingResultEventToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = { + def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = { val taskInfo = taskInfoToJson(taskGettingResult.taskInfo) ("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~ ("Task Info" -> taskInfo) } - def taskEndEventToJson(taskEnd: SparkListenerTaskEnd): JValue = { + def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = { val taskEndReason = taskEndReasonToJson(taskEnd.reason) val taskInfo = taskInfoToJson(taskEnd.taskInfo) val taskMetrics = taskMetricsToJson(taskEnd.taskMetrics) @@ -103,7 +106,7 @@ private[spark] object JsonProtocol { ("Task Metrics" -> taskMetrics) } - def jobStartEventToJson(jobStart: SparkListenerJobStart): JValue = { + def jobStartToJson(jobStart: SparkListenerJobStart): JValue = { val properties = propertiesToJson(jobStart.properties) ("Event" -> Utils.getFormattedClassName(jobStart)) ~ ("Job ID" -> jobStart.jobId) ~ @@ -111,37 +114,41 @@ private[spark] object JsonProtocol { ("Properties" -> properties) } - def jobEndEventToJson(jobEnd: SparkListenerJobEnd): JValue = { + def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = { val jobResult = jobResultToJson(jobEnd.jobResult) ("Event" -> Utils.getFormattedClassName(jobEnd)) ~ ("Job ID" -> jobEnd.jobId) ~ ("Job Result" -> jobResult) } - def applicationStartEventToJson(applicationStart: SparkListenerApplicationStart): JValue = { - val environmentDetails = applicationStart.environmentDetails + def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = { + ("Event" -> Utils.getFormattedClassName(applicationStart)) ~ + ("App Name" -> applicationStart.appName) + } + + def environmentUpdateToJson(environmentUpdate: SparkListenerEnvironmentUpdate): JValue = { + val environmentDetails = environmentUpdate.environmentDetails val jvmInformation = mapToJson(environmentDetails("JVM Information").toMap) val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap) val systemProperties = mapToJson(environmentDetails("System Properties").toMap) val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap) - ("Event" -> Utils.getFormattedClassName(applicationStart)) ~ + ("Event" -> Utils.getFormattedClassName(environmentUpdate)) ~ ("JVM Information" -> jvmInformation) ~ ("Spark Properties" -> sparkProperties) ~ ("System Properties" -> systemProperties) ~ ("Classpath Entries" -> classpathEntries) } - def storageStatusFetchEventToJson(storageStatusFetch: SparkListenerStorageStatusFetch): JValue = { + def executorsStateChangeToJson(executorsStateChange: SparkListenerExecutorsStateChange) + : JValue = { val storageStatusList = - JArray(storageStatusFetch.storageStatusList.map(storageStatusToJson).toList) - ("Event" -> Utils.getFormattedClassName(storageStatusFetch)) ~ - ("Storage Status List" -> storageStatusList) + JArray(executorsStateChange.storageStatusList.map(storageStatusToJson).toList) + ("Event" -> Utils.getFormattedClassName(executorsStateChange)) ~ + ("Storage Status List" -> storageStatusList) } - def getRDDInfoEventToJson(getRDDInfo: SparkListenerGetRDDInfo): JValue = { - val rddInfoList = JArray(getRDDInfo.rddInfoList.map(rddInfoToJson).toList) - ("Event" -> Utils.getFormattedClassName(getRDDInfo)) ~ - ("RDD Info List" -> rddInfoList) + def shutdownToJson(): JValue = { + "Event" -> Utils.getFormattedClassName(SparkListenerShutdown) } /** @@ -149,6 +156,7 @@ private[spark] object JsonProtocol { */ def stageInfoToJson(stageInfo: StageInfo): JValue = { + val rddInfo = rddInfoToJson(stageInfo.rddInfo) val taskInfos = JArray(stageInfo.taskInfos.map { case (info, metrics) => ("Task Info" -> taskInfoToJson(info)) ~ ("Task Metrics" -> taskMetricsToJson(metrics)) @@ -157,9 +165,8 @@ private[spark] object JsonProtocol { val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing) ("Stage ID" -> stageInfo.stageId) ~ ("Stage Name" -> stageInfo.name) ~ - ("RDD Name" -> stageInfo.rddName) ~ - ("Number of Partitions" -> stageInfo.numPartitions) ~ ("Number of Tasks" -> stageInfo.numTasks) ~ + ("RDD Info" -> rddInfo) ~ ("Task Infos" -> taskInfos) ~ ("Submission Time" -> submissionTime) ~ ("Completion Time" -> completionTime) ~ @@ -184,6 +191,12 @@ private[spark] object JsonProtocol { taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing) val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing) + val updatedBlocks = taskMetrics.updatedBlocks.map { blocks => + JArray(blocks.toList.map { case (id, status) => + ("Block ID" -> blockIdToJson(id)) ~ + ("Status" -> blockStatusToJson(status)) + }) + }.getOrElse(JNothing) ("Host Name" -> taskMetrics.hostname) ~ ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ ("Executor Run Time" -> taskMetrics.executorRunTime) ~ @@ -193,7 +206,8 @@ private[spark] object JsonProtocol { ("Memory Bytes Spilled" -> taskMetrics.memoryBytesSpilled) ~ ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~ ("Shuffle Read Metrics" -> shuffleReadMetrics) ~ - ("Shuffle Write Metrics" -> shuffleWriteMetrics) + ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~ + ("Updated Blocks" -> updatedBlocks) } def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = { @@ -268,8 +282,8 @@ private[spark] object JsonProtocol { ("RDD ID" -> rddInfo.id) ~ ("Name" -> rddInfo.name) ~ ("Storage Level" -> storageLevel) ~ - ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~ ("Number of Partitions" -> rddInfo.numPartitions) ~ + ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~ ("Memory Size" -> rddInfo.memSize) ~ ("Disk Size" -> rddInfo.diskSize) } @@ -364,11 +378,10 @@ private[spark] object JsonProtocol { val jobStart = Utils.getFormattedClassName(SparkListenerJobStart) val jobEnd = Utils.getFormattedClassName(SparkListenerJobEnd) val applicationStart = Utils.getFormattedClassName(SparkListenerApplicationStart) - val storageStatusFetch = Utils.getFormattedClassName(SparkListenerStorageStatusFetch) - val getRDDInfo = Utils.getFormattedClassName(SparkListenerGetRDDInfo) - // SparkListenerShutdown is not supported + val environmentUpdate = Utils.getFormattedClassName(SparkListenerEnvironmentUpdate) + val executorsStateChanged = Utils.getFormattedClassName(SparkListenerExecutorsStateChange) + val shutdown = Utils.getFormattedClassName(SparkListenerShutdown) - implicit val format = DefaultFormats (json \ "Event").extract[String] match { case `stageSubmitted` => stageSubmittedFromJson(json) case `stageCompleted` => stageCompletedFromJson(json) @@ -378,8 +391,9 @@ private[spark] object JsonProtocol { case `jobStart` => jobStartFromJson(json) case `jobEnd` => jobEndFromJson(json) case `applicationStart` => applicationStartFromJson(json) - case `storageStatusFetch` => storageStatusFetchFromJson(json) - case `getRDDInfo` => getRDDInfoFromJson(json) + case `environmentUpdate` => environmentUpdateFromJson(json) + case `executorsStateChanged` => executorsStateChangeFromJson(json) + case `shutdown` => SparkListenerShutdown } } @@ -395,7 +409,6 @@ private[spark] object JsonProtocol { } def taskStartFromJson(json: JValue): SparkListenerTaskStart = { - implicit val format = DefaultFormats val stageId = (json \ "Stage ID").extract[Int] val taskInfo = taskInfoFromJson(json \ "Task Info") new SparkListenerTaskStart(stageId, taskInfo) @@ -407,7 +420,6 @@ private[spark] object JsonProtocol { } def taskEndFromJson(json: JValue): SparkListenerTaskEnd = { - implicit val format = DefaultFormats val stageId = (json \ "Stage ID").extract[Int] val taskType = (json \ "Task Type").extract[String] val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason") @@ -417,7 +429,6 @@ private[spark] object JsonProtocol { } def jobStartFromJson(json: JValue): SparkListenerJobStart = { - implicit val format = DefaultFormats val jobId = (json \ "Job ID").extract[Int] val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int]) val properties = propertiesFromJson(json \ "Properties") @@ -425,32 +436,28 @@ private[spark] object JsonProtocol { } def jobEndFromJson(json: JValue): SparkListenerJobEnd = { - implicit val format = DefaultFormats val jobId = (json \ "Job ID").extract[Int] val jobResult = jobResultFromJson(json \ "Job Result") new SparkListenerJobEnd(jobId, jobResult) } def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = { + new SparkListenerApplicationStart((json \ "App Name").extract[String]) + } + + def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = { val environmentDetails = Map[String, Seq[(String, String)]]( "JVM Information" -> mapFromJson(json \ "JVM Information").toSeq, "Spark Properties" -> mapFromJson(json \ "Spark Properties").toSeq, "System Properties" -> mapFromJson(json \ "System Properties").toSeq, "Classpath Entries" -> mapFromJson(json \ "Classpath Entries").toSeq) - new SparkListenerApplicationStart(environmentDetails) + new SparkListenerEnvironmentUpdate(environmentDetails) } - def storageStatusFetchFromJson(json: JValue): SparkListenerStorageStatusFetch = { - implicit val format = DefaultFormats + def executorsStateChangeFromJson(json: JValue): SparkListenerExecutorsStateChange = { val storageStatusList = (json \ "Storage Status List").extract[List[JValue]].map(storageStatusFromJson) - new SparkListenerStorageStatusFetch(storageStatusList) - } - - def getRDDInfoFromJson(json: JValue): SparkListenerGetRDDInfo = { - implicit val format = DefaultFormats - val rddInfoList = (json \ "RDD Info List").extract[List[JValue]].map(rddInfoFromJson) - new SparkListenerGetRDDInfo(rddInfoList) + new SparkListenerExecutorsStateChange(storageStatusList) } /** @@ -458,12 +465,10 @@ private[spark] object JsonProtocol { */ def stageInfoFromJson(json: JValue): StageInfo = { - implicit val format = DefaultFormats val stageId = (json \ "Stage ID").extract[Int] val stageName = (json \ "Stage Name").extract[String] - val rddName = (json \ "RDD Name").extract[String] - val numPartitions = (json \ "Number of Partitions").extract[Int] val numTasks = (json \ "Number of Tasks").extract[Int] + val rddInfo = rddInfoFromJson(json \ "RDD Info") val taskInfos = (json \ "Task Infos").extract[List[JValue]].map { value => (taskInfoFromJson(value \ "Task Info"), taskMetricsFromJson(value \ "Task Metrics")) }.toBuffer @@ -471,7 +476,7 @@ private[spark] object JsonProtocol { val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]) val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean] - val stageInfo = new StageInfo(stageId, stageName, rddName, numPartitions, numTasks, taskInfos) + val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfo, taskInfos) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.emittedTaskSizeWarning = emittedTaskSizeWarning @@ -479,7 +484,6 @@ private[spark] object JsonProtocol { } def taskInfoFromJson(json: JValue): TaskInfo = { - implicit val format = DefaultFormats val taskId = (json \ "Task ID").extract[Long] val index = (json \ "Index").extract[Int] val launchTime = (json \ "Launch Time").extract[Long] @@ -500,7 +504,6 @@ private[spark] object JsonProtocol { } def taskMetricsFromJson(json: JValue): TaskMetrics = { - implicit val format = DefaultFormats val metrics = new TaskMetrics metrics.hostname = (json \ "Host Name").extract[String] metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long] @@ -513,11 +516,17 @@ private[spark] object JsonProtocol { Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson) metrics.shuffleWriteMetrics = Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson) + metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { value => + value.extract[List[JValue]].map { block => + val id = blockIdFromJson(block \ "Block ID") + val status = blockStatusFromJson(block \ "Status") + (id, status) + } + } metrics } def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = { - implicit val format = DefaultFormats val metrics = new ShuffleReadMetrics metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long] metrics.totalBlocksFetched = (json \ "Total Blocks Fetched").extract[Int] @@ -530,7 +539,6 @@ private[spark] object JsonProtocol { } def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = { - implicit val format = DefaultFormats val metrics = new ShuffleWriteMetrics metrics.shuffleBytesWritten = (json \ "Shuffle Bytes Written").extract[Long] metrics.shuffleWriteTime = (json \ "Shuffle Write Time").extract[Long] @@ -538,7 +546,6 @@ private[spark] object JsonProtocol { } def taskEndReasonFromJson(json: JValue): TaskEndReason = { - implicit val format = DefaultFormats val success = Utils.getFormattedClassName(Success) val resubmitted = Utils.getFormattedClassName(Resubmitted) val fetchFailed = Utils.getFormattedClassName(FetchFailed) @@ -571,7 +578,6 @@ private[spark] object JsonProtocol { } def blockManagerIdFromJson(json: JValue): BlockManagerId = { - implicit val format = DefaultFormats val executorId = (json \ "Executor ID").extract[String] val host = (json \ "Host").extract[String] val port = (json \ "Port").extract[Int] @@ -580,7 +586,6 @@ private[spark] object JsonProtocol { } def jobResultFromJson(json: JValue): JobResult = { - implicit val format = DefaultFormats val jobSucceeded = Utils.getFormattedClassName(JobSucceeded) val jobFailed = Utils.getFormattedClassName(JobFailed) @@ -594,31 +599,34 @@ private[spark] object JsonProtocol { } def storageStatusFromJson(json: JValue): StorageStatus = { - implicit val format = DefaultFormats val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") val maxMem = (json \ "Maximum Memory").extract[Long] val blocks = (json \ "Blocks").extract[List[JValue]].map { block => val id = blockIdFromJson(block \ "Block ID") val status = blockStatusFromJson(block \ "Status") (id, status) - }.toMap - new StorageStatus(blockManagerId, maxMem, blocks) + } + val blockMap = mutable.Map[BlockId, BlockStatus](blocks: _*) + new StorageStatus(blockManagerId, maxMem, blockMap) } def rddInfoFromJson(json: JValue): RDDInfo = { - implicit val format = DefaultFormats val rddId = (json \ "RDD ID").extract[Int] val name = (json \ "Name").extract[String] val storageLevel = storageLevelFromJson(json \ "Storage Level") - val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int] val numPartitions = (json \ "Number of Partitions").extract[Int] + val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int] val memSize = (json \ "Memory Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] - new RDDInfo(rddId, name, storageLevel, numCachedPartitions, numPartitions, memSize, diskSize) + + val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel) + rddInfo.numCachedPartitions = numCachedPartitions + rddInfo.memSize = memSize + rddInfo.diskSize = diskSize + rddInfo } def storageLevelFromJson(json: JValue): StorageLevel = { - implicit val format = DefaultFormats val useDisk = (json \ "Use Disk").extract[Boolean] val useMemory = (json \ "Use Memory").extract[Boolean] val deserialized = (json \ "Deserialized").extract[Boolean] @@ -627,7 +635,6 @@ private[spark] object JsonProtocol { } def blockIdFromJson(json: JValue): BlockId = { - implicit val format = DefaultFormats val rddBlockId = Utils.getFormattedClassName(RDDBlockId) val shuffleBlockId = Utils.getFormattedClassName(ShuffleBlockId) val broadcastBlockId = Utils.getFormattedClassName(BroadcastBlockId) @@ -672,11 +679,10 @@ private[spark] object JsonProtocol { } def blockStatusFromJson(json: JValue): BlockStatus = { - implicit val format = DefaultFormats val storageLevel = storageLevelFromJson(json \ "Storage Level") val memorySize = (json \ "Memory Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] - new BlockStatus(storageLevel, memorySize, diskSize) + BlockStatus(storageLevel, memorySize, diskSize) } /** @@ -697,14 +703,12 @@ private[spark] object JsonProtocol { } def UUIDFromJson(json: JValue): UUID = { - implicit val format = DefaultFormats val leastSignificantBits = (json \ "Least Significant Bits").extract[Long] val mostSignificantBits = (json \ "Most Significant Bits").extract[Long] new UUID(leastSignificantBits, mostSignificantBits) } def stackTraceFromJson(json: JValue): Array[StackTraceElement] = { - implicit val format = DefaultFormats json.extract[List[JValue]].map { line => val declaringClass = (line \ "Declaring Class").extract[String] val methodName = (line \ "Method Name").extract[String] @@ -715,7 +719,6 @@ private[spark] object JsonProtocol { } def exceptionFromJson(json: JValue): Exception = { - implicit val format = DefaultFormats val e = new Exception((json \ "Message").extract[String]) e.setStackTrace(stackTraceFromJson(json \ "Stack Trace")) e diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index a34976af44963..d88343de75cb1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -50,9 +50,9 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc listener.stageInfos.size should be {1} val first = listener.stageInfos.head - first.rddName should be {"Target RDD"} + first.rddInfo.name should be {"Target RDD"} first.numTasks should be {4} - first.numPartitions should be {4} + first.rddInfo.numPartitions should be {4} first.submissionTime should be ('defined) first.completionTime should be ('defined) first.taskInfos.length should be {4} @@ -110,7 +110,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc checkNonZeroAvg( stageInfo.taskInfos.map{_._2.executorDeserializeTime}, stageInfo + " executorDeserializeTime") - if (stageInfo.rddName == d4.name) { + if (stageInfo.rddInfo.name == d4.name) { checkNonZeroAvg( stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime") @@ -118,11 +118,11 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc stageInfo.taskInfos.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0l) - if (stageInfo.rddName == d2.name || stageInfo.rddName == d3.name) { + if (stageInfo.rddInfo.name == d2.name || stageInfo.rddInfo.name == d3.name) { taskMetrics.shuffleWriteMetrics should be ('defined) taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l) } - if (stageInfo.rddName == d4.name) { + if (stageInfo.rddInfo.name == d4.name) { taskMetrics.shuffleReadMetrics should be ('defined) val sm = taskMetrics.shuffleReadMetrics.get sm.totalBlocksFetched should be > (0) diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 9483fd8fb10f2..006353b705e32 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.util.Utils class JobProgressListenerSuite extends FunSuite with LocalSparkContext { test("test executor id to summary") { val sc = new SparkContext("local", "test") - val listener = new JobProgressListener(sc) + val listener = new JobProgressListener(sc, true) val taskMetrics = new TaskMetrics() val shuffleReadMetrics = new ShuffleReadMetrics()