Skip to content

Commit

Permalink
Handle some corner cases and add tests for StreamingJobProgressListener
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 27, 2015
1 parent 77a69ae commit 1282b10
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData

private[ui] case class BatchUIData(
var batchInfo: BatchInfo = null,
outputOpIdToSparkJobIds: Map[OutputOpId, ArrayBuffer[SparkJobId]] = Map()) {
}

private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streamingListener = parent.listener
Expand Down Expand Up @@ -184,18 +180,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
* Generate the job table for the batch.
*/
private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
val outputOpIdWithSparkJobIds: Seq[(OutputOpId, Seq[SparkJobId])] = {
batchUIData.outputOpIdToSparkJobIds.toSeq.sortBy(_._1). // sorted by OutputOpId
map { case (outputOpId, jobs) =>
(outputOpId, jobs.sorted.toSeq) // sort JobIds for each OutputOpId
}
}
sparkListener.synchronized {
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = outputOpIdWithSparkJobIds.map {
case (outputOpId, sparkJobIds) =>
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
batchUIData.outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
// Filter out spark Job ids that don't exist in sparkListener
(outputOpId, sparkJobIds.flatMap(getJobData))
}
}

<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
<thead>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.spark.streaming.ui

import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.ui.StreamingJobProgressListener._

/**
* The data in outputOpIdToSparkJobIds are sorted by `OutputOpId` in ascending order, and for each
* `OutputOpId`, the corresponding SparkJobId`s are sorted in ascending order.
*/
private[ui] case class BatchUIData(
batchInfo: BatchInfo,
outputOpIdToSparkJobIds: Seq[(OutputOpId, Seq[SparkJobId])]) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.streaming.ui

import java.util.LinkedHashMap
import java.util.{Map => JMap}
import java.util.Properties

import scala.collection.mutable.{ArrayBuffer, Queue, HashMap}
Expand All @@ -34,6 +36,8 @@ import org.apache.spark.util.Distribution
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
extends StreamingListener with SparkListener {

import StreamingJobProgressListener._

private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]
private val completedBatchInfos = new Queue[BatchInfo]
Expand All @@ -43,7 +47,28 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]

private val batchTimeToBatchUIData = new HashMap[Time, BatchUIData]
// Because onJobStart and onBatchXXX messages are processed in different threads,
// we may not be able to get the corresponding BatchInfo when receiving onJobStart. So here we
// cannot use a map of (Time, BatchUIData).
private[ui] val batchTimeToOutputOpIdToSparkJobIds =
new LinkedHashMap[Time, OutputOpIdToSparkJobIds] {
override def removeEldestEntry(p1: JMap.Entry[Time, OutputOpIdToSparkJobIds]): Boolean = {
// If a lot of "onBatchCompleted"s happen before "onJobStart" (image if
// SparkContext.listenerBus is very slow), "batchTimeToOutputOpIdToSparkJobIds"
// may add some information for a removed batch when processing "onJobStart". It will be a
// memory leak.
//
// To avoid the memory leak, we control the size of "batchTimeToOutputOpIdToSparkJobIds" and
// evict the eldest one.
//
// Note: if "onJobStart" happens before "onBatchSubmitted", the size of
// "batchTimeToOutputOpIdToSparkJobIds" may be greater than the number of the retained
// batches temporarily, so here we use "10" to handle such case. This is not a perfect
// solution, but at least it can handle most of cases.
size() > waitingBatchInfos.size + runningBatchInfos.size + completedBatchInfos.size + 10
}
}


val batchDuration = ssc.graph.batchDuration.milliseconds

Expand Down Expand Up @@ -85,7 +110,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
completedBatchInfos.enqueue(batchCompleted.batchInfo)
if (completedBatchInfos.size > batchInfoLimit) {
val removedBatch = completedBatchInfos.dequeue()
batchTimeToBatchUIData.remove(removedBatch.batchTime)
batchTimeToOutputOpIdToSparkJobIds.remove(removedBatch.batchTime)
}
totalCompletedBatches += 1L

Expand All @@ -95,12 +120,12 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)

override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
val batchUIData = batchTimeToBatchUIData.getOrElseUpdate(batchTime, BatchUIData())
// Because onJobStart and onBatchXXX messages are processed in different threads,
// we may not be able to get the corresponding BatchInfo now. So here we only set
// batchUIData.outputOpIdToSparkJobIds, batchUIData.batchInfo will be set in "getBatchUIData".
batchUIData.outputOpIdToSparkJobIds.
getOrElseUpdate(outputOpId, ArrayBuffer()) += jobStart.jobId
var outputOpIdToSparkJobIds = batchTimeToOutputOpIdToSparkJobIds.get(batchTime)
if (outputOpIdToSparkJobIds == null) {
outputOpIdToSparkJobIds = new OutputOpIdToSparkJobIds()
batchTimeToOutputOpIdToSparkJobIds.put(batchTime, outputOpIdToSparkJobIds)
}
outputOpIdToSparkJobIds.getOrElseUpdate(outputOpId, ArrayBuffer()) += jobStart.jobId
}
}

Expand All @@ -116,9 +141,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}

