diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 8da4309daf4ca..8309b82d0bbbd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -34,6 +34,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.{ReceivedBlockInfo, AddBlocks, DeregisterReceiver, RegisterReceiver} import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} +import org.apache.spark.util.Utils /** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] @@ -206,7 +207,9 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging val timeout = 5.seconds override def preStart() { - val future = tracker.ask(RegisterReceiver(streamId, self))(timeout) + val msg = RegisterReceiver( + streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self) + val future = tracker.ask(msg)(timeout) Await.result(future, timeout) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index 74a7644d1c7ad..6551535f876a1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -28,7 +28,12 @@ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver} import org.apache.spark.util.AkkaUtils -/** Information about block received by the network receiver */ +/** Information about receiver */ +case class ReceiverInfo(streamId: Int, typ: String, location: String) { + override def toString = s"$typ-$streamId" +} + +/** Information about blocks received by the network receiver */ case class ReceivedBlockInfo( streamId: Int, blockId: StreamBlockId, @@ -41,8 +46,12 @@ case class ReceivedBlockInfo( * with each other. */ private[streaming] sealed trait NetworkInputTrackerMessage -private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) - extends NetworkInputTrackerMessage +private[streaming] case class RegisterReceiver( + streamId: Int, + typ: String, + host: String, + receiverActor: ActorRef + ) extends NetworkInputTrackerMessage private[streaming] case class AddBlocks(receivedBlockInfo: ReceivedBlockInfo) extends NetworkInputTrackerMessage private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) @@ -108,11 +117,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { /** Actor to receive messages from the receivers. */ private class NetworkInputTrackerActor extends Actor { def receive = { - case RegisterReceiver(streamId, receiverActor) => { + case RegisterReceiver(streamId, typ, host, receiverActor) => { if (!networkInputStreamMap.contains(streamId)) { throw new Exception("Register received for unexpected id " + streamId) } receiverInfo += ((streamId, receiverActor)) + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted( + ReceiverInfo(streamId, typ, host) + )) logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) sender ! true diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 0c1edff9c8616..5db40ebbeb1de 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -26,6 +26,8 @@ sealed trait StreamingListenerEvent case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent +case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) + extends StreamingListenerEvent /** An event used in the listener to shutdown the listener daemon thread. */ private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent @@ -36,6 +38,9 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen */ trait StreamingListener { + /** Called when a receiver has been started */ + def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } + /** Called when a batch of jobs has been submitted for processing. */ def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 18811fc2b01d8..ea03dfc7bfeea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -38,6 +38,10 @@ private[spark] class StreamingListenerBus() extends Logging { while (true) { val event = eventQueue.take event match { + case receiverStarted: StreamingListenerReceiverStarted => + listeners.foreach(_.onReceiverStarted(receiverStarted)) + case batchSubmitted: StreamingListenerBatchSubmitted => + listeners.foreach(_.onBatchSubmitted(batchSubmitted)) case batchStarted: StreamingListenerBatchStarted => listeners.foreach(_.onBatchStarted(batchStarted)) case batchCompleted: StreamingListenerBatchCompleted => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala index 545c5cb8e3f61..86427ca171489 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala @@ -37,10 +37,17 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi private val runningBatchInfos = new HashMap[Time, BatchInfo] private val completedaBatchInfos = new Queue[BatchInfo] private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100) - private var totalBatchesCompleted = 0L + private var totalCompletedBatches = 0L + private val receiverInfos = new HashMap[Int, ReceiverInfo] val batchDuration = ssc.graph.batchDuration.milliseconds + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = { + synchronized { + receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo) + } + } + override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized { runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo } @@ -55,15 +62,19 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi runningBatchInfos.remove(batchCompleted.batchInfo.batchTime) completedaBatchInfos.enqueue(batchCompleted.batchInfo) if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue() - totalBatchesCompleted += 1L + totalCompletedBatches += 1L } - def numTotalBatchesCompleted: Long = synchronized { - totalBatchesCompleted + def numNetworkReceivers = synchronized { + ssc.graph.getNetworkInputStreams().size } - def numNetworkReceivers: Int = synchronized { - completedaBatchInfos.headOption.map(_.receivedBlockInfo.size).getOrElse(0) + def numTotalCompletedBatches: Long = synchronized { + totalCompletedBatches + } + + def numUnprocessedBatches: Long = synchronized { + waitingBatchInfos.size + runningBatchInfos.size } def waitingBatches: Seq[BatchInfo] = synchronized { @@ -91,9 +102,7 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi } def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized { - val allBatcheInfos = waitingBatchInfos.values.toSeq ++ - runningBatchInfos.values.toSeq ++ completedaBatchInfos - val latestBatchInfos = allBatcheInfos.sortBy(_.batchTime)(Time.ordering).reverse.take(batchInfoLimit) + val latestBatchInfos = allBatches.reverse.take(batchInfoLimit) val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo) (0 until numNetworkReceivers).map { receiverId => val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array.empty)) @@ -103,6 +112,34 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi }.toMap } + def lastReceivedBatchRecords: Map[Int, Long] = { + val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo) + lastReceivedBlockInfoOption.map { lastReceivedBlockInfo => + (0 until numNetworkReceivers).map { receiverId => + (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum) + }.toMap + }.getOrElse { + (0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap + } + } + + def receiverInfo(receiverId: Int): Option[ReceiverInfo] = { + receiverInfos.get(receiverId) + } + + def lastCompletedBatch: Option[BatchInfo] = { + completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption + } + + def lastReceivedBatch: Option[BatchInfo] = { + allBatches.lastOption + } + + private def allBatches: Seq[BatchInfo] = synchronized { + (waitingBatchInfos.values.toSeq ++ + runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering) + } + private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble)) } @@ -114,13 +151,13 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging { private val listener = parent.listener private val calendar = Calendar.getInstance() private val startTime = calendar.getTime() - + private val emptyCellTest = "-" def render(request: HttpServletRequest): Seq[Node] = { val content = generateBasicStats() ++ -

Statistics over last {listener.completedBatches.size} processed batches

++ +

Statistics over last {listener.completedBatches.size} processed batches

++ generateNetworkStatsTable() ++ generateBatchStatsTable() UIUtils.headerStreamingPage(content, "", parent.appName, "Spark Streaming Overview") @@ -136,28 +173,76 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
  • Time since start: {msDurationToString(timeSinceStart)}
  • +
  • + Network receivers: {listener.numNetworkReceivers} +
  • Batch interval: {msDurationToString(listener.batchDuration)}
  • - Processed batches: {listener.numTotalBatchesCompleted} + Processed batches: {listener.numTotalCompletedBatches} +
  • +
  • + Waiting batches: {listener.numUnprocessedBatches}
  • -
  • } + private def generateNetworkStatsTable(): Seq[Node] = { + val receivedRecordDistributions = listener.receivedRecordsDistributions + val lastBatchReceivedRecord = listener.lastReceivedBatchRecords + val table = if (receivedRecordDistributions.size > 0) { + val headerRow = Seq( + "Receiver", + "Location", + s"Records in last batch", + "Minimum rate [records/sec]", + "25th percentile rate [records/sec]", + "Median rate [records/sec]", + "75th percentile rate [records/sec]", + "Maximum rate [records/sec]" + ) + val dataRows = (0 until listener.numNetworkReceivers).map { receiverId => + val receiverInfo = listener.receiverInfo(receiverId) + val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId") + val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCellTest) + val receiverLastBatchRecords = numberToString(lastBatchReceivedRecord(receiverId)) + val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => + d.getQuantiles().map(r => numberToString(r.toLong)) + }.getOrElse { + Seq(emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest) + } + Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ + receivedRecordStats + } + Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true)) + } else { + None + } + + val content = +
    Network Input Statistics
    ++ +
    {table.getOrElse("No network receivers")}
    + + content + } + private def generateBatchStatsTable(): Seq[Node] = { val numBatches = listener.completedBatches.size + val lastCompletedBatch = listener.lastCompletedBatch val table = if (numBatches > 0) { val processingDelayQuantilesRow = - "Processing Times" +: getQuantiles(listener.processingDelayDistribution) + Seq("Processing Time", msDurationToString(lastCompletedBatch.flatMap(_.processingDelay))) ++ + getQuantiles(listener.processingDelayDistribution) val schedulingDelayQuantilesRow = - "Scheduling Delay:" +: getQuantiles(listener.schedulingDelayDistribution) + Seq("Scheduling Delay", msDurationToString(lastCompletedBatch.flatMap(_.schedulingDelay))) ++ + getQuantiles(listener.schedulingDelayDistribution) val totalDelayQuantilesRow = - "End-to-end Delay:" +: getQuantiles(listener.totalDelayDistribution) + Seq("Total Delay", msDurationToString(lastCompletedBatch.flatMap(_.totalDelay))) ++ + getQuantiles(listener.totalDelayDistribution) - val headerRow = Seq("Metric", "Min", "25th percentile", - "Median", "75th percentile", "Max") + val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile", + "Median", "75th percentile", "Maximum") val dataRows: Seq[Seq[String]] = Seq( processingDelayQuantilesRow, schedulingDelayQuantilesRow, @@ -168,57 +253,19 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging { None } - val batchCounts = - - - val batchStats = - - val content =
    Batch Processing Statistics
    ++ -
    {batchStats}
    - - content - } - - private def generateNetworkStatsTable(): Seq[Node] = { - val receivedRecordDistributions = listener.receivedRecordsDistributions - val numNetworkReceivers = receivedRecordDistributions.size - val table = if (receivedRecordDistributions.size > 0) { - val headerRow = Seq("Receiver", "Min", "25th percentile", - "Median", "75th percentile", "Max") - val dataRows = (0 until numNetworkReceivers).map { receiverId => - val receiverName = s"Receiver-$receiverId" - val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => - d.getQuantiles().map(r => numberToString(r.toLong) + " records/second") - }.getOrElse { - Seq("-", "-", "-", "-", "-") - } - receiverName +: receivedRecordStats - } - Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true)) - } else { - None - } - - val content = -
    Network Input Statistics
    ++ -
    {table.getOrElse("No network receivers")}
    +
    + +
    content } private def getQuantiles(timeDistributionOption: Option[Distribution]) = { - timeDistributionOption.get.getQuantiles().map { ms => Utils.msDurationToString(ms.toLong) } + timeDistributionOption.get.getQuantiles().map { ms => msDurationToString(ms.toLong) } } private def numberToString(records: Double): String = { @@ -229,13 +276,13 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging { val (value, unit) = { if (records >= 2*trillion) { - (records / trillion, "T") + (records / trillion, " T") } else if (records >= 2*billion) { - (records / billion, "B") + (records / billion, " B") } else if (records >= 2*million) { - (records / million, "M") + (records / million, " M") } else if (records >= 2*thousand) { - (records / thousand, "K") + (records / thousand, " K") } else { (records, "") } @@ -265,7 +312,7 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging { } } - val millisecondsString = if (ms % second == 0) "" else s"${ms % second} ms" + val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms" val secondString = toString((ms % minute) / second, "second") val minuteString = toString((ms % hour) / minute, "minute") val hourString = toString((ms % day) / hour, "hour") @@ -292,6 +339,10 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging { return "" } } + + private def msDurationToString(msOption: Option[Long]): String = { + msOption.map(msDurationToString).getOrElse(emptyCellTest) + } }