From 188180227a5d8f0d677f2b9df4c5526f4024cbf7 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 16 May 2015 15:33:15 -0700 Subject: [PATCH] Refactor DStream scope names again Now it reads from the class name instead of us having to specify it wherever we instantiate it. This reduces some more duplicate code. --- .../spark/streaming/flume/FlumeUtils.scala | 4 +- .../kafka/DirectKafkaInputDStream.scala | 3 + .../spark/streaming/kafka/KafkaUtils.scala | 6 +- .../streaming/mqtt/MQTTInputDStream.scala | 3 +- .../spark/streaming/mqtt/MQTTUtils.scala | 2 +- .../streaming/twitter/TwitterUtils.scala | 2 +- .../spark/streaming/StreamingContext.scala | 14 ++- .../spark/streaming/dstream/DStream.scala | 14 +-- .../streaming/dstream/InputDStream.scala | 31 +++++-- .../dstream/PairDStreamFunctions.scala | 88 +++++++++---------- .../spark/streaming/DStreamScopeSuite.scala | 48 ++++++---- 11 files changed, 123 insertions(+), 92 deletions(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 0229bda2e68e0..d80984ba870d3 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -59,7 +59,7 @@ object FlumeUtils { port: Int, storageLevel: StorageLevel, enableDecompression: Boolean - ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withNamedScope("flume stream") { + ): ReceiverInputDStream[SparkFlumeEvent] = { new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel, enableDecompression) } @@ -159,7 +159,7 @@ object FlumeUtils { storageLevel: StorageLevel, maxBatchSize: Int, parallelism: Int - ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withNamedScope("flume polling stream") { + ): ReceiverInputDStream[SparkFlumeEvent] = { new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize, parallelism, storageLevel) } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 6715aede7928a..060c2f23eded8 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -65,6 +65,9 @@ class DirectKafkaInputDStream[ val maxRetries = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRetries", 1) + // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]") + private[streaming] override def name: String = s"Kafka direct stream [$id]" + protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 703a639df067d..6b59c94aaec87 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -80,7 +80,7 @@ object KafkaUtils { kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ): ReceiverInputDStream[(K, V)] = ssc.withNamedScope("kafka stream") { + ): ReceiverInputDStream[(K, V)] = { val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) } @@ -348,7 +348,7 @@ object KafkaUtils { kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R - ): InputDStream[R] = ssc.withNamedScope("kafka direct stream") { + ): InputDStream[R] = { val cleanedHandler = ssc.sc.clean(messageHandler) new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, cleanedHandler) @@ -394,7 +394,7 @@ object KafkaUtils { ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] - ): InputDStream[(K, V)] = ssc.withNamedScope("kafka direct stream") { + ): InputDStream[(K, V)] = { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val kc = new KafkaCluster(kafkaParams) val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 3c0ef94cb0fab..40f5f18547236 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -35,7 +35,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.MqttTopic import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence -import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ @@ -57,6 +56,8 @@ class MQTTInputDStream( storageLevel: StorageLevel ) extends ReceiverInputDStream[String](ssc_) { + private[streaming] override def name: String = s"MQTT stream [$id]" + def getReceiver(): Receiver[String] = { new MQTTReceiver(brokerUrl, topic, storageLevel) } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 139f5c1fb5bad..8d67670b05669 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -37,7 +37,7 @@ object MQTTUtils { brokerUrl: String, topic: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[String] = ssc.withNamedScope("mqtt stream") { + ): ReceiverInputDStream[String] = { new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel) } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index ed1d77809231e..8cadfafbb8eb5 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -40,7 +40,7 @@ object TwitterUtils { twitterAuth: Option[Authorization], filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[Status] = ssc.withNamedScope("twitter stream") { + ): ReceiverInputDStream[Status] = { new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index c87dc6a6e9265..7f181bcecd4bf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -335,7 +335,7 @@ class StreamingContext private[streaming] ( port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel - ): ReceiverInputDStream[T] = withNamedScope("socket stream") { + ): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) } @@ -372,7 +372,7 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String): InputDStream[(K, V)] = withNamedScope("file stream") { + ] (directory: String): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory) } @@ -393,9 +393,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = { - withNamedScope("file stream") { - new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) - } + new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) } /** @@ -418,7 +416,7 @@ class StreamingContext private[streaming] ( ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean, - conf: Configuration): InputDStream[(K, V)] = withNamedScope("file stream") { + conf: Configuration): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) } @@ -475,7 +473,7 @@ class StreamingContext private[streaming] ( def queueStream[T: ClassTag]( queue: Queue[RDD[T]], oneAtATime: Boolean = true - ): InputDStream[T] = withNamedScope("queue stream") { + ): InputDStream[T] = { queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) } @@ -492,7 +490,7 @@ class StreamingContext private[streaming] ( queue: Queue[RDD[T]], oneAtATime: Boolean, defaultRDD: RDD[T] - ): InputDStream[T] = withNamedScope("queue stream") { + ): InputDStream[T] = { new QueueInputDStream(this, queue, oneAtATime, defaultRDD) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index fbad55becbaa6..be6a0d6ec125d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -121,16 +121,10 @@ abstract class DStream[T: ClassTag] ( * * This is not defined if the DStream is created outside of one of the public DStream operations. */ - private[streaming] val baseScope: Option[String] = { + protected[streaming] val baseScope: Option[String] = { Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)) } - /** - * Make a scope name based on the given one. - * Subclasses may optionally override this to provide custom scope names. - */ - protected[streaming] def makeScopeName(baseName: String): String = baseName - /** * Make a scope that groups RDDs created in the same DStream operation in the same batch. * @@ -142,8 +136,8 @@ abstract class DStream[T: ClassTag] ( baseScope.map { bsJson => val formattedBatchTime = UIUtils.formatBatchTime(time.milliseconds, ssc.graph.batchDuration.milliseconds) - val bscope = RDDOperationScope.fromJson(bsJson) - val baseName = makeScopeName(bscope.name) // e.g. countByWindow, "kafka stream [0]" + val bs = RDDOperationScope.fromJson(bsJson) + val baseName = bs.name // e.g. countByWindow, "kafka stream [0]" val scopeName = if (baseName.length > 10) { // If the operation name is too long, wrap the line @@ -151,7 +145,7 @@ abstract class DStream[T: ClassTag] ( } else { s"$baseName @ $formattedBatchTime" } - val scopeId = s"${bscope.id}_${time.milliseconds}" + val scopeId = s"${bs.id}_${time.milliseconds}" new RDDOperationScope(scopeName, id = scopeId) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 6669dc8c65ba5..d58c99a8ff321 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -19,7 +19,10 @@ package org.apache.spark.streaming.dstream import scala.reflect.ClassTag +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDDOperationScope import org.apache.spark.streaming.{Time, Duration, StreamingContext} +import org.apache.spark.util.Utils /** * This is the abstract base class for all input streams. This class provides methods @@ -44,13 +47,31 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) /** This is an unique identifier for the input stream. */ val id = ssc.getNewInputStreamId() + /** A human-readable name of this InputDStream */ + private[streaming] def name: String = { + // e.g. FlumePollingDStream -> "Flume polling stream" + val newName = Utils.getFormattedClassName(this) + .replaceAll("InputDStream", "Stream") + .split("(?=[A-Z])") + .filter(_.nonEmpty) + .mkString(" ") + .toLowerCase + .capitalize + s"$newName [$id]" + } + /** - * The name of this InputDStream. By default, it's the class name with its id. + * The base scope associated with the operation that created this DStream. + * + * For InputDStreams, we use the name of this DStream as the scope name. + * If an outer scope is given, we assume that it includes an alternative name for this stream. */ - private[streaming] def name: String = s"${getClass.getSimpleName}-$id" - - /** Make a scope name based on the given one. This includes the ID of this stream. */ - protected[streaming] override def makeScopeName(baseName: String): String = s"$baseName [$id]" + protected[streaming] override val baseScope: Option[String] = { + val scopeName = Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)) + .map { json => RDDOperationScope.fromJson(json).name + s" [$id]" } + .getOrElse(name.toLowerCase) + Some(new RDDOperationScope(scopeName).toJson) + } /** * Checks whether the 'time' is valid wrt slideDuration for generating RDD. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 27a8a820cc74e..884a8e8b52289 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -46,7 +46,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. */ - def groupByKey(): DStream[(K, Iterable[V])] = { + def groupByKey(): DStream[(K, Iterable[V])] = ssc.withScope { groupByKey(defaultPartitioner()) } @@ -54,7 +54,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = { + def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = ssc.withScope { groupByKey(defaultPartitioner(numPartitions)) } @@ -62,7 +62,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` on each RDD. The supplied * org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ - def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = self.ssc.withScope { + def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = ssc.withScope { val createCombiner = (v: V) => ArrayBuffer[V](v) val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) @@ -75,7 +75,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * merged using the associative reduce function. Hash partitioning is used to generate the RDDs * with Spark's default number of partitions. */ - def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { + def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope { reduceByKey(reduceFunc, defaultPartitioner()) } @@ -86,7 +86,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def reduceByKey( reduceFunc: (V, V) => V, - numPartitions: Int): DStream[(K, V)] = { + numPartitions: Int): DStream[(K, V)] = ssc.withScope { reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) } @@ -97,7 +97,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def reduceByKey( reduceFunc: (V, V) => V, - partitioner: Partitioner): DStream[(K, V)] = self.ssc.withScope { + partitioner: Partitioner): DStream[(K, V)] = ssc.withScope { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) } @@ -112,7 +112,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner, - mapSideCombine: Boolean = true): DStream[(K, C)] = self.ssc.withScope { + mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope { new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) } @@ -125,7 +125,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ - def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = { + def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = ssc.withScope { groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) } @@ -140,7 +140,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * DStream's batching interval */ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) - : DStream[(K, Iterable[V])] = { + : DStream[(K, Iterable[V])] = ssc.withScope { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) } @@ -160,7 +160,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, numPartitions: Int - ): DStream[(K, Iterable[V])] = { + ): DStream[(K, Iterable[V])] = ssc.withScope { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) } @@ -179,7 +179,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner - ): DStream[(K, Iterable[V])] = self.ssc.withScope { + ): DStream[(K, Iterable[V])] = ssc.withScope { val createCombiner = (v: Iterable[V]) => new ArrayBuffer[V] ++= v val mergeValue = (buf: ArrayBuffer[V], v: Iterable[V]) => buf ++= v val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2 @@ -201,7 +201,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowDuration: Duration - ): DStream[(K, V)] = { + ): DStream[(K, V)] = ssc.withScope { reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) } @@ -220,7 +220,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration - ): DStream[(K, V)] = { + ): DStream[(K, V)] = ssc.withScope { reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) } @@ -241,7 +241,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, numPartitions: Int - ): DStream[(K, V)] = { + ): DStream[(K, V)] = ssc.withScope { reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) } @@ -263,7 +263,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner - ): DStream[(K, V)] = self.ssc.withScope { + ): DStream[(K, V)] = ssc.withScope { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) self.reduceByKey(cleanedReduceFunc, partitioner) .window(windowDuration, slideDuration) @@ -297,7 +297,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) slideDuration: Duration = self.slideDuration, numPartitions: Int = ssc.sc.defaultParallelism, filterFunc: ((K, V)) => Boolean = null - ): DStream[(K, V)] = { + ): DStream[(K, V)] = ssc.withScope { reduceByKeyAndWindow( reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions), filterFunc @@ -330,7 +330,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) slideDuration: Duration, partitioner: Partitioner, filterFunc: ((K, V)) => Boolean - ): DStream[(K, V)] = self.ssc.withScope { + ): DStream[(K, V)] = ssc.withScope { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) @@ -351,7 +351,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] - ): DStream[(K, S)] = { + ): DStream[(K, S)] = ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner()) } @@ -367,7 +367,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int - ): DStream[(K, S)] = { + ): DStream[(K, S)] = ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) } @@ -384,7 +384,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner - ): DStream[(K, S)] = { + ): DStream[(K, S)] = ssc.withScope { val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } @@ -408,7 +408,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean - ): DStream[(K, S)] = self.ssc.withScope { + ): DStream[(K, S)] = ssc.withScope { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None) } @@ -427,7 +427,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner, initialRDD: RDD[(K, S)] - ): DStream[(K, S)] = { + ): DStream[(K, S)] = ssc.withScope { val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } @@ -453,7 +453,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) partitioner: Partitioner, rememberPartitioner: Boolean, initialRDD: RDD[(K, S)] - ): DStream[(K, S)] = self.ssc.withScope { + ): DStream[(K, S)] = ssc.withScope { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, Some(initialRDD)) } @@ -462,7 +462,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying a map function to the value of each key-value pairs in * 'this' DStream without changing the key. */ - def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = self.ssc.withScope { + def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = ssc.withScope { new MapValuedDStream[K, V, U](self, mapValuesFunc) } @@ -472,7 +472,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def flatMapValues[U: ClassTag]( flatMapValuesFunc: V => TraversableOnce[U] - ): DStream[(K, U)] = self.ssc.withScope { + ): DStream[(K, U)] = ssc.withScope { new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) } @@ -482,7 +482,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * of partitions. */ def cogroup[W: ClassTag]( - other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = { + other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope { cogroup(other, defaultPartitioner()) } @@ -492,7 +492,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def cogroup[W: ClassTag]( other: DStream[(K, W)], - numPartitions: Int): DStream[(K, (Iterable[V], Iterable[W]))] = { + numPartitions: Int): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope { cogroup(other, defaultPartitioner(numPartitions)) } @@ -503,7 +503,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def cogroup[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (Iterable[V], Iterable[W]))] = self.ssc.withScope { + ): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) @@ -514,7 +514,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ - def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = { + def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope { join[W](other, defaultPartitioner()) } @@ -524,7 +524,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def join[W: ClassTag]( other: DStream[(K, W)], - numPartitions: Int): DStream[(K, (V, W))] = { + numPartitions: Int): DStream[(K, (V, W))] = ssc.withScope { join[W](other, defaultPartitioner(numPartitions)) } @@ -535,7 +535,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def join[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (V, W))] = self.ssc.withScope { + ): DStream[(K, (V, W))] = ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner) @@ -548,7 +548,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * number of partitions. */ def leftOuterJoin[W: ClassTag]( - other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { + other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = ssc.withScope { leftOuterJoin[W](other, defaultPartitioner()) } @@ -560,7 +560,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int - ): DStream[(K, (V, Option[W]))] = { + ): DStream[(K, (V, Option[W]))] = ssc.withScope { leftOuterJoin[W](other, defaultPartitioner(numPartitions)) } @@ -572,7 +572,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (V, Option[W]))] = self.ssc.withScope { + ): DStream[(K, (V, Option[W]))] = ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner) @@ -585,7 +585,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * number of partitions. */ def rightOuterJoin[W: ClassTag]( - other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { + other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = ssc.withScope { rightOuterJoin[W](other, defaultPartitioner()) } @@ -597,7 +597,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int - ): DStream[(K, (Option[V], W))] = { + ): DStream[(K, (Option[V], W))] = ssc.withScope { rightOuterJoin[W](other, defaultPartitioner(numPartitions)) } @@ -609,7 +609,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (Option[V], W))] = self.ssc.withScope { + ): DStream[(K, (Option[V], W))] = ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner) @@ -622,7 +622,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * number of partitions. */ def fullOuterJoin[W: ClassTag]( - other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = { + other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = ssc.withScope { fullOuterJoin[W](other, defaultPartitioner()) } @@ -634,7 +634,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def fullOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int - ): DStream[(K, (Option[V], Option[W]))] = { + ): DStream[(K, (Option[V], Option[W]))] = ssc.withScope { fullOuterJoin[W](other, defaultPartitioner(numPartitions)) } @@ -646,7 +646,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def fullOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (Option[V], Option[W]))] = self.ssc.withScope { + ): DStream[(K, (Option[V], Option[W]))] = ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner) @@ -660,7 +660,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def saveAsHadoopFiles[F <: OutputFormat[K, V]]( prefix: String, suffix: String - )(implicit fm: ClassTag[F]): Unit = { + )(implicit fm: ClassTag[F]): Unit = ssc.withScope { saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -676,7 +676,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration) - ): Unit = self.ssc.withScope { + ): Unit = ssc.withScope { // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints val serializableConf = new SerializableWritable(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { @@ -693,7 +693,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( prefix: String, suffix: String - )(implicit fm: ClassTag[F]): Unit = { + )(implicit fm: ClassTag[F]): Unit = ssc.withScope { saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -709,7 +709,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = ssc.sparkContext.hadoopConfiguration - ): Unit = self.ssc.withScope { + ): Unit = ssc.withScope { // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints val serializableConf = new SerializableWritable(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala index 624aa393668e9..3eafe740ec22e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala @@ -43,15 +43,32 @@ class DStreamScopeSuite extends FunSuite with BeforeAndAfter with BeforeAndAfter after { assertPropertiesNotSet() } test("dstream without scope") { + val dummyStream = new DummyDStream(ssc) + dummyStream.initialize(Time(0)) + + // This DStream is not instantiated in any scope, so all RDDs + // created by this stream should similarly not have a scope + assert(dummyStream.baseScope === None) + assert(dummyStream.getOrCompute(Time(1000)).get.scope === None) + assert(dummyStream.getOrCompute(Time(2000)).get.scope === None) + assert(dummyStream.getOrCompute(Time(3000)).get.scope === None) + } + + test("input dstream without scope") { val inputStream = new DummyInputDStream(ssc) inputStream.initialize(Time(0)) + val baseScope = inputStream.baseScope.map(RDDOperationScope.fromJson) + val scope1 = inputStream.getOrCompute(Time(1000)).get.scope + val scope2 = inputStream.getOrCompute(Time(2000)).get.scope + val scope3 = inputStream.getOrCompute(Time(3000)).get.scope + // This DStream is not instantiated in any scope, so all RDDs - // created by this stream should similarly not have a scope - assert(inputStream.baseScope === None) - assert(inputStream.getOrCompute(Time(1000)).get.scope === None) - assert(inputStream.getOrCompute(Time(2000)).get.scope === None) - assert(inputStream.getOrCompute(Time(3000)).get.scope === None) + assertDefined(baseScope, scope1, scope2, scope3) + assert(baseScope.get.name.startsWith("dummy stream")) + assertScopeCorrect(baseScope.get, scope1.get, 1000) + assertScopeCorrect(baseScope.get, scope2.get, 2000) + assertScopeCorrect(baseScope.get, scope3.get, 3000) } test("scoping simple operations") { @@ -120,18 +137,6 @@ class DStreamScopeSuite extends FunSuite with BeforeAndAfter with BeforeAndAfter testStream(countStream) } - test("scoping input streams") { - ssc.withNamedScope("dummy stream") { - val stream = new DummyInputDStream(ssc) - stream.initialize(Time(0)) - val baseScope = stream.baseScope.map(RDDOperationScope.fromJson) - val rddScope = stream.getOrCompute(Time(1000)).get.scope - assertDefined(baseScope, rddScope) - assert(baseScope.get.name === "dummy stream") - assertScopeCorrect(baseScope.get.id, s"dummy stream [${stream.id}]", rddScope.get, 1000) - } - } - /** Assert that the RDD operation scope properties are not set in our SparkContext. */ private def assertPropertiesNotSet(): Unit = { assert(ssc != null) @@ -166,6 +171,15 @@ class DStreamScopeSuite extends FunSuite with BeforeAndAfter with BeforeAndAfter } +/** + * A dummy stream that does absolutely nothing. + */ +private class DummyDStream(ssc: StreamingContext) extends DStream[Int](ssc) { + override def dependencies: List[DStream[Int]] = List.empty + override def slideDuration: Duration = Seconds(1) + override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int]) +} + /** * A dummy input stream that does absolutely nothing. */