def numReceivers: Int = synchronized {
ssc.graph.getReceiverInputStreams().size
}
def numReceivers: Int = ssc.graph.getReceiverInputStreams().size

def numTotalCompletedBatches: Long = synchronized {
totalCompletedBatches
Expand Down Expand Up @@ -218,14 +241,20 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)

def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
for (batchInfo <- getBatchInfo(batchTime)) yield {
val batchUIData = batchTimeToBatchUIData.getOrElse(batchTime, BatchUIData(batchInfo))
batchUIData.batchInfo = batchInfo
batchUIData
// outputOpIdToSparkJobIds is a sorted copy of the original one so that the caller can feel
// free to use the data in BatchUIData.
val outputOpIdToSparkJobIds = Option(batchTimeToOutputOpIdToSparkJobIds.get(batchTime)).
getOrElse(Map.empty).toSeq.sortWith(_._1 < _._1). // sorted by OutputOpId
map { case (outputOpId, jobs) =>
(outputOpId, jobs.sorted.toSeq) // sort JobIds for each OutputOpId
}
BatchUIData(batchInfo, outputOpIdToSparkJobIds)
}
}
}

private[streaming] object StreamingJobProgressListener {
type SparkJobId = Int
type OutputOpId = Int
private type OutputOpIdToSparkJobIds = HashMap[OutputOpId, ArrayBuffer[SparkJobId]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.spark.streaming.ui

import java.util.Properties

import org.scalatest.Matchers

import org.apache.spark.scheduler.SparkListenerJobStart
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.{Duration, Time, Milliseconds, TestSuiteBase}
Expand Down Expand Up @@ -64,6 +67,48 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.numTotalProcessedRecords should be (0)
listener.numTotalReceivedRecords should be (600)

// onJobStart
val properties1 = new Properties()
properties1.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, Time(1000).milliseconds.toString)
properties1.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 0.toString)
val jobStart1 = SparkListenerJobStart(jobId = 0,
0L, // unused
Nil, // unused
properties1)
listener.onJobStart(jobStart1)

val properties2 = new Properties()
properties2.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, Time(1000).milliseconds.toString)
properties2.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 0.toString)
val jobStart2 = SparkListenerJobStart(jobId = 1,
0L, // unused
Nil, // unused
properties2)
listener.onJobStart(jobStart2)

val properties3 = new Properties()
properties3.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, Time(1000).milliseconds.toString)
properties3.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 1.toString)
val jobStart3 = SparkListenerJobStart(jobId = 0,
0L, // unused
Nil, // unused
properties3)
listener.onJobStart(jobStart3)

val properties4 = new Properties()
properties4.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, Time(1000).milliseconds.toString)
properties4.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 1.toString)
val jobStart4 = SparkListenerJobStart(jobId = 1,
0L, // unused
Nil, // unused
properties4)
listener.onJobStart(jobStart4)

val batchUIData = listener.getBatchUIData(Time(1000))
assert(batchUIData != None)
assert(batchUIData.get.batchInfo === batchInfoStarted)
assert(batchUIData.get.outputOpIdToSparkJobIds === Seq(0 -> Seq(0, 1), 1 -> Seq(0, 1)))

// onBatchCompleted
val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
Expand Down Expand Up @@ -116,4 +161,67 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.retainedCompletedBatches.size should be (limit)
listener.numTotalCompletedBatches should be(limit + 10)
}

test("disorder onJobStart and onBatchXXX") {
val ssc = setupStreams(input, operation)
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
val listener = new StreamingJobProgressListener(ssc)

// fulfill completedBatchInfos
for(i <- 0 until limit) {
val batchInfoCompleted =
BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
val properties = new Properties()
properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, (1000 + i * 100).toString)
properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, "0")
val jobStart = SparkListenerJobStart(jobId = 1,
0L, // unused
Nil, // unused
properties)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
listener.onJobStart(jobStart)
}

// onJobStart happens before onBatchSubmitted
val properties = new Properties()
properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, (1000 + limit * 100).toString)
properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, "0")
val jobStart = SparkListenerJobStart(jobId = 0,
0L, // unused
Nil, // unused
properties)
listener.onJobStart(jobStart)

val batchInfoSubmitted =
BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))

// We still can see the info retrieved from onJobStart
listener.getBatchUIData(Time(1000 + limit * 100)) should be
Some(BatchUIData(batchInfoSubmitted, Seq((0, Seq(0)))))


// A lot of "onBatchCompleted"s happen before "onJobStart"
for(i <- limit + 1 to limit * 2) {
val batchInfoCompleted =
BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
}

for(i <- limit + 1 to limit * 2) {
val properties = new Properties()
properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, (1000 + i * 100).toString)
properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, "0")
val jobStart = SparkListenerJobStart(jobId = 1,
0L, // unused
Nil, // unused
properties)
listener.onJobStart(jobStart)
}

// We should not leak memory
listener.batchTimeToOutputOpIdToSparkJobIds.size() should be <=
(listener.waitingBatches.size + listener.runningBatches.size +
listener.retainedCompletedBatches.size + 10)
}
}

0 comments on commit 1282b10

Please sign in to comment.