Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3615][Streaming]Fix Kafka unit test hard coded Zookeeper port issue #2483

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testKafkaStream() throws InterruptedException {
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", testSuite.zkConnect());
kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.{Properties, Random}
import scala.collection.mutable

import kafka.admin.CreateTopicCommand
import kafka.common.TopicAndPartition
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.utils.ZKStringSerializer
import kafka.serializer.{StringDecoder, StringEncoder}
Expand All @@ -42,14 +42,13 @@ import org.apache.spark.util.Utils
class KafkaStreamSuite extends TestSuiteBase {
import KafkaTestUtils._

val zkConnect = "localhost:2181"
val zkHost = "localhost"
var zkPort: Int = 0
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000

val brokerPort = 9092
val brokerProps = getBrokerConfig(brokerPort, zkConnect)
val brokerConf = new KafkaConfig(brokerProps)

protected var brokerPort = 9092
protected var brokerConf: KafkaConfig = _
protected var zookeeper: EmbeddedZookeeper = _
protected var zkClient: ZkClient = _
protected var server: KafkaServer = _
Expand All @@ -59,16 +58,35 @@ class KafkaStreamSuite extends TestSuiteBase {

override def beforeFunction() {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(zkConnect)
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
logInfo("==================== 0 ====================")
zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)

zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
ZKStringSerializer)
logInfo("==================== 1 ====================")

// Kafka broker startup
server = new KafkaServer(brokerConf)
logInfo("==================== 2 ====================")
server.startup()
logInfo("==================== 3 ====================")
var bindSuccess: Boolean = false
while(!bindSuccess) {
try {
val brokerProps = getBrokerConfig(brokerPort, s"$zkHost:$zkPort")
brokerConf = new KafkaConfig(brokerProps)
server = new KafkaServer(brokerConf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no way to specify the KafkaServer port as 0, and get back the actual port it bound to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi TD, thanks for reviewing. Actually there's no way for KafkaServer to read back the port it bound to, so I wrote like this to try different ports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright. just one more round of testing, and will merge it if it passes.

logInfo("==================== 2 ====================")
server.startup()
logInfo("==================== 3 ====================")
bindSuccess = true
} catch {
case e: KafkaException =>
if (e.getMessage != null && e.getMessage.contains("Socket server failed to bind to")) {
brokerPort += 1
}
case e: Exception => throw new Exception("Kafka server create failed", e)
}
}

Thread.sleep(2000)
logInfo("==================== 4 ====================")
super.beforeFunction()
Expand All @@ -92,7 +110,7 @@ class KafkaStreamSuite extends TestSuiteBase {
createTopic(topic)
produceAndSendMessage(topic, sent)

val kafkaParams = Map("zookeeper.connect" -> zkConnect,
val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

Expand Down Expand Up @@ -200,6 +218,8 @@ object KafkaTestUtils {
factory.configure(new InetSocketAddress(ip, port), 16)
factory.startup(zookeeper)

val actualPort = factory.getLocalPort

def shutdown() {
factory.shutdown()
Utils.deleteRecursively(snapshotDir)
Expand Down