Skip to content

Commit

Permalink
Remove a few unnecessary withScopes on aliases
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 15, 2015
1 parent fa4e5fb commit 0ca1801
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
): ReceiverInputDStream[SparkFlumeEvent] = {
createStream(ssc, hostname, port, storageLevel, false)
}

Expand Down Expand Up @@ -73,7 +73,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
hostname: String,
port: Int
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port)
}

Expand All @@ -88,7 +88,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel, false)
}

Expand All @@ -105,7 +105,7 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
}

Expand All @@ -122,7 +122,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
): ReceiverInputDStream[SparkFlumeEvent] = {
createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
}

Expand All @@ -137,7 +137,7 @@ object FlumeUtils {
ssc: StreamingContext,
addresses: Seq[InetSocketAddress],
storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
): ReceiverInputDStream[SparkFlumeEvent] = {
createPollingStream(ssc, addresses, storageLevel,
DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
}
Expand Down Expand Up @@ -175,7 +175,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
hostname: String,
port: Int
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
}

Expand All @@ -192,7 +192,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
}

Expand All @@ -207,7 +207,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
addresses: Array[InetSocketAddress],
storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createPollingStream(jssc, addresses, storageLevel,
DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
}
Expand All @@ -229,7 +229,7 @@ object FlumeUtils {
storageLevel: StorageLevel,
maxBatchSize: Int,
parallelism: Int
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object MQTTUtils {
jssc: JavaStreamingContext,
brokerUrl: String,
topic: String
): JavaReceiverInputDStream[String] = jssc.ssc.withScope {
): JavaReceiverInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic)
}
Expand All @@ -69,7 +69,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
): JavaReceiverInputDStream[String] = jssc.ssc.withScope {
): JavaReceiverInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
*/
def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = {
jssc.ssc.withScope {
createStream(jssc.ssc, None)
}
createStream(jssc.ssc, None)
}

/**
Expand All @@ -68,7 +66,7 @@ object TwitterUtils {
* @param filters Set of filter strings to get only those tweets that match them
*/
def createStream(jssc: JavaStreamingContext, filters: Array[String]
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None, filters)
}

Expand All @@ -85,7 +83,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
filters: Array[String],
storageLevel: StorageLevel
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None, filters, storageLevel)
}

Expand All @@ -96,7 +94,7 @@ object TwitterUtils {
* @param twitterAuth Twitter4J Authorization
*/
def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth))
}

Expand All @@ -111,7 +109,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
twitterAuth: Authorization,
filters: Array[String]
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters)
}

Expand All @@ -127,7 +125,7 @@ object TwitterUtils {
twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.util.matching.Regex

import org.apache.spark.{SparkContext, Logging, SparkException}
import org.apache.spark.{Logging, SparkContext, SparkException}
import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
Expand Down Expand Up @@ -127,9 +127,7 @@ abstract class DStream[T: ClassTag] (

/**
* Make a scope name based on the given one.
*
* By default, this just returns the base name. Subclasses
* may optionally override this to provide custom scope names.
* Subclasses may optionally override this to provide custom scope names.
*/
protected[streaming] def makeScopeName(baseName: String): String = baseName

Expand Down Expand Up @@ -351,8 +349,8 @@ abstract class DStream[T: ClassTag] (
if (isTimeValid(time)) {
val rddOption = doCompute(time)

// Register the generated RDD for caching and checkpointing
rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
Expand All @@ -363,7 +361,6 @@ abstract class DStream[T: ClassTag] (
}
generatedRDDs.put(time, newRDD)
}

rddOption
} else {
None
Expand Down Expand Up @@ -739,9 +736,7 @@ abstract class DStream[T: ClassTag] (
* the same interval as this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
*/
def window(windowDuration: Duration): DStream[T] = ssc.withScope {
window(windowDuration, this.slideDuration)
}
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)

/**
* Return a new DStream in which each RDD contains all the elements in seen in a
Expand Down
Loading

0 comments on commit 0ca1801

Please sign in to comment.