Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support connecting to Kerberos secured Kafka #7990, updated to Kafka 0.10.1.2 #8394

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
<version>0.10.2.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
Expand All @@ -782,10 +782,16 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.1.7</version>
<version>1.1.2.6</version>
</dependency>

<dependency>
Expand All @@ -811,7 +817,7 @@
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.8</version>
<version>0.10</version>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
Expand Down Expand Up @@ -921,6 +927,12 @@
<artifactId>cassandra-driver</artifactId>
<version>3.1.4-1</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
8 changes: 4 additions & 4 deletions presto-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ public class KafkaConnectorConfig
*/
private boolean hideInternalColumns = true;

/**
* Security protocol to connect to the broker, default is plain text
*/
private String securityProtocol = "PLAINTEXT";

private boolean autoCommit = true;

@NotNull
public File getTableDescriptionDir()
{
Expand Down Expand Up @@ -168,4 +175,28 @@ private static HostAddress toHostAddress(String value)
{
return HostAddress.fromString(value).withDefaultPort(KAFKA_DEFAULT_PORT);
}

@Config("kafka.security-protocol")
public KafkaConnectorConfig setSecurityProtocol(String securityProtocol)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an enum, so that the possible values can be validated

{
this.securityProtocol = securityProtocol;
return this;
}

public String getSecurityProtocol()
{
return securityProtocol;
}

@Config("kafka.auto-commit")
public KafkaConnectorConfig setAutoCommit(boolean autoCommit)
{
this.autoCommit = autoCommit;
return this;
}

public boolean isAutoCommit()
{
return autoCommit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void configure(Binder binder)
binder.bind(KafkaSplitManager.class).in(Scopes.SINGLETON);
binder.bind(KafkaRecordSetProvider.class).in(Scopes.SINGLETON);

binder.bind(KafkaSimpleConsumerManager.class).in(Scopes.SINGLETON);
binder.bind(KafkaConsumerManager.class).in(Scopes.SINGLETON);

configBinder(binder).bindConfig(KafkaConnectorConfig.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.kafka;

import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeManager;
import com.google.common.base.Throwables;
import io.airlift.log.Logger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import javax.inject.Inject;

import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

/**
* Manages connections to the Kafka nodes. A worker may connect to multiple Kafka nodes depending on the segments and partitions
* it needs to process. According to the Kafka source code, a Kafka {@link kafka.javaapi.consumer.SimpleConsumer} is thread-safe.
*/
public class KafkaConsumerManager
{
private static final Logger log = Logger.get(KafkaConsumerManager.class);
private static final String messageDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer";

private final String connectorId;
private final NodeManager nodeManager;
private final int connectTimeoutMillis;
private final int bufferSizeBytes;
private final String securityProtocol;
private final boolean autoCommit;
private final String bootStrapServers;

@Inject
public KafkaConsumerManager(
KafkaConnectorId connectorId,
KafkaConnectorConfig kafkaConnectorConfig,
NodeManager nodeManager)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
requireNonNull(kafkaConnectorConfig, "kafkaConfig is null");
this.connectTimeoutMillis = toIntExact(kafkaConnectorConfig.getKafkaConnectTimeout().toMillis());
this.bufferSizeBytes = toIntExact(kafkaConnectorConfig.getKafkaBufferSize().toBytes());
this.securityProtocol = kafkaConnectorConfig.getSecurityProtocol();
this.autoCommit = kafkaConnectorConfig.isAutoCommit();
this.bootStrapServers = bootstrapServers(kafkaConnectorConfig.getNodes());
}

public KafkaConsumer<byte[], byte[]> getConsumer()
{
return getConsumer(bootStrapServers);
}

public KafkaConsumer<byte[], byte[]> getConsumer(Set<HostAddress> nodes)
{
return getConsumer(bootstrapServers(nodes));
}

private KafkaConsumer<byte[], byte[]> getConsumer(String bootstrapServers)
{
try {
Thread.currentThread().setContextClassLoader(null);
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
prop.put(ConsumerConfig.GROUP_ID_CONFIG, connectorId + "-presto");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(autoCommit));
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, messageDeserializer);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, messageDeserializer);
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_" + connectorId + "_" + ThreadLocalRandom.current().nextInt() + "_" + System.currentTimeMillis());
prop.put("security.protocol", securityProtocol);
return new KafkaConsumer<byte[], byte[]>(prop);
}
catch (Exception e) {
throw Throwables.propagate(e.getCause());
}
}

private String bootstrapServers(Set<HostAddress> nodes)
{
return nodes.stream().map(ha -> new StringJoiner(":").add(ha.getHostText()).add(String.valueOf(ha.getPort())).toString())
.collect(Collectors.joining(","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.BooleanType;
import com.facebook.presto.spi.type.SmallintType;
import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -95,11 +97,22 @@ public class KafkaInternalFieldDescription
*/
public static final KafkaInternalFieldDescription KEY_LENGTH_FIELD = new KafkaInternalFieldDescription("_key_length", BigintType.BIGINT, "Total number of key bytes");

/**
* <tt>_message_timestamp</tt> - Message Timestamp.
*/
public static final KafkaInternalFieldDescription MESSAGE_TIMESTAMP_FIELD = new KafkaInternalFieldDescription("_message_timestamp", TimestampType.TIMESTAMP, "Message Timestamp");

/**
* <tt>_message_timestamp_type</tt> - Message Timestamp Type.
*/
public static final KafkaInternalFieldDescription MESSAGE_TIMESTAMP_TYPE_FIELD = new KafkaInternalFieldDescription("_message_timestamp_type", SmallintType.SMALLINT, "Message Timestamp Type");

public static Set<KafkaInternalFieldDescription> getInternalFields()
{
return ImmutableSet.of(PARTITION_ID_FIELD, PARTITION_OFFSET_FIELD,
SEGMENT_START_FIELD, SEGMENT_END_FIELD, SEGMENT_COUNT_FIELD,
KEY_FIELD, KEY_CORRUPT_FIELD, KEY_LENGTH_FIELD,
MESSAGE_TIMESTAMP_FIELD,MESSAGE_TIMESTAMP_TYPE_FIELD,
MESSAGE_FIELD, MESSAGE_CORRUPT_FIELD, MESSAGE_LENGTH_FIELD);
}

Expand Down
Loading