Skip to content

Commit

Permalink
Fixing code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mubarak committed Aug 18, 2014
1 parent ccde038 commit a207eb7
Show file tree
Hide file tree
Showing 19 changed files with 21 additions and 87 deletions.
6 changes: 1 addition & 5 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,6 @@ private[spark] object Utils extends Logging {
*/
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.streaming)?(\.streaming\.dstream)?(\.streaming\.scheduler)?(\.streaming\.twitter)?(\.streaming\.kafka)?(\.streaming\.flume)?(\.streaming\.mqtt)?(\.streaming\.zeromq)?\.[A-Z]""".r
private val SCALA_CLASS_REGEX = """^scala(\.util)?(\.collection)?(\.collection\.mutable)?(\.collection\.immutable)?(\.concurrent\.forkjoin)?\.[A-Z]""".r
private val AKKA_CLASS_REGEX = """^akka(\.actor)?(\.dispatch)?\.[A-Z]""".r
private val JAVA_CLASS_REGEX = """^java(\.util\.concurrent)?(\.lang)?\.[A-Z]""".r

/**
* When called inside a class in the spark package, returns the name of the user code class
Expand Down Expand Up @@ -848,9 +846,7 @@ private[spark] object Utils extends Logging {
for (el <- trace) {
if (insideSpark) {
if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined ||
SCALA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined ||
AKKA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined ||
JAVA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
SCALA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
lastSparkMethod = if (el.getMethodName == "<init>") {
// Spark method is a constructor; get its class name
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.ui.StreamingTab
import org.apache.spark.util.{Utils, MetadataCleaner}

/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ abstract class DStream[T: ClassTag] (
}

/* Return the current callSite */
private[streaming] def getCallSite() = {
private[streaming] def getCallSite(): CallSite = {
CallSite(ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_SHORT),
ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_LONG))
}
Expand Down Expand Up @@ -309,6 +309,8 @@ abstract class DStream[T: ClassTag] (
// (based on sliding time of this DStream), then generate the RDD
case None => {
if (isTimeValid(time)) {
val prevCallSite = getCallSite
setCreationCallSite
compute(time) match {
case Some(newRDD) =>
if (storageLevel != StorageLevel.NONE) {
Expand All @@ -323,8 +325,10 @@ abstract class DStream[T: ClassTag] (
" for checkpointing at time " + time)
}
generatedRDDs.put(time, newRDD)
setCallSite(prevCallSite)
Some(newRDD)
case None =>
setCallSite(prevCallSite)
None
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
assert(validTime.milliseconds >= ignoreTime,
"Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")
val prevCallSite = getCallSite
setCreationCallSite

// Find new files
val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
Expand All @@ -81,7 +80,6 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
ignoreTime = minNewFileModTime
}
files += ((validTime, newFiles.toArray))
setCallSite(prevCallSite)
Some(filesToRDD(newFiles))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ class FilteredDStream[T: ClassTag](
override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[T]] = {
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[T]] = parent.getOrCompute(validTime).map(_.filter(filterFunc))
setCallSite(prevCallSite)
return rdd
parent.getOrCompute(validTime).map(_.filter(filterFunc))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[(K, U)]] = {
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
setCallSite(prevCallSite)
return rdd
parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ class FlatMappedDStream[T: ClassTag, U: ClassTag](
override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
setCallSite(prevCallSite)
return rdd
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ class GlommedDStream[T: ClassTag](parent: DStream[T])
override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[Array[T]]] = {
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[Array[T]]] = parent.getOrCompute(validTime).map(_.glom())
setCallSite(prevCallSite)
return rdd
parent.getOrCompute(validTime).map(_.glom())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ class MapPartitionedDStream[T: ClassTag, U: ClassTag](
override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
setCallSite(prevCallSite)
return rdd
parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[(K, U)]] = {
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
setCallSite(prevCallSite)
return rdd
parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ class MappedDStream[T: ClassTag, U: ClassTag] (
override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.map[U](mapFunc))
setCallSite(prevCallSite)
return rdd
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,12 @@ class QueueInputDStream[T: ClassTag](
override def stop() { }

override def compute(validTime: Time): Option[RDD[T]] = {
val prevCallSite = getCallSite
setCreationCallSite
val buffer = new ArrayBuffer[RDD[T]]()
if (oneAtATime && queue.size > 0) {
buffer += queue.dequeue()
} else {
buffer ++= queue.dequeueAll(_ => true)
}
setCallSite(prevCallSite)
if (buffer.size > 0) {
if (oneAtATime) {
Some(buffer.head)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,17 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont

/** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
override def compute(validTime: Time): Option[RDD[T]] = {
val prevCallSite = getCallSite
setCreationCallSite
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
var blockRDD: Option[RDD[T]] = None
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
blockRDD = Some(new BlockRDD[T](ssc.sc, blockIds))
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
blockRDD = Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
setCallSite(prevCallSite)
blockRDD
}

/** Get information on received blocks. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
)



// Reduce each batch of data using reduceByKey which will be further reduced by window
// by ReducedWindowedDStream
val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
Expand Down Expand Up @@ -85,8 +83,6 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
}

override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val prevCallSite = getCallSite
setCreationCallSite
val reduceF = reduceFunc
val invReduceF = invReduceFunc

Expand Down Expand Up @@ -170,16 +166,13 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
}
}

var returnRDD: Option[RDD[(K, V)]] = None
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
.mapValues(mergeValues)

if (filterFunc.isDefined) {
returnRDD = Some(mergedValuesRDD.filter(filterFunc.get))
Some(mergedValuesRDD.filter(filterFunc.get))
} else {
returnRDD = Some(mergedValuesRDD)
Some(mergedValuesRDD)
}
setCallSite(prevCallSite)
returnRDD
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,10 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[(K,C)]] = {
val prevCallSite = getCallSite
setCreationCallSite
val rdd: Option[RDD[(K,C)]] = parent.getOrCompute(validTime) match {
parent.getOrCompute(validTime) match {
case Some(rdd) => Some(rdd.combineByKey[C](
createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
case None => None
}
setCallSite(prevCallSite)
return rdd
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
) extends DStream[(K, S)](parent.ssc) {

super.persist(StorageLevel.MEMORY_ONLY_SER)

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override val mustCheckpoint = true

override def compute(validTime: Time): Option[RDD[(K, S)]] = {
val prevCallSite = getCallSite
setCreationCallSite

// Try to get the previous state RDD
getOrCompute(validTime - slideDuration) match {
Expand Down Expand Up @@ -70,7 +69,6 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
setCallSite(prevCallSite)
Some(stateRDD)
}
case None => { // If parent RDD does not exist
Expand All @@ -82,7 +80,6 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
updateFuncLocal(i)
}
val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
setCallSite(prevCallSite)
Some(stateRDD)
}
}
Expand All @@ -105,12 +102,10 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
val groupedRDD = parentRDD.groupByKey(partitioner)
val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
// logDebug("Generating state RDD for time " + validTime + " (first)")
setCallSite(prevCallSite)
Some(sessionRDD)
}
case None => { // If parent RDD does not exist, then nothing to do!
// logDebug("Not generating state RDD (no previous state, no parent)")
setCallSite(prevCallSite)
None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ class TransformedDStream[U: ClassTag] (
override def slideDuration: Duration = parents.head.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
val prevCallSite = getCallSite
setCreationCallSite
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
val rdd: Option[RDD[U]] = Some(transformFunc(parentRDDs, validTime))
setCallSite(prevCallSite)
return rdd
Some(transformFunc(parentRDDs, validTime))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,15 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
override def slideDuration: Duration = parents.head.slideDuration

override def compute(validTime: Time): Option[RDD[T]] = {
val prevCallSite = getCallSite
setCreationCallSite
val rdds = new ArrayBuffer[RDD[T]]()
parents.map(_.getOrCompute(validTime)).foreach(_ match {
case Some(rdd) => rdds += rdd
case None => throw new Exception("Could not generate RDD from a parent for unifying at time "
+ validTime)
})
if (rdds.size > 0) {
setCallSite(prevCallSite)
Some(new UnionRDD(ssc.sc, rdds))
} else {
setCallSite(prevCallSite)
None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ class WindowedDStream[T: ClassTag](
}

override def compute(validTime: Time): Option[RDD[T]] = {
val prevCallSite = getCallSite
setCreationCallSite
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
val rddsInWindow = parent.slice(currentWindow)
val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
Expand All @@ -72,7 +70,6 @@ class WindowedDStream[T: ClassTag](
logDebug("Using normal union for windowing at " + validTime)
new UnionRDD(ssc.sc,rddsInWindow)
}
setCallSite(prevCallSite)
Some(windowRDD)
}
}

0 comments on commit a207eb7

Please sign in to comment.