diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 3312671b6f885..5b9db86c03aa1 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -283,4 +283,17 @@ private[spark] object UIUtils extends Logging { } + + def makeProgressBar(started: Int, completed: Int, failed: Int, total: Int): Seq[Node] = { + val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) + val startWidth = "width: %s%%".format((started.toDouble/total)*100) + +
+ + {completed}/{total} { if (failed > 0) s"($failed failed)" else "" } + +
+
+
+ } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 170ee5af56f5d..c116d063714d4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -44,6 +44,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { Description Submitted Duration + Tasks: Succeeded/Total } def makeRow(job: JobUIData): Seq[Node] = { @@ -74,6 +75,10 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { {formattedSubmissionTime} {formattedDuration} + + {UIUtils.makeProgressBar(job.numActiveTasks, job.numCompletedTasks, + job.numFailedTasks, job.numTasks)} + } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 72bc8d08864af..d63722ceec36a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.jobs -import scala.collection.mutable.{HashMap, ListBuffer} +import scala.collection.mutable.{HashMap, HashSet, ListBuffer} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi @@ -59,7 +59,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val failedStages = ListBuffer[StageInfo]() val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData] val stageIdToInfo = new HashMap[StageId, StageInfo] - + val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]] + // Number of completed and failed stages, may not actually equal to completedStages.size and // failedStages.size respectively due to completedStage and failedStages only maintain the latest // part of the stages, the earlier ones will be removed when there are too many stages for @@ -86,6 +87,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { jobGroup, JobExecutionStatus.RUNNING) jobIdToData(jobStart.jobId) = jobData activeJobs(jobStart.jobId) = jobData + for (stageId <- jobStart.stageIds) { + stageIdToActiveJobIds.getOrElseUpdate(stageId, new HashSet[StageId]).add(jobStart.jobId) + } } override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized { @@ -102,6 +106,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { failedJobs += jobData jobData.status = JobExecutionStatus.FAILED } + for (stageId <- jobData.stageIds) { + stageIdToActiveJobIds.get(stageId).foreach(_.remove(jobEnd.jobId)) + } } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { @@ -138,6 +145,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) stageIdToInfo.remove(s.stageId) + stageIdToActiveJobIds.remove(s.stageId) } stages.trimStart(toRemove) } @@ -162,6 +170,14 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]) stages(stage.stageId) = stage + + for ( + activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId); + jobId <- activeJobsDependentOnStage; + jobData <- jobIdToData.get(jobId) + ) { + jobData.numTasks += stage.numTasks + } } override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { @@ -174,6 +190,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.numActiveTasks += 1 stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo)) } + for ( + activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId); + jobId <- activeJobsDependentOnStage; + jobData <- jobIdToData.get(jobId) + ) { + jobData.numActiveTasks += 1 + } } override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { @@ -208,6 +231,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { execSummary.taskTime += info.duration stageData.numActiveTasks -= 1 + val isRecomputation = stageData.completedIndices.contains(info.index) + val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) = taskEnd.reason match { case org.apache.spark.Success => @@ -231,6 +256,22 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { taskData.taskInfo = info taskData.taskMetrics = metrics taskData.errorMessage = errorMessage + + for ( + activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskEnd.stageId); + jobId <- activeJobsDependentOnStage; + jobData <- jobIdToData.get(jobId) + ) { + jobData.numActiveTasks -= 1 + taskEnd.reason match { + case Success => + if (!isRecomputation) { + jobData.numCompletedTasks += 1 + } + case _ => + jobData.numFailedTasks += 1 + } + } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 1546b26c0fc27..88b687d51a537 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -69,20 +69,6 @@ private[ui] class StageTableBase( } - private def makeProgressBar(started: Int, completed: Int, failed: Int, total: Int): Seq[Node] = - { - val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) - val startWidth = "width: %s%%".format((started.toDouble/total)*100) - -
- - {completed}/{total} { if (failed > 0) s"($failed failed)" else "" } - -
-
-
- } - private def makeDescription(s: StageInfo): Seq[Node] = { // scalastyle:off val killLink = if (killEnabled) { @@ -172,7 +158,7 @@ private[ui] class StageTableBase( {submissionTime} {formattedDuration} - {makeProgressBar(stageData.numActiveTasks, stageData.completedIndices.size, + {UIUtils.makeProgressBar(stageData.numActiveTasks, stageData.completedIndices.size, stageData.numFailedTasks, s.numTasks)} {inputReadWithUnit} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 7ab4bf4712a4e..829500e254db9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -43,7 +43,11 @@ private[jobs] object UIData { var endTime: Option[Long] = None, var stageIds: Seq[Int] = Seq.empty, var jobGroup: Option[String] = None, - var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN + var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN, + var numTasks: Int = 0, + var numActiveTasks: Int = 0, + var numCompletedTasks: Int = 0, + var numFailedTasks: Int = 0 ) class StageUIData {