Skip to content

Commit

Permalink
Refactored the code that runs the NetworkReceiver into further classe…
Browse files Browse the repository at this point in the history
…s and traits to make them more testable.
  • Loading branch information
tdas committed Apr 2, 2014
1 parent a36cc48 commit 3223e95
Show file tree
Hide file tree
Showing 19 changed files with 871 additions and 434 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.Logging
import org.apache.spark.streaming.receiver.NetworkReceiver

private[streaming]
class FlumeInputDStream[T: ClassTag](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.NetworkReceiver

/**
* Input stream that pulls messages from a Kafka Broker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.NetworkReceiver

/**
* Input stream that subscribe messages from a Mqtt Broker.
Expand Down Expand Up @@ -96,8 +97,8 @@ class MQTTReceiver(
}

override def connectionLost(arg0: Throwable) {
store("Connection lost " + arg0)
stopOnError(new Exception(arg0))
reportError("Connection lost ", arg0)
stop()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.Logging
import org.apache.spark.streaming.receiver.NetworkReceiver

/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
Expand Down Expand Up @@ -75,7 +76,10 @@ class TwitterReceiver(
def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
def onException(e: Exception) { stopOnError(e) }
def onException(e: Exception) {
reportError("Error receiving tweets", e)
stop()
}
})

val query: FilterQuery = new FilterQuery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers._
import org.apache.spark.streaming.scheduler._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.receiver.NetworkReceiver

/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
Expand Down
Loading

0 comments on commit 3223e95

Please sign in to comment.