Skip to content

Commit

Permalink
Rename XxxDatas -> XxxData
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 29, 2015
1 parent 087ba98 commit 9a3083d
Showing 1 changed file with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import org.apache.spark.util.Distribution
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
extends StreamingListener with SparkListener {

private val waitingBatchUIDatas = new HashMap[Time, BatchUIData]
private val runningBatchUIDatas = new HashMap[Time, BatchUIData]
private val completedBatchUIDatas = new Queue[BatchUIData]
private val waitingBatchUIData = new HashMap[Time, BatchUIData]
private val runningBatchUIData = new HashMap[Time, BatchUIData]
private val completedBatchUIData = new Queue[BatchUIData]
private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private var totalCompletedBatches = 0L
private var totalReceivedRecords = 0L
Expand All @@ -64,7 +64,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
// batches temporarily, so here we use "10" to handle such case. This is not a perfect
// solution, but at least it can handle most of cases.
size() >
waitingBatchUIDatas.size + runningBatchUIDatas.size + completedBatchUIDatas.size + 10
waitingBatchUIData.size + runningBatchUIData.size + completedBatchUIData.size + 10
}
}

Expand All @@ -91,27 +91,27 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)

override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
synchronized {
waitingBatchUIDatas(batchSubmitted.batchInfo.batchTime) =
waitingBatchUIData(batchSubmitted.batchInfo.batchTime) =
BatchUIData(batchSubmitted.batchInfo)
}
}

override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized {
val batchUIData = BatchUIData(batchStarted.batchInfo)
runningBatchUIDatas(batchStarted.batchInfo.batchTime) = BatchUIData(batchStarted.batchInfo)
waitingBatchUIDatas.remove(batchStarted.batchInfo.batchTime)
runningBatchUIData(batchStarted.batchInfo.batchTime) = BatchUIData(batchStarted.batchInfo)
waitingBatchUIData.remove(batchStarted.batchInfo.batchTime)

totalReceivedRecords += batchUIData.numRecords
}

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
synchronized {
waitingBatchUIDatas.remove(batchCompleted.batchInfo.batchTime)
runningBatchUIDatas.remove(batchCompleted.batchInfo.batchTime)
waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
runningBatchUIData.remove(batchCompleted.batchInfo.batchTime)
val batchUIData = BatchUIData(batchCompleted.batchInfo)
completedBatchUIDatas.enqueue(batchUIData)
if (completedBatchUIDatas.size > batchUIDataLimit) {
val removedBatch = completedBatchUIDatas.dequeue()
completedBatchUIData.enqueue(batchUIData)
if (completedBatchUIData.size > batchUIDataLimit) {
val removedBatch = completedBatchUIData.dequeue()
batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
}
totalCompletedBatches += 1L
Expand Down Expand Up @@ -160,19 +160,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}

def numUnprocessedBatches: Long = synchronized {
waitingBatchUIDatas.size + runningBatchUIDatas.size
waitingBatchUIData.size + runningBatchUIData.size
}

def waitingBatches: Seq[BatchUIData] = synchronized {
waitingBatchUIDatas.values.toSeq
waitingBatchUIData.values.toSeq
}

def runningBatches: Seq[BatchUIData] = synchronized {
runningBatchUIDatas.values.toSeq
runningBatchUIData.values.toSeq
}

def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
completedBatchUIDatas.toSeq
completedBatchUIData.toSeq
}

def processingDelayDistribution: Option[Distribution] = synchronized {
Expand Down Expand Up @@ -215,26 +215,26 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}

def lastCompletedBatch: Option[BatchUIData] = synchronized {
completedBatchUIDatas.sortBy(_.batchTime)(Time.ordering).lastOption
completedBatchUIData.sortBy(_.batchTime)(Time.ordering).lastOption
}

def lastReceivedBatch: Option[BatchUIData] = synchronized {
retainedBatches.lastOption
}

private def retainedBatches: Seq[BatchUIData] = {
(waitingBatchUIDatas.values.toSeq ++
runningBatchUIDatas.values.toSeq ++ completedBatchUIDatas).sortBy(_.batchTime)(Time.ordering)
(waitingBatchUIData.values.toSeq ++
runningBatchUIData.values.toSeq ++ completedBatchUIData).sortBy(_.batchTime)(Time.ordering)
}

private def extractDistribution(getMetric: BatchUIData => Option[Long]): Option[Distribution] = {
Distribution(completedBatchUIDatas.flatMap(getMetric(_)).map(_.toDouble))
Distribution(completedBatchUIData.flatMap(getMetric(_)).map(_.toDouble))
}

def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
val batchUIData = waitingBatchUIDatas.get(batchTime).orElse {
runningBatchUIDatas.get(batchTime).orElse {
completedBatchUIDatas.find(batch => batch.batchTime == batchTime)
val batchUIData = waitingBatchUIData.get(batchTime).orElse {
runningBatchUIData.get(batchTime).orElse {
completedBatchUIData.find(batch => batch.batchTime == batchTime)
}
}
batchUIData.foreach { _batchUIData =>
Expand Down

0 comments on commit 9a3083d

Please sign in to comment.