diff --git a/docs/src/main/sphinx/connector/kafka.rst b/docs/src/main/sphinx/connector/kafka.rst index 82a313c94b96..f6106953c63f 100644 --- a/docs/src/main/sphinx/connector/kafka.rst +++ b/docs/src/main/sphinx/connector/kafka.rst @@ -39,13 +39,19 @@ Configuration To configure the Kafka connector, create a catalog properties file ``etc/catalog/kafka.properties`` with the following contents, -replacing the properties as appropriate: +replacing the properties as appropriate. + +In some cases, such as when using specialized authentication methods, it is necessary to specify +additional Kafka client properties in order to access your Kafka cluster. To do so, +add the ``kafka.config.resources`` property to reference your Kafka config files. Note that configs +can be overwritten if defined explicitly in ``kafka.properties``: .. code-block:: text connector.name=kafka kafka.table-names=table1,table2 kafka.nodes=host1:port,host2:port + kafka.config.resources=/etc/kafka-configuration.properties Multiple Kafka clusters ^^^^^^^^^^^^^^^^^^^^^^^ @@ -61,9 +67,9 @@ Configuration properties The following configuration properties are available: -========================================================== ============================================================================== +========================================================== ====================================================================================================== Property Name Description -========================================================== ============================================================================== +========================================================== ====================================================================================================== ``kafka.default-schema`` Default schema name for tables ``kafka.nodes`` List of nodes in the Kafka cluster ``kafka.buffer-size`` Kafka read buffer size @@ -79,7 +85,10 @@ Property Name Description ``kafka.ssl.truststore.type`` File format of the truststore file, defaults to ``JKS`` ``kafka.ssl.key.password`` Password for the private key in the keystore file ``kafka.ssl.endpoint-identification-algorithm`` Endpoint identification algorithm used by clients to validate server host name, defaults to ``https`` -========================================================== ============================================================================== +``kafka.config.resources`` A comma-separated list of Kafka client configuration files. These files must exist on the + machines running Trino. Only specify this if absolutely necessary to access Kafka. + Example: ``/etc/kafka-configuration.properties`` +========================================================== ====================================================================================================== In addition, you need to configure :ref:`table schema and schema registry usage ` with the relevant properties. diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaAdminFactory.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/DefaultKafkaAdminFactory.java similarity index 71% rename from plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaAdminFactory.java rename to plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/DefaultKafkaAdminFactory.java index 0b710f31929e..76af81e3bba0 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaAdminFactory.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/DefaultKafkaAdminFactory.java @@ -16,44 +16,41 @@ import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSession; -import org.apache.kafka.common.security.auth.SecurityProtocol; import javax.inject.Inject; +import java.util.Map; import java.util.Properties; import java.util.Set; +import static io.trino.plugin.kafka.utils.PropertiesUtils.readProperties; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.joining; -import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; -public class PlainTextKafkaAdminFactory +public class DefaultKafkaAdminFactory implements KafkaAdminFactory { private final Set nodes; - private final SecurityProtocol securityProtocol; + private final Map configurationProperties; @Inject - public PlainTextKafkaAdminFactory( - KafkaConfig kafkaConfig, - KafkaSecurityConfig securityConfig) + public DefaultKafkaAdminFactory(KafkaConfig kafkaConfig) + throws Exception { requireNonNull(kafkaConfig, "kafkaConfig is null"); - requireNonNull(securityConfig, "securityConfig is null"); - nodes = kafkaConfig.getNodes(); - securityProtocol = securityConfig.getSecurityProtocol(); + configurationProperties = readProperties(kafkaConfig.getResourceConfigFiles()); } @Override public Properties configure(ConnectorSession session) { Properties properties = new Properties(); + properties.putAll(configurationProperties); properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, nodes.stream() .map(HostAddress::toString) .collect(joining(","))); - properties.setProperty(SECURITY_PROTOCOL_CONFIG, securityProtocol.name()); return properties; } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaConsumerFactory.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/DefaultKafkaConsumerFactory.java similarity index 80% rename from plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaConsumerFactory.java rename to plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/DefaultKafkaConsumerFactory.java index 10580d4919d5..946104bf5bc2 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaConsumerFactory.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/DefaultKafkaConsumerFactory.java @@ -16,55 +16,52 @@ import io.airlift.units.DataSize; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSession; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import javax.inject.Inject; +import java.util.Map; import java.util.Properties; import java.util.Set; +import static io.trino.plugin.kafka.utils.PropertiesUtils.readProperties; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.joining; -import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.RECEIVE_BUFFER_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; -public class PlainTextKafkaConsumerFactory +public class DefaultKafkaConsumerFactory implements KafkaConsumerFactory { private final Set nodes; private final DataSize kafkaBufferSize; - private final SecurityProtocol securityProtocol; + private final Map configurationProperties; @Inject - public PlainTextKafkaConsumerFactory( - KafkaConfig kafkaConfig, - KafkaSecurityConfig securityConfig) + public DefaultKafkaConsumerFactory(KafkaConfig kafkaConfig) + throws Exception { requireNonNull(kafkaConfig, "kafkaConfig is null"); - requireNonNull(securityConfig, "securityConfig is null"); - nodes = kafkaConfig.getNodes(); kafkaBufferSize = kafkaConfig.getKafkaBufferSize(); - securityProtocol = securityConfig.getSecurityProtocol(); + configurationProperties = readProperties(kafkaConfig.getResourceConfigFiles()); } @Override public Properties configure(ConnectorSession session) { Properties properties = new Properties(); + properties.putAll(configurationProperties); properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, nodes.stream() .map(HostAddress::toString) .collect(joining(","))); properties.setProperty(KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); properties.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - properties.setProperty(RECEIVE_BUFFER_CONFIG, Long.toString(kafkaBufferSize.toBytes())); properties.setProperty(ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(false)); - properties.setProperty(SECURITY_PROTOCOL_CONFIG, securityProtocol.name()); + properties.setProperty(RECEIVE_BUFFER_CONFIG, Long.toString(kafkaBufferSize.toBytes())); return properties; } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaProducerFactory.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/DefaultKafkaProducerFactory.java similarity index 79% rename from plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaProducerFactory.java rename to plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/DefaultKafkaProducerFactory.java index 5fb358bbc5f0..9750a17e685d 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaProducerFactory.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/DefaultKafkaProducerFactory.java @@ -15,43 +15,43 @@ import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSession; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArraySerializer; import javax.inject.Inject; +import java.util.Map; import java.util.Properties; import java.util.Set; +import static io.trino.plugin.kafka.utils.PropertiesUtils.readProperties; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.joining; -import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -public class PlainTextKafkaProducerFactory +public class DefaultKafkaProducerFactory implements KafkaProducerFactory { private final Set nodes; - private final SecurityProtocol securityProtocol; + private final Map configurationProperties; @Inject - public PlainTextKafkaProducerFactory(KafkaConfig kafkaConfig, KafkaSecurityConfig securityConfig) + public DefaultKafkaProducerFactory(KafkaConfig kafkaConfig) + throws Exception { requireNonNull(kafkaConfig, "kafkaConfig is null"); - requireNonNull(securityConfig, "securityConfig is null"); - nodes = kafkaConfig.getNodes(); - securityProtocol = securityConfig.getSecurityProtocol(); + configurationProperties = readProperties(kafkaConfig.getResourceConfigFiles()); } @Override public Properties configure(ConnectorSession session) { Properties properties = new Properties(); + properties.putAll(configurationProperties); properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, nodes.stream() .map(HostAddress::toString) .collect(joining(","))); @@ -59,7 +59,6 @@ public Properties configure(ConnectorSession session) properties.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); properties.setProperty(ACKS_CONFIG, "all"); properties.setProperty(LINGER_MS_CONFIG, Long.toString(5)); - properties.setProperty(SECURITY_PROTOCOL_CONFIG, securityProtocol.name()); return properties; } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaClientsModule.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaClientsModule.java index de2069b3bd3e..da108a10c23f 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaClientsModule.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaClientsModule.java @@ -30,7 +30,8 @@ public class KafkaClientsModule protected void setup(Binder binder) { configBinder(binder).bindConfig(KafkaSecurityConfig.class); - installClientModule(SecurityProtocol.PLAINTEXT, KafkaClientsModule::configurePlainText); + installClientModule(null, KafkaClientsModule::configureDefault); + installClientModule(SecurityProtocol.PLAINTEXT, KafkaClientsModule::configureDefault); installClientModule(SecurityProtocol.SSL, KafkaClientsModule::configureSsl); } @@ -38,22 +39,22 @@ private void installClientModule(SecurityProtocol securityProtocol, Module modul { install(conditionalModule( KafkaSecurityConfig.class, - config -> config.getSecurityProtocol().equals(securityProtocol), + config -> config.getSecurityProtocol().orElse(null) == securityProtocol, module)); } - private static void configurePlainText(Binder binder) + private static void configureDefault(Binder binder) { - binder.bind(KafkaConsumerFactory.class).to(PlainTextKafkaConsumerFactory.class).in(Scopes.SINGLETON); - binder.bind(KafkaProducerFactory.class).to(PlainTextKafkaProducerFactory.class).in(Scopes.SINGLETON); - binder.bind(KafkaAdminFactory.class).to(PlainTextKafkaAdminFactory.class).in(Scopes.SINGLETON); + binder.bind(KafkaConsumerFactory.class).to(DefaultKafkaConsumerFactory.class).in(Scopes.SINGLETON); + binder.bind(KafkaProducerFactory.class).to(DefaultKafkaProducerFactory.class).in(Scopes.SINGLETON); + binder.bind(KafkaAdminFactory.class).to(DefaultKafkaAdminFactory.class).in(Scopes.SINGLETON); } private static void configureSsl(Binder binder) { - binder.bind(KafkaConsumerFactory.class).annotatedWith(ForKafkaSsl.class).to(PlainTextKafkaConsumerFactory.class).in(Scopes.SINGLETON); - binder.bind(KafkaProducerFactory.class).annotatedWith(ForKafkaSsl.class).to(PlainTextKafkaProducerFactory.class).in(Scopes.SINGLETON); - binder.bind(KafkaAdminFactory.class).annotatedWith(ForKafkaSsl.class).to(PlainTextKafkaAdminFactory.class).in(Scopes.SINGLETON); + binder.bind(KafkaConsumerFactory.class).annotatedWith(ForKafkaSsl.class).to(DefaultKafkaConsumerFactory.class).in(Scopes.SINGLETON); + binder.bind(KafkaProducerFactory.class).annotatedWith(ForKafkaSsl.class).to(DefaultKafkaProducerFactory.class).in(Scopes.SINGLETON); + binder.bind(KafkaAdminFactory.class).annotatedWith(ForKafkaSsl.class).to(DefaultKafkaAdminFactory.class).in(Scopes.SINGLETON); binder.bind(KafkaConsumerFactory.class).to(SslKafkaConsumerFactory.class).in(Scopes.SINGLETON); binder.bind(KafkaProducerFactory.class).to(SslKafkaProducerFactory.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java index 78e24db15428..9472067cc67f 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java @@ -14,10 +14,12 @@ package io.trino.plugin.kafka; import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.DefunctConfig; +import io.airlift.configuration.validation.FileExists; import io.airlift.units.DataSize; import io.airlift.units.DataSize.Unit; import io.trino.plugin.kafka.schema.file.FileTableDescriptionSupplier; @@ -27,9 +29,12 @@ import javax.validation.constraints.NotNull; import javax.validation.constraints.Size; +import java.io.File; +import java.util.List; import java.util.Set; import java.util.stream.StreamSupport; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; @DefunctConfig("kafka.connect-timeout") @@ -44,6 +49,7 @@ public class KafkaConfig private int messagesPerSplit = 100_000; private boolean timestampUpperBoundPushDownEnabled; private String tableDescriptionSupplier = FileTableDescriptionSupplier.NAME; + private List resourceConfigFiles = ImmutableList.of(); @Size(min = 1) public Set getNodes() @@ -152,4 +158,20 @@ public KafkaConfig setTimestampUpperBoundPushDownEnabled(boolean timestampUpperB this.timestampUpperBoundPushDownEnabled = timestampUpperBoundPushDownEnabled; return this; } + + @NotNull + public List<@FileExists File> getResourceConfigFiles() + { + return resourceConfigFiles; + } + + @Config("kafka.config.resources") + @ConfigDescription("Optional config files") + public KafkaConfig setResourceConfigFiles(List files) + { + this.resourceConfigFiles = files.stream() + .map(File::new) + .collect(toImmutableList()); + return this; + } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSecurityConfig.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSecurityConfig.java index e806cc97002b..f739f0f5f43c 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSecurityConfig.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSecurityConfig.java @@ -18,7 +18,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import javax.annotation.PostConstruct; -import javax.validation.constraints.NotNull; + +import java.util.Optional; import static com.google.common.base.Preconditions.checkState; import static java.lang.String.format; @@ -27,12 +28,11 @@ public class KafkaSecurityConfig { - private SecurityProtocol securityProtocol = PLAINTEXT; + private SecurityProtocol securityProtocol; - @NotNull - public SecurityProtocol getSecurityProtocol() + public Optional getSecurityProtocol() { - return securityProtocol; + return Optional.ofNullable(securityProtocol); } @Config("kafka.security-protocol") @@ -47,7 +47,7 @@ public KafkaSecurityConfig setSecurityProtocol(SecurityProtocol securityProtocol public void validate() { checkState( - securityProtocol.equals(PLAINTEXT) || securityProtocol.equals(SSL), - format("Only %s and %s security protocols are supported", PLAINTEXT, SSL)); + securityProtocol == null || securityProtocol.equals(PLAINTEXT) || securityProtocol.equals(SSL), + format("Only %s and %s security protocols are supported. See 'kafka.config.resources' if other security protocols are needed", PLAINTEXT, SSL)); } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/security/KafkaSecurityModule.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/security/KafkaSecurityModule.java index 4ea6a49318f6..0a708c2c5521 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/security/KafkaSecurityModule.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/security/KafkaSecurityModule.java @@ -36,7 +36,7 @@ private void bindSecurityModule(SecurityProtocol securityProtocol, Module module { install(conditionalModule( KafkaSecurityConfig.class, - config -> config.getSecurityProtocol().equals(securityProtocol), + config -> config.getSecurityProtocol().isPresent() && config.getSecurityProtocol().get().equals(securityProtocol), module)); } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/security/KafkaSslConfig.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/security/KafkaSslConfig.java index c196582a6189..e9dc56554980 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/security/KafkaSslConfig.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/security/KafkaSslConfig.java @@ -29,6 +29,7 @@ import static io.trino.plugin.kafka.security.KafkaEndpointIdentificationAlgorithm.HTTPS; import static io.trino.plugin.kafka.security.KafkaKeystoreTruststoreType.JKS; +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG; import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; @@ -37,6 +38,7 @@ import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG; +import static org.apache.kafka.common.security.auth.SecurityProtocol.SSL; /** * {@KafkaSslConfig} manages Kafka SSL authentication and encryption between clients and brokers. @@ -170,6 +172,7 @@ public Map getKafkaClientProperties() getTruststoreType().ifPresent(v -> properties.put(SSL_TRUSTSTORE_TYPE_CONFIG, v.name())); getKeyPassword().ifPresent(v -> properties.put(SSL_KEY_PASSWORD_CONFIG, v)); getEndpointIdentificationAlgorithm().ifPresent(v -> properties.put(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, v.getValue())); + properties.put(SECURITY_PROTOCOL_CONFIG, SSL.name()); return properties.buildOrThrow(); } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/utils/PropertiesUtils.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/utils/PropertiesUtils.java new file mode 100644 index 000000000000..6704cb2f5aeb --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/utils/PropertiesUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.utils; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; + +public final class PropertiesUtils +{ + private PropertiesUtils() {} + + public static Map readProperties(List resourcePaths) + throws IOException + { + Properties connectionProperties = new Properties(); + for (File resourcePath : resourcePaths) { + checkArgument(resourcePath.exists(), "File does not exist: %s", resourcePath); + + try (InputStream in = new FileInputStream(resourcePath)) { + connectionProperties.load(in); + } + } + return ImmutableMap.copyOf(Maps.fromProperties(connectionProperties)); + } +} diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConfig.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConfig.java index 5f1a61806c99..5b04622c5db0 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConfig.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConfig.java @@ -13,10 +13,15 @@ */ package io.trino.plugin.kafka; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.trino.plugin.kafka.schema.file.FileTableDescriptionSupplier; import org.testng.annotations.Test; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; import java.util.Map; import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; @@ -35,12 +40,17 @@ public void testDefaults() .setTableDescriptionSupplier(FileTableDescriptionSupplier.NAME) .setHideInternalColumns(true) .setMessagesPerSplit(100_000) - .setTimestampUpperBoundPushDownEnabled(false)); + .setTimestampUpperBoundPushDownEnabled(false) + .setResourceConfigFiles(List.of())); } @Test public void testExplicitPropertyMappings() + throws IOException { + Path resource1 = Files.createTempFile(null, null); + Path resource2 = Files.createTempFile(null, null); + Map properties = ImmutableMap.builder() .put("kafka.default-schema", "kafka") .put("kafka.table-description-supplier", "test") @@ -49,6 +59,7 @@ public void testExplicitPropertyMappings() .put("kafka.hide-internal-columns", "false") .put("kafka.messages-per-split", "1") .put("kafka.timestamp-upper-bound-force-push-down-enabled", "true") + .put("kafka.config.resources", resource1.toString() + "," + resource2.toString()) .buildOrThrow(); KafkaConfig expected = new KafkaConfig() @@ -58,7 +69,8 @@ public void testExplicitPropertyMappings() .setKafkaBufferSize("1MB") .setHideInternalColumns(false) .setMessagesPerSplit(1) - .setTimestampUpperBoundPushDownEnabled(true); + .setTimestampUpperBoundPushDownEnabled(true) + .setResourceConfigFiles(ImmutableList.of(resource1.toString(), resource2.toString())); assertFullMapping(properties, expected); } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaPlugin.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaPlugin.java index 2f1f862cf929..7f669ae3bfa9 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaPlugin.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaPlugin.java @@ -27,6 +27,7 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static io.airlift.testing.Assertions.assertInstanceOf; +import static org.apache.kafka.common.security.auth.SecurityProtocol.SSL; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertNotNull; @@ -34,17 +35,20 @@ public class TestKafkaPlugin { @Test public void testSpinup() + throws IOException { KafkaPlugin plugin = new KafkaPlugin(); ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories()); assertInstanceOf(factory, KafkaConnectorFactory.class); + Path resource = Files.createTempFile("kafka", ".properties"); Connector connector = factory.create( "test-connector", ImmutableMap.builder() .put("kafka.table-names", "test") .put("kafka.nodes", "localhost:9092") + .put("kafka.config.resources", resource.toString()) .buildOrThrow(), new TestingConnectorContext()); assertNotNull(connector); @@ -147,6 +151,51 @@ public void testSslTruststoreMissingFileSpindown() .hasMessageContaining("Error: Invalid configuration property kafka.ssl.truststore.location: file does not exist: /not/a/real/path"); } + @Test + public void testResourceConfigMissingFileSpindown() + { + KafkaPlugin plugin = new KafkaPlugin(); + + ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories()); + assertInstanceOf(factory, KafkaConnectorFactory.class); + + assertThatThrownBy(() -> factory.create( + "test-connector", + ImmutableMap.builder() + .put("kafka.table-names", "test") + .put("kafka.nodes", "localhost:9092") + .put("kafka.security-protocol", "PLAINTEXT") + .put("kafka.config.resources", "/not/a/real/path/1,/not/a/real/path/2") + .buildOrThrow(), + new TestingConnectorContext())) + .hasMessageContainingAll("Error: Invalid configuration property", ": file does not exist: /not/a/real/path/1", ": file does not exist: /not/a/real/path/2"); + } + + @Test + public void testConfigResourceSpinup() + throws IOException + { + KafkaPlugin plugin = new KafkaPlugin(); + + ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories()); + assertInstanceOf(factory, KafkaConnectorFactory.class); + + String nativeContent = "security.protocol=" + SSL; + Path nativeKafkaResourcePath = Files.createTempFile("native_kafka", ".properties"); + writeToFile(nativeKafkaResourcePath, nativeContent); + + Connector connector = factory.create( + "test-connector", + ImmutableMap.builder() + .put("kafka.table-names", "test") + .put("kafka.nodes", "localhost:9092") + .put("kafka.config.resources", nativeKafkaResourcePath.toString()) + .buildOrThrow(), + new TestingConnectorContext()); + assertNotNull(connector); + connector.shutdown(); + } + private void writeToFile(Path filepath, String content) throws IOException { diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaSecurityConfig.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaSecurityConfig.java index ad88c8801850..8fe23c6c040e 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaSecurityConfig.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaSecurityConfig.java @@ -35,7 +35,7 @@ public class TestKafkaSecurityConfig public void testDefaults() { assertRecordedDefaults(recordDefaults(KafkaSecurityConfig.class) - .setSecurityProtocol(PLAINTEXT)); + .setSecurityProtocol(null)); } @Test @@ -72,7 +72,7 @@ public void testInvalidSecurityProtocol(SecurityProtocol securityProtocol) .setSecurityProtocol(securityProtocol) .validate()) .isInstanceOf(IllegalStateException.class) - .hasMessage("Only PLAINTEXT and SSL security protocols are supported"); + .hasMessage("Only PLAINTEXT and SSL security protocols are supported. See 'kafka.config.resources' if other security protocols are needed"); } @DataProvider(name = "invalidSecurityProtocols") diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaSslConfig.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaSslConfig.java index cc89fa0858b8..6abb0a18faa8 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaSslConfig.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaSslConfig.java @@ -33,6 +33,7 @@ import static io.trino.plugin.kafka.security.KafkaKeystoreTruststoreType.JKS; import static io.trino.plugin.kafka.security.KafkaKeystoreTruststoreType.PKCS12; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG; import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; @@ -41,6 +42,7 @@ import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG; +import static org.apache.kafka.common.security.auth.SecurityProtocol.SSL; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -119,6 +121,7 @@ public void testAllConfigPropertiesAreContained() SSL_TRUSTSTORE_PASSWORD_CONFIG, "superSavePasswordForTruststore", SSL_TRUSTSTORE_TYPE_CONFIG, JKS.name(), SSL_KEY_PASSWORD_CONFIG, "aSslKeyPassword", + SECURITY_PROTOCOL_CONFIG, SSL.name(), SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, HTTPS.getValue()))); } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/utils/TestPropertiesUtils.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/utils/TestPropertiesUtils.java new file mode 100644 index 000000000000..ed25914f0736 --- /dev/null +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/utils/TestPropertiesUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.utils; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + +import static io.trino.plugin.kafka.utils.PropertiesUtils.readProperties; +import static org.testng.Assert.assertEquals; + +public class TestPropertiesUtils +{ + @Test + public void testReadPropertiesOverwritten() + throws IOException + { + Map expected = ImmutableMap.builder() + .put("security.protocol", "OVERWRITTEN") + .put("group.id", "consumer") + .put("client.id", "producer") + .buildOrThrow(); + + Properties firstProperties = new Properties(); + firstProperties.putAll(ImmutableMap.builder() + .put("security.protocol", "SASL_PLAINTEXT") + .put("group.id", "consumer") + .buildOrThrow()); + File firstFile = this.writePropertiesToFile(firstProperties); + + Properties secondProperties = new Properties(); + secondProperties.putAll(ImmutableMap.builder() + .put("security.protocol", "OVERWRITTEN") + .put("client.id", "producer") + .buildOrThrow()); + File secondFile = this.writePropertiesToFile(secondProperties); + + Map result = readProperties(Arrays.asList(firstFile, secondFile)); + + assertEquals(result, expected); + } + + private File writePropertiesToFile(Properties properties) + throws IOException + { + Path path = Files.createTempFile(null, null); + try (OutputStream outputStream = new FileOutputStream(path.toString())) { + properties.store(outputStream, null); + } + return path.toFile(); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/EnvironmentModule.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/EnvironmentModule.java index 989911c8740c..cab277309f2d 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/EnvironmentModule.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/EnvironmentModule.java @@ -23,6 +23,7 @@ import io.trino.tests.product.launcher.env.common.HadoopKerberosKms; import io.trino.tests.product.launcher.env.common.HydraIdentityProvider; import io.trino.tests.product.launcher.env.common.Kafka; +import io.trino.tests.product.launcher.env.common.KafkaSaslPlaintext; import io.trino.tests.product.launcher.env.common.KafkaSsl; import io.trino.tests.product.launcher.env.common.Kerberos; import io.trino.tests.product.launcher.env.common.Phoenix; @@ -69,6 +70,7 @@ public void configure(Binder binder) binder.bind(HydraIdentityProvider.class).in(SINGLETON); binder.bind(Kafka.class).in(SINGLETON); binder.bind(KafkaSsl.class).in(SINGLETON); + binder.bind(KafkaSaslPlaintext.class).in(SINGLETON); binder.bind(Standard.class).in(SINGLETON); binder.bind(StandardMultinode.class).in(SINGLETON); binder.bind(Phoenix.class).in(SINGLETON); diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/KafkaSaslPlaintext.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/KafkaSaslPlaintext.java new file mode 100644 index 000000000000..b667f3032bcb --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/KafkaSaslPlaintext.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.tests.product.launcher.env.common; + +import com.google.common.collect.ImmutableList; +import io.trino.tests.product.launcher.env.Environment; +import org.testcontainers.containers.BindMode; + +import javax.inject.Inject; + +import java.time.Duration; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class KafkaSaslPlaintext + implements EnvironmentExtender +{ + private final Kafka kafka; + + @Inject + public KafkaSaslPlaintext(Kafka kafka) + { + this.kafka = requireNonNull(kafka, "kafka is null"); + } + + @Override + public void extendEnvironment(Environment.Builder builder) + { + builder.configureContainer(Kafka.KAFKA, container -> container + .withStartupAttempts(3) + .withStartupTimeout(Duration.ofMinutes(5)) + .withEnv("KAFKA_LISTENERS", "SASL_PLAINTEXT://kafka:9092") + .withEnv("KAFKA_ADVERTISED_LISTENERS", "SASL_PLAINTEXT://kafka:9092") + .withEnv("KAFKA_SECURITY_INTER_BROKER_PROTOCOL", "SASL_PLAINTEXT") + .withEnv("KAFKA_SECURITY_PROTOCOL", "SASL_PLAINTEXT") + .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN") + .withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN") + .withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/tmp/kafka_server_jaas.conf") + .withEnv("KAFKA_LISTENER_NAME_SASL_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"admin\" " + + "password=\"admin-secret\" " + + "user_admin=\"admin-secret\";") + .withEnv("ZOOKEEPER_SASL_ENABLED", "false") + .withClasspathResourceMapping("docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka_server_jaas.conf", "/tmp/kafka_server_jaas.conf", BindMode.READ_ONLY)); + builder.configureContainer(Kafka.SCHEMA_REGISTRY, container -> container + .withStartupAttempts(3) + .withStartupTimeout(Duration.ofMinutes(5)) + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "SASL_PLAINTEXT://kafka:9092") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "SASL_PLAINTEXT") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM", "PLAIN") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG", "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"admin\" " + + "password=\"admin-secret\";") + .withClasspathResourceMapping("docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka_server_jaas.conf", "/tmp/kafka_server_jaas.conf", BindMode.READ_ONLY)); + } + + @Override + public List getDependencies() + { + return ImmutableList.of(kafka); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/KafkaSsl.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/KafkaSsl.java index caf149198cba..86251cfd77f6 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/KafkaSsl.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/KafkaSsl.java @@ -53,6 +53,7 @@ public void extendEnvironment(Environment.Builder builder) .withEnv("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", "https") .withEnv("KAFKA_SSL_CLIENT_AUTH", "required") .withEnv("KAFKA_SECURITY_INTER_BROKER_PROTOCOL", "SSL") + .withEnv("KAFKA_SECURITY_PROTOCOL", "SSL") .withClasspathResourceMapping("docker/presto-product-tests/conf/environment/multinode-kafka-ssl/secrets", "/etc/kafka/secrets", BindMode.READ_ONLY)); builder.configureContainer(Kafka.SCHEMA_REGISTRY, container -> container .withStartupAttempts(3) diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeKafkaSaslPlaintext.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeKafkaSaslPlaintext.java new file mode 100644 index 000000000000..c13dfce0a90d --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeKafkaSaslPlaintext.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.tests.product.launcher.env.environment; + +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.docker.DockerFiles.ResourceProvider; +import io.trino.tests.product.launcher.env.DockerContainer; +import io.trino.tests.product.launcher.env.Environment; +import io.trino.tests.product.launcher.env.EnvironmentProvider; +import io.trino.tests.product.launcher.env.common.KafkaSaslPlaintext; +import io.trino.tests.product.launcher.env.common.StandardMultinode; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; + +import javax.inject.Inject; + +import static io.trino.tests.product.launcher.env.EnvironmentContainers.COORDINATOR; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.WORKER; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.configureTempto; +import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_PRESTO_ETC; +import static java.util.Objects.requireNonNull; +import static org.testcontainers.utility.MountableFile.forHostPath; + +@TestsEnvironment +public final class EnvMultinodeKafkaSaslPlaintext + extends EnvironmentProvider +{ + private final ResourceProvider configDir; + + @Inject + public EnvMultinodeKafkaSaslPlaintext(KafkaSaslPlaintext kafka, StandardMultinode standardMultinode, DockerFiles dockerFiles) + { + super(standardMultinode, kafka); + requireNonNull(dockerFiles, "dockerFiles is null"); + configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment/multinode-kafka-sasl-plaintext/"); + } + + @Override + public void extendEnvironment(Environment.Builder builder) + { + builder.configureContainer(COORDINATOR, this::addCatalogs); + builder.configureContainer(WORKER, this::addCatalogs); + + configureTempto(builder, configDir); + builder.configureContainer(TESTS, container -> container + .withCopyFileToContainer( + forHostPath(configDir.getPath("kafka-configuration.properties")), + CONTAINER_PRESTO_ETC + "/kafka-configuration.properties")); + } + + private void addCatalogs(DockerContainer container) + { + container + .withCopyFileToContainer( + forHostPath(configDir.getPath("kafka_schema_registry.properties")), + CONTAINER_PRESTO_ETC + "/catalog/kafka_schema_registry.properties") + .withCopyFileToContainer( + forHostPath(configDir.getPath("kafka.properties")), + CONTAINER_PRESTO_ETC + "/catalog/kafka.properties") + .withCopyFileToContainer( + forHostPath(configDir.getPath("kafka-configuration.properties")), + CONTAINER_PRESTO_ETC + "/kafka-configuration.properties"); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite6NonGeneric.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite6NonGeneric.java index 49060cc7caf2..c4b84a126b6a 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite6NonGeneric.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite6NonGeneric.java @@ -17,6 +17,7 @@ import io.trino.tests.product.launcher.env.EnvironmentConfig; import io.trino.tests.product.launcher.env.EnvironmentDefaults; import io.trino.tests.product.launcher.env.environment.EnvMultinodeKafka; +import io.trino.tests.product.launcher.env.environment.EnvMultinodeKafkaSaslPlaintext; import io.trino.tests.product.launcher.env.environment.EnvMultinodeKafkaSsl; import io.trino.tests.product.launcher.env.environment.EnvMultinodePhoenix4; import io.trino.tests.product.launcher.env.environment.EnvMultinodePhoenix5; @@ -45,6 +46,7 @@ public List getTestRuns(EnvironmentConfig config) testOnEnvironment(EnvSinglenodeCassandra.class).withGroups("cassandra").build(), testOnEnvironment(EnvMultinodeKafka.class).withGroups("kafka").build(), testOnEnvironment(EnvMultinodeKafkaSsl.class).withGroups("kafka").build(), + testOnEnvironment(EnvMultinodeKafkaSaslPlaintext.class).withGroups("kafka").build(), testOnEnvironment(EnvMultinodePhoenix4.class).withGroups("phoenix").build(), testOnEnvironment(EnvMultinodePhoenix5.class).withGroups("phoenix").build()); } diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka-configuration.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka-configuration.properties new file mode 100644 index 000000000000..7f2cdb9e3532 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka-configuration.properties @@ -0,0 +1,5 @@ +security.protocol=SASL_PLAINTEXT +sasl.mechanism=PLAIN +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ + username="admin" \ + password="admin-secret"; diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka.properties new file mode 100644 index 000000000000..b791b6839447 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka.properties @@ -0,0 +1,20 @@ +connector.name=kafka +kafka.table-names=product_tests.read_simple_key_and_value,\ + product_tests.read_all_datatypes_raw,\ + product_tests.read_all_datatypes_csv,\ + product_tests.read_all_datatypes_json,\ + product_tests.read_all_datatypes_avro,\ + product_tests.read_all_null_avro,\ + product_tests.read_structural_datatype_avro,\ + product_tests.write_simple_key_and_value,\ + product_tests.write_all_datatypes_raw,\ + product_tests.write_all_datatypes_csv,\ + product_tests.write_all_datatypes_json,\ + product_tests.write_all_datatypes_avro,\ + product_tests.write_structural_datatype_avro,\ + product_tests.pushdown_partition,\ + product_tests.pushdown_offset,\ + product_tests.pushdown_create_time +kafka.nodes=kafka:9092 +kafka.table-description-dir=/docker/presto-product-tests/conf/presto/etc/catalog/kafka +kafka.config.resources=/docker/presto-product-tests/conf/presto/etc/kafka-configuration.properties diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka_schema_registry.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka_schema_registry.properties new file mode 100644 index 000000000000..26377bf0ae7d --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka_schema_registry.properties @@ -0,0 +1,6 @@ +connector.name=kafka +kafka.nodes=kafka:9092 +kafka.table-description-supplier=confluent +kafka.confluent-schema-registry-url=http://schema-registry:8081 +kafka.default-schema=product_tests +kafka.config.resources=/docker/presto-product-tests/conf/presto/etc/kafka-configuration.properties diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka_server_jaas.conf b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka_server_jaas.conf new file mode 100644 index 000000000000..706c3758ec42 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/kafka_server_jaas.conf @@ -0,0 +1,6 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin-secret" + user_admin="admin-secret"; +}; diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/tempto-configuration.yaml b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/tempto-configuration.yaml new file mode 100644 index 000000000000..f109c071838c --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-sasl-plaintext/tempto-configuration.yaml @@ -0,0 +1,11 @@ +schema-registry: + url: http://schema-registry:8081 + +databases: + kafka: + broker: + host: kafka + port: 9092 + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"