diff --git a/pom.xml b/pom.xml index c7b6a9356747..21aab571c83d 100644 --- a/pom.xml +++ b/pom.xml @@ -885,22 +885,6 @@ 9.6.3-3 - - org.apache.kafka - kafka_2.10 - 0.8.2.2 - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - - org.xerial.snappy snappy-java diff --git a/presto-kafka/pom.xml b/presto-kafka/pom.xml index 6e78414a6c6d..137e240b8e23 100644 --- a/presto-kafka/pom.xml +++ b/presto-kafka/pom.xml @@ -14,6 +14,8 @@ ${project.parent.basedir} + + 0.10.2.2 @@ -60,6 +62,17 @@ org.apache.kafka kafka_2.10 + ${dep.kafka.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaSplitManager.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaSplitManager.java index 56e716eba276..776c96d20555 100644 --- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaSplitManager.java +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaSplitManager.java @@ -28,7 +28,7 @@ import io.prestosql.spi.connector.ConnectorTransactionHandle; import io.prestosql.spi.connector.FixedSplitSource; import kafka.api.PartitionOffsetRequestInfo; -import kafka.cluster.Broker; +import kafka.cluster.BrokerEndPoint; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetRequest; import kafka.javaapi.OffsetResponse; @@ -100,7 +100,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co for (PartitionMetadata part : metadata.partitionsMetadata()) { log.debug("Adding Partition %s/%s", metadata.topic(), part.partitionId()); - Broker leader = part.leader(); + BrokerEndPoint leader = part.leader(); if (leader == null) { throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Leader election in progress for Kafka topic '%s' partition %s", metadata.topic(), part.partitionId())); } diff --git a/presto-kafka/src/test/java/io/prestosql/plugin/kafka/util/EmbeddedKafka.java b/presto-kafka/src/test/java/io/prestosql/plugin/kafka/util/EmbeddedKafka.java index a3b64f1e1c4a..f068904d6dae 100644 --- a/presto-kafka/src/test/java/io/prestosql/plugin/kafka/util/EmbeddedKafka.java +++ b/presto-kafka/src/test/java/io/prestosql/plugin/kafka/util/EmbeddedKafka.java @@ -17,12 +17,15 @@ import com.google.common.collect.Maps; import com.google.common.io.Files; import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; import java.io.Closeable; import java.io.File; @@ -86,7 +89,7 @@ public static EmbeddedKafka createEmbeddedKafka(Properties overrideProperties) .putAll(Maps.fromProperties(overrideProperties)) .build(); - KafkaConfig config = new KafkaConfig(toProperties(properties)); + KafkaConfig config = new KafkaConfig(properties); this.kafka = new KafkaServerStartable(config); } @@ -120,10 +123,17 @@ public void createTopics(int partitions, int replication, Properties topicProper { checkState(started.get() && !stopped.get(), "not started!"); - ZkClient zkClient = new ZkClient(getZookeeperConnectString(), 30_000, 30_000, ZKStringSerializer$.MODULE$); + ZkConnection zkConnection = new ZkConnection(getZookeeperConnectString(), 30_000); + ZkClient zkClient = new ZkClient(zkConnection, 30_000, ZKStringSerializer$.MODULE$); try { for (String topic : topics) { - AdminUtils.createTopic(zkClient, topic, partitions, replication, topicProperties); + AdminUtils.createTopic( + new ZkUtils(zkClient, zkConnection, false), + topic, + partitions, + replication, + topicProperties, + RackAwareMode.Disabled$.MODULE$); } } finally {