From 8555563830eb240f2b5b44a8991d0e45c49bfdaa Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 22 Sep 2014 10:52:14 +0800 Subject: [PATCH] Fix Kafka unit test hard coded Zookeeper port issue --- .../streaming/kafka/JavaKafkaStreamSuite.java | 2 +- .../streaming/kafka/KafkaStreamSuite.scala | 46 +++++++++++++------ 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 0571454c01dae..efb0099c7c850 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -81,7 +81,7 @@ public void testKafkaStream() throws InterruptedException { Predef.>conforms())); HashMap kafkaParams = new HashMap(); - kafkaParams.put("zookeeper.connect", testSuite.zkConnect()); + kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort()); kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000)); kafkaParams.put("auto.offset.reset", "smallest"); diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index c0b55e9340253..6943326eb750e 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -24,7 +24,7 @@ import java.util.{Properties, Random} import scala.collection.mutable import kafka.admin.CreateTopicCommand -import kafka.common.TopicAndPartition +import kafka.common.{KafkaException, TopicAndPartition} import kafka.producer.{KeyedMessage, ProducerConfig, Producer} import kafka.utils.ZKStringSerializer import kafka.serializer.{StringDecoder, StringEncoder} @@ -42,14 +42,13 @@ import org.apache.spark.util.Utils class KafkaStreamSuite extends TestSuiteBase { import KafkaTestUtils._ - val zkConnect = "localhost:2181" + val zkHost = "localhost" + var zkPort: Int = 0 val zkConnectionTimeout = 6000 val zkSessionTimeout = 6000 - val brokerPort = 9092 - val brokerProps = getBrokerConfig(brokerPort, zkConnect) - val brokerConf = new KafkaConfig(brokerProps) - + protected var brokerPort = 9092 + protected var brokerConf: KafkaConfig = _ protected var zookeeper: EmbeddedZookeeper = _ protected var zkClient: ZkClient = _ protected var server: KafkaServer = _ @@ -59,16 +58,35 @@ class KafkaStreamSuite extends TestSuiteBase { override def beforeFunction() { // Zookeeper server startup - zookeeper = new EmbeddedZookeeper(zkConnect) + zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") + // Get the actual zookeeper binding port + zkPort = zookeeper.actualPort logInfo("==================== 0 ====================") - zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) + + zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, + ZKStringSerializer) logInfo("==================== 1 ====================") // Kafka broker startup - server = new KafkaServer(brokerConf) - logInfo("==================== 2 ====================") - server.startup() - logInfo("==================== 3 ====================") + var bindSuccess: Boolean = false + while(!bindSuccess) { + try { + val brokerProps = getBrokerConfig(brokerPort, s"$zkHost:$zkPort") + brokerConf = new KafkaConfig(brokerProps) + server = new KafkaServer(brokerConf) + logInfo("==================== 2 ====================") + server.startup() + logInfo("==================== 3 ====================") + bindSuccess = true + } catch { + case e: KafkaException => + if (e.getMessage != null && e.getMessage.contains("Socket server failed to bind to")) { + brokerPort += 1 + } + case e: Exception => throw new Exception("Kafka server create failed", e) + } + } + Thread.sleep(2000) logInfo("==================== 4 ====================") super.beforeFunction() @@ -92,7 +110,7 @@ class KafkaStreamSuite extends TestSuiteBase { createTopic(topic) produceAndSendMessage(topic, sent) - val kafkaParams = Map("zookeeper.connect" -> zkConnect, + val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort", "group.id" -> s"test-consumer-${random.nextInt(10000)}", "auto.offset.reset" -> "smallest") @@ -200,6 +218,8 @@ object KafkaTestUtils { factory.configure(new InetSocketAddress(ip, port), 16) factory.startup(zookeeper) + val actualPort = factory.getLocalPort + def shutdown() { factory.shutdown() Utils.deleteRecursively(snapshotDir)