Skip to content

Commit

Permalink
Add BatchPage to display details of a batch
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 11, 2015
1 parent 3ceb810 commit 0c7b2eb
Show file tree
Hide file tree
Showing 10 changed files with 352 additions and 19 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.util.collection.OpenHashSet

import scala.collection.mutable.HashMap

private[jobs] object UIData {
private[spark] object UIData {

class ExecutorSummary {
var taskTime : Long = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ abstract class DStream[T: ClassTag] (
println("Time: " + time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.size > num) println("...")
if (firstNum.length > num) println("...")
println()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,11 @@ case class BatchInfo(
*/
def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay)
.map(x => x._1 + x._2).headOption

/**
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = receivedBlockInfo.map { case (_, infos) =>
infos.map(_.numRecords).sum
}.sum
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,43 @@ import scala.util.Try
*/
private[streaming]
class Job(val time: Time, func: () => _) {
var id: String = _
var result: Try[_] = null
private var _id: String = _
private var _outputOpId: Int = _
private var isSet = false
private var _result: Try[_] = null

def run() {
result = Try(func())
_result = Try(func())
}

def setId(number: Int) {
id = "streaming job " + time + "." + number
def result: Try[_] = {
if (_result == null) {
throw new IllegalStateException("Cannot access result before job finishes")
}
_result
}

def id: String = {
if (!isSet) {
throw new IllegalStateException("Cannot access id before calling setId")
}
_id
}

def outputOpId: Int = {
if (!isSet) {
throw new IllegalStateException("Cannot access number before calling setId")
}
_outputOpId
}

def setOutputOpId(outputOpId: Int) {
if (isSet) {
throw new IllegalStateException("Cannot call setOutputOpId more than once")
}
isSet = true
_id = "streaming job " + time + "." + outputOpId
_outputOpId = outputOpId
}

override def toString: String = id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,25 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
ssc.waiter.notifyError(e)
}

private class JobHandler(job: Job) extends Runnable {
private class JobHandler(job: Job) extends Runnable with Logging {
def run() {
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
eventActor ! JobStarted(job)
// Disable checks for existing output directories in jobs launched by the streaming scheduler,
// since we may need to write output to an existing directory during checkpoint recovery;
// see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
eventActor ! JobCompleted(job)
}
}
}

private[streaming] object JobScheduler {
private[streaming] val BATCH_TIME_PROPERTY_KEY = "spark.streaming.internal.batchTime"
private[streaming] val OUTPUT_OP_ID_PROPERTY_KEY = "spark.streaming.internal.outputOpId"
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ case class JobSet(
private var processingStartTime = -1L // when the first job of this jobset started processing
private var processingEndTime = -1L // when the last job of this jobset finished processing

jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
jobs.zipWithIndex.foreach { case (job, i) => job.setOutputOpId(i) }
incompleteJobs ++= jobs

def handleJobStart(job: Job) {
Expand Down
224 changes: 224 additions & 0 deletions streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.ui

import javax.servlet.http.HttpServletRequest

import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.streaming.Time
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{JobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData

import scala.xml.{NodeSeq, Node}

class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streaminglistener = parent.listener
private val sparkListener = parent.ssc.sc.jobProgressListener

private def columns: Seq[Node] = {
<th>Output Op Id</th>
<th>Description</th>
<th>Duration</th>
<th>Job Id</th>
<th>Duration</th>
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
<th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
<th>Last Error</th>
}

private def makeOutputOpIdRow(outputOpId: OutputOpId, jobs: Seq[JobUIData]): Seq[Node] = {
val jobDurations = jobs.map(job => {
job.submissionTime.map { start =>
val end = job.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
})
val formattedOutputOpDuration =
if (jobDurations.exists(_ == None)) {
// If any job does not finish, set "formattedOutputOpDuration" to "-"
"-"
} else {
UIUtils.formatDuration(jobDurations.flatMap(x => x).sum)
}

def makeJobRow(job: JobUIData, isFirstRow: Boolean): Seq[Node] = {
val lastStageInfo = Option(job.stageIds)
.filter(_.nonEmpty)
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
}

val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
job.submissionTime.map { start =>
val end = job.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
}
val lastFailureReason = job.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
flatMap(info => info.failureReason).headOption.getOrElse("")
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${job.jobId}"
<tr>
{if(isFirstRow) {
<td rowspan={jobs.size.toString}>{outputOpId}</td>
<td rowspan={jobs.size.toString}>
<span class="description-input" title={lastStageDescription}>
{lastStageDescription}
</span>{lastStageName}
</td>
<td rowspan={jobs.size.toString}>{formattedOutputOpDuration}</td>}
}
<td sorttable_customkey={job.jobId.toString}>
<a href={detailUrl}>
{job.jobId}{job.jobGroup.map(id => s"($id)").getOrElse("")}
</a>
</td>
<td sorttable_customkey={duration.getOrElse(Long.MaxValue).toString}>
{formattedDuration}
</td>
<td class="stage-progress-cell">
{job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
{if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
{if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
</td>
<td class="progress-cell">
{UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
failed = job.numFailedTasks, skipped = job.numSkippedTasks,
total = job.numTasks - job.numSkippedTasks)}
</td>
{failureReasonCell(lastFailureReason)}
</tr>
}

makeJobRow(jobs.head, true) ++ (jobs.tail.map(job => makeJobRow(job, false)).flatMap(x => x))
}

private def failureReasonCell(failureReason: String): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
failureReason.substring(0, failureReason.indexOf('\n'))
} else {
failureReason
})
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{failureReason}</pre>
</div>
// scalastyle:on
} else {
""
}
<td valign="middle">{failureReasonSummary}{details}</td>
}

private def jobsTable(jobInfos: Seq[(OutputOpId, JobId)]): Seq[Node] = {
def getJobData(jobId: JobId): Option[JobUIData] = {
sparkListener.activeJobs.get(jobId).orElse {
sparkListener.completedJobs.find(_.jobId == jobId).orElse {
sparkListener.failedJobs.find(_.jobId == jobId)
}
}
}

// Group jobInfos by OutputOpId firstly, then sort them.
// E.g., [(0, 1), (1, 3), (0, 2), (1, 4)] => [(0, [1, 2]), (1, [3, 4])]
val outputOpIdWithJobIds: Seq[(OutputOpId, Seq[JobId])] =
jobInfos.groupBy(_._1).toSeq.sortBy(_._1). // sorted by OutputOpId
map { case (outputOpId, jobs) =>
(outputOpId, jobs.map(_._2).sortBy(x => x).toSeq)} // sort JobIds for each OutputOpId
sparkListener.synchronized {
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = outputOpIdWithJobIds.map {
case (outputOpId, jobIds) =>
// Filter out JobIds that don't exist in sparkListener
(outputOpId, jobIds.flatMap(getJobData))
}

<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
<thead>
{columns}
</thead>
<tbody>
{outputOpIdWithJobs.map { case (outputOpId, jobs) => makeOutputOpIdRow(outputOpId, jobs)}}
</tbody>
</table>
}
}

def render(request: HttpServletRequest): Seq[Node] = {
val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
throw new IllegalArgumentException(s"Missing id parameter")
}
val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
val (batchInfo, jobInfos) = streaminglistener.synchronized {
val _batchInfo = streaminglistener.getBatchInfo(batchTime).getOrElse {
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
}
val _jobInfos = streaminglistener.getJobInfos(batchTime)
(_batchInfo, _jobInfos)
}

val formattedSchedulingDelay =
batchInfo.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
val formattedProcessingTime =
batchInfo.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
val formattedTotalDelay = batchInfo.totalDelay.map(UIUtils.formatDuration).getOrElse("-")

val summary: NodeSeq =
<div>
<ul class="unstyled">
<li>
<strong>Batch Duration: </strong>
{UIUtils.formatDuration(streaminglistener.batchDuration)}
</li>
<li>
<strong>Input data size: </strong>
{batchInfo.numRecords} records
</li>
<li>
<strong>Scheduling delay: </strong>
{formattedSchedulingDelay} records
</li>
<li>
<strong>Processing time: </strong>
{formattedProcessingTime}
</li>
<li>
<strong>Total delay: </strong>
{formattedTotalDelay} records
</li>
</ul>
</div>

val content = summary ++ jobInfos.map(jobsTable).getOrElse {
<div>Cannot find any job for Batch {formattedBatchTime}</div>
}
UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
}
}
Loading

0 comments on commit 0c7b2eb

Please sign in to comment.