Skip to content

Commit

Permalink
Incorporate a bunch of minor review feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Nov 20, 2014
1 parent 0b77e3e commit 1f45d44
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
type PoolName = String
type ExecutorId = String

// Define all of our state:

// Jobs:
val activeJobs = new HashMap[JobId, JobUIData]
val completedJobs = ListBuffer[JobUIData]()
Expand Down Expand Up @@ -151,9 +149,18 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
) yield group
val jobData: JobUIData =
new JobUIData(jobStart.jobId, Some(System.currentTimeMillis), None, jobStart.stageIds,
jobGroup, JobExecutionStatus.RUNNING)
// Compute (a potential underestimate of) the number of tasks that will be run by this job:
new JobUIData(
jobId = jobStart.jobId,
startTime = Some(System.currentTimeMillis),
endTime = None,
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
// This may be an underestimate because the job start event references all of the result
// stages's transitive stage dependencies, but some of these stages might be skipped if their
// output is available from earlier runs.
// See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
jobData.numTasks = {
val allStages = jobStart.stageInfos
val missingStages = allStages.filter(_.completionTime.isEmpty)
Expand Down Expand Up @@ -195,8 +202,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// If this is a pending stage and no other job depends on it, then it won't be run.
// To prevent memory leaks, remove this data since it won't be cleaned up as stages
// finish / fail:
if (stageInfo.submissionTime.isEmpty && stageInfo.completionTime.isEmpty
&& jobsUsingStage.isEmpty) {
val isPendingStage = stageInfo.submissionTime.isEmpty && stageInfo.completionTime.isEmpty
if (isPendingStage && jobsUsingStage.isEmpty) {
stageIdToInfo.remove(stageId)
stageIdToData.remove((stageId, stageInfo.attemptId))
}
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ private[jobs] object UIData {
var jobGroup: Option[String] = None,
var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN,
/* Tasks */
// `numTasks` is a potential underestimate of the true number of tasks that this job will run
// see https://github.com/apache/spark/pull/3009 for an extensive discussion of this
// `numTasks` is a potential underestimate of the true number of tasks that this job will run.
// This may be an underestimate because the job start event references all of the result
// stages's transitive stage dependencies, but some of these stages might be skipped if their
// output is available from earlier runs.
// See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
var numTasks: Int = 0,
var numActiveTasks: Int = 0,
var numCompletedTasks: Int = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.selenium.WebBrowser
import org.scalatest.time.SpanSugar._

import org.apache.spark.api.java.StorageLevels
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.LocalSparkContext._
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.shuffle.FetchFailedException

/**
Expand Down

0 comments on commit 1f45d44

Please sign in to comment.