Skip to content

Commit

Permalink
Address some PR comments and fixed other issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 18, 2014
1 parent 91bfa72 commit a75c7a6
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
public void testFlumeStream() {
// tests the API, does not actually test data receiving
JavaNetworkInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaNetworkInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
JavaNetworkInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void testKafkaStream() {
// tests the API, does not actually test data receiving
JavaPairNetworkInputDStream<String, String> test1 =
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
JavaPairNetworkInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
JavaPairNetworkInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());

HashMap<String, String> kafkaParams = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._

import org.apache.spark.storage.StorageLevel

Expand All @@ -30,23 +31,25 @@ import org.apache.spark.storage.StorageLevel
* and onStop() should define the cleanup steps necessary to stop receiving data. A custom
* receiver would look something like this.
*
* class MyReceiver(storageLevel) extends NetworkReceiver[String](storageLevel) {
* def onStart() {
* // Setup stuff (start threads, open sockets, etc.) to start receiving data.
* // Must start new thread to receive data, as onStart() must be non-blocking.
* @example {{{
* class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) {
* def onStart() {
* // Setup stuff (start threads, open sockets, etc.) to start receiving data.
* // Must start new thread to receive data, as onStart() must be non-blocking.
*
* // Call store(...) in those threads to store received data into Spark's memory.
* // Call store(...) in those threads to store received data into Spark's memory.
*
* // Call stop(...), restart() or reportError(...) on any thread based on how
* // different errors should be handled.
* // Call stop(...), restart() or reportError(...) on any thread based on how
* // different errors should be handled.
*
* // See corresponding method documentation for more details.
* }
* // See corresponding method documentation for more details
* }
*
* def onStop() {
* // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
* }
* }
* def onStop() {
* // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
* }
* }
* }}}
*/
abstract class NetworkReceiver[T](val storageLevel: StorageLevel) extends Serializable {

Expand Down Expand Up @@ -80,43 +83,58 @@ abstract class NetworkReceiver[T](val storageLevel: StorageLevel) extends Serial
executor.pushSingle(dataItem)
}

/** Store a sequence of received data into Spark's memory. */
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def store(dataBuffer: ArrayBuffer[T]) {
executor.pushArrayBuffer(dataBuffer, None, None)
}

/**
* Store a sequence of received data into Spark's memory.
* Store an ArrayBuffer of received data as a data block into Spark's memory.
* The metadata will be associated with this block of data
* for being used in the corresponding InputDStream.
*/
def store(dataBuffer: ArrayBuffer[T], metadata: Any) {
executor.pushArrayBuffer(dataBuffer, Some(metadata), None)
}
/** Store a sequence of received data into Spark's memory. */

/** Store a iterator of received data as a data block into Spark's memory. */
def store(dataIterator: Iterator[T]) {
executor.pushIterator(dataIterator, None, None)
}

/**
* Store a sequence of received data into Spark's memory.
* Store a iterator of received data as a data block into Spark's memory.
* The metadata will be associated with this block of data
* for being used in the corresponding InputDStream.
*/
def store(dataIterator: java.util.Iterator[T], metadata: Any) {
executor.pushIterator(dataIterator, Some(metadata), None)
}

/** Store a iterator of received data as a data block into Spark's memory. */
def store(dataIterator: java.util.Iterator[T]) {
executor.pushIterator(dataIterator, None, None)
}

/**
* Store a iterator of received data as a data block into Spark's memory.
* The metadata will be associated with this block of data
* for being used in the corresponding InputDStream.
*/
def store(dataIterator: Iterator[T], metadata: Any) {
executor.pushIterator(dataIterator, Some(metadata), None)
}

/** Store the bytes of received data into Spark's memory. */
/** Store the bytes of received data as a data block into Spark's memory. */
def store(bytes: ByteBuffer) {
executor.pushBytes(bytes, None, None)
}

