From d759877b7c19886a4d1fa413ca5a18072ddf0955 Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Sat, 27 Apr 2024 22:07:11 +0200 Subject: [PATCH] feat(schema-connect): add SSL support for KafkaConnect --- .../jikkou/http/client/SslConfigSupport.java | 32 +++++++++---- .../http/client/SslConfigSupportTest.java | 27 +++++++++++ .../jikkou/kafka/connect/api/AuthMethod.java | 14 +----- .../connect/api/KafkaConnectApiFactory.java | 4 ++ .../connect/api/KafkaConnectClientConfig.java | 45 ++++++++++++------- up | 2 + 6 files changed, 87 insertions(+), 37 deletions(-) create mode 100644 extension-rest-client/src/test/java/io/streamthoughts/jikkou/http/client/SslConfigSupportTest.java diff --git a/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/SslConfigSupport.java b/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/SslConfigSupport.java index a2b60f1a9..ea4c17001 100644 --- a/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/SslConfigSupport.java +++ b/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/SslConfigSupport.java @@ -9,56 +9,72 @@ import io.streamthoughts.jikkou.core.config.ConfigProperty; import io.streamthoughts.jikkou.core.config.Configuration; import io.streamthoughts.jikkou.http.client.ssl.SSLConfig; +import java.util.Optional; +import org.jetbrains.annotations.NotNull; public interface SslConfigSupport { + String SSL_KEY_STORE_LOCATION = "sslKeyStoreLocation"; + String SSL_KEY_STORE_TYPE = "sslKeyStoreType"; + String SSL_KEY_STORE_PASSWORD = "sslKeyStorePassword"; + String SSL_KEY_PASSWORD = "sslKeyPassword"; + String SSL_TRUST_STORE_LOCATION = "sslTrustStoreLocation"; + String SSL_TRUST_STORE_PASSWORD = "sslTrustStorePassword"; + String SSL_TRUST_STORE_TYPE = "sslTrustStoreType"; + String SSL_IGNORE_HOSTNAME_VERIFICATION = "sslIgnoreHostnameVerification"; + static ConfigProperty sslKeyStoreLocation(final String configNamespace) { return ConfigProperty - .ofString(configNamespace + ".sslKeyStoreLocation") + .ofString(prefixWithNamespace(configNamespace, SSL_KEY_STORE_LOCATION)) .description("The location of the key store file."); } + private static @NotNull String prefixWithNamespace(final String configNamespace, + final String sslKeyStoreLocation) { + return Optional.ofNullable(configNamespace).map(s -> s + ".").orElse("") + sslKeyStoreLocation; + } + static ConfigProperty sslKeyStoreType(final String configNamespace) { return ConfigProperty - .ofString(configNamespace + ".sslKeyStoreType") + .ofString(prefixWithNamespace(configNamespace, SSL_KEY_STORE_TYPE)) .description("The file format of the key store file.") .orElse("PKCS12"); } static ConfigProperty sslKeyStorePassword(final String configNamespace) { return ConfigProperty - .ofString(configNamespace + ".sslKeyStorePassword") + .ofString(prefixWithNamespace(configNamespace, SSL_KEY_STORE_PASSWORD)) .description("The password for the key store file."); } static ConfigProperty sslKeyPassword(final String configNamespace) { return ConfigProperty - .ofString(configNamespace + ".sslKeyPassword") + .ofString(prefixWithNamespace(configNamespace, SSL_KEY_PASSWORD)) .description("The password of the private key in the key store file."); } static ConfigProperty sslTrustStoreLocation(final String configNamespace) { return ConfigProperty - .ofString(configNamespace + ".sslTrustStoreLocation") + .ofString(prefixWithNamespace(configNamespace, SSL_TRUST_STORE_LOCATION)) .description("The location of the trust store file."); } static ConfigProperty sslTrustStoreType(final String configNamespace) { return ConfigProperty - .ofString(configNamespace + ".sslTrustStoreType") + .ofString(prefixWithNamespace(configNamespace, SSL_TRUST_STORE_TYPE)) .description("The file format of the trust store file.") .orElse("PKCS12"); } static ConfigProperty sslTrustStorePassword(final String configNamespace) { return ConfigProperty - .ofString(configNamespace + ".sslTrustStorePassword") + .ofString(prefixWithNamespace(configNamespace, SSL_TRUST_STORE_PASSWORD)) .description("The password for the trust store file."); } static ConfigProperty sslIgnoreHostnameVerification(final String configNamespace) { return ConfigProperty - .ofBoolean(configNamespace + ".sslIgnoreHostnameVerification") + .ofBoolean(prefixWithNamespace(configNamespace, SSL_IGNORE_HOSTNAME_VERIFICATION)) .description("Specifies whether to ignore the hostname verification.") .orElse(false); } diff --git a/extension-rest-client/src/test/java/io/streamthoughts/jikkou/http/client/SslConfigSupportTest.java b/extension-rest-client/src/test/java/io/streamthoughts/jikkou/http/client/SslConfigSupportTest.java new file mode 100644 index 000000000..94347adc9 --- /dev/null +++ b/extension-rest-client/src/test/java/io/streamthoughts/jikkou/http/client/SslConfigSupportTest.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) The original authors + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.jikkou.http.client; + +import io.streamthoughts.jikkou.core.config.Configuration; +import org.junit.jupiter.api.Test; + +class SslConfigSupportTest { + + @Test + void shouldCreateSslConfigWithoutNamespace() { + SslConfigSupport.getSslConfig(null, Configuration + .builder() + .with(SslConfigSupport.SSL_KEY_STORE_PASSWORD, "password") + .with(SslConfigSupport.SSL_KEY_STORE_LOCATION, "/tmp/keystore.jks") + .with(SslConfigSupport.SSL_KEY_STORE_TYPE, "jks") + .with(SslConfigSupport.SSL_TRUST_STORE_LOCATION, "/tmp/truststore.jks") + .with(SslConfigSupport.SSL_TRUST_STORE_PASSWORD, "password") + .with(SslConfigSupport.SSL_TRUST_STORE_TYPE, "jks") + .build() + ); + } +} \ No newline at end of file diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/AuthMethod.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/AuthMethod.java index a09cb38e1..deb39f5fa 100644 --- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/AuthMethod.java +++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/AuthMethod.java @@ -6,19 +6,9 @@ */ package io.streamthoughts.jikkou.kafka.connect.api; -import java.util.Arrays; -import java.util.Locale; -import org.jetbrains.annotations.NotNull; - public enum AuthMethod { INVALID, BASICAUTH, - NONE; - - public static AuthMethod getForNameIgnoreCase(final @NotNull String str) { - return Arrays.stream(AuthMethod.values()) - .filter(e -> e.name().equals(str.toUpperCase(Locale.ROOT))) - .findFirst() - .orElse(AuthMethod.INVALID); - } + NONE, + SSL } diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/KafkaConnectApiFactory.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/KafkaConnectApiFactory.java index fb884edb0..60ad62776 100644 --- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/KafkaConnectApiFactory.java +++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/KafkaConnectApiFactory.java @@ -60,6 +60,10 @@ public static KafkaConnectApi create(@NotNull KafkaConnectClientConfig config, builder.header("Authorization", buildAuthorizationHeader); yield builder; } + case SSL -> { + builder.sslConfig(config.getSslConfig()); + yield builder; + } case NONE -> builder; case INVALID -> throw new IllegalStateException("Unexpected value: " + config.getAuthMethod()); diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/KafkaConnectClientConfig.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/KafkaConnectClientConfig.java index 37a859210..30c57e20f 100644 --- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/KafkaConnectClientConfig.java +++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/KafkaConnectClientConfig.java @@ -6,37 +6,40 @@ */ package io.streamthoughts.jikkou.kafka.connect.api; +import io.streamthoughts.jikkou.common.utils.Enums; import io.streamthoughts.jikkou.core.config.ConfigProperty; import io.streamthoughts.jikkou.core.config.Configuration; +import io.streamthoughts.jikkou.http.client.SslConfigSupport; +import io.streamthoughts.jikkou.http.client.ssl.SSLConfig; import java.util.Objects; public class KafkaConnectClientConfig { public static final ConfigProperty KAFKA_CONNECT_NAME = ConfigProperty - .ofString("name") - .description("Name of the kafka connect cluster."); + .ofString("name") + .description("Name of the kafka connect cluster."); public static final ConfigProperty KAFKA_CONNECT_URL = ConfigProperty - .ofString("url") - .description("URL to establish connection to kafka connect cluster."); + .ofString("url") + .description("URL to establish connection to kafka connect cluster."); public static final ConfigProperty KAFKA_CONNECT_AUTH_METHOD = ConfigProperty - .ofString("authMethod") - .orElse(AuthMethod.NONE.name()) - .description("Method to use for authenticating on Kafka Connect cluster. Available values are: [none, basicauth]"); + .ofString("authMethod") + .orElse(AuthMethod.NONE.name()) + .description("Method to use for authenticating on Kafka Connect cluster. Available values are: [none, basicauth, ssl]"); public static final ConfigProperty KAFKA_CONNECT_BASIC_AUTH_USERNAME = ConfigProperty - .ofString("basicAuthUser") - .description("Use when 'kafkaConnect.authMethod' is 'basicauth' to specify the username for Authorization Basic header"); + .ofString("basicAuthUser") + .description("Use when 'kafkaConnect.authMethod' is 'basicauth' to specify the username for Authorization Basic header"); public static final ConfigProperty KAFKA_CONNECT_BASIC_AUTH_PASSWORD = ConfigProperty - .ofString("basicAuthPassword") - .description("Use when 'kafkaConnect.authMethod' is 'basicauth' to specify the password for Authorization Basic header"); + .ofString("basicAuthPassword") + .description("Use when 'kafkaConnect.authMethod' is 'basicauth' to specify the password for Authorization Basic header"); public static final ConfigProperty KAFKA_CONNECT_DEBUG_LOGGING_ENABLED = ConfigProperty - .ofBoolean("debugLoggingEnabled") - .description("Enable debug logging.") - .orElse(false); + .ofBoolean("debugLoggingEnabled") + .description("Enable debug logging.") + .orElse(false); private final Configuration configuration; @@ -58,7 +61,7 @@ public String getConnectUrl() { } public AuthMethod getAuthMethod() { - return AuthMethod.getForNameIgnoreCase(KAFKA_CONNECT_AUTH_METHOD.get(configuration)); + return Enums.getForNameIgnoreCase(KAFKA_CONNECT_AUTH_METHOD.get(configuration), AuthMethod.class, AuthMethod.INVALID); } public String getBasicAuthUsername() { @@ -77,7 +80,13 @@ public boolean getDebugLoggingEnabled() { return KAFKA_CONNECT_DEBUG_LOGGING_ENABLED.get(configuration); } - /** {@inheritDoc} **/ + public SSLConfig getSslConfig() { + return SslConfigSupport.getSslConfig(null, configuration); + } + + /** + * {@inheritDoc} + **/ @Override public boolean equals(Object o) { if (this == o) return true; @@ -86,7 +95,9 @@ public boolean equals(Object o) { return Objects.equals(configuration, that.configuration); } - /** {@inheritDoc} **/ + /** + * {@inheritDoc} + **/ @Override public int hashCode() { return Objects.hash(configuration); diff --git a/up b/up index e8d0f4915..80a81989f 100755 --- a/up +++ b/up @@ -5,7 +5,9 @@ set -e BASEDIR=$(dirname "$(readlink -f $0)") DOCKERDIR="$BASEDIR" +if [ -z "$DOCKER_STACK" ]; then DOCKER_STACK="$DOCKERDIR/docker-compose.yml" +fi line() { echo -e "\n----------------------------------------------------------------------------------------------------------------------------------------\n"