From fa4e5fbb3aaca6fcfbdf358bf08371f0146b24a1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 15 May 2015 01:38:29 -0700 Subject: [PATCH] Pass in input stream name rather than defining it from within --- .../apache/spark/rdd/RDDOperationScope.scala | 2 +- .../streaming/flume/FlumeInputDStream.scala | 2 - .../flume/FlumePollingInputDStream.scala | 4 -- .../spark/streaming/flume/FlumeUtils.scala | 4 +- .../kafka/DirectKafkaInputDStream.scala | 4 -- .../streaming/kafka/KafkaInputDStream.scala | 2 - .../spark/streaming/kafka/KafkaUtils.scala | 20 ++++---- .../streaming/mqtt/MQTTInputDStream.scala | 2 - .../spark/streaming/mqtt/MQTTUtils.scala | 2 +- .../twitter/TwitterInputDStream.scala | 2 - .../streaming/twitter/TwitterUtils.scala | 2 +- .../spark/streaming/StreamingContext.scala | 49 ++++++++++++------- .../dstream/ConstantInputDStream.scala | 2 - .../spark/streaming/dstream/DStream.scala | 10 ++-- .../streaming/dstream/FileInputDStream.scala | 2 - .../streaming/dstream/InputDStream.scala | 4 +- .../streaming/dstream/QueueInputDStream.scala | 2 - .../streaming/dstream/RawInputDStream.scala | 2 - .../dstream/ReceiverInputDStream.scala | 2 - .../dstream/SocketInputDStream.scala | 2 - .../spark/streaming/DStreamScopeSuite.scala | 38 ++++---------- 21 files changed, 63 insertions(+), 96 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala index 369702977650d..6b09dfafc889c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala @@ -96,7 +96,7 @@ private[spark] object RDDOperationScope extends Logging { sc: SparkContext, allowNesting: Boolean = false)(body: => T): T = { val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace" - val ourMethodName = stackTrace(1).getMethodName // + val ourMethodName = stackTrace(1).getMethodName // i.e. withScope // Climb upwards to find the first method that's called something different val callerMethodName = stackTrace .find(_.getMethodName != ourMethodName) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 7fec636d749f8..60e2994431b38 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -50,8 +50,6 @@ class FlumeInputDStream[T: ClassTag]( enableDecompression: Boolean ) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) { - protected[streaming] override val customScopeName: Option[String] = Some(s"input stream [$id]") - override def getReceiver(): Receiver[SparkFlumeEvent] = { new FlumeReceiver(host, port, storageLevel, enableDecompression) } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index e942053c6ede5..92fa5b41be89e 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -53,10 +53,6 @@ private[streaming] class FlumePollingInputDStream[T: ClassTag]( storageLevel: StorageLevel ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) { - protected[streaming] override val customScopeName: Option[String] = { - Some(s"flume polling stream [$id]") - } - override def getReceiver(): Receiver[SparkFlumeEvent] = { new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel) } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index fe39c9be2ce9b..56b917f3cde58 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -59,7 +59,7 @@ object FlumeUtils { port: Int, storageLevel: StorageLevel, enableDecompression: Boolean - ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope { + ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withNamedScope("flume stream") { new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel, enableDecompression) } @@ -159,7 +159,7 @@ object FlumeUtils { storageLevel: StorageLevel, maxBatchSize: Int, parallelism: Int - ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope { + ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withNamedScope("flume polling stream") { new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize, parallelism, storageLevel) } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index bc9093f0b8a44..6715aede7928a 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -65,10 +65,6 @@ class DirectKafkaInputDStream[ val maxRetries = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRetries", 1) - protected[streaming] override val customScopeName: Option[String] = { - Some(s"kafka direct stream [$id]") - } - protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 65fa789c5cc4c..cca0fac0234e1 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -55,8 +55,6 @@ class KafkaInputDStream[ storageLevel: StorageLevel ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { - protected[streaming] override val customScopeName: Option[String] = Some(s"kafka stream [$id]") - def getReceiver(): Receiver[(K, V)] = { if (!useReliableReceiver) { new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 8f25c1c7b8e82..703a639df067d 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -58,7 +58,7 @@ object KafkaUtils { groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[(String, String)] = ssc.withScope { + ): ReceiverInputDStream[(String, String)] = { val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000") @@ -80,7 +80,7 @@ object KafkaUtils { kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ): ReceiverInputDStream[(K, V)] = ssc.withScope { + ): ReceiverInputDStream[(K, V)] = ssc.withNamedScope("kafka stream") { val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) } @@ -99,7 +99,7 @@ object KafkaUtils { zkQuorum: String, groupId: String, topics: JMap[String, JInt] - ): JavaPairReceiverInputDStream[String, String] = jssc.ssc.withScope { + ): JavaPairReceiverInputDStream[String, String] = { createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) } @@ -118,7 +118,7 @@ object KafkaUtils { groupId: String, topics: JMap[String, JInt], storageLevel: StorageLevel - ): JavaPairReceiverInputDStream[String, String] = jssc.ssc.withScope { + ): JavaPairReceiverInputDStream[String, String] = { createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } @@ -145,7 +145,7 @@ object KafkaUtils { kafkaParams: JMap[String, String], topics: JMap[String, JInt], storageLevel: StorageLevel - ): JavaPairReceiverInputDStream[K, V] = jssc.ssc.withScope { + ): JavaPairReceiverInputDStream[K, V] = { implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass) @@ -295,7 +295,7 @@ object KafkaUtils { offsetRanges: Array[OffsetRange], leaders: JMap[TopicAndPartition, Broker], messageHandler: JFunction[MessageAndMetadata[K, V], R] - ): JavaRDD[R] = jsc.sc.withScope { + ): JavaRDD[R] = { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) @@ -348,7 +348,7 @@ object KafkaUtils { kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R - ): InputDStream[R] = ssc.withScope { + ): InputDStream[R] = ssc.withNamedScope("kafka direct stream") { val cleanedHandler = ssc.sc.clean(messageHandler) new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, cleanedHandler) @@ -394,7 +394,7 @@ object KafkaUtils { ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] - ): InputDStream[(K, V)] = ssc.withScope { + ): InputDStream[(K, V)] = ssc.withNamedScope("kafka direct stream") { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val kc = new KafkaCluster(kafkaParams) val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) @@ -465,7 +465,7 @@ object KafkaUtils { kafkaParams: JMap[String, String], fromOffsets: JMap[TopicAndPartition, JLong], messageHandler: JFunction[MessageAndMetadata[K, V], R] - ): JavaInputDStream[R] = jssc.ssc.withScope { + ): JavaInputDStream[R] = { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) @@ -524,7 +524,7 @@ object KafkaUtils { valueDecoderClass: Class[VD], kafkaParams: JMap[String, String], topics: JSet[String] - ): JavaPairInputDStream[K, V] = jssc.ssc.withScope { + ): JavaPairInputDStream[K, V] = { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index d47ff268271a9..3c0ef94cb0fab 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -57,8 +57,6 @@ class MQTTInputDStream( storageLevel: StorageLevel ) extends ReceiverInputDStream[String](ssc_) { - protected[streaming] override val customScopeName: Option[String] = Some(s"MQTT stream [$id]") - def getReceiver(): Receiver[String] = { new MQTTReceiver(brokerUrl, topic, storageLevel) } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 42f1dfd1c601f..1b465c823472a 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -37,7 +37,7 @@ object MQTTUtils { brokerUrl: String, topic: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[String] = ssc.withScope { + ): ReceiverInputDStream[String] = ssc.withNamedScope("MQTT stream") { new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel) } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 9d742de802b00..7cf02d85d73d3 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -45,8 +45,6 @@ class TwitterInputDStream( storageLevel: StorageLevel ) extends ReceiverInputDStream[Status](ssc_) { - protected[streaming] override val customScopeName: Option[String] = Some(s"twitter stream [$id]") - private def createOAuthAuthorization(): Authorization = { new OAuthAuthorization(new ConfigurationBuilder().build()) } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index 1df984ce0a1ba..8ed9076a9123f 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -40,7 +40,7 @@ object TwitterUtils { twitterAuth: Option[Authorization], filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[Status] = ssc.withScope { + ): ReceiverInputDStream[Status] = ssc.withNamedScope("twitter stream") { new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b49b3ad7d6f3a..c87dc6a6e9265 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark._ import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.input.FixedLengthBinaryInputFormat -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{RDD, RDDOperationScope} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContextState._ import org.apache.spark.streaming.dstream._ @@ -245,19 +245,30 @@ class StreamingContext private[streaming] ( * Execute a block of code in a scope such that all new DStreams created in this body will * be part of the same scope. For more detail, see the comments in `doCompute`. * - * Note: Return statements are NOT allowed in the given body. Also, this currently does - * not handle multiple StreamingContexts sharing the same SparkContext gracefully. + * Note: Return statements are NOT allowed in the given body. */ private[streaming] def withScope[U](body: => U): U = sparkContext.withScope(body) + /** + * Execute a block of code in a scope such that all new DStreams created in this body will + * be part of the same scope. For more detail, see the comments in `doCompute`. + * + * Note: Return statements are NOT allowed in the given body. + */ + private[streaming] def withNamedScope[U](name: String)(body: => U): U = { + RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) + } + /** * Create an input stream with any arbitrary user implemented receiver. * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver */ @deprecated("Use receiverStream", "1.0.0") - def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = withScope { - receiverStream(receiver) + def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = { + withNamedScope("network stream") { + receiverStream(receiver) + } } /** @@ -265,8 +276,10 @@ class StreamingContext private[streaming] ( * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver */ - def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = withScope { - new PluggableInputDStream[T](this, receiver) + def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = { + withNamedScope("receiver stream") { + new PluggableInputDStream[T](this, receiver) + } } /** @@ -286,7 +299,7 @@ class StreamingContext private[streaming] ( name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy - ): ReceiverInputDStream[T] = withScope { + ): ReceiverInputDStream[T] = withNamedScope("actor stream") { receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) } @@ -303,7 +316,7 @@ class StreamingContext private[streaming] ( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[String] = withScope { + ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } @@ -322,7 +335,7 @@ class StreamingContext private[streaming] ( port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel - ): ReceiverInputDStream[T] = withScope { + ): ReceiverInputDStream[T] = withNamedScope("socket stream") { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) } @@ -341,7 +354,7 @@ class StreamingContext private[streaming] ( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[T] = withScope { + ): ReceiverInputDStream[T] = withNamedScope("raw socket stream") { new RawInputDStream[T](this, hostname, port, storageLevel) } @@ -359,7 +372,7 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String): InputDStream[(K, V)] = withScope { + ] (directory: String): InputDStream[(K, V)] = withNamedScope("file stream") { new FileInputDStream[K, V, F](this, directory) } @@ -380,7 +393,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = { - withScope { + withNamedScope("file stream") { new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) } } @@ -405,7 +418,7 @@ class StreamingContext private[streaming] ( ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean, - conf: Configuration): InputDStream[(K, V)] = withScope { + conf: Configuration): InputDStream[(K, V)] = withNamedScope("file stream") { new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) } @@ -417,7 +430,7 @@ class StreamingContext private[streaming] ( * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ - def textFileStream(directory: String): DStream[String] = withScope { + def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") { fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } @@ -439,7 +452,7 @@ class StreamingContext private[streaming] ( @Experimental def binaryRecordsStream( directory: String, - recordLength: Int): DStream[Array[Byte]] = withScope { + recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") { val conf = sc_.hadoopConfiguration conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat]( @@ -462,7 +475,7 @@ class StreamingContext private[streaming] ( def queueStream[T: ClassTag]( queue: Queue[RDD[T]], oneAtATime: Boolean = true - ): InputDStream[T] = withScope { + ): InputDStream[T] = withNamedScope("queue stream") { queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) } @@ -479,7 +492,7 @@ class StreamingContext private[streaming] ( queue: Queue[RDD[T]], oneAtATime: Boolean, defaultRDD: RDD[T] - ): InputDStream[T] = withScope { + ): InputDStream[T] = withNamedScope("queue stream") { new QueueInputDStream(this, queue, oneAtATime, defaultRDD) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala index f659b9b1e608b..f396c347581ce 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala @@ -27,8 +27,6 @@ import scala.reflect.ClassTag class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T]) extends InputDStream[T](ssc_) { - protected[streaming] override val customScopeName: Option[String] = Some(s"constant stream [$id]") - override def start() {} override def stop() {} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index af195190130fc..28c58487f726d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -126,10 +126,12 @@ abstract class DStream[T: ClassTag] ( } /** - * An optional custom name for all scopes generated by this DStream. - * If None, the name of the operation that created this DStream will be used. + * Make a scope name based on the given one. + * + * By default, this just returns the base name. Subclasses + * may optionally override this to provide custom scope names. */ - protected[streaming] val customScopeName: Option[String] = None + protected[streaming] def makeScopeName(baseName: String): String = baseName /** * Make a scope that groups RDDs created in the same DStream operation in the same batch. @@ -143,7 +145,7 @@ abstract class DStream[T: ClassTag] ( val formattedBatchTime = UIUtils.formatBatchTime(time.milliseconds, ssc.graph.batchDuration.milliseconds) val bscope = RDDOperationScope.fromJson(bsJson) - val baseName = customScopeName.getOrElse(bscope.name) // e.g. countByWindow + val baseName = makeScopeName(bscope.name) // e.g. countByWindow, "kafka stream [0]" val scopeName = if (baseName.length > 10) { // If the operation name is too long, wrap the line diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 4bd08de76158c..eca69f00188e4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -126,8 +126,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null - protected[streaming] override val customScopeName: Option[String] = Some(s"file stream [$id]") - override def start() { } override def stop() { } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index ed89a5b44268b..6669dc8c65ba5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -49,8 +49,8 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) */ private[streaming] def name: String = s"${getClass.getSimpleName}-$id" - /** Human-friendly scope name to use in place of generic operation names (e.g. createStream). */ - protected[streaming] override val customScopeName: Option[String] = Some(s"input stream [$id]") + /** Make a scope name based on the given one. This includes the ID of this stream. */ + protected[streaming] override def makeScopeName(baseName: String): String = s"$baseName [$id]" /** * Checks whether the 'time' is valid wrt slideDuration for generating RDD. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index ae40c4b8f2fca..ed7da6dc1315e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -32,8 +32,6 @@ class QueueInputDStream[T: ClassTag]( defaultRDD: RDD[T] ) extends InputDStream[T](ssc) { - protected[streaming] override val customScopeName: Option[String] = Some(s"queue stream [$id]") - override def start() { } override def stop() { } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index cccfb8368a5d6..e2925b9e03ec3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -45,8 +45,6 @@ class RawInputDStream[T: ClassTag]( storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_ ) with Logging { - protected[streaming] override val customScopeName: Option[String] = Some(s"raw stream [$id]") - def getReceiver(): Receiver[T] = { new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]] } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 435c9623485cd..5cfe43a1ce726 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -40,8 +40,6 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { - protected[streaming] override val customScopeName: Option[String] = Some(s"receiver stream [$id]") - /** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 4d79a3f116534..8b72bcf20653d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -37,8 +37,6 @@ class SocketInputDStream[T: ClassTag]( storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_) { - protected[streaming] override val customScopeName: Option[String] = Some(s"socket stream [$id]") - def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala index c202254314973..624aa393668e9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala @@ -120,31 +120,16 @@ class DStreamScopeSuite extends FunSuite with BeforeAndAfter with BeforeAndAfter testStream(countStream) } - test("scoping with custom names") { - var baseScope: RDDOperationScope = null - var rddScope: RDDOperationScope = null - - /** Make a stream in our own scoped DStream operation. */ - def makeStream(customName: Option[String]): Unit = ssc.withScope { - val stream = new DummyInputDStream(ssc, customName) + test("scoping input streams") { + ssc.withNamedScope("dummy stream") { + val stream = new DummyInputDStream(ssc) stream.initialize(Time(0)) - val _baseScope = stream.baseScope.map(RDDOperationScope.fromJson) - val _rddScope = stream.getOrCompute(Time(1000)).get.scope - assertDefined(_baseScope, _rddScope) - baseScope = _baseScope.get - rddScope = _rddScope.get + val baseScope = stream.baseScope.map(RDDOperationScope.fromJson) + val rddScope = stream.getOrCompute(Time(1000)).get.scope + assertDefined(baseScope, rddScope) + assert(baseScope.get.name === "dummy stream") + assertScopeCorrect(baseScope.get.id, s"dummy stream [${stream.id}]", rddScope.get, 1000) } - - // By default, a DStream gets its scope name from the operation that created it - makeStream(customName = None) - assert(baseScope.name.startsWith("makeStream")) - assertScopeCorrect(baseScope, rddScope, 1000) - // If the DStream defines a custom scope name, however, use that instead of deriving it - // from the method. Custom scope names are used extensively by real InputDStreams, which - // are frequently created from methods with generic names (e.g. createStream) - makeStream(customName = Some("dummy stream")) - assert(baseScope.name.startsWith("makeStream")) // not used by RDDs - assertScopeCorrect(baseScope.id, "dummy stream", rddScope, 1000) } /** Assert that the RDD operation scope properties are not set in our SparkContext. */ @@ -184,12 +169,7 @@ class DStreamScopeSuite extends FunSuite with BeforeAndAfter with BeforeAndAfter /** * A dummy input stream that does absolutely nothing. */ -private class DummyInputDStream( - ssc: StreamingContext, - customName: Option[String] = None) - extends InputDStream[Int](ssc) { - - protected[streaming] override val customScopeName: Option[String] = customName +private class DummyInputDStream(ssc: StreamingContext) extends InputDStream[Int](ssc) { override def start(): Unit = { } override def stop(): Unit = { } override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int])