diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b59f562d05ead..cdde3762e3360 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -670,7 +670,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * Note: Return statements are NOT allowed in the given body. */ - private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body) + private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body) // Methods for creating RDDs diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 44dec45c227ca..fe39c9be2ce9b 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -41,7 +41,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[SparkFlumeEvent] = { + ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope { createStream(ssc, hostname, port, storageLevel, false) } @@ -59,11 +59,8 @@ object FlumeUtils { port: Int, storageLevel: StorageLevel, enableDecompression: Boolean - ): ReceiverInputDStream[SparkFlumeEvent] = { - val inputStream = new FlumeInputDStream[SparkFlumeEvent]( - ssc, hostname, port, storageLevel, enableDecompression) - - inputStream + ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope { + new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel, enableDecompression) } /** @@ -76,7 +73,7 @@ object FlumeUtils { jssc: JavaStreamingContext, hostname: String, port: Int - ): JavaReceiverInputDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { createStream(jssc.ssc, hostname, port) } @@ -91,7 +88,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel - ): JavaReceiverInputDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { createStream(jssc.ssc, hostname, port, storageLevel, false) } @@ -108,7 +105,7 @@ object FlumeUtils { port: Int, storageLevel: StorageLevel, enableDecompression: Boolean - ): JavaReceiverInputDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression) } @@ -125,7 +122,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[SparkFlumeEvent] = { + ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope { createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel) } @@ -140,7 +137,7 @@ object FlumeUtils { ssc: StreamingContext, addresses: Seq[InetSocketAddress], storageLevel: StorageLevel - ): ReceiverInputDStream[SparkFlumeEvent] = { + ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope { createPollingStream(ssc, addresses, storageLevel, DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) } @@ -162,7 +159,7 @@ object FlumeUtils { storageLevel: StorageLevel, maxBatchSize: Int, parallelism: Int - ): ReceiverInputDStream[SparkFlumeEvent] = { + ): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope { new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize, parallelism, storageLevel) } @@ -178,7 +175,7 @@ object FlumeUtils { jssc: JavaStreamingContext, hostname: String, port: Int - ): JavaReceiverInputDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2) } @@ -195,7 +192,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel - ): JavaReceiverInputDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel) } @@ -210,7 +207,7 @@ object FlumeUtils { jssc: JavaStreamingContext, addresses: Array[InetSocketAddress], storageLevel: StorageLevel - ): JavaReceiverInputDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { createPollingStream(jssc, addresses, storageLevel, DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) } @@ -232,7 +229,7 @@ object FlumeUtils { storageLevel: StorageLevel, maxBatchSize: Int, parallelism: Int - ): JavaReceiverInputDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope { createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism) } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index d7cf500577c2a..bda7831d2bc38 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -58,7 +58,7 @@ object KafkaUtils { groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[(String, String)] = { + ): ReceiverInputDStream[(String, String)] = ssc.withScope { val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000") @@ -80,7 +80,7 @@ object KafkaUtils { kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ): ReceiverInputDStream[(K, V)] = { + ): ReceiverInputDStream[(K, V)] = ssc.withScope { val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) } @@ -99,7 +99,7 @@ object KafkaUtils { zkQuorum: String, groupId: String, topics: JMap[String, JInt] - ): JavaPairReceiverInputDStream[String, String] = { + ): JavaPairReceiverInputDStream[String, String] = jssc.ssc.withScope { createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) } @@ -118,7 +118,7 @@ object KafkaUtils { groupId: String, topics: JMap[String, JInt], storageLevel: StorageLevel - ): JavaPairReceiverInputDStream[String, String] = { + ): JavaPairReceiverInputDStream[String, String] = jssc.ssc.withScope { createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } @@ -145,7 +145,7 @@ object KafkaUtils { kafkaParams: JMap[String, String], topics: JMap[String, JInt], storageLevel: StorageLevel - ): JavaPairReceiverInputDStream[K, V] = { + ): JavaPairReceiverInputDStream[K, V] = jssc.ssc.withScope { implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass) @@ -189,7 +189,7 @@ object KafkaUtils { sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange] - ): RDD[(K, V)] = { + ): RDD[(K, V)] = sc.withScope { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val leaders = leadersForRanges(kafkaParams, offsetRanges) new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) @@ -224,7 +224,7 @@ object KafkaUtils { offsetRanges: Array[OffsetRange], leaders: Map[TopicAndPartition, Broker], messageHandler: MessageAndMetadata[K, V] => R - ): RDD[R] = { + ): RDD[R] = sc.withScope { val leaderMap = if (leaders.isEmpty) { leadersForRanges(kafkaParams, offsetRanges) } else { @@ -256,7 +256,7 @@ object KafkaUtils { valueDecoderClass: Class[VD], kafkaParams: JMap[String, String], offsetRanges: Array[OffsetRange] - ): JavaPairRDD[K, V] = { + ): JavaPairRDD[K, V] = jsc.sc.withScope { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) @@ -294,7 +294,7 @@ object KafkaUtils { offsetRanges: Array[OffsetRange], leaders: JMap[TopicAndPartition, Broker], messageHandler: JFunction[MessageAndMetadata[K, V], R] - ): JavaRDD[R] = { + ): JavaRDD[R] = jsc.sc.withScope { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) @@ -347,7 +347,7 @@ object KafkaUtils { kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R - ): InputDStream[R] = { + ): InputDStream[R] = ssc.withScope { new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, messageHandler) } @@ -392,7 +392,7 @@ object KafkaUtils { ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] - ): InputDStream[(K, V)] = { + ): InputDStream[(K, V)] = ssc.withScope { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val kc = new KafkaCluster(kafkaParams) val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) @@ -463,7 +463,7 @@ object KafkaUtils { kafkaParams: JMap[String, String], fromOffsets: JMap[TopicAndPartition, JLong], messageHandler: JFunction[MessageAndMetadata[K, V], R] - ): JavaInputDStream[R] = { + ): JavaInputDStream[R] = jssc.ssc.withScope { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) @@ -521,7 +521,7 @@ object KafkaUtils { valueDecoderClass: Class[VD], kafkaParams: JMap[String, String], topics: JSet[String] - ): JavaPairInputDStream[K, V] = { + ): JavaPairInputDStream[K, V] = jssc.ssc.withScope { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 1142d0f56ba34..42f1dfd1c601f 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -21,8 +21,8 @@ import scala.reflect.ClassTag import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream} -import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream object MQTTUtils { /** @@ -37,7 +37,7 @@ object MQTTUtils { brokerUrl: String, topic: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[String] = { + ): ReceiverInputDStream[String] = ssc.withScope { new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel) } @@ -52,7 +52,7 @@ object MQTTUtils { jssc: JavaStreamingContext, brokerUrl: String, topic: String - ): JavaReceiverInputDStream[String] = { + ): JavaReceiverInputDStream[String] = jssc.ssc.withScope { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, brokerUrl, topic) } @@ -69,7 +69,7 @@ object MQTTUtils { brokerUrl: String, topic: String, storageLevel: StorageLevel - ): JavaReceiverInputDStream[String] = { + ): JavaReceiverInputDStream[String] = jssc.ssc.withScope { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, brokerUrl, topic, storageLevel) } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index c6a9a2b73714f..1df984ce0a1ba 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -21,8 +21,8 @@ import twitter4j.Status import twitter4j.auth.Authorization import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext} -import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream object TwitterUtils { /** @@ -40,7 +40,7 @@ object TwitterUtils { twitterAuth: Option[Authorization], filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[Status] = { + ): ReceiverInputDStream[Status] = ssc.withScope { new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) } @@ -53,7 +53,9 @@ object TwitterUtils { * @param jssc JavaStreamingContext object */ def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = { - createStream(jssc.ssc, None) + jssc.ssc.withScope { + createStream(jssc.ssc, None) + } } /** @@ -66,7 +68,7 @@ object TwitterUtils { * @param filters Set of filter strings to get only those tweets that match them */ def createStream(jssc: JavaStreamingContext, filters: Array[String] - ): JavaReceiverInputDStream[Status] = { + ): JavaReceiverInputDStream[Status] = jssc.ssc.withScope { createStream(jssc.ssc, None, filters) } @@ -83,7 +85,7 @@ object TwitterUtils { jssc: JavaStreamingContext, filters: Array[String], storageLevel: StorageLevel - ): JavaReceiverInputDStream[Status] = { + ): JavaReceiverInputDStream[Status] = jssc.ssc.withScope { createStream(jssc.ssc, None, filters, storageLevel) } @@ -94,7 +96,7 @@ object TwitterUtils { * @param twitterAuth Twitter4J Authorization */ def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization - ): JavaReceiverInputDStream[Status] = { + ): JavaReceiverInputDStream[Status] = jssc.ssc.withScope { createStream(jssc.ssc, Some(twitterAuth)) } @@ -109,7 +111,7 @@ object TwitterUtils { jssc: JavaStreamingContext, twitterAuth: Authorization, filters: Array[String] - ): JavaReceiverInputDStream[Status] = { + ): JavaReceiverInputDStream[Status] = jssc.ssc.withScope { createStream(jssc.ssc, Some(twitterAuth), filters) } @@ -125,7 +127,7 @@ object TwitterUtils { twitterAuth: Authorization, filters: Array[String], storageLevel: StorageLevel - ): JavaReceiverInputDStream[Status] = { + ): JavaReceiverInputDStream[Status] = jssc.ssc.withScope { createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 407cab45ed4c6..7f747a8bd4712 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -241,14 +241,22 @@ class StreamingContext private[streaming] ( private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement() + /** + * Execute a block of code in a scope such that all new DStreams created in this body will + * be part of the same scope. For more detail, see the comments in `doCompute`. + * + * Note: Return statements are NOT allowed in the given body. Also, this currently does + * not handle multiple StreamingContexts sharing the same SparkContext gracefully. + */ + private[streaming] def withScope[U](body: => U): U = sparkContext.withScope(body) + /** * Create an input stream with any arbitrary user implemented receiver. * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver */ @deprecated("Use receiverStream", "1.0.0") - def networkStream[T: ClassTag]( - receiver: Receiver[T]): ReceiverInputDStream[T] = { + def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = withScope { receiverStream(receiver) } @@ -257,8 +265,7 @@ class StreamingContext private[streaming] ( * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver */ - def receiverStream[T: ClassTag]( - receiver: Receiver[T]): ReceiverInputDStream[T] = { + def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = withScope { new PluggableInputDStream[T](this, receiver) } @@ -279,7 +286,7 @@ class StreamingContext private[streaming] ( name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy - ): ReceiverInputDStream[T] = { + ): ReceiverInputDStream[T] = withScope { receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) } @@ -296,7 +303,7 @@ class StreamingContext private[streaming] ( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[String] = { + ): ReceiverInputDStream[String] = withScope { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } @@ -315,7 +322,7 @@ class StreamingContext private[streaming] ( port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel - ): ReceiverInputDStream[T] = { + ): ReceiverInputDStream[T] = withScope { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) } @@ -334,7 +341,7 @@ class StreamingContext private[streaming] ( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[T] = { + ): ReceiverInputDStream[T] = withScope { new RawInputDStream[T](this, hostname, port, storageLevel) } @@ -352,7 +359,7 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String): InputDStream[(K, V)] = { + ] (directory: String): InputDStream[(K, V)] = withScope { new FileInputDStream[K, V, F](this, directory) } @@ -373,7 +380,9 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) + withScope { + new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) + } } /** @@ -396,7 +405,7 @@ class StreamingContext private[streaming] ( ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean, - conf: Configuration): InputDStream[(K, V)] = { + conf: Configuration): InputDStream[(K, V)] = withScope { new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) } @@ -408,7 +417,7 @@ class StreamingContext private[streaming] ( * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ - def textFileStream(directory: String): DStream[String] = { + def textFileStream(directory: String): DStream[String] = withScope { fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } @@ -430,7 +439,7 @@ class StreamingContext private[streaming] ( @Experimental def binaryRecordsStream( directory: String, - recordLength: Int): DStream[Array[Byte]] = { + recordLength: Int): DStream[Array[Byte]] = withScope { val conf = sc_.hadoopConfiguration conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat]( @@ -453,7 +462,7 @@ class StreamingContext private[streaming] ( def queueStream[T: ClassTag]( queue: Queue[RDD[T]], oneAtATime: Boolean = true - ): InputDStream[T] = { + ): InputDStream[T] = withScope { queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) } @@ -470,14 +479,14 @@ class StreamingContext private[streaming] ( queue: Queue[RDD[T]], oneAtATime: Boolean, defaultRDD: RDD[T] - ): InputDStream[T] = { + ): InputDStream[T] = withScope { new QueueInputDStream(this, queue, oneAtATime, defaultRDD) } /** * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ - def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = { + def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = withScope { new UnionDStream[T](streams.toArray) } @@ -488,7 +497,7 @@ class StreamingContext private[streaming] ( def transform[T: ClassTag]( dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) => RDD[T] - ): DStream[T] = { + ): DStream[T] = withScope { new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index ee5d500305fcc..dabd3071de060 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -355,7 +355,7 @@ abstract class DStream[T: ClassTag] ( try { ssc.sparkContext.setCallSite(creationSite) // Use the DStream's scope for this RDD so we preserve the name of the operation that - // created the DStream. Note that this is equivalent to {{RDDOperationScope.withScope}} + // created the DStream. Note that this is equivalent to {{RDDOperationScope.ssc.withScope}} // with `allowNesting = false` and `ignoreParent = true`. // TODO: merge callsites with scopes so we can just reuse the code there scope.foreach { s => @@ -462,16 +462,6 @@ abstract class DStream[T: ClassTag] ( logInfo("Restored checkpoint data") } - /** - * Execute a block of code in a scope such that all new DStreams created in this body will - * be part of the same scope. For more detail, see the comments in `doCompute`. - * - * Note: Return statements are NOT allowed in the given body. - */ - private[streaming] def withScope[U](body: => U): U = { - RDDOperationScope.withScope[U](ssc.sparkContext)(body) - } - @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { logDebug(this.getClass().getSimpleName + ".writeObject used") @@ -507,7 +497,7 @@ abstract class DStream[T: ClassTag] ( // ======================================================================= /** Return a new DStream by applying a function to all elements of this DStream. */ - def map[U: ClassTag](mapFunc: T => U): DStream[U] = withScope { + def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope { new MappedDStream(this, context.sparkContext.clean(mapFunc)) } @@ -515,12 +505,12 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ - def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = withScope { + def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) } /** Return a new DStream containing only the elements that satisfy a predicate. */ - def filter(filterFunc: T => Boolean): DStream[T] = withScope { + def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope { new FilteredDStream(this, filterFunc) } @@ -529,7 +519,7 @@ abstract class DStream[T: ClassTag] ( * this DStream. Applying glom() to an RDD coalesces all elements within each partition into * an array. */ - def glom(): DStream[Array[T]] = withScope { + def glom(): DStream[Array[T]] = ssc.withScope { new GlommedDStream(this) } @@ -537,7 +527,7 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the * returned DStream has exactly numPartitions partitions. */ - def repartition(numPartitions: Int): DStream[T] = withScope { + def repartition(numPartitions: Int): DStream[T] = ssc.withScope { this.transform(_.repartition(numPartitions)) } @@ -549,7 +539,7 @@ abstract class DStream[T: ClassTag] ( def mapPartitions[U: ClassTag]( mapPartFunc: Iterator[T] => Iterator[U], preservePartitioning: Boolean = false - ): DStream[U] = withScope { + ): DStream[U] = ssc.withScope { new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning) } @@ -557,7 +547,7 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream in which each RDD has a single element generated by reducing each RDD * of this DStream. */ - def reduce(reduceFunc: (T, T) => T): DStream[T] = withScope { + def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope { this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) } @@ -565,7 +555,7 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ - def count(): DStream[Long] = withScope { + def count(): DStream[Long] = ssc.withScope { this.map(_ => (null, 1L)) .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))) .reduceByKey(_ + _) @@ -579,7 +569,7 @@ abstract class DStream[T: ClassTag] ( * `numPartitions` not specified). */ def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null) - : DStream[(T, Long)] = withScope { + : DStream[(T, Long)] = ssc.withScope { this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) } @@ -588,7 +578,7 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ @deprecated("use foreachRDD", "0.9.0") - def foreach(foreachFunc: RDD[T] => Unit): Unit = withScope { + def foreach(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { this.foreachRDD(foreachFunc) } @@ -597,7 +587,7 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ @deprecated("use foreachRDD", "0.9.0") - def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = withScope { + def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope { this.foreachRDD(foreachFunc) } @@ -605,7 +595,7 @@ abstract class DStream[T: ClassTag] ( * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = withScope { + def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r)) } @@ -613,7 +603,7 @@ abstract class DStream[T: ClassTag] ( * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = withScope { + def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean @@ -624,7 +614,7 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ - def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = withScope { + def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean @@ -636,7 +626,7 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ - def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = withScope { + def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean @@ -654,7 +644,7 @@ abstract class DStream[T: ClassTag] ( */ def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] - ): DStream[V] = withScope { + ): DStream[V] = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean @@ -668,7 +658,7 @@ abstract class DStream[T: ClassTag] ( */ def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] - ): DStream[V] = withScope { + ): DStream[V] = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean @@ -686,7 +676,7 @@ abstract class DStream[T: ClassTag] ( * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ - def print(): Unit = withScope { + def print(): Unit = ssc.withScope { print(10) } @@ -694,7 +684,7 @@ abstract class DStream[T: ClassTag] ( * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ - def print(num: Int): Unit = withScope { + def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) @@ -715,7 +705,7 @@ abstract class DStream[T: ClassTag] ( * the same interval as this DStream. * @param windowDuration width of the window; must be a multiple of this DStream's interval. */ - def window(windowDuration: Duration): DStream[T] = withScope { + def window(windowDuration: Duration): DStream[T] = ssc.withScope { window(windowDuration, this.slideDuration) } @@ -728,7 +718,7 @@ abstract class DStream[T: ClassTag] ( * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ - def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = withScope { + def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope { new WindowedDStream(this, windowDuration, slideDuration) } @@ -746,7 +736,7 @@ abstract class DStream[T: ClassTag] ( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration - ): DStream[T] = withScope { + ): DStream[T] = ssc.withScope { this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) } @@ -771,7 +761,7 @@ abstract class DStream[T: ClassTag] ( invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration - ): DStream[T] = withScope { + ): DStream[T] = ssc.withScope { this.map(x => (1, x)) .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) .map(_._2) @@ -787,7 +777,9 @@ abstract class DStream[T: ClassTag] ( * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ - def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = withScope { + def countByWindow( + windowDuration: Duration, + slideDuration: Duration): DStream[Long] = ssc.withScope { this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) } @@ -808,7 +800,7 @@ abstract class DStream[T: ClassTag] ( slideDuration: Duration, numPartitions: Int = ssc.sc.defaultParallelism) (implicit ord: Ordering[T] = null) - : DStream[(T, Long)] = withScope { + : DStream[(T, Long)] = ssc.withScope { this.map(x => (x, 1L)).reduceByKeyAndWindow( (x: Long, y: Long) => x + y, (x: Long, y: Long) => x - y, @@ -823,21 +815,21 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same slideDuration as this DStream. */ - def union(that: DStream[T]): DStream[T] = withScope { + def union(that: DStream[T]): DStream[T] = ssc.withScope { new UnionDStream[T](Array(this, that)) } /** * Return all the RDDs defined by the Interval object (both end times included) */ - def slice(interval: Interval): Seq[RDD[T]] = withScope { + def slice(interval: Interval): Seq[RDD[T]] = ssc.withScope { slice(interval.beginTime, interval.endTime) } /** * Return all the RDDs between 'fromTime' to 'toTime' (both included) */ - def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = withScope { + def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = ssc.withScope { if (!isInitialized) { throw new SparkException(this + " has not been initialized") } @@ -871,7 +863,7 @@ abstract class DStream[T: ClassTag] ( * The file name at each batch interval is generated based on `prefix` and * `suffix`: "prefix-TIME_IN_MS.suffix". */ - def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = withScope { + def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsObjectFile(file) @@ -884,7 +876,7 @@ abstract class DStream[T: ClassTag] ( * of elements. The file name at each batch interval is generated based on * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ - def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = withScope { + def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index b0c27db956298..93c6d9d76f33a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -46,7 +46,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. */ - def groupByKey(): DStream[(K, Iterable[V])] = self.withScope { + def groupByKey(): DStream[(K, Iterable[V])] = self.ssc.withScope { groupByKey(defaultPartitioner()) } @@ -54,7 +54,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = self.withScope { + def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = self.ssc.withScope { groupByKey(defaultPartitioner(numPartitions)) } @@ -62,7 +62,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` on each RDD. The supplied * org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ - def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = self.withScope { + def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = self.ssc.withScope { val createCombiner = (v: V) => ArrayBuffer[V](v) val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) @@ -75,7 +75,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * merged using the associative reduce function. Hash partitioning is used to generate the RDDs * with Spark's default number of partitions. */ - def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = self.withScope { + def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = self.ssc.withScope { reduceByKey(reduceFunc, defaultPartitioner()) } @@ -84,7 +84,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs * with `numPartitions` partitions. */ - def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = self.withScope { + def reduceByKey( + reduceFunc: (V, V) => V, + numPartitions: Int): DStream[(K, V)] = self.ssc.withScope { reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) } @@ -95,7 +97,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def reduceByKey( reduceFunc: (V, V) => V, - partitioner: Partitioner): DStream[(K, V)] = self.withScope { + partitioner: Partitioner): DStream[(K, V)] = self.ssc.withScope { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) } @@ -110,7 +112,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner, - mapSideCombine: Boolean = true): DStream[(K, C)] = self.withScope { + mapSideCombine: Boolean = true): DStream[(K, C)] = self.ssc.withScope { new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) } @@ -123,8 +125,10 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ - def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = self.withScope { - groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) + def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = { + self.ssc.withScope { + groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) + } } /** @@ -138,7 +142,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * DStream's batching interval */ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) - : DStream[(K, Iterable[V])] = self.withScope { + : DStream[(K, Iterable[V])] = self.ssc.withScope { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) } @@ -158,7 +162,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, numPartitions: Int - ): DStream[(K, Iterable[V])] = self.withScope { + ): DStream[(K, Iterable[V])] = self.ssc.withScope { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) } @@ -177,7 +181,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner - ): DStream[(K, Iterable[V])] = self.withScope { + ): DStream[(K, Iterable[V])] = self.ssc.withScope { val createCombiner = (v: Iterable[V]) => new ArrayBuffer[V] ++= v val mergeValue = (buf: ArrayBuffer[V], v: Iterable[V]) => buf ++= v val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2 @@ -199,7 +203,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowDuration: Duration - ): DStream[(K, V)] = self.withScope { + ): DStream[(K, V)] = self.ssc.withScope { reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) } @@ -218,7 +222,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration - ): DStream[(K, V)] = self.withScope { + ): DStream[(K, V)] = self.ssc.withScope { reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) } @@ -239,7 +243,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, numPartitions: Int - ): DStream[(K, V)] = self.withScope { + ): DStream[(K, V)] = self.ssc.withScope { reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) } @@ -261,7 +265,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner - ): DStream[(K, V)] = self.withScope { + ): DStream[(K, V)] = self.ssc.withScope { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) self.reduceByKey(cleanedReduceFunc, partitioner) .window(windowDuration, slideDuration) @@ -295,7 +299,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) slideDuration: Duration = self.slideDuration, numPartitions: Int = ssc.sc.defaultParallelism, filterFunc: ((K, V)) => Boolean = null - ): DStream[(K, V)] = self.withScope { + ): DStream[(K, V)] = self.ssc.withScope { reduceByKeyAndWindow( reduceFunc, invReduceFunc, windowDuration, @@ -329,7 +333,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) slideDuration: Duration, partitioner: Partitioner, filterFunc: ((K, V)) => Boolean - ): DStream[(K, V)] = self.withScope { + ): DStream[(K, V)] = self.ssc.withScope { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) @@ -350,7 +354,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] - ): DStream[(K, S)] = self.withScope { + ): DStream[(K, S)] = self.ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner()) } @@ -366,7 +370,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int - ): DStream[(K, S)] = self.withScope { + ): DStream[(K, S)] = self.ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) } @@ -383,7 +387,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner - ): DStream[(K, S)] = self.withScope { + ): DStream[(K, S)] = self.ssc.withScope { val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } @@ -407,7 +411,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean - ): DStream[(K, S)] = self.withScope { + ): DStream[(K, S)] = self.ssc.withScope { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None) } @@ -426,7 +430,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner, initialRDD: RDD[(K, S)] - ): DStream[(K, S)] = self.withScope { + ): DStream[(K, S)] = self.ssc.withScope { val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } @@ -452,7 +456,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) partitioner: Partitioner, rememberPartitioner: Boolean, initialRDD: RDD[(K, S)] - ): DStream[(K, S)] = self.withScope { + ): DStream[(K, S)] = self.ssc.withScope { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, Some(initialRDD)) } @@ -461,7 +465,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying a map function to the value of each key-value pairs in * 'this' DStream without changing the key. */ - def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = self.withScope { + def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = self.ssc.withScope { new MapValuedDStream[K, V, U](self, mapValuesFunc) } @@ -471,7 +475,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def flatMapValues[U: ClassTag]( flatMapValuesFunc: V => TraversableOnce[U] - ): DStream[(K, U)] = self.withScope { + ): DStream[(K, U)] = self.ssc.withScope { new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) } @@ -481,7 +485,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * of partitions. */ def cogroup[W: ClassTag]( - other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = self.withScope { + other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = self.ssc.withScope { cogroup(other, defaultPartitioner()) } @@ -491,7 +495,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def cogroup[W: ClassTag]( other: DStream[(K, W)], - numPartitions: Int): DStream[(K, (Iterable[V], Iterable[W]))] = self.withScope { + numPartitions: Int): DStream[(K, (Iterable[V], Iterable[W]))] = self.ssc.withScope { cogroup(other, defaultPartitioner(numPartitions)) } @@ -502,7 +506,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def cogroup[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (Iterable[V], Iterable[W]))] = self.withScope { + ): DStream[(K, (Iterable[V], Iterable[W]))] = self.ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) @@ -513,7 +517,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ - def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = self.withScope { + def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = self.ssc.withScope { join[W](other, defaultPartitioner()) } @@ -523,7 +527,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) */ def join[W: ClassTag]( other: DStream[(K, W)], - numPartitions: Int): DStream[(K, (V, W))] = self.withScope { + numPartitions: Int): DStream[(K, (V, W))] = self.ssc.withScope { join[W](other, defaultPartitioner(numPartitions)) } @@ -534,7 +538,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def join[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (V, W))] = self.withScope { + ): DStream[(K, (V, W))] = self.ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner) @@ -547,7 +551,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * number of partitions. */ def leftOuterJoin[W: ClassTag]( - other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = self.withScope { + other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = self.ssc.withScope { leftOuterJoin[W](other, defaultPartitioner()) } @@ -559,7 +563,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int - ): DStream[(K, (V, Option[W]))] = self.withScope { + ): DStream[(K, (V, Option[W]))] = self.ssc.withScope { leftOuterJoin[W](other, defaultPartitioner(numPartitions)) } @@ -571,7 +575,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def leftOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (V, Option[W]))] = self.withScope { + ): DStream[(K, (V, Option[W]))] = self.ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner) @@ -584,7 +588,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * number of partitions. */ def rightOuterJoin[W: ClassTag]( - other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = self.withScope { + other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = self.ssc.withScope { rightOuterJoin[W](other, defaultPartitioner()) } @@ -596,7 +600,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int - ): DStream[(K, (Option[V], W))] = self.withScope { + ): DStream[(K, (Option[V], W))] = self.ssc.withScope { rightOuterJoin[W](other, defaultPartitioner(numPartitions)) } @@ -608,7 +612,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def rightOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (Option[V], W))] = self.withScope { + ): DStream[(K, (Option[V], W))] = self.ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner) @@ -621,7 +625,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * number of partitions. */ def fullOuterJoin[W: ClassTag]( - other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = self.withScope { + other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = self.ssc.withScope { fullOuterJoin[W](other, defaultPartitioner()) } @@ -633,7 +637,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def fullOuterJoin[W: ClassTag]( other: DStream[(K, W)], numPartitions: Int - ): DStream[(K, (Option[V], Option[W]))] = self.withScope { + ): DStream[(K, (Option[V], Option[W]))] = self.ssc.withScope { fullOuterJoin[W](other, defaultPartitioner(numPartitions)) } @@ -645,7 +649,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def fullOuterJoin[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (Option[V], Option[W]))] = self.withScope { + ): DStream[(K, (Option[V], Option[W]))] = self.ssc.withScope { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner) @@ -659,7 +663,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def saveAsHadoopFiles[F <: OutputFormat[K, V]]( prefix: String, suffix: String - )(implicit fm: ClassTag[F]): Unit = self.withScope { + )(implicit fm: ClassTag[F]): Unit = self.ssc.withScope { saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -675,7 +679,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration) - ): Unit = self.withScope { + ): Unit = self.ssc.withScope { // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints val serializableConf = new SerializableWritable(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { @@ -692,7 +696,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( prefix: String, suffix: String - )(implicit fm: ClassTag[F]): Unit = self.withScope { + )(implicit fm: ClassTag[F]): Unit = self.ssc.withScope { saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -708,7 +712,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = ssc.sparkContext.hadoopConfiguration - ): Unit = self.withScope { + ): Unit = self.ssc.withScope { // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints val serializableConf = new SerializableWritable(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => {