Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support connecting to Kerberos secured Kafka #7990, updated to Kafka 0.10.1.2 #8394

Closed
wants to merge 12 commits into from
Closed
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@
<dep.testng.version>6.10</dep.testng.version>
<dep.nifty.version>0.15.1</dep.nifty.version>
<dep.swift.version>0.15.2</dep.swift.version>
<dep.kafka-clients.version>0.10.2.1</dep.kafka-clients.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't add a property for a version that is only used in one place. We use properties for related artifacts that need to be versioned together.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, modified as per suggestion.


<!-- use a fractional hour timezone offset for tests -->
<air.test.timezone>Asia/Katmandu</air.test.timezone>
@@ -769,7 +770,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
<version>0.10.2.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
@@ -785,7 +786,7 @@
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.1.7</version>
<version>1.1.2.6</version>
</dependency>

<dependency>
@@ -811,7 +812,7 @@
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.8</version>
<version>0.10</version>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
@@ -921,6 +922,12 @@
<artifactId>cassandra-driver</artifactId>
<version>3.1.4-1</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
</dependencyManagement>

10 changes: 8 additions & 2 deletions presto-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -67,15 +67,21 @@
<artifactId>kafka_2.10</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${dep.kafka-clients.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this to the dependencyManagement section in the root POM, then remove the version here. In almost all cases, versions should only be in the root POM, ensuring versions are consistent.

(Technically, this is not necessary here, because this dependency is only used in this one module, and different plugins can actually have different versions of the same library due to class loader isolation. But we follow the rule as much as possible so it's easy for everyone to understand and not wonder why there are exceptions.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changes made per suggestion.

</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

<!--
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete rather than commenting out. The old version is available in the Git history. Commenting out code quickly clutters the code base.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
</dependency>-->

<dependency>
<groupId>javax.inject</groupId>
Original file line number Diff line number Diff line change
@@ -69,6 +69,13 @@ public class KafkaConnectorConfig
*/
private boolean hideInternalColumns = true;

/**
* Security protocol to connect to the broker, default is plain text
*/
private String securityProtocol = "PLAINTEXT";

private boolean autoCommit = true;

@NotNull
public File getTableDescriptionDir()
{
@@ -168,4 +175,28 @@ private static HostAddress toHostAddress(String value)
{
return HostAddress.fromString(value).withDefaultPort(KAFKA_DEFAULT_PORT);
}

@Config("kafka.security-protocol")
public KafkaConnectorConfig setSecurityProtocol(String securityProtocol)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an enum, so that the possible values can be validated

{
this.securityProtocol = securityProtocol;
return this;
}

public String getSecurityProtocol()
{
return securityProtocol;
}

@Config("kafka.auto-commit")
public KafkaConnectorConfig setAutoCommit(boolean autoCommit)
{
this.autoCommit = autoCommit;
return this;
}

public boolean isAutoCommit()
{
return autoCommit;
}
}
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ public void configure(Binder binder)
binder.bind(KafkaSplitManager.class).in(Scopes.SINGLETON);
binder.bind(KafkaRecordSetProvider.class).in(Scopes.SINGLETON);

binder.bind(KafkaSimpleConsumerManager.class).in(Scopes.SINGLETON);
binder.bind(KafkaConsumerManager.class).in(Scopes.SINGLETON);

configBinder(binder).bindConfig(KafkaConnectorConfig.class);

Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.kafka;

import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeManager;
import com.google.common.base.Throwables;
import io.airlift.log.Logger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import javax.inject.Inject;

import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

/**
* Manages connections to the Kafka nodes. A worker may connect to multiple Kafka nodes depending on the segments and partitions
* it needs to process. According to the Kafka source code, a Kafka {@link kafka.javaapi.consumer.SimpleConsumer} is thread-safe.
*/
public class KafkaConsumerManager
{
private static final Logger log = Logger.get(KafkaConsumerManager.class);
private static final String messageDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer";

private final String connectorId;
private final NodeManager nodeManager;
private final int connectTimeoutMillis;
private final int bufferSizeBytes;
private final String securityProtocol;
private final boolean autoCommit;
private final String bootStrapServers;

@Inject
public KafkaConsumerManager(
KafkaConnectorId connectorId,
KafkaConnectorConfig kafkaConnectorConfig,
NodeManager nodeManager)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
requireNonNull(kafkaConnectorConfig, "kafkaConfig is null");
this.connectTimeoutMillis = toIntExact(kafkaConnectorConfig.getKafkaConnectTimeout().toMillis());
this.bufferSizeBytes = toIntExact(kafkaConnectorConfig.getKafkaBufferSize().toBytes());
this.securityProtocol = kafkaConnectorConfig.getSecurityProtocol();
this.autoCommit = kafkaConnectorConfig.isAutoCommit();
this.bootStrapServers = bootstrapServers(kafkaConnectorConfig.getNodes());
}

public KafkaConsumer<byte[], byte[]> getConsumer()
{
return getConsumer(bootStrapServers);
}

public KafkaConsumer<byte[], byte[]> getConsumer(Set<HostAddress> nodes)
{
return getConsumer(bootstrapServers(nodes));
}

private KafkaConsumer<byte[], byte[]> getConsumer(String bootstrapServers)
{
try {
Thread.currentThread().setContextClassLoader(null);
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
prop.put(ConsumerConfig.GROUP_ID_CONFIG, connectorId + "-presto");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(autoCommit));
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, messageDeserializer);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, messageDeserializer);
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_" + connectorId + "_" + ThreadLocalRandom.current().nextInt() + "_" + System.currentTimeMillis());
prop.put("security.protocol", securityProtocol);
return new KafkaConsumer<byte[], byte[]>(prop);
}
catch (Exception e) {
throw Throwables.propagate(e.getCause());
}
}

private String bootstrapServers(Set<HostAddress> nodes)
{
return nodes.stream().map(ha -> new StringJoiner(":").add(ha.getHostText()).add(String.valueOf(ha.getPort())).toString())
.collect(Collectors.joining(","));
}
}
Loading