Skip to content

Commit

Permalink
Refactor codes as per TD's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 22, 2015
1 parent 35ffd80 commit 77a69ae
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,19 @@ class Job(val time: Time, func: () => _) {
_result
}

/**
* @return the global unique id of this Job.
*/
def id: String = {
if (!isSet) {
throw new IllegalStateException("Cannot access id before calling setId")
}
_id
}

/**
* @return the output op id of this Job. Each Job has a unique output op id in the same JobSet.
*/
def outputOpId: Int = {
if (!isSet) {
throw new IllegalStateException("Cannot access number before calling setId")
Expand All @@ -60,7 +66,7 @@ class Job(val time: Time, func: () => _) {
throw new IllegalStateException("Cannot call setOutputOpId more than once")
}
isSet = true
_id = "streaming job " + time + "." + outputOpId
_id = s"streaming job $time.$outputOpId"
_outputOpId = outputOpId
}

Expand Down
225 changes: 135 additions & 90 deletions streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,24 @@ package org.apache.spark.streaming.ui

import javax.servlet.http.HttpServletRequest

import scala.collection.mutable.{ArrayBuffer, Map}
import scala.xml.{NodeSeq, Node}

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{JobId, OutputOpId}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData

import scala.xml.{NodeSeq, Node}
private[ui] case class BatchUIData(
var batchInfo: BatchInfo = null,
outputOpIdToSparkJobIds: Map[OutputOpId, ArrayBuffer[SparkJobId]] = Map()) {
}

class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streaminglistener = parent.listener
private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streamingListener = parent.listener
private val sparkListener = parent.ssc.sc.jobProgressListener

private def columns: Seq[Node] = {
Expand All @@ -42,75 +50,101 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<th>Error</th>
}

private def makeOutputOpIdRow(outputOpId: OutputOpId, jobs: Seq[JobUIData]): Seq[Node] = {
val jobDurations = jobs.map(job => {
job.submissionTime.map { start =>
val end = job.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
})
val formattedOutputOpDuration =
if (jobDurations.exists(_ == None)) {
// If any job does not finish, set "formattedOutputOpDuration" to "-"
"-"
} else {
UIUtils.formatDuration(jobDurations.flatMap(x => x).sum)
}
/**
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
* one cell, we use "rowspan" for the first row of a output op.
*/
def generateJobRow(
outputOpId: OutputOpId,
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
val lastStageInfo = Option(sparkJob.stageIds)
.filter(_.nonEmpty)
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
}

def makeJobRow(job: JobUIData, isFirstRow: Boolean): Seq[Node] = {
val lastStageInfo = Option(job.stageIds)
.filter(_.nonEmpty)
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
}
val lastFailureReason =
sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
flatMap(info => info.failureReason).headOption.getOrElse("")
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"

val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
job.submissionTime.map { start =>
val end = job.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
}
val lastFailureReason = job.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
flatMap(info => info.failureReason).headOption.getOrElse("")
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${job.jobId}"
<tr>
{if(isFirstRow) {
<td rowspan={jobs.size.toString}>{outputOpId}</td>
<td rowspan={jobs.size.toString}>
// In the first row, output op id and its information needs to be shown. In other rows, these
// cells will be taken up due to "rowspan".
val prefixCells =
if (isFirstRow) {
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>
<span class="description-input" title={lastStageDescription}>
{lastStageDescription}
</span>{lastStageName}
</td>
<td rowspan={jobs.size.toString}>{formattedOutputOpDuration}</td>}
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
} else {
Nil
}

<tr>
{prefixCells}
<td sorttable_customkey={sparkJob.jobId.toString}>
<a href={detailUrl}>
{sparkJob.jobId}{sparkJob.jobGroup.map(id => s"($id)").getOrElse("")}
</a>
</td>
<td sorttable_customkey={duration.getOrElse(Long.MaxValue).toString}>
{formattedDuration}
</td>
<td class="stage-progress-cell">
{sparkJob.completedStageIndices.size}/{sparkJob.stageIds.size - sparkJob.numSkippedStages}
{if (sparkJob.numFailedStages > 0) s"(${sparkJob.numFailedStages} failed)"}
{if (sparkJob.numSkippedStages > 0) s"(${sparkJob.numSkippedStages} skipped)"}
</td>
<td class="progress-cell">
{
UIUtils.makeProgressBar(
started = sparkJob.numActiveTasks,
completed = sparkJob.numCompletedTasks,
failed = sparkJob.numFailedTasks,
skipped = sparkJob.numSkippedTasks,
total = sparkJob.numTasks - sparkJob.numSkippedTasks)
}
<td sorttable_customkey={job.jobId.toString}>
<a href={detailUrl}>
{job.jobId}{job.jobGroup.map(id => s"($id)").getOrElse("")}
</a>
</td>
<td sorttable_customkey={duration.getOrElse(Long.MaxValue).toString}>
{formattedDuration}
</td>
<td class="stage-progress-cell">
{job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
{if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
{if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
</td>
<td class="progress-cell">
{UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
failed = job.numFailedTasks, skipped = job.numSkippedTasks,
total = job.numTasks - job.numSkippedTasks)}
</td>
{failureReasonCell(lastFailureReason)}
</tr>
}
</td>
{failureReasonCell(lastFailureReason)}
</tr>
}

makeJobRow(jobs.head, true) ++ (jobs.tail.map(job => makeJobRow(job, false)).flatMap(x => x))
private def generateOutputOpIdRow(
outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
val sparkjobDurations = sparkJobs.map(sparkJob => {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
})
val formattedOutputOpDuration =
if (sparkjobDurations.exists(_ == None)) {
// If any job does not finish, set "formattedOutputOpDuration" to "-"
"-"
} else {
UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
}
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
}.flatMap(x => x)
}

private def failureReasonCell(failureReason: String): Seq[Node] = {
Expand Down Expand Up @@ -138,34 +172,41 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
}

private def jobsTable(jobInfos: Seq[(OutputOpId, JobId)]): Seq[Node] = {
def getJobData(jobId: JobId): Option[JobUIData] = {
sparkListener.activeJobs.get(jobId).orElse {
sparkListener.completedJobs.find(_.jobId == jobId).orElse {
sparkListener.failedJobs.find(_.jobId == jobId)
}
private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
sparkListener.activeJobs.get(sparkJobId).orElse {
sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse {
sparkListener.failedJobs.find(_.jobId == sparkJobId)
}
}
}

// Group jobInfos by OutputOpId firstly, then sort them.
// E.g., [(0, 1), (1, 3), (0, 2), (1, 4)] => [(0, [1, 2]), (1, [3, 4])]
val outputOpIdWithJobIds: Seq[(OutputOpId, Seq[JobId])] =
jobInfos.groupBy(_._1).toSeq.sortBy(_._1). // sorted by OutputOpId
/**
* Generate the job table for the batch.
*/
private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
val outputOpIdWithSparkJobIds: Seq[(OutputOpId, Seq[SparkJobId])] = {
batchUIData.outputOpIdToSparkJobIds.toSeq.sortBy(_._1). // sorted by OutputOpId
map { case (outputOpId, jobs) =>
(outputOpId, jobs.map(_._2).sortBy(x => x).toSeq)} // sort JobIds for each OutputOpId
(outputOpId, jobs.sorted.toSeq) // sort JobIds for each OutputOpId
}
}
sparkListener.synchronized {
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = outputOpIdWithJobIds.map {
case (outputOpId, jobIds) =>
// Filter out JobIds that don't exist in sparkListener
(outputOpId, jobIds.flatMap(getJobData))
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = outputOpIdWithSparkJobIds.map {
case (outputOpId, sparkJobIds) =>
// Filter out spark Job ids that don't exist in sparkListener
(outputOpId, sparkJobIds.flatMap(getJobData))
}

<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
<thead>
{columns}
</thead>
<tbody>
{outputOpIdWithJobs.map { case (outputOpId, jobs) => makeOutputOpIdRow(outputOpId, jobs)}}
{
outputOpIdWithJobs.map {
case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
}
}
</tbody>
</table>
}
Expand All @@ -176,13 +217,11 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
throw new IllegalArgumentException(s"Missing id parameter")
}
val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
val (batchInfo, jobInfos) = streaminglistener.synchronized {
val _batchInfo = streaminglistener.getBatchInfo(batchTime).getOrElse {
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
}
val _jobInfos = streaminglistener.getJobInfos(batchTime)
(_batchInfo, _jobInfos)

val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
}
val batchInfo = batchUIData.batchInfo

val formattedSchedulingDelay =
batchInfo.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
Expand All @@ -195,7 +234,7 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<ul class="unstyled">
<li>
<strong>Batch Duration: </strong>
{UIUtils.formatDuration(streaminglistener.batchDuration)}
{UIUtils.formatDuration(streamingListener.batchDuration)}
</li>
<li>
<strong>Input data size: </strong>
Expand All @@ -216,9 +255,15 @@ class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</ul>
</div>

val content = summary ++ jobInfos.map(jobsTable).getOrElse {
<div>Cannot find any job for Batch {formattedBatchTime}</div>
}
val jobTable =
if (batchUIData.outputOpIdToSparkJobIds.isEmpty) {
<div>Cannot find any job for Batch {formattedBatchTime}.</div>
} else {
generateJobTable(batchUIData)
}

val content = summary ++ jobTable

UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import org.apache.spark.util.Distribution
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
extends StreamingListener with SparkListener {

import StreamingJobProgressListener._

private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]
private val completedBatchInfos = new Queue[BatchInfo]
Expand All @@ -45,7 +43,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]

private val batchTimeToJobIds = new HashMap[Time, ArrayBuffer[(OutputOpId, JobId)]]
private val batchTimeToBatchUIData = new HashMap[Time, BatchUIData]

val batchDuration = ssc.graph.batchDuration.milliseconds

Expand Down Expand Up @@ -87,7 +85,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
completedBatchInfos.enqueue(batchCompleted.batchInfo)
if (completedBatchInfos.size > batchInfoLimit) {
val removedBatch = completedBatchInfos.dequeue()
batchTimeToJobIds.remove(removedBatch.batchTime)
batchTimeToBatchUIData.remove(removedBatch.batchTime)
}
totalCompletedBatches += 1L

Expand All @@ -97,8 +95,12 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)

override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
batchTimeToJobIds.getOrElseUpdate(batchTime, ArrayBuffer[(OutputOpId, JobId)]()) +=
outputOpId -> jobStart.jobId
val batchUIData = batchTimeToBatchUIData.getOrElseUpdate(batchTime, BatchUIData())
// Because onJobStart and onBatchXXX messages are processed in different threads,
// we may not be able to get the corresponding BatchInfo now. So here we only set
// batchUIData.outputOpIdToSparkJobIds, batchUIData.batchInfo will be set in "getBatchUIData".
batchUIData.outputOpIdToSparkJobIds.
getOrElseUpdate(outputOpId, ArrayBuffer()) += jobStart.jobId
}
}

Expand Down Expand Up @@ -206,20 +208,24 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
}

def getBatchInfo(batchTime: Time): Option[BatchInfo] = synchronized {
private def getBatchInfo(batchTime: Time): Option[BatchInfo] = {
waitingBatchInfos.get(batchTime).orElse {
runningBatchInfos.get(batchTime).orElse {
completedBatchInfos.find(batch => batch.batchTime == batchTime)
}
}
}

def getJobInfos(batchTime: Time): Option[Seq[(OutputOpId, JobId)]] = synchronized {
batchTimeToJobIds.get(batchTime).map(_.toList)
def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
for (batchInfo <- getBatchInfo(batchTime)) yield {
val batchUIData = batchTimeToBatchUIData.getOrElse(batchTime, BatchUIData(batchInfo))
batchUIData.batchInfo = batchInfo
batchUIData
}
}
}

private[streaming] object StreamingJobProgressListener {
type JobId = Int
type SparkJobId = Int
type OutputOpId = Int
}

0 comments on commit 77a69ae

Please sign in to comment.