Skip to content

Commit

Permalink
Add stages progress bar; fix bug where active stages show as completed.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Nov 12, 2014
1 parent 4846ce4 commit b7bf30e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
<th>Description</th>
<th>Submitted</th>
<th>Duration</th>
<th>Tasks (for all stages): Succeeded/Total</th>
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
<th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
}

def makeRow(job: JobUIData): Seq[Node] = {
Expand Down Expand Up @@ -79,6 +80,10 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
{formattedSubmissionTime}
</td>
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
<td class="progress-cell">
{UIUtils.makeProgressBar(job.numActiveStages, job.numCompletedStages,
job.numFailedStages, job.stageIds.size)}
</td>
<td class="progress-cell">
{UIUtils.makeProgressBar(job.numActiveTasks, job.numCompletedTasks,
job.numFailedTasks, job.numTasks)}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, "Unknown"))
}

val (activeStages, completedOrFailedStages) = stages.partition(_.completionTime.isDefined)
val (completedOrFailedStages, activeStages) = stages.partition(_.completionTime.isDefined)
val (failedStages, completedStages) =
completedOrFailedStages.partition(_.failureReason.isDefined)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
numFailedStages += 1
trimIfNecessary(failedStages)
}

for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId);
jobId <- activeJobsDependentOnStage;
jobData <- jobIdToData.get(jobId)
) {
jobData.numActiveStages -= 1
if (stage.failureReason.isEmpty) {
jobData.numCompletedStages += 1
} else {
jobData.numFailedStages += 1
}
}
}

/** If stages is too large, remove and garbage collect old stages */
Expand Down Expand Up @@ -177,6 +190,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
jobData <- jobIdToData.get(jobId)
) {
jobData.numTasks += stage.numTasks
jobData.numActiveStages += 1
}
}

Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,15 @@ private[jobs] object UIData {
var stageIds: Seq[Int] = Seq.empty,
var jobGroup: Option[String] = None,
var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN,
/* Tasks */
var numTasks: Int = 0,
var numActiveTasks: Int = 0,
var numCompletedTasks: Int = 0,
var numFailedTasks: Int = 0
var numFailedTasks: Int = 0,
/* Stages */
var numActiveStages: Int = 0,
var numCompletedStages: Int = 0,
var numFailedStages: Int = 0
)

class StageUIData {
Expand Down

0 comments on commit b7bf30e

Please sign in to comment.