Skip to content

Commit

Permalink
Add support for optional Kafka properties from external file
Browse files Browse the repository at this point in the history
  • Loading branch information
klDen committed Mar 31, 2022
1 parent 736c160 commit be4b572
Show file tree
Hide file tree
Showing 25 changed files with 471 additions and 57 deletions.
17 changes: 13 additions & 4 deletions docs/src/main/sphinx/connector/kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,19 @@ Configuration

To configure the Kafka connector, create a catalog properties file
``etc/catalog/kafka.properties`` with the following contents,
replacing the properties as appropriate:
replacing the properties as appropriate.

In some cases, such as when using specialized authentication methods, it is necessary to specify
additional Kafka client properties in order to access your Kafka cluster. To do so,
add the ``kafka.config.resources`` property to reference your Kafka config files. Note that configs
can be overwritten if defined explicitly in ``kafka.properties``:

.. code-block:: text
connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port
kafka.config.resources=/etc/kafka-configuration.properties
Multiple Kafka clusters
^^^^^^^^^^^^^^^^^^^^^^^
Expand All @@ -61,9 +67,9 @@ Configuration properties

The following configuration properties are available:

========================================================== ==============================================================================
========================================================== ======================================================================================================
Property Name Description
========================================================== ==============================================================================
========================================================== ======================================================================================================
``kafka.default-schema`` Default schema name for tables
``kafka.nodes`` List of nodes in the Kafka cluster
``kafka.buffer-size`` Kafka read buffer size
Expand All @@ -79,7 +85,10 @@ Property Name Description
``kafka.ssl.truststore.type`` File format of the truststore file, defaults to ``JKS``
``kafka.ssl.key.password`` Password for the private key in the keystore file
``kafka.ssl.endpoint-identification-algorithm`` Endpoint identification algorithm used by clients to validate server host name, defaults to ``https``
========================================================== ==============================================================================
``kafka.config.resources`` A comma-separated list of Kafka client configuration files. These files must exist on the
machines running Trino. Only specify this if absolutely necessary to access Kafka.
Example: ``/etc/kafka-configuration.properties``
========================================================== ======================================================================================================

In addition, you need to configure :ref:`table schema and schema registry usage
<kafka-table-schema-registry>` with the relevant properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,41 @@

import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSession;
import org.apache.kafka.common.security.auth.SecurityProtocol;

import javax.inject.Inject;

import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static io.trino.plugin.kafka.utils.PropertiesUtils.readProperties;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;

