diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala
index b574cb103f766..545c5cb8e3f61 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala
@@ -29,14 +29,17 @@ import org.apache.spark.streaming.scheduler._
import org.apache.spark.ui.{ServerInfo, SparkUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{Distribution, Utils}
-import java.util.Locale
+import java.util.{Calendar, Locale}
-private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListener {
+private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingListener {
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]
private val completedaBatchInfos = new Queue[BatchInfo]
- private val batchInfoLimit = conf.getInt("spark.steaming.ui.maxBatches", 100)
+ private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100)
+ private var totalBatchesCompleted = 0L
+
+ val batchDuration = ssc.graph.batchDuration.milliseconds
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
@@ -52,6 +55,11 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe
runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
+ totalBatchesCompleted += 1L
+ }
+
+ def numTotalBatchesCompleted: Long = synchronized {
+ totalBatchesCompleted
}
def numNetworkReceivers: Int = synchronized {
@@ -89,7 +97,8 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
(0 until numNetworkReceivers).map { receiverId =>
val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array.empty))
- val distributionOption = Distribution(blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble))
+ val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble * 1000 / batchDuration)
+ val distributionOption = Distribution(recordsOfParticularReceiver)
(receiverId, distributionOption)
}.toMap
}
@@ -99,44 +108,42 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe
}
}
-private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
- private val sc = ssc.sparkContext
- private val conf = sc.conf
- private val appName = sc.appName
- private val bindHost = Utils.localHostName()
- private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
- private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT)
- private val securityManager = sc.env.securityManager
- private val listener = new StreamingUIListener(conf)
- private val handlers: Seq[ServletContextHandler] = {
- Seq(
- createServletHandler("/",
- (request: HttpServletRequest) => render(request), securityManager),
- createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")
- )
- }
+private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
- private var serverInfo: Option[ServerInfo] = None
+ private val listener = parent.listener
+ private val calendar = Calendar.getInstance()
+ private val startTime = calendar.getTime()
- ssc.addStreamingListener(listener)
- def bind() {
- try {
- serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf))
- logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort))
- } catch {
- case e: Exception =>
- logError("Failed to create Spark JettyUtils", e)
- System.exit(1)
- }
- }
+ def render(request: HttpServletRequest): Seq[Node] = {
- def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
+ val content =
+ generateBasicStats() ++
+
Statistics over last {listener.completedBatches.size} processed batches
++
+ generateNetworkStatsTable() ++
+ generateBatchStatsTable()
+ UIUtils.headerStreamingPage(content, "", parent.appName, "Spark Streaming Overview")
+ }
- private def render(request: HttpServletRequest): Seq[Node] = {
- val content = generateBatchStatsTable() ++ generateNetworkStatsTable()
- UIUtils.headerStreamingPage(content, "", appName, "Spark Streaming Overview")
+ private def generateBasicStats(): Seq[Node] = {
+
+ val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+
+ -
+ Started at: {startTime.toString}
+
+ -
+ Time since start: {msDurationToString(timeSinceStart)}
+
+ -
+ Batch interval: {msDurationToString(listener.batchDuration)}
+
+ -
+ Processed batches: {listener.numTotalBatchesCompleted}
+
+
+
}
private def generateBatchStatsTable(): Seq[Node] = {
@@ -173,18 +180,12 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
val batchStats =
- -
-
Statistics over last {numBatches} processed batches
-
- -
- {table.getOrElse("No statistics have been generated yet.")}
-
+ {table.getOrElse("No statistics have been generated yet.")}
val content =
- Batch Processing Statistics
++
- {batchCounts}
++
- {batchStats}
+ Batch Processing Statistics
++
+ {batchStats}
content
}
@@ -198,7 +199,7 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
val dataRows = (0 until numNetworkReceivers).map { receiverId =>
val receiverName = s"Receiver-$receiverId"
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
- d.getQuantiles().map(r => numberToString(r.toLong) + " records/batch")
+ d.getQuantiles().map(r => numberToString(r.toLong) + " records/second")
}.getOrElse {
Seq("-", "-", "-", "-", "-")
}
@@ -210,8 +211,8 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
}
val content =
- Network Input Statistics
++
- {table.getOrElse("No network receivers")}
+ Network Input Statistics
++
+ {table.getOrElse("No network receivers")}
content
}
@@ -241,6 +242,95 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
}
"%.1f%s".formatLocal(Locale.US, value, unit)
}
+
+ /**
+ * Returns a human-readable string representing a duration such as "5 second 35 ms"
+ */
+ private def msDurationToString(ms: Long): String = {
+ try {
+ val second = 1000L
+ val minute = 60 * second
+ val hour = 60 * minute
+ val day = 24 * hour
+ val week = 7 * day
+ val year = 365 * day
+
+ def toString(num: Long, unit: String): String = {
+ if (num == 0) {
+ ""
+ } else if (num == 1) {
+ s"$num $unit"
+ } else {
+ s"$num ${unit}s"
+ }
+ }
+
+ val millisecondsString = if (ms % second == 0) "" else s"${ms % second} ms"
+ val secondString = toString((ms % minute) / second, "second")
+ val minuteString = toString((ms % hour) / minute, "minute")
+ val hourString = toString((ms % day) / hour, "hour")
+ val dayString = toString((ms % week) / day, "day")
+ val weekString = toString((ms % year) / week, "week")
+ val yearString = toString(ms / year, "year")
+
+ Seq(
+ second -> millisecondsString,
+ minute -> s"$secondString $millisecondsString",
+ hour -> s"$minuteString $secondString",
+ day -> s"$hourString $minuteString $secondString",
+ week -> s"$dayString $hourString $minuteString",
+ year -> s"$weekString $dayString $hourString"
+ ).foreach {
+ case (durationLimit, durationString) if (ms < durationLimit) =>
+ return durationString
+ case e: Any => // matcherror is thrown without this
+ }
+ return s"$yearString $weekString $dayString"
+ } catch {
+ case e: Exception =>
+ logError("Error converting time to string", e)
+ return ""
+ }
+ }
+}
+
+
+private[spark] class StreamingUI(val ssc: StreamingContext) extends Logging {
+
+ val sc = ssc.sparkContext
+ val conf = sc.conf
+ val appName = sc.appName
+ val listener = new StreamingUIListener(ssc)
+ val overviewPage = new StreamingPage(this)
+
+ private val bindHost = Utils.localHostName()
+ private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
+ private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT)
+ private val securityManager = sc.env.securityManager
+ private val handlers: Seq[ServletContextHandler] = {
+ Seq(
+ createServletHandler("/",
+ (request: HttpServletRequest) => overviewPage.render(request), securityManager),
+ createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")
+ )
+ }
+
+ private var serverInfo: Option[ServerInfo] = None
+
+ ssc.addStreamingListener(listener)
+
+ def bind() {
+ try {
+ serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf))
+ logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort))
+ } catch {
+ case e: Exception =>
+ logError("Failed to create Spark JettyUtils", e)
+ System.exit(1)
+ }
+ }
+
+ private def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
}
object StreamingUI {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
index 62e95135fa5c5..b87bba87129b5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -25,8 +25,15 @@ private[spark] object UIUtils {
type="text/css" />
{appName} - {title}
+
-
+