-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-6862][Streaming][WebUI] Add BatchPage to display details of a batch #5473
Changes from 1 commit
0c7b2eb
fc98a43
0b226f9
7168807
4bf66b6
15bdf9b
35ffd80
77a69ae
1282b10
72f8e7e
cb62e4f
087ba98
9a3083d
b380cfb
0727d35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,15 +25,43 @@ import scala.util.Try | |
*/ | ||
private[streaming] | ||
class Job(val time: Time, func: () => _) { | ||
var id: String = _ | ||
var result: Try[_] = null | ||
private var _id: String = _ | ||
private var _outputOpId: Int = _ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you define what is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comments on the |
||
private var isSet = false | ||
private var _result: Try[_] = null | ||
|
||
def run() { | ||
result = Try(func()) | ||
_result = Try(func()) | ||
} | ||
|
||
def setId(number: Int) { | ||
id = "streaming job " + time + "." + number | ||
def result: Try[_] = { | ||
if (_result == null) { | ||
throw new IllegalStateException("Cannot access result before job finishes") | ||
} | ||
_result | ||
} | ||
|
||
def id: String = { | ||
if (!isSet) { | ||
throw new IllegalStateException("Cannot access id before calling setId") | ||
} | ||
_id | ||
} | ||
|
||
def outputOpId: Int = { | ||
if (!isSet) { | ||
throw new IllegalStateException("Cannot access number before calling setId") | ||
} | ||
_outputOpId | ||
} | ||
|
||
def setOutputOpId(outputOpId: Int) { | ||
if (isSet) { | ||
throw new IllegalStateException("Cannot call setOutputOpId more than once") | ||
} | ||
isSet = true | ||
_id = "streaming job " + time + "." + outputOpId | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use string interpolation. |
||
_outputOpId = outputOpId | ||
} | ||
|
||
override def toString: String = id | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -170,16 +170,25 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { | |
ssc.waiter.notifyError(e) | ||
} | ||
|
||
private class JobHandler(job: Job) extends Runnable { | ||
private class JobHandler(job: Job) extends Runnable with Logging { | ||
def run() { | ||
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) | ||
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This setting and clearing of property needs to be done in a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put them to |
||
eventActor ! JobStarted(job) | ||
// Disable checks for existing output directories in jobs launched by the streaming scheduler, | ||
// since we may need to write output to an existing directory during checkpoint recovery; | ||
// see SPARK-4835 for more details. | ||
PairRDDFunctions.disableOutputSpecValidation.withValue(true) { | ||
job.run() | ||
} | ||
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) | ||
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) | ||
eventActor ! JobCompleted(job) | ||
} | ||
} | ||
} | ||
|
||
private[streaming] object JobScheduler { | ||
private[streaming] val BATCH_TIME_PROPERTY_KEY = "spark.streaming.internal.batchTime" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the object private[streaming] then all the field are already private[streaming], isnt it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. I just removed |
||
private[streaming] val OUTPUT_OP_ID_PROPERTY_KEY = "spark.streaming.internal.outputOpId" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.streaming.ui | ||
|
||
import javax.servlet.http.HttpServletRequest | ||
|
||
import org.apache.commons.lang3.StringEscapeUtils | ||
import org.apache.spark.streaming.Time | ||
import org.apache.spark.ui.{UIUtils, WebUIPage} | ||
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{JobId, OutputOpId} | ||
import org.apache.spark.ui.jobs.UIData.JobUIData | ||
|
||
import scala.xml.{NodeSeq, Node} | ||
|
||
class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { | ||
private val streaminglistener = parent.listener | ||
private val sparkListener = parent.ssc.sc.jobProgressListener | ||
|
||
private def columns: Seq[Node] = { | ||
<th>Output Op Id</th> | ||
<th>Description</th> | ||
<th>Duration</th> | ||
<th>Job Id</th> | ||
<th>Duration</th> | ||
<th class="sorttable_nosort">Stages: Succeeded/Total</th> | ||
<th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th> | ||
<th>Last Error</th> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking |
||
} | ||
|
||
private def makeOutputOpIdRow(outputOpId: OutputOpId, jobs: Seq[JobUIData]): Seq[Node] = { | ||
val jobDurations = jobs.map(job => { | ||
job.submissionTime.map { start => | ||
val end = job.completionTime.getOrElse(System.currentTimeMillis()) | ||
end - start | ||
} | ||
}) | ||
val formattedOutputOpDuration = | ||
if (jobDurations.exists(_ == None)) { | ||
// If any job does not finish, set "formattedOutputOpDuration" to "-" | ||
"-" | ||
} else { | ||
UIUtils.formatDuration(jobDurations.flatMap(x => x).sum) | ||
} | ||
|
||
def makeJobRow(job: JobUIData, isFirstRow: Boolean): Seq[Node] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The nesting too deep in this inner function. Its really hard to follow the control flow. Please move this |
||
val lastStageInfo = Option(job.stageIds) | ||
.filter(_.nonEmpty) | ||
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) } | ||
val lastStageData = lastStageInfo.flatMap { s => | ||
sparkListener.stageIdToData.get((s.stageId, s.attemptId)) | ||
} | ||
|
||
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") | ||
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("") | ||
val duration: Option[Long] = { | ||
job.submissionTime.map { start => | ||
val end = job.completionTime.getOrElse(System.currentTimeMillis()) | ||
end - start | ||
} | ||
} | ||
val lastFailureReason = job.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this is a lot of chaining and hard to follow. please break it down. |
||
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure | ||
flatMap(info => info.failureReason).headOption.getOrElse("") | ||
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-") | ||
val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${job.jobId}" | ||
<tr> | ||
{if(isFirstRow) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The formatting is weird here and really hard to follow. I cant even figure out where the |
||
<td rowspan={jobs.size.toString}>{outputOpId}</td> | ||
<td rowspan={jobs.size.toString}> | ||
<span class="description-input" title={lastStageDescription}> | ||
{lastStageDescription} | ||
</span>{lastStageName} | ||
</td> | ||
<td rowspan={jobs.size.toString}>{formattedOutputOpDuration}</td>} | ||
} | ||
<td sorttable_customkey={job.jobId.toString}> | ||
<a href={detailUrl}> | ||
{job.jobId}{job.jobGroup.map(id => s"($id)").getOrElse("")} | ||
</a> | ||
</td> | ||
<td sorttable_customkey={duration.getOrElse(Long.MaxValue).toString}> | ||
{formattedDuration} | ||
</td> | ||
<td class="stage-progress-cell"> | ||
{job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages} | ||
{if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"} | ||
{if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"} | ||
</td> | ||
<td class="progress-cell"> | ||
{UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks, | ||
failed = job.numFailedTasks, skipped = job.numSkippedTasks, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indent scala code correctly. |
||
total = job.numTasks - job.numSkippedTasks)} | ||
</td> | ||
{failureReasonCell(lastFailureReason)} | ||
</tr> | ||
} | ||
|
||
makeJobRow(jobs.head, true) ++ (jobs.tail.map(job => makeJobRow(job, false)).flatMap(x => x)) | ||
} | ||
|
||
private def failureReasonCell(failureReason: String): Seq[Node] = { | ||
val isMultiline = failureReason.indexOf('\n') >= 0 | ||
// Display the first line by default | ||
val failureReasonSummary = StringEscapeUtils.escapeHtml4( | ||
if (isMultiline) { | ||
failureReason.substring(0, failureReason.indexOf('\n')) | ||
} else { | ||
failureReason | ||
}) | ||
val details = if (isMultiline) { | ||
// scalastyle:off | ||
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" | ||
class="expand-details"> | ||
+details | ||
</span> ++ | ||
<div class="stacktrace-details collapsed"> | ||
<pre>{failureReason}</pre> | ||
</div> | ||
// scalastyle:on | ||
} else { | ||
"" | ||
} | ||
<td valign="middle">{failureReasonSummary}{details}</td> | ||
} | ||
|
||
private def jobsTable(jobInfos: Seq[(OutputOpId, JobId)]): Seq[Node] = { | ||
def getJobData(jobId: JobId): Option[JobUIData] = { | ||
sparkListener.activeJobs.get(jobId).orElse { | ||
sparkListener.completedJobs.find(_.jobId == jobId).orElse { | ||
sparkListener.failedJobs.find(_.jobId == jobId) | ||
} | ||
} | ||
} | ||
|
||
// Group jobInfos by OutputOpId firstly, then sort them. | ||
// E.g., [(0, 1), (1, 3), (0, 2), (1, 4)] => [(0, [1, 2]), (1, [3, 4])] | ||
val outputOpIdWithJobIds: Seq[(OutputOpId, Seq[JobId])] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put this multi-line into a |
||
jobInfos.groupBy(_._1).toSeq.sortBy(_._1). // sorted by OutputOpId | ||
map { case (outputOpId, jobs) => | ||
(outputOpId, jobs.map(_._2).sortBy(x => x).toSeq)} // sort JobIds for each OutputOpId | ||
sparkListener.synchronized { | ||
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = outputOpIdWithJobIds.map { | ||
case (outputOpId, jobIds) => | ||
// Filter out JobIds that don't exist in sparkListener | ||
(outputOpId, jobIds.flatMap(getJobData)) | ||
} | ||
|
||
<table id="batch-job-table" class="table table-bordered table-striped table-condensed"> | ||
<thead> | ||
{columns} | ||
</thead> | ||
<tbody> | ||
{outputOpIdWithJobs.map { case (outputOpId, jobs) => makeOutputOpIdRow(outputOpId, jobs)}} | ||
</tbody> | ||
</table> | ||
} | ||
} | ||
|
||
def render(request: HttpServletRequest): Seq[Node] = { | ||
val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse { | ||
throw new IllegalArgumentException(s"Missing id parameter") | ||
} | ||
val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds) | ||
val (batchInfo, jobInfos) = streaminglistener.synchronized { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think its quite awkward and unintuitive that a code outside StreamingJobProgressListener is synchronizing on the same object as the internals of StreamingJobProgressListener. Its best to move this code inside to atomically get everything. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, this is not job infos! This is only the outputId to jobId mapping. That was very confusing as in the Spark UI stuff there is StageInfo, JobUIData, StageUIData, and all sorts of classes which actually contain info. |
||
val _batchInfo = streaminglistener.getBatchInfo(batchTime).getOrElse { | ||
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist") | ||
} | ||
val _jobInfos = streaminglistener.getJobInfos(batchTime) | ||
(_batchInfo, _jobInfos) | ||
} | ||
|
||
val formattedSchedulingDelay = | ||
batchInfo.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-") | ||
val formattedProcessingTime = | ||
batchInfo.processingDelay.map(UIUtils.formatDuration).getOrElse("-") | ||
val formattedTotalDelay = batchInfo.totalDelay.map(UIUtils.formatDuration).getOrElse("-") | ||
|
||
val summary: NodeSeq = | ||
<div> | ||
<ul class="unstyled"> | ||
<li> | ||
<strong>Batch Duration: </strong> | ||
{UIUtils.formatDuration(streaminglistener.batchDuration)} | ||
</li> | ||
<li> | ||
<strong>Input data size: </strong> | ||
{batchInfo.numRecords} records | ||
</li> | ||
<li> | ||
<strong>Scheduling delay: </strong> | ||
{formattedSchedulingDelay} records | ||
</li> | ||
<li> | ||
<strong>Processing time: </strong> | ||
{formattedProcessingTime} | ||
</li> | ||
<li> | ||
<strong>Total delay: </strong> | ||
{formattedTotalDelay} records | ||
</li> | ||
</ul> | ||
</div> | ||
|
||
val content = summary ++ jobInfos.map(jobsTable).getOrElse { | ||
<div>Cannot find any job for Batch {formattedBatchTime}</div> | ||
} | ||
UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on exposing this.