Skip to content

Commit

Permalink
Pass in input stream name rather than defining it from within
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 15, 2015
1 parent 1af0b0e commit fa4e5fb
Show file tree
Hide file tree
Showing 21 changed files with 63 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[spark] object RDDOperationScope extends Logging {
sc: SparkContext,
allowNesting: Boolean = false)(body: => T): T = {
val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace"
val ourMethodName = stackTrace(1).getMethodName //
val ourMethodName = stackTrace(1).getMethodName // i.e. withScope
// Climb upwards to find the first method that's called something different
val callerMethodName = stackTrace
.find(_.getMethodName != ourMethodName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class FlumeInputDStream[T: ClassTag](
enableDecompression: Boolean
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {

protected[streaming] override val customScopeName: Option[String] = Some(s"input stream [$id]")

override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel, enableDecompression)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ private[streaming] class FlumePollingInputDStream[T: ClassTag](
storageLevel: StorageLevel
) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {

protected[streaming] override val customScopeName: Option[String] = {
Some(s"flume polling stream [$id]")
}

override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withNamedScope("flume stream") {
new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel, enableDecompression)
}

Expand Down Expand Up @@ -159,7 +159,7 @@ object FlumeUtils {
storageLevel: StorageLevel,
maxBatchSize: Int,
parallelism: Int
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withNamedScope("flume polling stream") {
new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
parallelism, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ class DirectKafkaInputDStream[
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)

protected[streaming] override val customScopeName: Option[String] = {
Some(s"kafka direct stream [$id]")
}

protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ class KafkaInputDStream[
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

protected[streaming] override val customScopeName: Option[String] = Some(s"kafka stream [$id]")

def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object KafkaUtils {
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[(String, String)] = ssc.withScope {
): ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
Expand All @@ -80,7 +80,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = ssc.withScope {
): ReceiverInputDStream[(K, V)] = ssc.withNamedScope("kafka stream") {
val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}
Expand All @@ -99,7 +99,7 @@ object KafkaUtils {
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
): JavaPairReceiverInputDStream[String, String] = jssc.ssc.withScope {
): JavaPairReceiverInputDStream[String, String] = {
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}

Expand All @@ -118,7 +118,7 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[String, String] = jssc.ssc.withScope {
): JavaPairReceiverInputDStream[String, String] = {
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}
Expand All @@ -145,7 +145,7 @@ object KafkaUtils {
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[K, V] = jssc.ssc.withScope {
): JavaPairReceiverInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)

Expand Down Expand Up @@ -295,7 +295,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange],
leaders: JMap[TopicAndPartition, Broker],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaRDD[R] = jsc.sc.withScope {
): JavaRDD[R] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
Expand Down Expand Up @@ -348,7 +348,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = ssc.withScope {
): InputDStream[R] = ssc.withNamedScope("kafka direct stream") {
val cleanedHandler = ssc.sc.clean(messageHandler)
new DirectKafkaInputDStream[K, V, KD, VD, R](
ssc, kafkaParams, fromOffsets, cleanedHandler)
Expand Down Expand Up @@ -394,7 +394,7 @@ object KafkaUtils {
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = ssc.withScope {
): InputDStream[(K, V)] = ssc.withNamedScope("kafka direct stream") {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
Expand Down Expand Up @@ -465,7 +465,7 @@ object KafkaUtils {
kafkaParams: JMap[String, String],
fromOffsets: JMap[TopicAndPartition, JLong],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaInputDStream[R] = jssc.ssc.withScope {
): JavaInputDStream[R] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
Expand Down Expand Up @@ -524,7 +524,7 @@ object KafkaUtils {
valueDecoderClass: Class[VD],
kafkaParams: JMap[String, String],
topics: JSet[String]
): JavaPairInputDStream[K, V] = jssc.ssc.withScope {
): JavaPairInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ class MQTTInputDStream(
storageLevel: StorageLevel
) extends ReceiverInputDStream[String](ssc_) {

protected[streaming] override val customScopeName: Option[String] = Some(s"MQTT stream [$id]")

def getReceiver(): Receiver[String] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = ssc.withScope {
): ReceiverInputDStream[String] = ssc.withNamedScope("MQTT stream") {
new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ class TwitterInputDStream(
storageLevel: StorageLevel
) extends ReceiverInputDStream[Status](ssc_) {

protected[streaming] override val customScopeName: Option[String] = Some(s"twitter stream [$id]")

private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object TwitterUtils {
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[Status] = ssc.withScope {
): ReceiverInputDStream[Status] = ssc.withNamedScope("twitter stream") {
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
Expand Down Expand Up @@ -245,28 +245,41 @@ class StreamingContext private[streaming] (
* Execute a block of code in a scope such that all new DStreams created in this body will
* be part of the same scope. For more detail, see the comments in `doCompute`.
*
* Note: Return statements are NOT allowed in the given body. Also, this currently does
* not handle multiple StreamingContexts sharing the same SparkContext gracefully.
* Note: Return statements are NOT allowed in the given body.
*/
private[streaming] def withScope[U](body: => U): U = sparkContext.withScope(body)

/**
* Execute a block of code in a scope such that all new DStreams created in this body will
* be part of the same scope. For more detail, see the comments in `doCompute`.
*
* Note: Return statements are NOT allowed in the given body.
*/
private[streaming] def withNamedScope[U](name: String)(body: => U): U = {
RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body)
}

/**
* Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
@deprecated("Use receiverStream", "1.0.0")
def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = withScope {
receiverStream(receiver)
def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
withNamedScope("network stream") {
receiverStream(receiver)
}
}

/**
* Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = withScope {
new PluggableInputDStream[T](this, receiver)
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
withNamedScope("receiver stream") {
new PluggableInputDStream[T](this, receiver)
}
}

/**
Expand All @@ -286,7 +299,7 @@ class StreamingContext private[streaming] (
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T] = withScope {
): ReceiverInputDStream[T] = withNamedScope("actor stream") {
receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
}

Expand All @@ -303,7 +316,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withScope {
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

Expand All @@ -322,7 +335,7 @@ class StreamingContext private[streaming] (
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = withScope {
): ReceiverInputDStream[T] = withNamedScope("socket stream") {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}

Expand All @@ -341,7 +354,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[T] = withScope {
): ReceiverInputDStream[T] = withNamedScope("raw socket stream") {
new RawInputDStream[T](this, hostname, port, storageLevel)
}

Expand All @@ -359,7 +372,7 @@ class StreamingContext private[streaming] (
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): InputDStream[(K, V)] = withScope {
] (directory: String): InputDStream[(K, V)] = withNamedScope("file stream") {
new FileInputDStream[K, V, F](this, directory)
}

Expand All @@ -380,7 +393,7 @@ class StreamingContext private[streaming] (
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
withScope {
withNamedScope("file stream") {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}
}
Expand All @@ -405,7 +418,7 @@ class StreamingContext private[streaming] (
] (directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean,
conf: Configuration): InputDStream[(K, V)] = withScope {
conf: Configuration): InputDStream[(K, V)] = withNamedScope("file stream") {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf))
}

Expand All @@ -417,7 +430,7 @@ class StreamingContext private[streaming] (
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = withScope {
def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}

Expand All @@ -439,7 +452,7 @@ class StreamingContext private[streaming] (
@Experimental
def binaryRecordsStream(
directory: String,
recordLength: Int): DStream[Array[Byte]] = withScope {
recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
val conf = sc_.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
Expand All @@ -462,7 +475,7 @@ class StreamingContext private[streaming] (
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true
): InputDStream[T] = withScope {
): InputDStream[T] = withNamedScope("queue stream") {
queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))
}

Expand All @@ -479,7 +492,7 @@ class StreamingContext private[streaming] (
queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
): InputDStream[T] = withScope {
): InputDStream[T] = withNamedScope("queue stream") {
new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import scala.reflect.ClassTag
class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
extends InputDStream[T](ssc_) {

protected[streaming] override val customScopeName: Option[String] = Some(s"constant stream [$id]")

override def start() {}

override def stop() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,12 @@ abstract class DStream[T: ClassTag] (
}

/**
* An optional custom name for all scopes generated by this DStream.
* If None, the name of the operation that created this DStream will be used.
* Make a scope name based on the given one.
*
* By default, this just returns the base name. Subclasses
* may optionally override this to provide custom scope names.
*/
protected[streaming] val customScopeName: Option[String] = None
protected[streaming] def makeScopeName(baseName: String): String = baseName

/**
* Make a scope that groups RDDs created in the same DStream operation in the same batch.
Expand All @@ -143,7 +145,7 @@ abstract class DStream[T: ClassTag] (
val formattedBatchTime =
UIUtils.formatBatchTime(time.milliseconds, ssc.graph.batchDuration.milliseconds)
val bscope = RDDOperationScope.fromJson(bsJson)
val baseName = customScopeName.getOrElse(bscope.name) // e.g. countByWindow
val baseName = makeScopeName(bscope.name) // e.g. countByWindow, "kafka stream [0]"
val scopeName =
if (baseName.length > 10) {
// If the operation name is too long, wrap the line
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null

protected[streaming] override val customScopeName: Option[String] = Some(s"file stream [$id]")

override def start() { }

override def stop() { }
Expand Down
Loading

0 comments on commit fa4e5fb

Please sign in to comment.