/** Store the bytes of received data into Spark's memory.
/** Store the bytes of received data as a data block into Spark's memory.
* The metadata will be associated with this block of data
* for being used in the corresponding InputDStream.
*/
def store(bytes: ByteBuffer, metadata: Any = null) {
def store(bytes: ByteBuffer, metadata: Any) {
executor.pushBytes(bytes, Some(metadata), None)
}

Expand All @@ -143,26 +161,26 @@ abstract class NetworkReceiver[T](val storageLevel: StorageLevel) extends Serial
* The delay is defined by the Spark configuration
* `spark.streaming.receiverRestartDelay`.
*/
def restart(message: String, exception: Throwable) {
executor.restartReceiver(message, exception)
def restart(message: String, error: Throwable) {
executor.restartReceiver(message, Some(error))
}

/**
* Restart the receiver. This will call `onStop()` immediately and return.
* Asynchronously, after the given delay, `onStart()` will be called.
*/
def restart(message: String, throwable: Throwable, millisecond: Int) {
executor.restartReceiver(message, throwable, millisecond)
def restart(message: String, error: Throwable, millisecond: Int) {
executor.restartReceiver(message, Some(error), millisecond)
}

/** Stop the receiver completely. */
def stop(message: String) {
executor.stop(message)
executor.stop(message, None)
}

/** Stop the receiver completely due to an exception */
def stop(message: String, exception: Throwable) {
executor.stop(message, exception)
def stop(message: String, error: Throwable) {
executor.stop(message, Some(error))
}

def isStarted(): Boolean = {
Expand All @@ -175,7 +193,7 @@ abstract class NetworkReceiver[T](val storageLevel: StorageLevel) extends Serial
}

/** Get unique identifier of this receiver. */
def receiverId = id
def streamId = id

/*
* =================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ private[streaming] abstract class NetworkReceiverExecutor(
conf: SparkConf
) extends Logging {


/** Enumeration to identify current state of the StreamingContext */
object NetworkReceiverState extends Enumeration {
type CheckpointState = Value
Expand All @@ -48,41 +47,38 @@ private[streaming] abstract class NetworkReceiverExecutor(
receiver.attachExecutor(this)

/** Receiver id */
protected val receiverId = receiver.receiverId

/** Message associated with the stopping of the receiver */
protected var stopMessage = ""

/** Exception associated with the stopping of the receiver */
protected var stopException: Throwable = null
protected val streamId = receiver.streamId

/** Has the receiver been marked for stop. */
private val stopLatch = new CountDownLatch(1)

/** Time between a receiver is stopped */
private val restartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000)
/** Time between a receiver is stopped and started again */
private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000)

/** Exception associated with the stopping of the receiver */
@volatile protected var stoppingError: Throwable = null

/** State of the receiver */
private[streaming] var receiverState = Initialized
@volatile private[streaming] var receiverState = Initialized

/** Push a single data item to backend data store. */
def pushSingle(data: Any)

/** Push a byte buffer to backend data store. */
/** Store the bytes of received data as a data block into Spark's memory. */
def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
)

/** Push an iterator of objects as a block to backend data store. */
/** Store a iterator of received data as a data block into Spark's memory. */
def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
)

/** Push an ArrayBuffer of object as a block to back data store. */
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
Expand All @@ -97,57 +93,46 @@ private[streaming] abstract class NetworkReceiverExecutor(
startReceiver()
}

/**
* Mark the executor and the receiver for stopping
*/
def stop(message: String, exception: Throwable = null) {
stopMessage = message
stopException = exception
stopReceiver()
/** Mark the executor and the receiver for stopping */
def stop(message: String, error: Option[Throwable]) {
stoppingError = error.orNull
stopReceiver(message, error)
stopLatch.countDown()
if (exception != null) {
logError("Stopped executor: " + message, exception)
} else {
logWarning("Stopped executor: " + message)
}
}

/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
logInfo("Starting receiver")
stopMessage = ""
stopException = null
onReceiverStart()
receiverState = Started
} catch {
case t: Throwable =>
stop("Error starting receiver " + receiverId, t)
stop("Error starting receiver " + streamId, Some(t))
}
}

/** Stop receiver */
def stopReceiver(): Unit = synchronized {
def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
try {
receiverState = Stopped
onReceiverStop()
onReceiverStop(message, error)
} catch {
case t: Throwable =>
stop("Error stopping receiver " + receiverId, t)
stop("Error stopping receiver " + streamId, Some(t))
}
}

/** Restart receiver with delay */
def restartReceiver(message: String, throwable: Throwable = null) {
val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000)
restartReceiver(message, throwable, defaultRestartDelay)
def restartReceiver(message: String, error: Option[Throwable] = None) {
restartReceiver(message, error, defaultRestartDelay)
}

/** Restart receiver with delay */
def restartReceiver(message: String, exception: Throwable, delay: Int) {
logWarning("Restarting receiver with delay " + delay + " ms: " + message, exception)
reportError(message, exception)
stopReceiver()
def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
logWarning("Restarting receiver with delay " + delay + " ms: " + message,
error.getOrElse(null))
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
future {
logDebug("Sleeping for " + delay)
Thread.sleep(delay)
Expand All @@ -166,25 +151,30 @@ private[streaming] abstract class NetworkReceiverExecutor(
}

/** Called when the receiver needs to be stopped */
protected def onReceiverStop(): Unit = synchronized {
protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = synchronized {
// Call user-defined onStop()
logInfo("Calling receiver onStop")
receiver.onStop()
logInfo("Called receiver onStop")
}

/** Check if receiver has been marked for stopping */
def isReceiverStarted() = synchronized {
def isReceiverStarted() = {
logDebug("state = " + receiverState)
receiverState == Started
}

/** Wait the thread until the executor is stopped */
def awaitStop() {
def awaitTermination() {
stopLatch.await()
logInfo("Waiting for executor stop is over")
if (stopException != null) {
throw new Exception(stopMessage, stopException)
if (stoppingError != null) {
logError("Stopped executor with error: " + stoppingError)
} else {
logWarning("Stopped executor without error")
}
if (stoppingError != null) {
throw stoppingError
}
}
}
Loading

0 comments on commit a75c7a6

Please sign in to comment.