Skip to content

Commit

Permalink
[SPARK-1332] Improve Spark Streaming's Network Receiver and InputDStr…
Browse files Browse the repository at this point in the history
…eam API [WIP]

The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51

Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability.

Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented.

This PR is blocked on the graceful shutdown PR apache#247

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#300 from tdas/network-receiver-api and squashes the following commits:

ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff.
838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers.
a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues.
91bfa72 [Tathagata Das] Fixed bugs.
8533094 [Tathagata Das] Scala style fixes.
028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver.
43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java.
2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread.
9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable.
a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability.
  • Loading branch information
tdas authored and pdeyhim committed Jun 25, 2014
1 parent e0845f3 commit d9756e4
Show file tree
Hide file tree
Showing 55 changed files with 1,836 additions and 731 deletions.
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ private[spark] object UIUtils extends Logging {
(records, "")
}
}
"%.1f%s".formatLocal(Locale.US, value, unit)
if (unit.isEmpty) {
"%d".formatLocal(Locale.US, value)
} else {
"%.1f%s".formatLocal(Locale.US, value, unit)
}
}

// Yarn has to go through a proxy so the base uri is provided and has to be on all links
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
import org.apache.spark.{SparkConf, SecurityManager}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.util.AkkaUtils
import org.apache.spark.streaming.receiver.ActorHelper

case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
Expand Down Expand Up @@ -81,14 +81,14 @@ class FeederActor extends Actor {
* @see [[org.apache.spark.streaming.examples.FeederActor]]
*/
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
extends Actor with Receiver {
extends Actor with ActorHelper {

lazy private val remotePublisher = context.actorSelection(urlOfPublisher)

override def preStart = remotePublisher ! SubscribeReceiver(context.self)

def receive = {
case msg => pushBlock(msg.asInstanceOf[T])
case msg => store(msg.asInstanceOf[T])
}

override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.Logging
import org.apache.spark.streaming.receiver.Receiver

private[streaming]
class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {

override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}
Expand Down Expand Up @@ -115,13 +117,13 @@ private[streaming] object SparkFlumeEvent {
private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}

override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
events.foreach (event =>
receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
Status.OK
}
}
Expand All @@ -133,23 +135,21 @@ class FlumeReceiver(
host: String,
port: Int,
storageLevel: StorageLevel
) extends NetworkReceiver[SparkFlumeEvent] {
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {

lazy val blockGenerator = new BlockGenerator(storageLevel)
lazy val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))

protected override def onStart() {
val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
val server = new NettyServer(responder, new InetSocketAddress(host, port))
blockGenerator.start()
def onStart() {
server.start()
logInfo("Flume receiver started")
}

protected override def onStop() {
blockGenerator.stop()
def onStop() {
server.close()
logInfo("Flume receiver stopped")
}

override def getLocationPreference = Some(host)
override def preferredLocation = Some(host)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}

object FlumeUtils {
/**
Expand All @@ -35,7 +35,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
inputStream
}
Expand All @@ -50,7 +50,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
hostname: String,
port: Int
): JavaDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port)
}

Expand All @@ -65,7 +65,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel
): JavaDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;

public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
@Test
public void testFlumeStream() {
// tests the API, does not actually test data receiving
JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream

class FlumeStreamSuite extends TestSuiteBase {

Expand All @@ -39,10 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
outputStream.register()
ssc.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver

/**
* Input stream that pulls messages from a Kafka Broker.
Expand All @@ -53,11 +54,11 @@ class KafkaInputDStream[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

def getReceiver(): NetworkReceiver[(K, V)] = {
def getReceiver(): Receiver[(K, V)] = {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
.asInstanceOf[NetworkReceiver[(K, V)]]
.asInstanceOf[Receiver[(K, V)]]
}
}

Expand All @@ -70,21 +71,15 @@ class KafkaReceiver[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends NetworkReceiver[Any] {
) extends Receiver[Any](storageLevel) with Logging {

// Handles pushing data into the BlockManager
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Connection to Kafka
var consumerConnector : ConsumerConnector = null

def onStop() {
blockGenerator.stop()
}
def onStop() { }

def onStart() {

blockGenerator.start()

// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))

Expand Down Expand Up @@ -130,7 +125,7 @@ class KafkaReceiver[
def run() {
logInfo("Starting MessageHandler.")
for (msgAndMetadata <- stream) {
blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
store((msgAndMetadata.key, msgAndMetadata.message))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}


object KafkaUtils {
Expand All @@ -48,7 +48,7 @@ object KafkaUtils {
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[(String, String)] = {
): ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
Expand All @@ -70,7 +70,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): DStream[(K, V)] = {
): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
}

Expand All @@ -88,7 +88,7 @@ object KafkaUtils {
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
): JavaPairDStream[String, String] = {
): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
Expand All @@ -110,7 +110,7 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairDStream[String, String] = {
): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
Expand Down Expand Up @@ -139,7 +139,7 @@ object KafkaUtils {
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairDStream[K, V] = {
): JavaPairReceiverInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val valueCmt: ClassTag[V] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,29 @@
package org.apache.spark.streaming.kafka;

import java.util.HashMap;

import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;

public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();

// tests the API, does not actually test data receiving
JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
JavaPairReceiverInputDStream<String, String> test1 =
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
JavaPairReceiverInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());

HashMap<String, String> kafkaParams = Maps.newHashMap();
kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc,
JavaPairReceiverInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream

class KafkaStreamSuite extends TestSuiteBase {

Expand All @@ -28,10 +29,13 @@ class KafkaStreamSuite extends TestSuiteBase {
val topics = Map("my-topic" -> 1)

// tests the API, does not actually test data receiving
val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val test1: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
val test2: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
val test3: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)

// TODO: Actually test receiving data
Expand Down
Loading

0 comments on commit d9756e4

Please sign in to comment.