Skip to content

Commit

Permalink
Added more events to the StreamingListener to report errors and stopp…
Browse files Browse the repository at this point in the history
…ed receivers.
  • Loading branch information
tdas committed Apr 19, 2014
1 parent a75c7a6 commit 838dd39
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 120 deletions.
29 changes: 15 additions & 14 deletions project/MimaBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ object MimaBuild {
IO.read(excludeFile).split("\n")
}

// Exclude a single class and its corresponding object
def excludeClass(className: String) = {
Seq(
excludePackage(className),
Expand All @@ -48,7 +49,16 @@ object MimaBuild {
ProblemFilters.exclude[MissingTypesProblem](className + "$")
)
}
def excludeSparkClass(className: String) = excludeClass("org.apache.spark." + className)

// Exclude a Spark class, that is in the package org.apache.spark
def excludeSparkClass(className: String) = {
excludeClass("org.apache.spark." + className)
}

// Exclude a Spark package, that is in the package org.apache.spark
def excludeSparkPackage(packageName: String) = {
excludePackage("org.apache.spark." + packageName)
}

val packagePrivateExcludes = packagePrivateList.flatMap(excludeClass)

Expand All @@ -58,25 +68,17 @@ object MimaBuild {
SparkBuild.SPARK_VERSION match {
case v if v.startsWith("1.0") =>
Seq(
excludePackage("org.apache.spark.api.java"),
excludePackage("org.apache.spark.streaming.api.java"),
excludePackage("org.apache.spark.streaming.scheduler"),
excludePackage("org.apache.spark.mllib")
excludeSparkPackage("api.java"),
excludeSparkPackage("mllib"),
excludeSparkPackage("streaming")
) ++
excludeSparkClass("rdd.ClassTags") ++
excludeSparkClass("util.XORShiftRandom") ++
excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
excludeSparkClass("mllib.optimization.SquaredGradient") ++
excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
excludeSparkClass("mllib.regression.LassoWithSGD") ++
excludeSparkClass("mllib.regression.LinearRegressionWithSGD") ++
excludeSparkClass("streaming.dstream.NetworkReceiver") ++
excludeSparkClass("streaming.dstream.NetworkReceiver#NetworkReceiverActor") ++
excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator") ++
excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator#Block") ++
excludeSparkClass("streaming.dstream.ReportError") ++
excludeSparkClass("streaming.dstream.ReportBlock") ++
excludeSparkClass("streaming.dstream.DStream")
excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
case _ => Seq()
}

Expand All @@ -87,5 +89,4 @@ object MimaBuild {
previousArtifact := None,
binaryIssueFilters ++= ignoredABIProblems(sparkHome)
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
/** Deregister a receiver */
def deregisterReceiver(streamId: Int, message: String, error: String) {
receiverInfo -= streamId
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(streamId))
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(streamId, message, error))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.streaming.scheduler

import scala.collection.mutable.Queue

import org.apache.spark.util.Distribution

/** Base trait for events related to StreamingListener */
Expand All @@ -26,12 +27,13 @@ sealed trait StreamingListenerEvent
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent

case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
case class StreamingListenerReceiverStopped(streamId: Int)
extends StreamingListenerEvent
case class StreamingListenerReceiverError(streamId: Int, message: String, error: String)
extends StreamingListenerEvent
case class StreamingListenerReceiverStopped(streamId: Int, message: String, error: String)
extends StreamingListenerEvent

/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
Expand All @@ -45,20 +47,20 @@ trait StreamingListener {
/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }

/** Called when a receiver has been stopped */
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStarted) { }

/** Called when a receiver has reported an error */
def onReceiverError(receiverError: StreamingListenerReceiverError) { }

/** Called when a receiver has been stopped */
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }

/** Called when a batch of jobs has been submitted for processing. */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }

/** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

/** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }

/** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ private[spark] class StreamingListenerBus() extends Logging {
event match {
case receiverStarted: StreamingListenerReceiverStarted =>
listeners.foreach(_.onReceiverStarted(receiverStarted))
case receiverError: StreamingListenerReceiverError =>
listeners.foreach(_.onReceiverError(receiverError))
case receiverStopped: StreamingListenerReceiverStopped =>
listeners.foreach(_.onReceiverStopped(receiverStopped))
case batchSubmitted: StreamingListenerBatchSubmitted =>
listeners.foreach(_.onBatchSubmitted(batchSubmitted))
case batchStarted: StreamingListenerBatchStarted =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,110 +139,111 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
assert(blockGeneratorListener.arrayBuffers.size > 0)
assert(recordedData.toSet === generatedData.toSet)
}
}

/**
* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
*/
class FakeReceiver extends NetworkReceiver[Int](StorageLevel.MEMORY_ONLY) {
var otherThread: Thread = null
var receiving = false
var onStartCalled = false
var onStopCalled = false

def onStart() {
otherThread = new Thread() {
override def run() {
receiving = true
while(!isStopped()) {
Thread.sleep(10)
/**
* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
*/
class FakeReceiver extends NetworkReceiver[Int](StorageLevel.MEMORY_ONLY) {
var otherThread: Thread = null
var receiving = false
var onStartCalled = false
var onStopCalled = false

def onStart() {
otherThread = new Thread() {
override def run() {
receiving = true
while(!isStopped()) {
Thread.sleep(10)
}
}
}
onStartCalled = true
otherThread.start()

}
onStartCalled = true
otherThread.start()

}
def onStop() {
onStopCalled = true
otherThread.join()
}

def onStop() {
onStopCalled = true
otherThread.join()
def reset() {
receiving = false
onStartCalled = false
onStopCalled = false
}
}

def reset() {
receiving = false
onStartCalled = false
onStopCalled = false
}
}
/**
* An implementation of NetworkReceiverExecutor used for testing a NetworkReceiver.
* Instead of storing the data in the BlockManager, it stores all the data in a local buffer
* that can used for verifying that the data has been forwarded correctly.
*/
class FakeReceiverExecutor(receiver: FakeReceiver)
extends NetworkReceiverExecutor(receiver, new SparkConf()) {
val singles = new ArrayBuffer[Any]
val byteBuffers = new ArrayBuffer[ByteBuffer]
val iterators = new ArrayBuffer[Iterator[_]]
val arrayBuffers = new ArrayBuffer[ArrayBuffer[_]]
val errors = new ArrayBuffer[Throwable]

/** Check if all data structures are clean */
def isAllEmpty = {
singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
arrayBuffers.isEmpty && errors.isEmpty
}

/**
* An implementation of NetworkReceiverExecutor used for testing a NetworkReceiver.
* Instead of storing the data in the BlockManager, it stores all the data in a local buffer
* that can used for verifying that the data has been forwarded correctly.
*/
class FakeReceiverExecutor(receiver: FakeReceiver)
extends NetworkReceiverExecutor(receiver, new SparkConf()) {
val singles = new ArrayBuffer[Any]
val byteBuffers = new ArrayBuffer[ByteBuffer]
val iterators = new ArrayBuffer[Iterator[_]]
val arrayBuffers = new ArrayBuffer[ArrayBuffer[_]]
val errors = new ArrayBuffer[Throwable]

/** Check if all data structures are clean */
def isAllEmpty = {
singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
arrayBuffers.isEmpty && errors.isEmpty
}
def pushSingle(data: Any) {
singles += data
}

def pushSingle(data: Any) {
singles += data
}
def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
byteBuffers += bytes
}

def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
byteBuffers += bytes
}
def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
iterators += iterator
}

def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
iterators += iterator
}
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
arrayBuffers += arrayBuffer
}

def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
arrayBuffers += arrayBuffer
def reportError(message: String, throwable: Throwable) {
errors += throwable
}
}

def reportError(message: String, throwable: Throwable) {
errors += throwable
}
}
/**
* An implementation of BlockGeneratorListener that is used to test the BlockGenerator.
*/
class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener {
// buffer of data received as ArrayBuffers
val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
val errors = new ArrayBuffer[Throwable]

def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
arrayBuffers += bufferOfInts
Thread.sleep(0)
}

/**
* An implementation of BlockGeneratorListener that is used to test the BlockGenerator.
*/
class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener {
// buffer of data received as ArrayBuffers
val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
val errors = new ArrayBuffer[Throwable]

def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
arrayBuffers += bufferOfInts
Thread.sleep(0)
def onError(message: String, throwable: Throwable) {
errors += throwable
}
}
}

def onError(message: String, throwable: Throwable) {
errors += throwable
}
}
Loading

0 comments on commit 838dd39

Please sign in to comment.