Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor 3 = Refactor 2 + refactored KafkaStreamSuite further to elimite KafkaTestUtils, and made Java testsuite more robust #8

Merged
merged 6 commits into from
Nov 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,23 @@ class ReliableKafkaReceiver[

/** Store a Kafka message and the associated metadata as a tuple. */
private def storeMessageAndMetadata(
msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized {
msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
val data = (msgAndMetadata.key, msgAndMetadata.message)
val metadata = (topicAndPartition, msgAndMetadata.offset)
blockGenerator.addDataWithCallback(data, metadata)
}

/** Update stored offset */
private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
topicPartitionOffsetMap.put(topicAndPartition, offset)
}

/**
* Remember the current offsets for each topic and partition. This is called when a block is
* generated.
*/
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized {
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
Expand Down Expand Up @@ -221,8 +227,9 @@ class ReliableKafkaReceiver[

ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
} catch {
case t: Throwable => logWarning(s"Exception during commit offset $offset for topic" +
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t)
case e: Exception =>
logWarning(s"Exception during commit offset $offset for topic" +
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
}

logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
Expand Down Expand Up @@ -250,17 +257,25 @@ class ReliableKafkaReceiver[
/** Class to handle blocks generated by the block generator. */
private final class GeneratedBlockHandler extends BlockGeneratorListener {

override def onGenerateBlock(blockId: StreamBlockId): Unit = {
def onAddData(data: Any, metadata: Any): Unit = {
// Update the offset of the data that was added to the generator
if (metadata != null) {
val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
updateOffset(topicAndPartition, offset)
}
}

def onGenerateBlock(blockId: StreamBlockId): Unit = {
// Remember the offsets of topics/partitions when a block has been generated
rememberBlockOffsets(blockId)
}

override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
// Store block and commit the blocks offset
storeBlockAndCommitOffset(blockId, arrayBuffer)
}

override def onError(message: String, throwable: Throwable): Unit = {
def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.List;
import java.util.Random;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConverters;
Expand All @@ -43,23 +45,25 @@

public class JavaKafkaStreamSuite implements Serializable {
private transient JavaStreamingContext ssc = null;
private Random random = new Random();
private transient Random random = new Random();
private transient KafkaStreamSuiteBase suiteBase = null;

@Before
public void setUp() {
suiteBase = new KafkaStreamSuiteBase() { };
suiteBase.beforeFunction();
suiteBase.setupKafka();
System.clearProperty("spark.driver.port");
ssc = new JavaStreamingContext(suiteBase.sparkConf(), suiteBase.batchDuration());
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, new Duration(500));
}

@After
public void tearDown() {
ssc.stop();
ssc = null;
System.clearProperty("spark.driver.port");
suiteBase.afterFunction();
suiteBase.tearDownKafka();
}

@Test
Expand All @@ -76,8 +80,8 @@ public void testKafkaStream() throws InterruptedException {
suiteBase.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
suiteBase.produceAndSendMessage(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
Expand Down Expand Up @@ -123,11 +127,16 @@ public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
);

ssc.start();
ssc.awaitTermination(3000);

long startTime = System.currentTimeMillis();
boolean sizeMatches = false;
while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
sizeMatches = sent.size() == result.size();
Thread.sleep(200);
}
Assert.assertEquals(sent.size(), result.size());
for (String k : sent.keySet()) {
Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
}
ssc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,12 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils

abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
import KafkaTestUtils._
/**
* This is an abstract base class for Kafka testsuites. This has the functionality to set up
* and tear down local Kafka servers, and to push data using Kafka producers.
*/
abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {

val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
val batchDuration = Milliseconds(500)
var ssc: StreamingContext = _

var zkAddress: String = _
var zkClient: ZkClient = _

Expand All @@ -64,7 +61,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
private var server: KafkaServer = _
private var producer: Producer[String, String] = _

def beforeFunction() {
def setupKafka() {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
Expand All @@ -80,7 +77,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
var bindSuccess: Boolean = false
while(!bindSuccess) {
try {
val brokerProps = getBrokerConfig(brokerPort, zkAddress)
val brokerProps = getBrokerConfig()
brokerConf = new KafkaConfig(brokerProps)
server = new KafkaServer(brokerConf)
logInfo("==================== 2 ====================")
Expand All @@ -100,12 +97,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
logInfo("==================== 4 ====================")
}

def afterFunction() {
if (ssc != null) {
ssc.stop()
ssc = null
}

def tearDownKafka() {
if (producer != null) {
producer.close()
producer = null
Expand Down Expand Up @@ -141,101 +133,43 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
logInfo("==================== 5 ====================")
// wait until metadata is propagated
waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
waitUntilMetadataIsPropagated(topic, 0)
}

def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
if (producer == null) {
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
}
producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
producer.send(createTestMessage(topic, sent): _*)
producer.close()
logInfo("==================== 6 ====================")
}
}

class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {

before { beforeFunction() }
after { afterFunction() }

test("Kafka input stream") {
ssc = new StreamingContext(sparkConf, batchDuration)
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
produceAndSendMessage(topic, sent)

val kafkaParams = Map("zookeeper.connect" -> zkAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]()
stream.map { case (k, v) => v }
.countByValue()
.foreachRDD { r =>
val ret = r.collect()
ret.toMap.foreach { kv =>
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
result.put(kv._1, count)
}
}
ssc.start()
eventually(timeout(3000 milliseconds), interval(100 milliseconds)) {
assert(sent.size === result.size)
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
}

ssc.stop()
}
}


object KafkaTestUtils {

def getBrokerConfig(port: Int, zkConnect: String): Properties = {
private def getBrokerConfig(): Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "localhost")
props.put("port", port.toString)
props.put("port", brokerPort.toString)
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
props.put("zookeeper.connect", zkConnect)
props.put("zookeeper.connect", zkAddress)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props
}

def getProducerConfig(brokerList: String): Properties = {
private def getProducerConfig(): Properties = {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
val props = new Properties()
props.put("metadata.broker.list", brokerList)
props.put("metadata.broker.list", brokerAddr)
props.put("serializer.class", classOf[StringEncoder].getName)
props
}

def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
val startTime = System.currentTimeMillis()
while (true) {
if (condition())
return true
if (System.currentTimeMillis() > startTime + waitTime)
return false
Thread.sleep(waitTime.min(100L))
private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
eventually(timeout(1000 milliseconds), interval(100 milliseconds)) {
assert(
server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
s"Partition [$topic, $partition] metadata not propagated after timeout"
)
}
// Should never go to here
throw new RuntimeException("unexpected error")
}

def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
timeout: Long) {
assert(waitUntilTrue(() =>
servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(
TopicAndPartition(topic, partition))), timeout),
s"Partition [$topic, $partition] metadata not propagated after timeout")
}

class EmbeddedZookeeper(val zkConnect: String) {
Expand All @@ -261,3 +195,53 @@ object KafkaTestUtils {
}
}
}


class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
var ssc: StreamingContext = _

before {
setupKafka()
}

after {
if (ssc != null) {
ssc.stop()
ssc = null
}
tearDownKafka()
}

test("Kafka input stream") {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
ssc = new StreamingContext(sparkConf, Milliseconds(500))
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
produceAndSendMessage(topic, sent)

val kafkaParams = Map("zookeeper.connect" -> zkAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]()
stream.map(_._2).countByValue().foreachRDD { r =>
val ret = r.collect()
ret.toMap.foreach { kv =>
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
result.put(kv._1, count)
}
}
ssc.start()
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(sent.size === result.size)
sent.keys.foreach { k =>
assert(sent(k) === result(k).toInt)
}
}
ssc.stop()
}
}

Loading