Skip to content

Commit

Permalink
Fix Kafka unit test hard coded Zookeeper port issue
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Sep 22, 2014
1 parent fd0b32c commit 8555563
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testKafkaStream() throws InterruptedException {
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", testSuite.zkConnect());
kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.{Properties, Random}
import scala.collection.mutable

import kafka.admin.CreateTopicCommand
import kafka.common.TopicAndPartition
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.utils.ZKStringSerializer
import kafka.serializer.{StringDecoder, StringEncoder}
Expand All @@ -42,14 +42,13 @@ import org.apache.spark.util.Utils
class KafkaStreamSuite extends TestSuiteBase {
import KafkaTestUtils._

val zkConnect = "localhost:2181"
val zkHost = "localhost"
var zkPort: Int = 0
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000

val brokerPort = 9092
val brokerProps = getBrokerConfig(brokerPort, zkConnect)
val brokerConf = new KafkaConfig(brokerProps)

protected var brokerPort = 9092
protected var brokerConf: KafkaConfig = _
protected var zookeeper: EmbeddedZookeeper = _
protected var zkClient: ZkClient = _
protected var server: KafkaServer = _
Expand All @@ -59,16 +58,35 @@ class KafkaStreamSuite extends TestSuiteBase {

override def beforeFunction() {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(zkConnect)
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
logInfo("==================== 0 ====================")
zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)

zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
ZKStringSerializer)
logInfo("==================== 1 ====================")

// Kafka broker startup
server = new KafkaServer(brokerConf)
logInfo("==================== 2 ====================")
server.startup()
logInfo("==================== 3 ====================")
var bindSuccess: Boolean = false
while(!bindSuccess) {
try {
val brokerProps = getBrokerConfig(brokerPort, s"$zkHost:$zkPort")
brokerConf = new KafkaConfig(brokerProps)
server = new KafkaServer(brokerConf)
logInfo("==================== 2 ====================")
server.startup()
logInfo("==================== 3 ====================")
bindSuccess = true
} catch {
case e: KafkaException =>
if (e.getMessage != null && e.getMessage.contains("Socket server failed to bind to")) {
brokerPort += 1
}
case e: Exception => throw new Exception("Kafka server create failed", e)
}
}

Thread.sleep(2000)
logInfo("==================== 4 ====================")
super.beforeFunction()
Expand All @@ -92,7 +110,7 @@ class KafkaStreamSuite extends TestSuiteBase {
createTopic(topic)
produceAndSendMessage(topic, sent)

val kafkaParams = Map("zookeeper.connect" -> zkConnect,
val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

Expand Down Expand Up @@ -200,6 +218,8 @@ object KafkaTestUtils {
factory.configure(new InetSocketAddress(ip, port), 16)
factory.startup(zookeeper)

val actualPort = factory.getLocalPort

def shutdown() {
factory.shutdown()
Utils.deleteRecursively(snapshotDir)
Expand Down

0 comments on commit 8555563

Please sign in to comment.