public class PlainTextKafkaAdminFactory
public class DefaultKafkaAdminFactory
implements KafkaAdminFactory
{
private final Set<HostAddress> nodes;
private final SecurityProtocol securityProtocol;
private final Map<String, String> configurationProperties;

@Inject
public PlainTextKafkaAdminFactory(
KafkaConfig kafkaConfig,
KafkaSecurityConfig securityConfig)
public DefaultKafkaAdminFactory(KafkaConfig kafkaConfig)
throws Exception
{
requireNonNull(kafkaConfig, "kafkaConfig is null");
requireNonNull(securityConfig, "securityConfig is null");

nodes = kafkaConfig.getNodes();
securityProtocol = securityConfig.getSecurityProtocol();
configurationProperties = readProperties(kafkaConfig.getResourceConfigFiles());
}

@Override
public Properties configure(ConnectorSession session)
{
Properties properties = new Properties();
properties.putAll(configurationProperties);
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, nodes.stream()
.map(HostAddress::toString)
.collect(joining(",")));
properties.setProperty(SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,55 +16,52 @@
import io.airlift.units.DataSize;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSession;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import javax.inject.Inject;

import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static io.trino.plugin.kafka.utils.PropertiesUtils.readProperties;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.RECEIVE_BUFFER_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;

public class PlainTextKafkaConsumerFactory
public class DefaultKafkaConsumerFactory
implements KafkaConsumerFactory
{
private final Set<HostAddress> nodes;
private final DataSize kafkaBufferSize;
private final SecurityProtocol securityProtocol;
private final Map<String, String> configurationProperties;

@Inject
public PlainTextKafkaConsumerFactory(
KafkaConfig kafkaConfig,
KafkaSecurityConfig securityConfig)
public DefaultKafkaConsumerFactory(KafkaConfig kafkaConfig)
throws Exception
{
requireNonNull(kafkaConfig, "kafkaConfig is null");
requireNonNull(securityConfig, "securityConfig is null");

nodes = kafkaConfig.getNodes();
kafkaBufferSize = kafkaConfig.getKafkaBufferSize();
securityProtocol = securityConfig.getSecurityProtocol();
configurationProperties = readProperties(kafkaConfig.getResourceConfigFiles());
}

@Override
public Properties configure(ConnectorSession session)
{
Properties properties = new Properties();
properties.putAll(configurationProperties);
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, nodes.stream()
.map(HostAddress::toString)
.collect(joining(",")));
properties.setProperty(KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
properties.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
properties.setProperty(RECEIVE_BUFFER_CONFIG, Long.toString(kafkaBufferSize.toBytes()));
properties.setProperty(ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(false));
properties.setProperty(SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
properties.setProperty(RECEIVE_BUFFER_CONFIG, Long.toString(kafkaBufferSize.toBytes()));
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,50 @@

import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSession;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import javax.inject.Inject;

import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static io.trino.plugin.kafka.utils.PropertiesUtils.readProperties;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

public class PlainTextKafkaProducerFactory
public class DefaultKafkaProducerFactory
implements KafkaProducerFactory
{
private final Set<HostAddress> nodes;
private final SecurityProtocol securityProtocol;
private final Map<String, String> configurationProperties;

@Inject
public PlainTextKafkaProducerFactory(KafkaConfig kafkaConfig, KafkaSecurityConfig securityConfig)
public DefaultKafkaProducerFactory(KafkaConfig kafkaConfig)
throws Exception
{
requireNonNull(kafkaConfig, "kafkaConfig is null");
requireNonNull(securityConfig, "securityConfig is null");

nodes = kafkaConfig.getNodes();
securityProtocol = securityConfig.getSecurityProtocol();
configurationProperties = readProperties(kafkaConfig.getResourceConfigFiles());
}

@Override
public Properties configure(ConnectorSession session)
{
Properties properties = new Properties();
properties.putAll(configurationProperties);
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, nodes.stream()
.map(HostAddress::toString)
.collect(joining(",")));
properties.setProperty(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.setProperty(ACKS_CONFIG, "all");
properties.setProperty(LINGER_MS_CONFIG, Long.toString(5));
properties.setProperty(SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,31 @@ public class KafkaClientsModule
protected void setup(Binder binder)
{
configBinder(binder).bindConfig(KafkaSecurityConfig.class);
installClientModule(SecurityProtocol.PLAINTEXT, KafkaClientsModule::configurePlainText);
installClientModule(null, KafkaClientsModule::configureDefault);
installClientModule(SecurityProtocol.PLAINTEXT, KafkaClientsModule::configureDefault);
installClientModule(SecurityProtocol.SSL, KafkaClientsModule::configureSsl);
}

private void installClientModule(SecurityProtocol securityProtocol, Module module)
{
install(conditionalModule(
KafkaSecurityConfig.class,
config -> config.getSecurityProtocol().equals(securityProtocol),
config -> config.getSecurityProtocol().orElse(null) == securityProtocol,
module));
}

private static void configurePlainText(Binder binder)
private static void configureDefault(Binder binder)
{
binder.bind(KafkaConsumerFactory.class).to(PlainTextKafkaConsumerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaProducerFactory.class).to(PlainTextKafkaProducerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaAdminFactory.class).to(PlainTextKafkaAdminFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaConsumerFactory.class).to(DefaultKafkaConsumerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaProducerFactory.class).to(DefaultKafkaProducerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaAdminFactory.class).to(DefaultKafkaAdminFactory.class).in(Scopes.SINGLETON);
}

private static void configureSsl(Binder binder)
{
binder.bind(KafkaConsumerFactory.class).annotatedWith(ForKafkaSsl.class).to(PlainTextKafkaConsumerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaProducerFactory.class).annotatedWith(ForKafkaSsl.class).to(PlainTextKafkaProducerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaAdminFactory.class).annotatedWith(ForKafkaSsl.class).to(PlainTextKafkaAdminFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaConsumerFactory.class).annotatedWith(ForKafkaSsl.class).to(DefaultKafkaConsumerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaProducerFactory.class).annotatedWith(ForKafkaSsl.class).to(DefaultKafkaProducerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaAdminFactory.class).annotatedWith(ForKafkaSsl.class).to(DefaultKafkaAdminFactory.class).in(Scopes.SINGLETON);

binder.bind(KafkaConsumerFactory.class).to(SslKafkaConsumerFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaProducerFactory.class).to(SslKafkaProducerFactory.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package io.trino.plugin.kafka;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.configuration.validation.FileExists;
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;
import io.trino.plugin.kafka.schema.file.FileTableDescriptionSupplier;
Expand All @@ -27,9 +29,12 @@
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;

import java.io.File;
import java.util.List;
import java.util.Set;
import java.util.stream.StreamSupport;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;

@DefunctConfig("kafka.connect-timeout")
Expand All @@ -44,6 +49,7 @@ public class KafkaConfig
private int messagesPerSplit = 100_000;
private boolean timestampUpperBoundPushDownEnabled;
private String tableDescriptionSupplier = FileTableDescriptionSupplier.NAME;
private List<File> resourceConfigFiles = ImmutableList.of();

@Size(min = 1)
public Set<HostAddress> getNodes()
Expand Down Expand Up @@ -152,4 +158,20 @@ public KafkaConfig setTimestampUpperBoundPushDownEnabled(boolean timestampUpperB
this.timestampUpperBoundPushDownEnabled = timestampUpperBoundPushDownEnabled;
return this;
}

@NotNull
public List<@FileExists File> getResourceConfigFiles()
{
return resourceConfigFiles;
}

@Config("kafka.config.resources")
@ConfigDescription("Optional config files")
public KafkaConfig setResourceConfigFiles(List<String> files)
{
this.resourceConfigFiles = files.stream()
.map(File::new)
.collect(toImmutableList());
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import org.apache.kafka.common.security.auth.SecurityProtocol;

import javax.annotation.PostConstruct;
import javax.validation.constraints.NotNull;

import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
Expand All @@ -27,12 +28,11 @@

public class KafkaSecurityConfig
{
private SecurityProtocol securityProtocol = PLAINTEXT;
private SecurityProtocol securityProtocol;

@NotNull
public SecurityProtocol getSecurityProtocol()
public Optional<SecurityProtocol> getSecurityProtocol()
{
return securityProtocol;
return Optional.ofNullable(securityProtocol);
}

@Config("kafka.security-protocol")
Expand All @@ -47,7 +47,7 @@ public KafkaSecurityConfig setSecurityProtocol(SecurityProtocol securityProtocol
public void validate()
{
checkState(
securityProtocol.equals(PLAINTEXT) || securityProtocol.equals(SSL),
format("Only %s and %s security protocols are supported", PLAINTEXT, SSL));
securityProtocol == null || securityProtocol.equals(PLAINTEXT) || securityProtocol.equals(SSL),
format("Only %s and %s security protocols are supported. See 'kafka.config.resources' if other security protocols are needed", PLAINTEXT, SSL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private void bindSecurityModule(SecurityProtocol securityProtocol, Module module
{
install(conditionalModule(
KafkaSecurityConfig.class,
config -> config.getSecurityProtocol().equals(securityProtocol),
config -> config.getSecurityProtocol().isPresent() && config.getSecurityProtocol().get().equals(securityProtocol),
module));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static io.trino.plugin.kafka.security.KafkaEndpointIdentificationAlgorithm.HTTPS;
import static io.trino.plugin.kafka.security.KafkaKeystoreTruststoreType.JKS;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
Expand All @@ -37,6 +38,7 @@
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
import static org.apache.kafka.common.security.auth.SecurityProtocol.SSL;

/**
* {@KafkaSslConfig} manages Kafka SSL authentication and encryption between clients and brokers.
Expand Down Expand Up @@ -170,6 +172,7 @@ public Map<String, Object> getKafkaClientProperties()
getTruststoreType().ifPresent(v -> properties.put(SSL_TRUSTSTORE_TYPE_CONFIG, v.name()));
getKeyPassword().ifPresent(v -> properties.put(SSL_KEY_PASSWORD_CONFIG, v));
getEndpointIdentificationAlgorithm().ifPresent(v -> properties.put(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, v.getValue()));
properties.put(SECURITY_PROTOCOL_CONFIG, SSL.name());

return properties.buildOrThrow();
}
Expand Down
Loading

0 comments on commit be4b572

Please sign in to comment.