diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 15ba9c8212aa1..4d26b640e8d74 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -128,7 +128,9 @@ class KafkaReceiver[ def run() { logInfo("Starting MessageHandler.") try { - for (msgAndMetadata <- stream) { + val streamIterator = stream.iterator() + while (streamIterator.hasNext()) { + val msgAndMetadata = streamIterator.next() store((msgAndMetadata.key, msgAndMetadata.message)) } } catch { diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index 4a2e414365715..37a966fa302af 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -206,7 +206,9 @@ class ReliableKafkaReceiver[ override def run(): Unit = { logInfo(s"Starting message process thread ${Thread.currentThread().getId}.") try { - for (msgAndMetadata <- stream) { + val streamIterator = stream.iterator() + while (streamIterator.hasNext()) { + val msgAndMetadata = streamIterator.next() val topicAndPartition = TopicAndPartition( msgAndMetadata.topic, msgAndMetadata.partition) blockGenerator.synchronized {