From 1f45d44dfa45d2a8174837d36aa31ec31e4a1e80 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 20 Nov 2014 13:51:21 -0800 Subject: [PATCH] Incorporate a bunch of minor review feedback. --- .../spark/ui/jobs/JobProgressListener.scala | 21 ++++++++++++------- .../org/apache/spark/ui/jobs/UIData.scala | 7 +++++-- .../org/apache/spark/ui/UISeleniumSuite.scala | 2 +- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index a8941fdb8e93a..31ed997ba4e89 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -49,8 +49,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { type PoolName = String type ExecutorId = String - // Define all of our state: - // Jobs: val activeJobs = new HashMap[JobId, JobUIData] val completedJobs = ListBuffer[JobUIData]() @@ -151,9 +149,18 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) ) yield group val jobData: JobUIData = - new JobUIData(jobStart.jobId, Some(System.currentTimeMillis), None, jobStart.stageIds, - jobGroup, JobExecutionStatus.RUNNING) - // Compute (a potential underestimate of) the number of tasks that will be run by this job: + new JobUIData( + jobId = jobStart.jobId, + startTime = Some(System.currentTimeMillis), + endTime = None, + stageIds = jobStart.stageIds, + jobGroup = jobGroup, + status = JobExecutionStatus.RUNNING) + // Compute (a potential underestimate of) the number of tasks that will be run by this job. + // This may be an underestimate because the job start event references all of the result + // stages's transitive stage dependencies, but some of these stages might be skipped if their + // output is available from earlier runs. + // See https://github.com/apache/spark/pull/3009 for a more extensive discussion. jobData.numTasks = { val allStages = jobStart.stageInfos val missingStages = allStages.filter(_.completionTime.isEmpty) @@ -195,8 +202,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // If this is a pending stage and no other job depends on it, then it won't be run. // To prevent memory leaks, remove this data since it won't be cleaned up as stages // finish / fail: - if (stageInfo.submissionTime.isEmpty && stageInfo.completionTime.isEmpty - && jobsUsingStage.isEmpty) { + val isPendingStage = stageInfo.submissionTime.isEmpty && stageInfo.completionTime.isEmpty + if (isPendingStage && jobsUsingStage.isEmpty) { stageIdToInfo.remove(stageId) stageIdToData.remove((stageId, stageInfo.attemptId)) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index f849792da4554..79ec72e8f273b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -46,8 +46,11 @@ private[jobs] object UIData { var jobGroup: Option[String] = None, var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN, /* Tasks */ - // `numTasks` is a potential underestimate of the true number of tasks that this job will run - // see https://github.com/apache/spark/pull/3009 for an extensive discussion of this + // `numTasks` is a potential underestimate of the true number of tasks that this job will run. + // This may be an underestimate because the job start event references all of the result + // stages's transitive stage dependencies, but some of these stages might be skipped if their + // output is available from earlier runs. + // See https://github.com/apache/spark/pull/3009 for a more extensive discussion. var numTasks: Int = 0, var numActiveTasks: Int = 0, var numCompletedTasks: Int = 0, diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index c0bb9345eee04..ee0d7433a3105 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -24,10 +24,10 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.selenium.WebBrowser import org.scalatest.time.SpanSugar._ -import org.apache.spark.api.java.StorageLevels import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.LocalSparkContext._ +import org.apache.spark.api.java.StorageLevels import org.apache.spark.shuffle.FetchFailedException /**