Skip to content

Commit

Permalink
Make BatchPage show friendly information when jobs are dropped by Spa…
Browse files Browse the repository at this point in the history
…rkListener
  • Loading branch information
zsxwing committed May 1, 2015
1 parent 7630213 commit 83dec11
Showing 1 changed file with 107 additions and 30 deletions.
137 changes: 107 additions & 30 deletions streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui

import javax.servlet.http.HttpServletRequest

import scala.xml.{NodeSeq, Node}
import scala.xml.{NodeSeq, Node, Text}

import org.apache.commons.lang3.StringEscapeUtils

Expand All @@ -28,6 +28,7 @@ import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData

private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])

private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streamingListener = parent.listener
Expand All @@ -44,25 +45,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<th>Error</th>
}

private def generateJobRow(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: SparkJobIdWithUIData): Seq[Node] = {
if (sparkJob.jobUIData.isDefined) {
generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
} else {
generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
}
}

/**
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
* one cell, we use "rowspan" for the first row of a output op.
*/
def generateJobRow(
def generateNormalJobRow(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
val lastStageInfo = Option(sparkJob.stageIds)
.filter(_.nonEmpty)
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
}

val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
Expand All @@ -83,9 +92,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>
<span class="description-input" title={lastStageDescription}>
{lastStageDescription}
</span>{lastStageName}
{outputOpDescription}
</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
} else {
Expand Down Expand Up @@ -122,27 +129,97 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</tr>
}

private def generateOutputOpIdRow(
outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
val sparkjobDurations = sparkJobs.map(sparkJob => {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
end - start
/**
* If a job is dropped by sparkListener due to exceeding the limitation, we only show the job id
* with "-" cells.
*/
private def generateDroppedJobRow(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
jobId: Int): Seq[Node] = {
// In the first row, output op id and its information needs to be shown. In other rows, these
// cells will be taken up due to "rowspan".
// scalastyle:off
val prefixCells =
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
} else {
Nil
}
})
// scalastyle:on

<tr>
{prefixCells}
<td sorttable_customkey={jobId.toString}>
{jobId.toString}
</td>
<!-- Duration -->
<td>-</td>
<!-- Stages: Succeeded/Total -->
<td>-</td>
<!-- Tasks (for all stages): Succeeded/Total -->
<td>-</td>
<!-- Error -->
<td>-</td>
</tr>
}

private def generateOutputOpIdRow(
outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
// We don't count the durations of dropped jobs
val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
map(sparkJob => {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
})
val formattedOutputOpDuration =
if (sparkjobDurations.exists(_ == None)) {
// If any job does not finish, set "formattedOutputOpDuration" to "-"
if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) {
// If no job or any job does not finish, set "formattedOutputOpDuration" to "-"
"-"
} else {
UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
UIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum)
}
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++

val description = generateOutputOpDescription(sparkJobs)

generateJobRow(
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
generateJobRow(
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
}.flatMap(x => x)
}

private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
val lastStageInfo =
sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData
// Get the latest Stage info
flatMap { sparkJob =>
if (sparkJob.stageIds.isEmpty) {
None
} else {
sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
}
}
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
}

val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")

<span class="description-input" title={lastStageDescription}>
{lastStageDescription}
</span> ++ Text(lastStageName)
}

private def failureReasonCell(failureReason: String): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
Expand Down Expand Up @@ -187,10 +264,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
sparkListener.synchronized {
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
// Filter out spark Job ids that don't exist in sparkListener
(outputOpId, sparkJobIds.flatMap(getJobData))
(outputOpId,
sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
}

<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
Expand All @@ -200,7 +277,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tbody>
{
outputOpIdWithJobs.map {
case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
}
}
</tbody>
Expand Down

0 comments on commit 83dec11

Please sign in to comment.