Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6862][Streaming][WebUI] Add BatchPage to display details of a batch #5473

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.util.collection.OpenHashSet

import scala.collection.mutable.HashMap

private[jobs] object UIData {
private[spark] object UIData {

class ExecutorSummary {
var taskTime : Long = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ abstract class DStream[T: ClassTag] (
println("Time: " + time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.size > num) println("...")
if (firstNum.length > num) println("...")
println()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,49 @@ import scala.util.Try
*/
private[streaming]
class Job(val time: Time, func: () => _) {
var id: String = _
var result: Try[_] = null
private var _id: String = _
private var _outputOpId: Int = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you define what is id and outputId (especially that output id is unique within the jobset)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments on the def outputOpId: Int and def id: String.

private var isSet = false
private var _result: Try[_] = null

def run() {
result = Try(func())
_result = Try(func())
}

def setId(number: Int) {
id = "streaming job " + time + "." + number
def result: Try[_] = {
if (_result == null) {
throw new IllegalStateException("Cannot access result before job finishes")
}
_result
}

/**
* @return the global unique id of this Job.
*/
def id: String = {
if (!isSet) {
throw new IllegalStateException("Cannot access id before calling setId")
}
_id
}

/**
* @return the output op id of this Job. Each Job has a unique output op id in the same JobSet.
*/
def outputOpId: Int = {
if (!isSet) {
throw new IllegalStateException("Cannot access number before calling setId")
}
_outputOpId
}

def setOutputOpId(outputOpId: Int) {
if (isSet) {
throw new IllegalStateException("Cannot call setOutputOpId more than once")
}
isSet = true
_id = s"streaming job $time.$outputOpId"
_outputOpId = outputOpId
}

override def toString: String = id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,28 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
ssc.waiter.notifyError(e)
}

private class JobHandler(job: Job) extends Runnable {
private class JobHandler(job: Job) extends Runnable with Logging {
def run() {
eventLoop.post(JobStarted(job))
// Disable checks for existing output directories in jobs launched by the streaming scheduler,
// since we may need to write output to an existing directory during checkpoint recovery;
// see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setting and clearing of property needs to be done in a try....finally so that it gets cleared in case of any exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put them to finally.

try {
eventLoop.post(JobStarted(job))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
eventLoop.post(JobCompleted(job))
} finally {
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
}
eventLoop.post(JobCompleted(job))
}
}
}

private[streaming] object JobScheduler {
val BATCH_TIME_PROPERTY_KEY = "spark.streaming.internal.batchTime"
val OUTPUT_OP_ID_PROPERTY_KEY = "spark.streaming.internal.outputOpId"
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ case class JobSet(
private var processingStartTime = -1L // when the first job of this jobset started processing
private var processingEndTime = -1L // when the last job of this jobset finished processing

jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
jobs.zipWithIndex.foreach { case (job, i) => job.setOutputOpId(i) }
incompleteJobs ++= jobs

def handleJobStart(job: Job) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.streaming.ui

import scala.xml.Node

import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.ui.UIUtils

private[ui] abstract class BatchTableBase(tableId: String) {
Expand All @@ -31,18 +30,20 @@ private[ui] abstract class BatchTableBase(tableId: String) {
<th>Processing Time</th>
}

protected def baseRow(batch: BatchInfo): Seq[Node] = {
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds)
val eventCount = batch.receivedBlockInfo.values.map {
receivers => receivers.map(_.numRecords).sum
}.sum
val eventCount = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
val formattedProcessingTime = processingTime.map(UIUtils.formatDuration).getOrElse("-")

<td sorttable_customkey={batchTime.toString}>{formattedBatchTime}</td>
<td sorttable_customkey={batchTime.toString}>
<a href={s"batch?id=$batchTime"}>
{formattedBatchTime}
</a>
</td>
<td sorttable_customkey={eventCount.toString}>{eventCount.toString} events</td>
<td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
{formattedSchedulingDelay}
Expand Down Expand Up @@ -73,8 +74,9 @@ private[ui] abstract class BatchTableBase(tableId: String) {
protected def renderRows: Seq[Node]
}

private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatches: Seq[BatchInfo])
extends BatchTableBase("active-batches-table") {
private[ui] class ActiveBatchTable(
runningBatches: Seq[BatchUIData],
waitingBatches: Seq[BatchUIData]) extends BatchTableBase("active-batches-table") {

override protected def columns: Seq[Node] = super.columns ++ <th>Status</th>

Expand All @@ -85,16 +87,16 @@ private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatche
runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
}

private def runningBatchRow(batch: BatchInfo): Seq[Node] = {
private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ <td>processing</td>
}

private def waitingBatchRow(batch: BatchInfo): Seq[Node] = {
private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ <td>queued</td>
}
}

private[ui] class CompletedBatchTable(batches: Seq[BatchInfo])
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData])
extends BatchTableBase("completed-batches-table") {

override protected def columns: Seq[Node] = super.columns ++ <th>Total Delay</th>
Expand All @@ -103,7 +105,7 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchInfo])
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
}

private def completedBatchRow(batch: BatchInfo): Seq[Node] = {
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(UIUtils.formatDuration).getOrElse("-")
baseRow(batch) ++
Expand Down
Loading