diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index 50bcd44ea5605..d832a82aacf09 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -98,7 +98,7 @@ class ReliableKafkaReceiver[ /** Manage the BlockGenerator in receiver itself for better managing block store and offset * commit */ - private lazy val blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf) + private var blockGenerator: BlockGenerator = null override def onStop(): Unit = { if (consumerConnector != null) { @@ -117,6 +117,8 @@ class ReliableKafkaReceiver[ override def onStart(): Unit = { logInfo(s"Starting Kafka Consumer Stream with group: $groupId") + blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf) + if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") { logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " + "otherwise we will manually set it to false to turn off auto offset commit in Kafka") diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 0aa49987ea23a..83a88597e57c7 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -139,7 +139,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { .setAppName(framework) .set("spark.streaming.receiver.writeAheadLog.enable", "true") var ssc = new StreamingContext( - sparkConf.clone.set("spark.streaming.blockInterval", "4000"), + sparkConf.clone.set("spark.streaming.blockInterval", "10000"), batchDuration) val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) val sent = Map("a" -> 10, "b" -> 10, "c" -> 10) @@ -155,10 +155,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { "auto.offset.reset" -> "smallest") KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, - kafkaParams, - topics, - StorageLevel.MEMORY_ONLY).foreachRDD(_ => throw new Exception) + ssc, + kafkaParams, + topics, + StorageLevel.MEMORY_ONLY) + .foreachRDD(_ => throw new Exception) try { ssc.start() ssc.awaitTermination(1000) @@ -175,10 +176,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { // Restart to see if data is consumed from last checkpoint. ssc = new StreamingContext(sparkConf, batchDuration) KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, - kafkaParams, - topics, - StorageLevel.MEMORY_ONLY).foreachRDD(_ => Unit) + ssc, + kafkaParams, + topics, + StorageLevel.MEMORY_ONLY) + .foreachRDD(_ => Unit) ssc.start() ssc.awaitTermination(3000) ssc.stop()