From ec2e95eb72d09bb36d83e28402769f636b04c745 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 13 Nov 2014 13:33:14 -0800 Subject: [PATCH 1/6] Removed the receiver's locks and essentially reverted to Saisai's original design. --- .../kafka/ReliableKafkaReceiver.scala | 28 +++-- .../streaming/kafka/JavaKafkaStreamSuite.java | 16 ++- .../streaming/kafka/KafkaStreamSuite.scala | 41 +++---- .../kafka/ReliableKafkaStreamSuite.scala | 106 +++++++++--------- .../streaming/receiver/BlockGenerator.scala | 52 +++++++-- .../receiver/ReceiverSupervisorImpl.scala | 6 +- .../spark/streaming/ReceiverSuite.scala | 6 +- 7 files changed, 157 insertions(+), 98 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index c207e95d5d337..f85296efe8d60 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -178,17 +178,23 @@ class ReliableKafkaReceiver[ /** Store a Kafka message and the associated metadata as a tuple. */ private def storeMessageAndMetadata( - msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized { + msgAndMetadata: MessageAndMetadata[K, V]): Unit = { val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition) - blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message)) - topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset) + val data = (msgAndMetadata.key, msgAndMetadata.message) + val metadata = (topicAndPartition, msgAndMetadata.offset) + blockGenerator.addDataWithCallback(data, metadata) + } + + /** Update stored offset */ + private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = { + topicPartitionOffsetMap.put(topicAndPartition, offset) } /** * Remember the current offsets for each topic and partition. This is called when a block is * generated. */ - private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized { + private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) @@ -250,17 +256,25 @@ class ReliableKafkaReceiver[ /** Class to handle blocks generated by the block generator. */ private final class GeneratedBlockHandler extends BlockGeneratorListener { - override def onGenerateBlock(blockId: StreamBlockId): Unit = { + def onAddData(data: Any, metadata: Any): Unit = { + // Update the offset of the data that was added to the generator + if (metadata != null) { + val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)] + updateOffset(topicAndPartition, offset) + } + } + + def onGenerateBlock(blockId: StreamBlockId): Unit = { // Remember the offsets of topics/partitions when a block has been generated rememberBlockOffsets(blockId) } - override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { // Store block and commit the blocks offset storeBlockAndCommitOffset(blockId, arrayBuffer) } - override def onError(message: String, throwable: Throwable): Unit = { + def onError(message: String, throwable: Throwable): Unit = { reportError(message, throwable) } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 6386602ef8a43..60e6f8deb0d13 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Random; +import org.apache.spark.SparkConf; +import org.apache.spark.streaming.Duration; import scala.Predef; import scala.Tuple2; import scala.collection.JavaConverters; @@ -43,15 +45,17 @@ public class JavaKafkaStreamSuite implements Serializable { private transient JavaStreamingContext ssc = null; - private Random random = new Random(); + private transient Random random = new Random(); private transient KafkaStreamSuiteBase suiteBase = null; @Before public void setUp() { suiteBase = new KafkaStreamSuiteBase() { }; - suiteBase.beforeFunction(); + suiteBase.setupKafka(); System.clearProperty("spark.driver.port"); - ssc = new JavaStreamingContext(suiteBase.sparkConf(), suiteBase.batchDuration()); + SparkConf sparkConf = new SparkConf() + .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); + ssc = new JavaStreamingContext(sparkConf, new Duration(500)); } @After @@ -59,7 +63,7 @@ public void tearDown() { ssc.stop(); ssc = null; System.clearProperty("spark.driver.port"); - suiteBase.afterFunction(); + suiteBase.tearDownKafka(); } @Test @@ -76,8 +80,8 @@ public void testKafkaStream() throws InterruptedException { suiteBase.createTopic(topic); HashMap tmp = new HashMap(sent); suiteBase.produceAndSendMessage(topic, - JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( - Predef.>conforms())); + JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( + Predef.>conforms())); HashMap kafkaParams = new HashMap(); kafkaParams.put("zookeeper.connect", suiteBase.zkAddress()); diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 1bb8e0175b97c..6e24b6f7ffb3b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -42,15 +42,13 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.util.Utils +/** + * This is an abstract base class for Kafka testsuites. This has the functionality to set up + * and tear down local Kafka servers, and to push data using Kafka producers. + */ abstract class KafkaStreamSuiteBase extends FunSuite with Logging { import KafkaTestUtils._ - val sparkConf = new SparkConf() - .setMaster("local[4]") - .setAppName(this.getClass.getSimpleName) - val batchDuration = Milliseconds(500) - var ssc: StreamingContext = _ - var zkAddress: String = _ var zkClient: ZkClient = _ @@ -64,7 +62,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging { private var server: KafkaServer = _ private var producer: Producer[String, String] = _ - def beforeFunction() { + def setupKafka() { // Zookeeper server startup zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") // Get the actual zookeeper binding port @@ -100,12 +98,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging { logInfo("==================== 4 ====================") } - def afterFunction() { - if (ssc != null) { - ssc.stop() - ssc = null - } - + def tearDownKafka() { if (producer != null) { producer.close() producer = null @@ -146,21 +139,31 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging { def produceAndSendMessage(topic: String, sent: Map[String, Int]) { val brokerAddr = brokerConf.hostName + ":" + brokerConf.port - if (producer == null) { - producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr))) - } + producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr))) producer.send(createTestMessage(topic, sent): _*) + producer.close() logInfo("==================== 6 ====================") } } class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually { + var ssc: StreamingContext = _ + + before { + setupKafka() + } - before { beforeFunction() } - after { afterFunction() } + after { + if (ssc != null) { + ssc.stop() + ssc = null + } + tearDownKafka() + } test("Kafka input stream") { - ssc = new StreamingContext(sparkConf, batchDuration) + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) + ssc = new StreamingContext(sparkConf, Milliseconds(500)) val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) createTopic(topic) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index b546d22ca6c38..8489d64762a2b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.kafka + import java.io.File import scala.collection.mutable @@ -24,50 +25,67 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random +import com.google.common.io.Files import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} +import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.Eventually +import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.util.Utils +import org.apache.spark.streaming.{Milliseconds, StreamingContext} class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually { - val topic = "topic" + + val sparkConf = new SparkConf() + .setMaster("local[4]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.receiver.writeAheadLog.enable", "true") val data = Map("a" -> 10, "b" -> 10, "c" -> 10) + + var topic: String = _ var groupId: String = _ var kafkaParams: Map[String, String] = _ + var ssc: StreamingContext = _ + var tempDirectory: File = null before { - beforeFunction() // call this first to start ZK and Kafka + setupKafka() + topic = s"test-topic-${Random.nextInt(10000)}" groupId = s"test-consumer-${Random.nextInt(10000)}" kafkaParams = Map( "zookeeper.connect" -> zkAddress, "group.id" -> groupId, "auto.offset.reset" -> "smallest" ) + + ssc = new StreamingContext(sparkConf, Milliseconds(500)) + tempDirectory = Files.createTempDir() + ssc.checkpoint(tempDirectory.getAbsolutePath) } after { - afterFunction() + if (ssc != null) { + ssc.stop() + } + if (tempDirectory != null && tempDirectory.exists()) { + FileUtils.deleteDirectory(tempDirectory) + tempDirectory = null + } + tearDownKafka() } - test("Reliable Kafka input stream") { - sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") - ssc = new StreamingContext(sparkConf, batchDuration) - val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" + - s"test-checkpoint${Random.nextInt(10000)}" - Utils.registerShutdownDeleteDir(new File(checkpointDir)) - ssc.checkpoint(checkpointDir) + + test("Reliable Kafka input stream with single topic") { createTopic(topic) produceAndSendMessage(topic, data) + // Verify whether the offset of this group/topic/partition is 0 before starting. + assert(getCommitOffset(groupId, topic, 0) === None) + val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, - kafkaParams, - Map(topic -> 1), - StorageLevel.MEMORY_ONLY) + ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) val result = new mutable.HashMap[String, Long]() stream.map { case (k, v) => v }.foreachRDD { r => val ret = r.collect() @@ -77,53 +95,36 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter } } ssc.start() - eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + + eventually(timeout(20000 milliseconds), interval(200 milliseconds)) { // A basic process verification for ReliableKafkaReceiver. // Verify whether received message number is equal to the sent message number. assert(data.size === result.size) // Verify whether each message is the same as the data to be verified. data.keys.foreach { k => assert(data(k) === result(k).toInt) } + // Verify the offset number whether it is equal to the total message number. + assert(getCommitOffset(groupId, topic, 0) === Some(29L)) + } ssc.stop() } +/* test("Verify the offset commit") { // Verify the correctness of offset commit mechanism. - sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") - ssc = new StreamingContext(sparkConf, batchDuration) - val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" + - s"test-checkpoint${Random.nextInt(10000)}" - Utils.registerShutdownDeleteDir(new File(checkpointDir)) - ssc.checkpoint(checkpointDir) - createTopic(topic) produceAndSendMessage(topic, data) - // Verify whether the offset of this group/topic/partition is 0 before starting. - assert(getCommitOffset(groupId, topic, 0) === 0L) - // Do this to consume all the message of this group/topic. val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, - kafkaParams, - Map(topic -> 1), - StorageLevel.MEMORY_ONLY) + ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) stream.foreachRDD(_ => Unit) ssc.start() - eventually(timeout(3000 milliseconds), interval(100 milliseconds)) { - // Verify the offset number whether it is equal to the total message number. - assert(getCommitOffset(groupId, topic, 0) === 29L) + eventually(timeout(20000 milliseconds), interval(200 milliseconds)) { } ssc.stop() } - - test("Verify multiple topics offset commit") { - sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") - ssc = new StreamingContext(sparkConf, batchDuration) - val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" + - s"test-checkpoint${Random.nextInt(10000)}" - Utils.registerShutdownDeleteDir(new File(checkpointDir)) - ssc.checkpoint(checkpointDir) - +*/ + test("Reliable Kafka input stream with multiple topics") { val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) topics.foreach { case (t, _) => createTopic(t) @@ -131,30 +132,27 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter } // Before started, verify all the group/topic/partition offsets are 0. - topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) } + topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === None) } // Consuming all the data sent to the broker which will potential commit the offsets internally. val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, - kafkaParams, - topics, - StorageLevel.MEMORY_ONLY) + ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY) stream.foreachRDD(_ => Unit) ssc.start() - eventually(timeout(3000 milliseconds), interval(100 milliseconds)) { + eventually(timeout(20000 milliseconds), interval(100 milliseconds)) { // Verify the offset for each group/topic to see whether they are equal to the expected one. - topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) } + topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) } } ssc.stop() } + /** Getting partition offset from Zookeeper. */ - private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = { + private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = { assert(zkClient != null, "Zookeeper client is not initialized") - val topicDirs = new ZKGroupTopicDirs(groupId, topic) val zkPath = s"${topicDirs.consumerOffsetDir}/$partition" - - ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong).getOrElse(0L) + val offset = ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong) + offset } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index b1e9cb7673f2c..d286fadd50096 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -27,13 +27,38 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} /** Listener object for BlockGenerator events */ private[streaming] trait BlockGeneratorListener { - /** Called when a new block is generated */ + /** + * Called after a data item is added into the BlockGenerator. The data addition and this + * callback are synchronized with the block generation and its associated callback, + * so block generation waits for the active data addition+callback to complete. This is useful + * for updating metadata on successful buffering of a data item, specifically that metadata + * that will be useful when a block is generated. Any long blocking operation in this callback + * will hurt the throughput. + */ + def onAddData(data: Any, metadata: Any) + + /** + * Called when a new block of data is generated by the block generator. The block generation + * and this callback are synchronized with the data addition and its associated callback, so + * the data addition waits for the block generation+callback to complete. This is useful + * for updating metadata when a block has been generated, specifically metadata that will + * be useful when the block has been successfully stored. Any long blocking operation in this + * callback will hurt the throughput. + */ def onGenerateBlock(blockId: StreamBlockId) - /** Called when a new block needs to be pushed */ + /** + * Called when a new block is ready to be pushed. Callers are supposed to store the block into + * Spark in this method. Internally this is called from a single + * thread, that is not synchronized with any other callbacks. Hence it is okay to do long + * blocking operation in this callback. + */ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) - /** Called when an error has occurred in BlockGenerator */ + /** + * Called when an error has occurred in the BlockGenerator. Can be called form many places + * so better to not do any long block operation in this callback. + */ def onError(message: String, throwable: Throwable) } @@ -84,16 +109,27 @@ private[streaming] class BlockGenerator( * Push a single data item into the buffer. All received data items * will be periodically pushed into BlockManager. */ - def += (data: Any): Unit = synchronized { + def addData (data: Any): Unit = synchronized { waitToPush() currentBuffer += data } + /** + * Push a single data item into the buffer. After buffering the data, the + * `BlockGeneratorListnere.onAddData` callback will be called. All received data items + * will be periodically pushed into BlockManager. + */ + def addDataWithCallback(data: Any, metadata: Any) = synchronized { + waitToPush() + currentBuffer += data + listener.onAddData(data, metadata) + } + /** Change the buffer to which single records are added to. */ - private def updateCurrentBuffer(time: Long): Unit = { + private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer - synchronized { currentBuffer = new ArrayBuffer[Any] } + currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size > 0) { val blockId = StreamBlockId(receiverId, time - blockInterval) val newBlock = new Block(blockId, newBlockBuffer) @@ -131,8 +167,8 @@ private[streaming] class BlockGenerator( } catch { case ie: InterruptedException => logInfo("Block pushing thread was interrupted") - case t: Throwable => - reportError("Error in block pushing thread", t) + case e: Exception => + reportError("Error in block updating thread", e) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index a13689a9254a6..704019a55aff3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -27,10 +27,10 @@ import akka.actor.{Actor, Props} import akka.pattern.ask import com.google.common.base.Throwables import org.apache.hadoop.conf.Configuration + import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.util.WriteAheadLogFileSegment import org.apache.spark.util.{AkkaUtils, Utils} /** @@ -99,6 +99,8 @@ private[streaming] class ReceiverSupervisorImpl( /** Divides received data records into data blocks for pushing in BlockManager. */ private val blockGenerator = new BlockGenerator(new BlockGeneratorListener { + def onAddData(data: Any, metadata: Any): Unit = { } + def onGenerateBlock(blockId: StreamBlockId): Unit = { } def onError(message: String, throwable: Throwable) { @@ -112,7 +114,7 @@ private[streaming] class ReceiverSupervisorImpl( /** Push a single record of received data into block generator. */ def pushSingle(data: Any) { - blockGenerator += (data) + blockGenerator addData (data) } /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index c189f1fa9ef47..9bad105f5a83b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -138,7 +138,7 @@ class ReceiverSuite extends FunSuite with Timeouts { blockGenerator.start() var count = 0 while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator += count + blockGenerator addData count generatedData += count count += 1 Thread.sleep(10) @@ -168,7 +168,7 @@ class ReceiverSuite extends FunSuite with Timeouts { blockGenerator.start() var count = 0 while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator += count + blockGenerator addData count generatedData += count count += 1 Thread.sleep(1) @@ -299,6 +299,8 @@ class ReceiverSuite extends FunSuite with Timeouts { val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] val errors = new ArrayBuffer[Throwable] + def onAddData(data: Any, metadata: Any) { } + def onGenerateBlock(blockId: StreamBlockId) { } def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { From d9a452cd3ffefc390622c147816d18a57d9428e4 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 13 Nov 2014 13:39:30 -0800 Subject: [PATCH 2/6] Minor updates. --- .../streaming/kafka/JavaKafkaStreamSuite.java | 4 ++-- .../kafka/ReliableKafkaStreamSuite.scala | 24 +++---------------- 2 files changed, 5 insertions(+), 23 deletions(-) diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 60e6f8deb0d13..ce48dfbc0fcf1 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -80,8 +80,8 @@ public void testKafkaStream() throws InterruptedException { suiteBase.createTopic(topic); HashMap tmp = new HashMap(sent); suiteBase.produceAndSendMessage(topic, - JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( - Predef.>conforms())); + JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( + Predef.>conforms())); HashMap kafkaParams = new HashMap(); kafkaParams.put("zookeeper.connect", suiteBase.zkAddress()); diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 8489d64762a2b..64ccc92c81fa9 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -44,7 +44,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter .set("spark.streaming.receiver.writeAheadLog.enable", "true") val data = Map("a" -> 10, "b" -> 10, "c" -> 10) - var topic: String = _ + var groupId: String = _ var kafkaParams: Map[String, String] = _ var ssc: StreamingContext = _ @@ -52,7 +52,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter before { setupKafka() - topic = s"test-topic-${Random.nextInt(10000)}" groupId = s"test-consumer-${Random.nextInt(10000)}" kafkaParams = Map( "zookeeper.connect" -> zkAddress, @@ -78,6 +77,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter test("Reliable Kafka input stream with single topic") { + var topic = "test-topic" createTopic(topic) produceAndSendMessage(topic, data) @@ -95,7 +95,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter } } ssc.start() - eventually(timeout(20000 milliseconds), interval(200 milliseconds)) { // A basic process verification for ReliableKafkaReceiver. // Verify whether received message number is equal to the sent message number. @@ -104,26 +103,10 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter data.keys.foreach { k => assert(data(k) === result(k).toInt) } // Verify the offset number whether it is equal to the total message number. assert(getCommitOffset(groupId, topic, 0) === Some(29L)) - } ssc.stop() } -/* - test("Verify the offset commit") { - // Verify the correctness of offset commit mechanism. - createTopic(topic) - produceAndSendMessage(topic, data) - // Do this to consume all the message of this group/topic. - val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) - stream.foreachRDD(_ => Unit) - ssc.start() - eventually(timeout(20000 milliseconds), interval(200 milliseconds)) { - } - ssc.stop() - } -*/ test("Reliable Kafka input stream with multiple topics") { val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) topics.foreach { case (t, _) => @@ -152,7 +135,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter assert(zkClient != null, "Zookeeper client is not initialized") val topicDirs = new ZKGroupTopicDirs(groupId, topic) val zkPath = s"${topicDirs.consumerOffsetDir}/$partition" - val offset = ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong) - offset + ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong) } } From 14630aaf06892494ade308c93ab1d18b3d8d5207 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 13 Nov 2014 13:51:42 -0800 Subject: [PATCH 3/6] Minor updates. --- .../spark/streaming/kafka/ReliableKafkaReceiver.scala | 2 +- .../apache/spark/streaming/receiver/BlockGenerator.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index f85296efe8d60..09c3d1c4549e7 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -227,7 +227,7 @@ class ReliableKafkaReceiver[ ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString) } catch { - case t: Throwable => logWarning(s"Exception during commit offset $offset for topic" + + case e: Exception => logWarning(s"Exception during commit offset $offset for topic" + s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index d286fadd50096..55765dc90698b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -140,8 +140,8 @@ private[streaming] class BlockGenerator( } catch { case ie: InterruptedException => logInfo("Block updating timer thread was interrupted") - case t: Throwable => - reportError("Error in block updating thread", t) + case e: Exception => + reportError("Error in block updating thread", e) } } @@ -168,7 +168,7 @@ private[streaming] class BlockGenerator( case ie: InterruptedException => logInfo("Block pushing thread was interrupted") case e: Exception => - reportError("Error in block updating thread", e) + reportError("Error in block pushing thread", e) } } From 149948b57de14ddacabc81c16a278ceff4cf927a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 13 Nov 2014 13:55:09 -0800 Subject: [PATCH 4/6] Fixed mistake --- .../apache/spark/streaming/kafka/ReliableKafkaReceiver.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index 09c3d1c4549e7..be734b80272d1 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -227,8 +227,9 @@ class ReliableKafkaReceiver[ ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString) } catch { - case e: Exception => logWarning(s"Exception during commit offset $offset for topic" + - s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t) + case e: Exception => + logWarning(s"Exception during commit offset $offset for topic" + + s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e) } logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " + From fab14c79d3b1e7ced5b881359343366915a622b0 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 13 Nov 2014 14:21:19 -0800 Subject: [PATCH 5/6] minor update. --- .../spark/streaming/receiver/ReceiverSupervisorImpl.scala | 2 +- .../test/scala/org/apache/spark/streaming/ReceiverSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 704019a55aff3..3b1233e86c210 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -114,7 +114,7 @@ private[streaming] class ReceiverSupervisorImpl( /** Push a single record of received data into block generator. */ def pushSingle(data: Any) { - blockGenerator addData (data) + blockGenerator.addData(data) } /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 9bad105f5a83b..e26c0c6859e57 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -138,7 +138,7 @@ class ReceiverSuite extends FunSuite with Timeouts { blockGenerator.start() var count = 0 while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator addData count + blockGenerator.addData(count) generatedData += count count += 1 Thread.sleep(10) @@ -168,7 +168,7 @@ class ReceiverSuite extends FunSuite with Timeouts { blockGenerator.start() var count = 0 while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator addData count + blockGenerator.addData(count) generatedData += count count += 1 Thread.sleep(1) From eae4ad606e60f940a1537feea01e0c2cc4fb6ae8 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 13 Nov 2014 14:54:14 -0800 Subject: [PATCH 6/6] Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made Java more robust. --- .../streaming/kafka/JavaKafkaStreamSuite.java | 9 +- .../streaming/kafka/KafkaStreamSuite.scala | 151 ++++++++---------- 2 files changed, 73 insertions(+), 87 deletions(-) diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index ce48dfbc0fcf1..6e1abf3f385ee 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -127,11 +127,16 @@ public Void call(JavaPairRDD rdd) throws Exception { ); ssc.start(); - ssc.awaitTermination(3000); - + long startTime = System.currentTimeMillis(); + boolean sizeMatches = false; + while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) { + sizeMatches = sent.size() == result.size(); + Thread.sleep(200); + } Assert.assertEquals(sent.size(), result.size()); for (String k : sent.keySet()) { Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue()); } + ssc.stop(); } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 6e24b6f7ffb3b..b19c053ebfc44 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -46,8 +46,7 @@ import org.apache.spark.util.Utils * This is an abstract base class for Kafka testsuites. This has the functionality to set up * and tear down local Kafka servers, and to push data using Kafka producers. */ -abstract class KafkaStreamSuiteBase extends FunSuite with Logging { - import KafkaTestUtils._ +abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging { var zkAddress: String = _ var zkClient: ZkClient = _ @@ -78,7 +77,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging { var bindSuccess: Boolean = false while(!bindSuccess) { try { - val brokerProps = getBrokerConfig(brokerPort, zkAddress) + val brokerProps = getBrokerConfig() brokerConf = new KafkaConfig(brokerProps) server = new KafkaServer(brokerConf) logInfo("==================== 2 ====================") @@ -134,111 +133,43 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging { CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") logInfo("==================== 5 ====================") // wait until metadata is propagated - waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000) + waitUntilMetadataIsPropagated(topic, 0) } def produceAndSendMessage(topic: String, sent: Map[String, Int]) { - val brokerAddr = brokerConf.hostName + ":" + brokerConf.port - producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr))) + producer = new Producer[String, String](new ProducerConfig(getProducerConfig())) producer.send(createTestMessage(topic, sent): _*) producer.close() logInfo("==================== 6 ====================") } -} - -class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually { - var ssc: StreamingContext = _ - - before { - setupKafka() - } - - after { - if (ssc != null) { - ssc.stop() - ssc = null - } - tearDownKafka() - } - - test("Kafka input stream") { - val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) - ssc = new StreamingContext(sparkConf, Milliseconds(500)) - val topic = "topic1" - val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) - createTopic(topic) - produceAndSendMessage(topic, sent) - - val kafkaParams = Map("zookeeper.connect" -> zkAddress, - "group.id" -> s"test-consumer-${Random.nextInt(10000)}", - "auto.offset.reset" -> "smallest") - - val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, - kafkaParams, - Map(topic -> 1), - StorageLevel.MEMORY_ONLY) - val result = new mutable.HashMap[String, Long]() - stream.map { case (k, v) => v } - .countByValue() - .foreachRDD { r => - val ret = r.collect() - ret.toMap.foreach { kv => - val count = result.getOrElseUpdate(kv._1, 0) + kv._2 - result.put(kv._1, count) - } - } - ssc.start() - eventually(timeout(3000 milliseconds), interval(100 milliseconds)) { - assert(sent.size === result.size) - sent.keys.foreach { k => assert(sent(k) === result(k).toInt) } - } - - ssc.stop() - } -} - -object KafkaTestUtils { - - def getBrokerConfig(port: Int, zkConnect: String): Properties = { + private def getBrokerConfig(): Properties = { val props = new Properties() props.put("broker.id", "0") props.put("host.name", "localhost") - props.put("port", port.toString) + props.put("port", brokerPort.toString) props.put("log.dir", Utils.createTempDir().getAbsolutePath) - props.put("zookeeper.connect", zkConnect) + props.put("zookeeper.connect", zkAddress) props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") props } - def getProducerConfig(brokerList: String): Properties = { + private def getProducerConfig(): Properties = { + val brokerAddr = brokerConf.hostName + ":" + brokerConf.port val props = new Properties() - props.put("metadata.broker.list", brokerList) + props.put("metadata.broker.list", brokerAddr) props.put("serializer.class", classOf[StringEncoder].getName) props } - def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = { - val startTime = System.currentTimeMillis() - while (true) { - if (condition()) - return true - if (System.currentTimeMillis() > startTime + waitTime) - return false - Thread.sleep(waitTime.min(100L)) + private def waitUntilMetadataIsPropagated(topic: String, partition: Int) { + eventually(timeout(1000 milliseconds), interval(100 milliseconds)) { + assert( + server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)), + s"Partition [$topic, $partition] metadata not propagated after timeout" + ) } - // Should never go to here - throw new RuntimeException("unexpected error") - } - - def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, - timeout: Long) { - assert(waitUntilTrue(() => - servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains( - TopicAndPartition(topic, partition))), timeout), - s"Partition [$topic, $partition] metadata not propagated after timeout") } class EmbeddedZookeeper(val zkConnect: String) { @@ -264,3 +195,53 @@ object KafkaTestUtils { } } } + + +class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter { + var ssc: StreamingContext = _ + + before { + setupKafka() + } + + after { + if (ssc != null) { + ssc.stop() + ssc = null + } + tearDownKafka() + } + + test("Kafka input stream") { + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) + ssc = new StreamingContext(sparkConf, Milliseconds(500)) + val topic = "topic1" + val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) + createTopic(topic) + produceAndSendMessage(topic, sent) + + val kafkaParams = Map("zookeeper.connect" -> zkAddress, + "group.id" -> s"test-consumer-${Random.nextInt(10000)}", + "auto.offset.reset" -> "smallest") + + val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) + val result = new mutable.HashMap[String, Long]() + stream.map(_._2).countByValue().foreachRDD { r => + val ret = r.collect() + ret.toMap.foreach { kv => + val count = result.getOrElseUpdate(kv._1, 0) + kv._2 + result.put(kv._1, count) + } + } + ssc.start() + eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + assert(sent.size === result.size) + sent.keys.foreach { k => + assert(sent(k) === result(k).toInt) + } + } + ssc.stop() + } +} +