Skip to content

Commit

Permalink
Made functions that create input streams return InputDStream and Netw…
Browse files Browse the repository at this point in the history
…orkInputDStream, for both Scala and Java.
  • Loading branch information
tdas committed Apr 16, 2014
1 parent 2c94579 commit 43f5290
Show file tree
Hide file tree
Showing 24 changed files with 289 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.api.java.{JavaNetworkInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}

object FlumeUtils {
/**
Expand All @@ -35,7 +35,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
): NetworkInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
inputStream
}
Expand All @@ -50,7 +50,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
hostname: String,
port: Int
): JavaDStream[SparkFlumeEvent] = {
): JavaNetworkInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port)
}

Expand All @@ -65,7 +65,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel
): JavaDStream[SparkFlumeEvent] = {
): JavaNetworkInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;

import org.apache.spark.streaming.api.java.JavaNetworkInputDStream;
import org.junit.Test;

public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
@Test
public void testFlumeStream() {
// tests the API, does not actually test data receiving
JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
JavaNetworkInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaNetworkInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.api.java.JavaNetworkInputDStream

class FlumeStreamSuite extends TestSuiteBase {

Expand All @@ -39,10 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val flumeStream: JavaNetworkInputDStream[SparkFlumeEvent] =
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
val outputStream = new TestOutputStream(flumeStream.networkInputDStream, outputBuffer)
outputStream.register()
ssc.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.api.java.{JavaPairNetworkInputDStream, JavaStreamingContext, JavaPairDStream}
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}


object KafkaUtils {
Expand All @@ -48,7 +48,7 @@ object KafkaUtils {
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[(String, String)] = {
): NetworkInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
Expand All @@ -70,7 +70,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): DStream[(K, V)] = {
): NetworkInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
}

Expand All @@ -88,7 +88,7 @@ object KafkaUtils {
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
): JavaPairDStream[String, String] = {
): JavaPairNetworkInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
Expand All @@ -110,7 +110,7 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairDStream[String, String] = {
): JavaPairNetworkInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
Expand Down Expand Up @@ -139,7 +139,7 @@ object KafkaUtils {
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairDStream[K, V] = {
): JavaPairNetworkInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val valueCmt: ClassTag[V] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.streaming.kafka;

import java.util.HashMap;

import org.apache.spark.streaming.api.java.JavaPairNetworkInputDStream;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
Expand All @@ -31,14 +33,15 @@ public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();

// tests the API, does not actually test data receiving
JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
JavaPairNetworkInputDStream<String, String> test1 =
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
JavaPairNetworkInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());

HashMap<String, String> kafkaParams = Maps.newHashMap();
kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc,
JavaPairNetworkInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.NetworkInputDStream

class KafkaStreamSuite extends TestSuiteBase {

Expand All @@ -28,11 +29,15 @@ class KafkaStreamSuite extends TestSuiteBase {
val topics = Map("my-topic" -> 1)

// tests the API, does not actually test data receiving
val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val test1: NetworkInputDStream[(String, String)] =
KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
val test2: NetworkInputDStream[(String, String)] =
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
val test3: NetworkInputDStream[(String, String)] =
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
assert(test1.isInstanceOf)

// TODO: Actually test receiving data
ssc.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.streaming.mqtt

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.api.java.{JavaNetworkInputDStream, JavaStreamingContext, JavaDStream}
import scala.reflect.ClassTag
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}

object MQTTUtils {
/**
Expand All @@ -36,7 +36,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[String] = {
): NetworkInputDStream[String] = {
new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
}

Expand All @@ -51,7 +51,7 @@ object MQTTUtils {
jssc: JavaStreamingContext,
brokerUrl: String,
topic: String
): JavaDStream[String] = {
): JavaNetworkInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic)
}
Expand All @@ -68,7 +68,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
): JavaDStream[String] = {
): JavaNetworkInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaNetworkInputDStream;
import org.apache.spark.streaming.api.java.JavaPairNetworkInputDStream;
import org.junit.Test;

import org.apache.spark.streaming.LocalJavaStreamingContext;
Expand All @@ -30,8 +32,8 @@ public void testMQTTStream() {
String topic = "def";

// tests the API, does not actually test data receiving
JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
JavaNetworkInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
JavaNetworkInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt

import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.NetworkInputDStream

class MQTTStreamSuite extends TestSuiteBase {

Expand All @@ -28,8 +29,9 @@ class MQTTStreamSuite extends TestSuiteBase {
val topic = "def"

// tests the API, does not actually test data receiving
val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic)
val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
val test1: NetworkInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
val test2: NetworkInputDStream[String] =
MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)

// TODO: Actually test receiving data
ssc.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.api.java.{JavaNetworkInputDStream, JavaDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}

object TwitterUtils {
/**
Expand All @@ -40,7 +40,7 @@ object TwitterUtils {
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[Status] = {
): NetworkInputDStream[Status] = {
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
}

Expand All @@ -52,7 +52,7 @@ object TwitterUtils {
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
*/
def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
def createStream(jssc: JavaStreamingContext): JavaNetworkInputDStream[Status] = {
createStream(jssc.ssc, None)
}

Expand All @@ -65,7 +65,8 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
* @param filters Set of filter strings to get only those tweets that match them
*/
def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = {
def createStream(jssc: JavaStreamingContext, filters: Array[String]
): JavaNetworkInputDStream[Status] = {
createStream(jssc.ssc, None, filters)
}

Expand All @@ -82,7 +83,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
filters: Array[String],
storageLevel: StorageLevel
): JavaDStream[Status] = {
): JavaNetworkInputDStream[Status] = {
createStream(jssc.ssc, None, filters, storageLevel)
}

Expand All @@ -92,7 +93,8 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
*/
def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = {
def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
): JavaNetworkInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth))
}

Expand All @@ -107,7 +109,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
twitterAuth: Authorization,
filters: Array[String]
): JavaDStream[Status] = {
): JavaNetworkInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters)
}

Expand All @@ -123,7 +125,7 @@ object TwitterUtils {
twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
): JavaDStream[Status] = {
): JavaNetworkInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.streaming.twitter
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.{NullAuthorization, Authorization}
import org.apache.spark.streaming.dstream.NetworkInputDStream
import twitter4j.Status

class TwitterStreamSuite extends TestSuiteBase {

Expand All @@ -29,13 +31,17 @@ class TwitterStreamSuite extends TestSuiteBase {
val authorization: Authorization = NullAuthorization.getInstance()

// tests the API, does not actually test data receiving
val test1 = TwitterUtils.createStream(ssc, None)
val test2 = TwitterUtils.createStream(ssc, None, filters)
val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
val test4 = TwitterUtils.createStream(ssc, Some(authorization))
val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters)
val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters,
StorageLevel.MEMORY_AND_DISK_SER_2)
val test1: NetworkInputDStream[Status] = TwitterUtils.createStream(ssc, None)
val test2: NetworkInputDStream[Status] =
TwitterUtils.createStream(ssc, None, filters)
val test3: NetworkInputDStream[Status] =
TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
val test4: NetworkInputDStream[Status] =
TwitterUtils.createStream(ssc, Some(authorization))
val test5: NetworkInputDStream[Status] =
TwitterUtils.createStream(ssc, Some(authorization), filters)
val test6: NetworkInputDStream[Status] = TwitterUtils.createStream(
ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)

// Note that actually testing the data receiving is hard as authentication keys are
// necessary for accessing Twitter live stream
Expand Down
Loading

0 comments on commit 43f5290

Please sign in to comment.