diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 064315177e06f..3c481bf3491f9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -41,6 +41,9 @@ class Job(val time: Time, func: () => _) {
_result
}
+ /**
+ * @return the global unique id of this Job.
+ */
def id: String = {
if (!isSet) {
throw new IllegalStateException("Cannot access id before calling setId")
@@ -48,6 +51,9 @@ class Job(val time: Time, func: () => _) {
_id
}
+ /**
+ * @return the output op id of this Job. Each Job has a unique output op id in the same JobSet.
+ */
def outputOpId: Int = {
if (!isSet) {
throw new IllegalStateException("Cannot access number before calling setId")
@@ -60,7 +66,7 @@ class Job(val time: Time, func: () => _) {
throw new IllegalStateException("Cannot call setOutputOpId more than once")
}
isSet = true
- _id = "streaming job " + time + "." + outputOpId
+ _id = s"streaming job $time.$outputOpId"
_outputOpId = outputOpId
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index e876157fb4b00..f88ac66b8d77c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -19,16 +19,24 @@ package org.apache.spark.streaming.ui
import javax.servlet.http.HttpServletRequest
+import scala.collection.mutable.{ArrayBuffer, Map}
+import scala.xml.{NodeSeq, Node}
+
import org.apache.commons.lang3.StringEscapeUtils
+
import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.ui.{UIUtils, WebUIPage}
-import org.apache.spark.streaming.ui.StreamingJobProgressListener.{JobId, OutputOpId}
+import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData
-import scala.xml.{NodeSeq, Node}
+private[ui] case class BatchUIData(
+ var batchInfo: BatchInfo = null,
+ outputOpIdToSparkJobIds: Map[OutputOpId, ArrayBuffer[SparkJobId]] = Map()) {
+}
-class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
- private val streaminglistener = parent.listener
+private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
+ private val streamingListener = parent.listener
private val sparkListener = parent.ssc.sc.jobProgressListener
private def columns: Seq[Node] = {
@@ -42,75 +50,101 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
Error |
}
- private def makeOutputOpIdRow(outputOpId: OutputOpId, jobs: Seq[JobUIData]): Seq[Node] = {
- val jobDurations = jobs.map(job => {
- job.submissionTime.map { start =>
- val end = job.completionTime.getOrElse(System.currentTimeMillis())
- end - start
- }
- })
- val formattedOutputOpDuration =
- if (jobDurations.exists(_ == None)) {
- // If any job does not finish, set "formattedOutputOpDuration" to "-"
- "-"
- } else {
- UIUtils.formatDuration(jobDurations.flatMap(x => x).sum)
- }
+ /**
+ * Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
+ * one cell, we use "rowspan" for the first row of a output op.
+ */
+ def generateJobRow(
+ outputOpId: OutputOpId,
+ formattedOutputOpDuration: String,
+ numSparkJobRowsInOutputOp: Int,
+ isFirstRow: Boolean,
+ sparkJob: JobUIData): Seq[Node] = {
+ val lastStageInfo = Option(sparkJob.stageIds)
+ .filter(_.nonEmpty)
+ .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
+ val lastStageData = lastStageInfo.flatMap { s =>
+ sparkListener.stageIdToData.get((s.stageId, s.attemptId))
+ }
- def makeJobRow(job: JobUIData, isFirstRow: Boolean): Seq[Node] = {
- val lastStageInfo = Option(job.stageIds)
- .filter(_.nonEmpty)
- .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
- val lastStageData = lastStageInfo.flatMap { s =>
- sparkListener.stageIdToData.get((s.stageId, s.attemptId))
+ val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+ val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
+ val duration: Option[Long] = {
+ sparkJob.submissionTime.map { start =>
+ val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
}
+ }
+ val lastFailureReason =
+ sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
+ dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
+ flatMap(info => info.failureReason).headOption.getOrElse("")
+ val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
+ val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
- val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
- val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
- val duration: Option[Long] = {
- job.submissionTime.map { start =>
- val end = job.completionTime.getOrElse(System.currentTimeMillis())
- end - start
- }
- }
- val lastFailureReason = job.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
- dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
- flatMap(info => info.failureReason).headOption.getOrElse("")
- val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
- val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${job.jobId}"
-
- {if(isFirstRow) {
- {outputOpId} |
-
+ // In the first row, output op id and its information needs to be shown. In other rows, these
+ // cells will be taken up due to "rowspan".
+ val prefixCells =
+ if (isFirstRow) {
+ | {outputOpId.toString} |
+
{lastStageDescription}
{lastStageName}
|
- {formattedOutputOpDuration} | }
+ {formattedOutputOpDuration} |
+ } else {
+ Nil
+ }
+
+
+ {prefixCells}
+
+
+ {sparkJob.jobId}{sparkJob.jobGroup.map(id => s"($id)").getOrElse("")}
+
+ |
+
+ {formattedDuration}
+ |
+
+ {sparkJob.completedStageIndices.size}/{sparkJob.stageIds.size - sparkJob.numSkippedStages}
+ {if (sparkJob.numFailedStages > 0) s"(${sparkJob.numFailedStages} failed)"}
+ {if (sparkJob.numSkippedStages > 0) s"(${sparkJob.numSkippedStages} skipped)"}
+ |
+
+ {
+ UIUtils.makeProgressBar(
+ started = sparkJob.numActiveTasks,
+ completed = sparkJob.numCompletedTasks,
+ failed = sparkJob.numFailedTasks,
+ skipped = sparkJob.numSkippedTasks,
+ total = sparkJob.numTasks - sparkJob.numSkippedTasks)
}
- |
-
- {job.jobId}{job.jobGroup.map(id => s"($id)").getOrElse("")}
-
- |
-
- {formattedDuration}
- |
-
- {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
- {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
- {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
- |
-
- {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
- failed = job.numFailedTasks, skipped = job.numSkippedTasks,
- total = job.numTasks - job.numSkippedTasks)}
- |
- {failureReasonCell(lastFailureReason)}
-
- }
+
+ {failureReasonCell(lastFailureReason)}
+
+ }
- makeJobRow(jobs.head, true) ++ (jobs.tail.map(job => makeJobRow(job, false)).flatMap(x => x))
+ private def generateOutputOpIdRow(
+ outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
+ val sparkjobDurations = sparkJobs.map(sparkJob => {
+ sparkJob.submissionTime.map { start =>
+ val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ })
+ val formattedOutputOpDuration =
+ if (sparkjobDurations.exists(_ == None)) {
+ // If any job does not finish, set "formattedOutputOpDuration" to "-"
+ "-"
+ } else {
+ UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
+ }
+ generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
+ sparkJobs.tail.map { sparkJob =>
+ generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
+ }.flatMap(x => x)
}
private def failureReasonCell(failureReason: String): Seq[Node] = {
@@ -138,26 +172,29 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
{failureReasonSummary}{details} |
}
- private def jobsTable(jobInfos: Seq[(OutputOpId, JobId)]): Seq[Node] = {
- def getJobData(jobId: JobId): Option[JobUIData] = {
- sparkListener.activeJobs.get(jobId).orElse {
- sparkListener.completedJobs.find(_.jobId == jobId).orElse {
- sparkListener.failedJobs.find(_.jobId == jobId)
- }
+ private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
+ sparkListener.activeJobs.get(sparkJobId).orElse {
+ sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse {
+ sparkListener.failedJobs.find(_.jobId == sparkJobId)
}
}
+ }
- // Group jobInfos by OutputOpId firstly, then sort them.
- // E.g., [(0, 1), (1, 3), (0, 2), (1, 4)] => [(0, [1, 2]), (1, [3, 4])]
- val outputOpIdWithJobIds: Seq[(OutputOpId, Seq[JobId])] =
- jobInfos.groupBy(_._1).toSeq.sortBy(_._1). // sorted by OutputOpId
+ /**
+ * Generate the job table for the batch.
+ */
+ private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
+ val outputOpIdWithSparkJobIds: Seq[(OutputOpId, Seq[SparkJobId])] = {
+ batchUIData.outputOpIdToSparkJobIds.toSeq.sortBy(_._1). // sorted by OutputOpId
map { case (outputOpId, jobs) =>
- (outputOpId, jobs.map(_._2).sortBy(x => x).toSeq)} // sort JobIds for each OutputOpId
+ (outputOpId, jobs.sorted.toSeq) // sort JobIds for each OutputOpId
+ }
+ }
sparkListener.synchronized {
- val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = outputOpIdWithJobIds.map {
- case (outputOpId, jobIds) =>
- // Filter out JobIds that don't exist in sparkListener
- (outputOpId, jobIds.flatMap(getJobData))
+ val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = outputOpIdWithSparkJobIds.map {
+ case (outputOpId, sparkJobIds) =>
+ // Filter out spark Job ids that don't exist in sparkListener
+ (outputOpId, sparkJobIds.flatMap(getJobData))
}
@@ -165,7 +202,11 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
{columns}
- {outputOpIdWithJobs.map { case (outputOpId, jobs) => makeOutputOpIdRow(outputOpId, jobs)}}
+ {
+ outputOpIdWithJobs.map {
+ case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
+ }
+ }
}
@@ -176,13 +217,11 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
throw new IllegalArgumentException(s"Missing id parameter")
}
val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
- val (batchInfo, jobInfos) = streaminglistener.synchronized {
- val _batchInfo = streaminglistener.getBatchInfo(batchTime).getOrElse {
- throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
- }
- val _jobInfos = streaminglistener.getJobInfos(batchTime)
- (_batchInfo, _jobInfos)
+
+ val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
+ throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
}
+ val batchInfo = batchUIData.batchInfo
val formattedSchedulingDelay =
batchInfo.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
@@ -195,7 +234,7 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
-
Batch Duration:
- {UIUtils.formatDuration(streaminglistener.batchDuration)}
+ {UIUtils.formatDuration(streamingListener.batchDuration)}
-
Input data size:
@@ -216,9 +255,15 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
- val content = summary ++ jobInfos.map(jobsTable).getOrElse {
- Cannot find any job for Batch {formattedBatchTime}
- }
+ val jobTable =
+ if (batchUIData.outputOpIdToSparkJobIds.isEmpty) {
+ Cannot find any job for Batch {formattedBatchTime}.
+ } else {
+ generateJobTable(batchUIData)
+ }
+
+ val content = summary ++ jobTable
+
UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index a7dd41f0443c4..d9f9602bb13c9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -34,8 +34,6 @@ import org.apache.spark.util.Distribution
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
extends StreamingListener with SparkListener {
- import StreamingJobProgressListener._
-
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]
private val completedBatchInfos = new Queue[BatchInfo]
@@ -45,7 +43,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
- private val batchTimeToJobIds = new HashMap[Time, ArrayBuffer[(OutputOpId, JobId)]]
+ private val batchTimeToBatchUIData = new HashMap[Time, BatchUIData]
val batchDuration = ssc.graph.batchDuration.milliseconds
@@ -87,7 +85,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
completedBatchInfos.enqueue(batchCompleted.batchInfo)
if (completedBatchInfos.size > batchInfoLimit) {
val removedBatch = completedBatchInfos.dequeue()
- batchTimeToJobIds.remove(removedBatch.batchTime)
+ batchTimeToBatchUIData.remove(removedBatch.batchTime)
}
totalCompletedBatches += 1L
@@ -97,8 +95,12 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
- batchTimeToJobIds.getOrElseUpdate(batchTime, ArrayBuffer[(OutputOpId, JobId)]()) +=
- outputOpId -> jobStart.jobId
+ val batchUIData = batchTimeToBatchUIData.getOrElseUpdate(batchTime, BatchUIData())
+ // Because onJobStart and onBatchXXX messages are processed in different threads,
+ // we may not be able to get the corresponding BatchInfo now. So here we only set
+ // batchUIData.outputOpIdToSparkJobIds, batchUIData.batchInfo will be set in "getBatchUIData".
+ batchUIData.outputOpIdToSparkJobIds.
+ getOrElseUpdate(outputOpId, ArrayBuffer()) += jobStart.jobId
}
}
@@ -206,7 +208,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
}
- def getBatchInfo(batchTime: Time): Option[BatchInfo] = synchronized {
+ private def getBatchInfo(batchTime: Time): Option[BatchInfo] = {
waitingBatchInfos.get(batchTime).orElse {
runningBatchInfos.get(batchTime).orElse {
completedBatchInfos.find(batch => batch.batchTime == batchTime)
@@ -214,12 +216,16 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
- def getJobInfos(batchTime: Time): Option[Seq[(OutputOpId, JobId)]] = synchronized {
- batchTimeToJobIds.get(batchTime).map(_.toList)
+ def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
+ for (batchInfo <- getBatchInfo(batchTime)) yield {
+ val batchUIData = batchTimeToBatchUIData.getOrElse(batchTime, BatchUIData(batchInfo))
+ batchUIData.batchInfo = batchInfo
+ batchUIData
+ }
}
}
private[streaming] object StreamingJobProgressListener {
- type JobId = Int
+ type SparkJobId = Int
type OutputOpId = Int
}