Skip to content

Commit

Permalink
Fix flaky test
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 11, 2014
1 parent a949741 commit 8135d31
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ReliableKafkaReceiver[

/** Manage the BlockGenerator in receiver itself for better managing block store and offset
* commit */
private lazy val blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
private var blockGenerator: BlockGenerator = null

override def onStop(): Unit = {
if (consumerConnector != null) {
Expand All @@ -117,6 +117,8 @@ class ReliableKafkaReceiver[
override def onStart(): Unit = {
logInfo(s"Starting Kafka Consumer Stream with group: $groupId")

blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)

if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
"otherwise we will manually set it to false to turn off auto offset commit in Kafka")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
.setAppName(framework)
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
var ssc = new StreamingContext(
sparkConf.clone.set("spark.streaming.blockInterval", "4000"),
sparkConf.clone.set("spark.streaming.blockInterval", "10000"),
batchDuration)
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
Expand All @@ -155,10 +155,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
"auto.offset.reset" -> "smallest")

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_ONLY).foreachRDD(_ => throw new Exception)
ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_ONLY)
.foreachRDD(_ => throw new Exception)
try {
ssc.start()
ssc.awaitTermination(1000)
Expand All @@ -175,10 +176,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
// Restart to see if data is consumed from last checkpoint.
ssc = new StreamingContext(sparkConf, batchDuration)
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_ONLY).foreachRDD(_ => Unit)
ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_ONLY)
.foreachRDD(_ => Unit)
ssc.start()
ssc.awaitTermination(3000)
ssc.stop()
Expand Down

0 comments on commit 8135d31

Please sign in to comment.