Skip to content

Commit

Permalink
Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceive…
Browse files Browse the repository at this point in the history
…r related stuff.
  • Loading branch information
tdas committed Apr 19, 2014
1 parent 838dd39 commit 3a4777c
Show file tree
Hide file tree
Showing 46 changed files with 322 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
import org.apache.spark.{SparkConf, SecurityManager}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.util.AkkaUtils
import org.apache.spark.streaming.receiver.ActorHelper

case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
Expand Down Expand Up @@ -81,14 +81,14 @@ class FeederActor extends Actor {
* @see [[org.apache.spark.streaming.examples.FeederActor]]
*/
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
extends Actor with Receiver {
extends Actor with ActorHelper {

lazy private val remotePublisher = context.actorSelection(urlOfPublisher)

override def preStart = remotePublisher ! SubscribeReceiver(context.self)

def receive = {
case msg => pushBlock(msg.asInstanceOf[T])
case msg => store(msg.asInstanceOf[T])
}

override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.Logging
import org.apache.spark.streaming.receiver.NetworkReceiver
import org.apache.spark.streaming.receiver.Receiver

private[streaming]
class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {

override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}
Expand Down Expand Up @@ -135,7 +135,7 @@ class FlumeReceiver(
host: String,
port: Int,
storageLevel: StorageLevel
) extends NetworkReceiver[SparkFlumeEvent](storageLevel) with Logging {
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {

lazy val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaNetworkInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}

object FlumeUtils {
/**
Expand All @@ -35,7 +35,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): NetworkInputDStream[SparkFlumeEvent] = {
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
inputStream
}
Expand All @@ -50,7 +50,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
hostname: String,
port: Int
): JavaNetworkInputDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port)
}

Expand All @@ -65,7 +65,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel
): JavaNetworkInputDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@

import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;

import org.apache.spark.streaming.api.java.JavaNetworkInputDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;

public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
@Test
public void testFlumeStream() {
// tests the API, does not actually test data receiving
JavaNetworkInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaNetworkInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.api.java.JavaNetworkInputDStream
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream

class FlumeStreamSuite extends TestSuiteBase {

Expand All @@ -40,11 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: JavaNetworkInputDStream[SparkFlumeEvent] =
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream.networkInputDStream, outputBuffer)
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
outputStream.register()
ssc.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.NetworkReceiver
import org.apache.spark.streaming.receiver.Receiver

/**
* Input stream that pulls messages from a Kafka Broker.
Expand All @@ -54,11 +54,11 @@ class KafkaInputDStream[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

def getReceiver(): NetworkReceiver[(K, V)] = {
def getReceiver(): Receiver[(K, V)] = {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
.asInstanceOf[NetworkReceiver[(K, V)]]
.asInstanceOf[Receiver[(K, V)]]
}
}

Expand All @@ -71,7 +71,7 @@ class KafkaReceiver[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends NetworkReceiver[Any](storageLevel) with Logging {
) extends Receiver[Any](storageLevel) with Logging {

// Connection to Kafka
var consumerConnector : ConsumerConnector = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaPairNetworkInputDStream, JavaStreamingContext, JavaPairDStream}
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}


object KafkaUtils {
Expand All @@ -48,7 +48,7 @@ object KafkaUtils {
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): NetworkInputDStream[(String, String)] = {
): ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
Expand All @@ -70,7 +70,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): NetworkInputDStream[(K, V)] = {
): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
}

Expand All @@ -88,7 +88,7 @@ object KafkaUtils {
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
): JavaPairNetworkInputDStream[String, String] = {
): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
Expand All @@ -110,7 +110,7 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairNetworkInputDStream[String, String] = {
): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
Expand Down Expand Up @@ -139,7 +139,7 @@ object KafkaUtils {
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairNetworkInputDStream[K, V] = {
): JavaPairReceiverInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val valueCmt: ClassTag[V] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,28 @@

import java.util.HashMap;

import org.apache.spark.streaming.api.java.JavaPairNetworkInputDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;

public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();

// tests the API, does not actually test data receiving
JavaPairNetworkInputDStream<String, String> test1 =
JavaPairReceiverInputDStream<String, String> test1 =
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
JavaPairNetworkInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
JavaPairReceiverInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());

HashMap<String, String> kafkaParams = Maps.newHashMap();
kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
JavaPairNetworkInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
JavaPairReceiverInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.streaming.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.NetworkInputDStream
import org.apache.spark.streaming.dstream.ReceiverInputDStream

class KafkaStreamSuite extends TestSuiteBase {

Expand All @@ -29,12 +29,12 @@ class KafkaStreamSuite extends TestSuiteBase {
val topics = Map("my-topic" -> 1)

// tests the API, does not actually test data receiving
val test1: NetworkInputDStream[(String, String)] =
val test1: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
val test2: NetworkInputDStream[(String, String)] =
val test2: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
val test3: NetworkInputDStream[(String, String)] =
val test3: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.NetworkReceiver
import org.apache.spark.streaming.receiver.Receiver

/**
* Input stream that subscribe messages from a Mqtt Broker.
Expand All @@ -55,9 +55,9 @@ class MQTTInputDStream(
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkInputDStream[String](ssc_) with Logging {
) extends ReceiverInputDStream[String](ssc_) with Logging {

def getReceiver(): NetworkReceiver[String] = {
def getReceiver(): Receiver[String] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
}
}
Expand All @@ -67,7 +67,7 @@ class MQTTReceiver(
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkReceiver[String](storageLevel) {
) extends Receiver[String](storageLevel) {

def onStop() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.streaming.mqtt

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaNetworkInputDStream, JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
import scala.reflect.ClassTag
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}

object MQTTUtils {
/**
Expand All @@ -36,7 +36,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): NetworkInputDStream[String] = {
): ReceiverInputDStream[String] = {
new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
}

Expand All @@ -51,7 +51,7 @@ object MQTTUtils {
jssc: JavaStreamingContext,
brokerUrl: String,
topic: String
): JavaNetworkInputDStream[String] = {
): JavaReceiverInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic)
}
Expand All @@ -68,7 +68,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
): JavaNetworkInputDStream[String] = {
): JavaReceiverInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.spark.streaming.mqtt;

import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaNetworkInputDStream;
import org.apache.spark.streaming.api.java.JavaPairNetworkInputDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;

import org.apache.spark.streaming.LocalJavaStreamingContext;
Expand All @@ -32,8 +30,8 @@ public void testMQTTStream() {
String topic = "def";

// tests the API, does not actually test data receiving
JavaNetworkInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
JavaNetworkInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
JavaReceiverInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.streaming.mqtt

import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.NetworkInputDStream
import org.apache.spark.streaming.dstream.ReceiverInputDStream

class MQTTStreamSuite extends TestSuiteBase {

Expand All @@ -29,8 +29,8 @@ class MQTTStreamSuite extends TestSuiteBase {
val topic = "def"

// tests the API, does not actually test data receiving
val test1: NetworkInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
val test2: NetworkInputDStream[String] =
val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
val test2: ReceiverInputDStream[String] =
MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)

// TODO: Actually test receiving data
Expand Down
Loading

0 comments on commit 3a4777c

Please sign in to comment.