diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 711a3697bda15..935c8a4f80e7b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -24,7 +24,7 @@ import org.apache.spark.util.collection.OpenHashSet
import scala.collection.mutable.HashMap
-private[jobs] object UIData {
+private[spark] object UIData {
class ExecutorSummary {
var taskTime : Long = 0
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 24f99a2b929f5..83d41f5762444 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -626,7 +626,7 @@ abstract class DStream[T: ClassTag] (
println("Time: " + time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
- if (firstNum.size > num) println("...")
+ if (firstNum.length > num) println("...")
println()
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 92dc113f397ca..9d21be50a566f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -58,4 +58,11 @@ case class BatchInfo(
*/
def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay)
.map(x => x._1 + x._2).headOption
+
+ /**
+ * The number of recorders received by the receivers in this batch.
+ */
+ def numRecords: Long = receivedBlockInfo.map { case (_, infos) =>
+ infos.map(_.numRecords).sum
+ }.sum
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 30cf87f5b7dd1..064315177e06f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -25,15 +25,43 @@ import scala.util.Try
*/
private[streaming]
class Job(val time: Time, func: () => _) {
- var id: String = _
- var result: Try[_] = null
+ private var _id: String = _
+ private var _outputOpId: Int = _
+ private var isSet = false
+ private var _result: Try[_] = null
def run() {
- result = Try(func())
+ _result = Try(func())
}
- def setId(number: Int) {
- id = "streaming job " + time + "." + number
+ def result: Try[_] = {
+ if (_result == null) {
+ throw new IllegalStateException("Cannot access result before job finishes")
+ }
+ _result
+ }
+
+ def id: String = {
+ if (!isSet) {
+ throw new IllegalStateException("Cannot access id before calling setId")
+ }
+ _id
+ }
+
+ def outputOpId: Int = {
+ if (!isSet) {
+ throw new IllegalStateException("Cannot access number before calling setId")
+ }
+ _outputOpId
+ }
+
+ def setOutputOpId(outputOpId: Int) {
+ if (isSet) {
+ throw new IllegalStateException("Cannot call setOutputOpId more than once")
+ }
+ isSet = true
+ _id = "streaming job " + time + "." + outputOpId
+ _outputOpId = outputOpId
}
override def toString: String = id
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 95f1857b4c377..fa4daa9ade385 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -170,8 +170,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
ssc.waiter.notifyError(e)
}
- private class JobHandler(job: Job) extends Runnable {
+ private class JobHandler(job: Job) extends Runnable with Logging {
def run() {
+ ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
+ ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
eventActor ! JobStarted(job)
// Disable checks for existing output directories in jobs launched by the streaming scheduler,
// since we may need to write output to an existing directory during checkpoint recovery;
@@ -179,7 +181,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
+ ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
+ ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
eventActor ! JobCompleted(job)
}
}
}
+
+private[streaming] object JobScheduler {
+ private[streaming] val BATCH_TIME_PROPERTY_KEY = "spark.streaming.internal.batchTime"
+ private[streaming] val OUTPUT_OP_ID_PROPERTY_KEY = "spark.streaming.internal.outputOpId"
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 5b134877d0b2d..24b3794236ea5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -35,7 +35,7 @@ case class JobSet(
private var processingStartTime = -1L // when the first job of this jobset started processing
private var processingEndTime = -1L // when the last job of this jobset finished processing
- jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
+ jobs.zipWithIndex.foreach { case (job, i) => job.setOutputOpId(i) }
incompleteJobs ++= jobs
def handleJobStart(job: Job) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
new file mode 100644
index 0000000000000..a539fdb9d6b16
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.spark.streaming.Time
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.streaming.ui.StreamingJobProgressListener.{JobId, OutputOpId}
+import org.apache.spark.ui.jobs.UIData.JobUIData
+
+import scala.xml.{NodeSeq, Node}
+
+class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
+ private val streaminglistener = parent.listener
+ private val sparkListener = parent.ssc.sc.jobProgressListener
+
+ private def columns: Seq[Node] = {
+
Output Op Id |
+ Description |
+ Duration |
+ Job Id |
+ Duration |
+ Stages: Succeeded/Total |
+ Tasks (for all stages): Succeeded/Total |
+ Last Error |
+ }
+
+ private def makeOutputOpIdRow(outputOpId: OutputOpId, jobs: Seq[JobUIData]): Seq[Node] = {
+ val jobDurations = jobs.map(job => {
+ job.submissionTime.map { start =>
+ val end = job.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ })
+ val formattedOutputOpDuration =
+ if (jobDurations.exists(_ == None)) {
+ // If any job does not finish, set "formattedOutputOpDuration" to "-"
+ "-"
+ } else {
+ UIUtils.formatDuration(jobDurations.flatMap(x => x).sum)
+ }
+
+ def makeJobRow(job: JobUIData, isFirstRow: Boolean): Seq[Node] = {
+ val lastStageInfo = Option(job.stageIds)
+ .filter(_.nonEmpty)
+ .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
+ val lastStageData = lastStageInfo.flatMap { s =>
+ sparkListener.stageIdToData.get((s.stageId, s.attemptId))
+ }
+
+ val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+ val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
+ val duration: Option[Long] = {
+ job.submissionTime.map { start =>
+ val end = job.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ }
+ val lastFailureReason = job.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
+ dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
+ flatMap(info => info.failureReason).headOption.getOrElse("")
+ val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
+ val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${job.jobId}"
+
+ {if(isFirstRow) {
+ {outputOpId} |
+
+
+ {lastStageDescription}
+ {lastStageName}
+ |
+ {formattedOutputOpDuration} | }
+ }
+
+
+ {job.jobId}{job.jobGroup.map(id => s"($id)").getOrElse("")}
+
+ |
+
+ {formattedDuration}
+ |
+
+ {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
+ {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
+ {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
+ |
+
+ {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
+ failed = job.numFailedTasks, skipped = job.numSkippedTasks,
+ total = job.numTasks - job.numSkippedTasks)}
+ |
+ {failureReasonCell(lastFailureReason)}
+
+ }
+
+ makeJobRow(jobs.head, true) ++ (jobs.tail.map(job => makeJobRow(job, false)).flatMap(x => x))
+ }
+
+ private def failureReasonCell(failureReason: String): Seq[Node] = {
+ val isMultiline = failureReason.indexOf('\n') >= 0
+ // Display the first line by default
+ val failureReasonSummary = StringEscapeUtils.escapeHtml4(
+ if (isMultiline) {
+ failureReason.substring(0, failureReason.indexOf('\n'))
+ } else {
+ failureReason
+ })
+ val details = if (isMultiline) {
+ // scalastyle:off
+
+ +details
+ ++
+
+ // scalastyle:on
+ } else {
+ ""
+ }
+ {failureReasonSummary}{details} |
+ }
+
+ private def jobsTable(jobInfos: Seq[(OutputOpId, JobId)]): Seq[Node] = {
+ def getJobData(jobId: JobId): Option[JobUIData] = {
+ sparkListener.activeJobs.get(jobId).orElse {
+ sparkListener.completedJobs.find(_.jobId == jobId).orElse {
+ sparkListener.failedJobs.find(_.jobId == jobId)
+ }
+ }
+ }
+
+ // Group jobInfos by OutputOpId firstly, then sort them.
+ // E.g., [(0, 1), (1, 3), (0, 2), (1, 4)] => [(0, [1, 2]), (1, [3, 4])]
+ val outputOpIdWithJobIds: Seq[(OutputOpId, Seq[JobId])] =
+ jobInfos.groupBy(_._1).toSeq.sortBy(_._1). // sorted by OutputOpId
+ map { case (outputOpId, jobs) =>
+ (outputOpId, jobs.map(_._2).sortBy(x => x).toSeq)} // sort JobIds for each OutputOpId
+ sparkListener.synchronized {
+ val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = outputOpIdWithJobIds.map {
+ case (outputOpId, jobIds) =>
+ // Filter out JobIds that don't exist in sparkListener
+ (outputOpId, jobIds.flatMap(getJobData))
+ }
+
+
+
+ {columns}
+
+
+ {outputOpIdWithJobs.map { case (outputOpId, jobs) => makeOutputOpIdRow(outputOpId, jobs)}}
+
+
+ }
+ }
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
+ throw new IllegalArgumentException(s"Missing id parameter")
+ }
+ val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
+ val (batchInfo, jobInfos) = streaminglistener.synchronized {
+ val _batchInfo = streaminglistener.getBatchInfo(batchTime).getOrElse {
+ throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
+ }
+ val _jobInfos = streaminglistener.getJobInfos(batchTime)
+ (_batchInfo, _jobInfos)
+ }
+
+ val formattedSchedulingDelay =
+ batchInfo.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
+ val formattedProcessingTime =
+ batchInfo.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
+ val formattedTotalDelay = batchInfo.totalDelay.map(UIUtils.formatDuration).getOrElse("-")
+
+ val summary: NodeSeq =
+
+
+ -
+ Batch Duration:
+ {UIUtils.formatDuration(streaminglistener.batchDuration)}
+
+ -
+ Input data size:
+ {batchInfo.numRecords} records
+
+ -
+ Scheduling delay:
+ {formattedSchedulingDelay} records
+
+ -
+ Processing time:
+ {formattedProcessingTime}
+
+ -
+ Total delay:
+ {formattedTotalDelay} records
+
+
+
+
+ val content = summary ++ jobInfos.map(jobsTable).getOrElse {
+ Cannot find any job for Batch {formattedBatchTime}
+ }
+ UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 84f80e638f638..33bf21ee9e162 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -17,8 +17,11 @@
package org.apache.spark.streaming.ui
-import scala.collection.mutable.{Queue, HashMap}
+import java.util.Properties
+import scala.collection.mutable.{ArrayBuffer, Queue, HashMap}
+
+import org.apache.spark.scheduler._
import org.apache.spark.streaming.{Time, StreamingContext}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
@@ -29,7 +32,9 @@ import org.apache.spark.util.Distribution
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
- extends StreamingListener {
+ extends StreamingListener with SparkListener {
+
+ import StreamingJobProgressListener._
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]
@@ -40,6 +45,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
+ private val batchTimeToJobIds = new HashMap[Time, ArrayBuffer[(OutputOpId, JobId)]]
+
val batchDuration = ssc.graph.batchDuration.milliseconds
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
@@ -70,9 +77,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
- batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
- totalReceivedRecords += infos.map(_.numRecords).sum
- }
+ totalReceivedRecords += batchStarted.batchInfo.numRecords
}
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
@@ -80,12 +85,32 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
completedBatchInfos.enqueue(batchCompleted.batchInfo)
- if (completedBatchInfos.size > batchInfoLimit) completedBatchInfos.dequeue()
+ if (completedBatchInfos.size > batchInfoLimit) {
+ val removedBatch = completedBatchInfos.dequeue()
+ batchTimeToJobIds.remove(removedBatch.batchTime)
+ }
totalCompletedBatches += 1L
- batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
- totalProcessedRecords += infos.map(_.numRecords).sum
- }
+ totalProcessedRecords += batchCompleted.batchInfo.numRecords
+ }
+ }
+
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
+ getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
+ batchTimeToJobIds.getOrElseUpdate(batchTime, ArrayBuffer[(OutputOpId, JobId)]()) +=
+ outputOpId -> jobStart.jobId
+ }
+ }
+
+ private def getBatchTimeAndOutputOpId(properties: Properties): Option[(Time, Int)] = {
+ val batchTime = properties.getProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY)
+ if (batchTime == null) {
+ // Not submitted from JobScheduler
+ None
+ } else {
+ val outputOpId = properties.getProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY)
+ assert(outputOpId != null)
+ Some(Time(batchTime.toLong) -> outputOpId.toInt)
}
}
@@ -180,4 +205,21 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
}
+
+ def getBatchInfo(batchTime: Time): Option[BatchInfo] = synchronized {
+ waitingBatchInfos.get(batchTime).orElse {
+ runningBatchInfos.get(batchTime).orElse {
+ completedBatchInfos.find(batch => batch.batchTime == batchTime)
+ }
+ }
+ }
+
+ def getJobInfos(batchTime: Time): Option[Seq[(OutputOpId, JobId)]] = synchronized {
+ batchTimeToJobIds.get(batchTime).map(_.toList)
+ }
+}
+
+private[streaming] object StreamingJobProgressListener {
+ type JobId = Int
+ type OutputOpId = Int
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index 9a860ea4a6c68..e4039639adbad 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -27,14 +27,16 @@ import StreamingTab._
* Spark Web UI tab that shows statistics of a streaming job.
* This assumes the given SparkContext has enabled its SparkUI.
*/
-private[spark] class StreamingTab(ssc: StreamingContext)
+private[spark] class StreamingTab(val ssc: StreamingContext)
extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {
val parent = getSparkUI(ssc)
val listener = ssc.progressListener
ssc.addStreamingListener(listener)
+ ssc.sc.addSparkListener(listener)
attachPage(new StreamingPage(this))
+ attachPage(new BatchPage(this))
parent.attachTab(this)
def detach() {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 998426ebb82e5..8f5fa8c672f33 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.streaming
+import scala.collection.mutable.Queue
+
import org.openqa.selenium.WebDriver
import org.openqa.selenium.htmlunit.HtmlUnitDriver
import org.scalatest._
@@ -60,8 +62,17 @@ class UISeleniumSuite
ssc
}
+ private def setupStreams(ssc: StreamingContext): Unit = {
+ val rdds = Queue(ssc.sc.parallelize(1 to 4, 4))
+ val inputStream = ssc.queueStream(rdds)
+ inputStream.foreachRDD(rdd => rdd.foreach(_ => {}))
+ }
+
test("attaching and detaching a Streaming tab") {
withStreamingContext(newSparkStreamingContext()) { ssc =>
+ setupStreams(ssc)
+ ssc.start()
+
val sparkUI = ssc.sparkContext.ui.get
eventually(timeout(10 seconds), interval(50 milliseconds)) {
@@ -75,6 +86,16 @@ class UISeleniumSuite
val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
statisticText should contain("Network receivers:")
statisticText should contain("Batch interval:")
+
+ // TODO add tests once SPARK-6796 is merged
+
+ // Check a batch page without id
+ go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming/batch/")
+ webDriver.getPageSource should include ("Missing id parameter")
+
+ // Check a non-exist batch
+ go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming/batch/?id=12345")
+ webDriver.getPageSource should include ("does not exist")
}
ssc.stop(false)