Skip to content

Commit

Permalink
feat(schema-connect): add SSL support for KafkaConnect
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Apr 27, 2024
1 parent 1857335 commit d759877
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,56 +9,72 @@
import io.streamthoughts.jikkou.core.config.ConfigProperty;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.http.client.ssl.SSLConfig;
import java.util.Optional;
import org.jetbrains.annotations.NotNull;

public interface SslConfigSupport {

String SSL_KEY_STORE_LOCATION = "sslKeyStoreLocation";
String SSL_KEY_STORE_TYPE = "sslKeyStoreType";
String SSL_KEY_STORE_PASSWORD = "sslKeyStorePassword";
String SSL_KEY_PASSWORD = "sslKeyPassword";
String SSL_TRUST_STORE_LOCATION = "sslTrustStoreLocation";
String SSL_TRUST_STORE_PASSWORD = "sslTrustStorePassword";
String SSL_TRUST_STORE_TYPE = "sslTrustStoreType";
String SSL_IGNORE_HOSTNAME_VERIFICATION = "sslIgnoreHostnameVerification";

static ConfigProperty<String> sslKeyStoreLocation(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslKeyStoreLocation")
.ofString(prefixWithNamespace(configNamespace, SSL_KEY_STORE_LOCATION))
.description("The location of the key store file.");
}

private static @NotNull String prefixWithNamespace(final String configNamespace,
final String sslKeyStoreLocation) {
return Optional.ofNullable(configNamespace).map(s -> s + ".").orElse("") + sslKeyStoreLocation;
}

static ConfigProperty<String> sslKeyStoreType(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslKeyStoreType")
.ofString(prefixWithNamespace(configNamespace, SSL_KEY_STORE_TYPE))
.description("The file format of the key store file.")
.orElse("PKCS12");
}

static ConfigProperty<String> sslKeyStorePassword(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslKeyStorePassword")
.ofString(prefixWithNamespace(configNamespace, SSL_KEY_STORE_PASSWORD))
.description("The password for the key store file.");
}

static ConfigProperty<String> sslKeyPassword(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslKeyPassword")
.ofString(prefixWithNamespace(configNamespace, SSL_KEY_PASSWORD))
.description("The password of the private key in the key store file.");
}

static ConfigProperty<String> sslTrustStoreLocation(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslTrustStoreLocation")
.ofString(prefixWithNamespace(configNamespace, SSL_TRUST_STORE_LOCATION))
.description("The location of the trust store file.");
}

static ConfigProperty<String> sslTrustStoreType(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslTrustStoreType")
.ofString(prefixWithNamespace(configNamespace, SSL_TRUST_STORE_TYPE))
.description("The file format of the trust store file.")
.orElse("PKCS12");
}

static ConfigProperty<String> sslTrustStorePassword(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslTrustStorePassword")
.ofString(prefixWithNamespace(configNamespace, SSL_TRUST_STORE_PASSWORD))
.description("The password for the trust store file.");
}

static ConfigProperty<Boolean> sslIgnoreHostnameVerification(final String configNamespace) {
return ConfigProperty
.ofBoolean(configNamespace + ".sslIgnoreHostnameVerification")
.ofBoolean(prefixWithNamespace(configNamespace, SSL_IGNORE_HOSTNAME_VERIFICATION))
.description("Specifies whether to ignore the hostname verification.")
.orElse(false);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) The original authors
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.streamthoughts.jikkou.http.client;

import io.streamthoughts.jikkou.core.config.Configuration;
import org.junit.jupiter.api.Test;

class SslConfigSupportTest {

@Test
void shouldCreateSslConfigWithoutNamespace() {
SslConfigSupport.getSslConfig(null, Configuration
.builder()
.with(SslConfigSupport.SSL_KEY_STORE_PASSWORD, "password")
.with(SslConfigSupport.SSL_KEY_STORE_LOCATION, "/tmp/keystore.jks")
.with(SslConfigSupport.SSL_KEY_STORE_TYPE, "jks")
.with(SslConfigSupport.SSL_TRUST_STORE_LOCATION, "/tmp/truststore.jks")
.with(SslConfigSupport.SSL_TRUST_STORE_PASSWORD, "password")
.with(SslConfigSupport.SSL_TRUST_STORE_TYPE, "jks")
.build()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,9 @@
*/
package io.streamthoughts.jikkou.kafka.connect.api;

import java.util.Arrays;
import java.util.Locale;
import org.jetbrains.annotations.NotNull;

public enum AuthMethod {
INVALID,
BASICAUTH,
NONE;

public static AuthMethod getForNameIgnoreCase(final @NotNull String str) {
return Arrays.stream(AuthMethod.values())
.filter(e -> e.name().equals(str.toUpperCase(Locale.ROOT)))
.findFirst()
.orElse(AuthMethod.INVALID);
}
NONE,
SSL
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public static KafkaConnectApi create(@NotNull KafkaConnectClientConfig config,
builder.header("Authorization", buildAuthorizationHeader);
yield builder;
}
case SSL -> {
builder.sslConfig(config.getSslConfig());
yield builder;
}
case NONE -> builder;

case INVALID -> throw new IllegalStateException("Unexpected value: " + config.getAuthMethod());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,40 @@
*/
package io.streamthoughts.jikkou.kafka.connect.api;

import io.streamthoughts.jikkou.common.utils.Enums;
import io.streamthoughts.jikkou.core.config.ConfigProperty;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.http.client.SslConfigSupport;
import io.streamthoughts.jikkou.http.client.ssl.SSLConfig;
import java.util.Objects;

public class KafkaConnectClientConfig {

public static final ConfigProperty<String> KAFKA_CONNECT_NAME = ConfigProperty
.ofString("name")
.description("Name of the kafka connect cluster.");
.ofString("name")
.description("Name of the kafka connect cluster.");

public static final ConfigProperty<String> KAFKA_CONNECT_URL = ConfigProperty
.ofString("url")
.description("URL to establish connection to kafka connect cluster.");
.ofString("url")
.description("URL to establish connection to kafka connect cluster.");

public static final ConfigProperty<String> KAFKA_CONNECT_AUTH_METHOD = ConfigProperty
.ofString("authMethod")
.orElse(AuthMethod.NONE.name())
.description("Method to use for authenticating on Kafka Connect cluster. Available values are: [none, basicauth]");
.ofString("authMethod")
.orElse(AuthMethod.NONE.name())
.description("Method to use for authenticating on Kafka Connect cluster. Available values are: [none, basicauth, ssl]");

public static final ConfigProperty<String> KAFKA_CONNECT_BASIC_AUTH_USERNAME = ConfigProperty
.ofString("basicAuthUser")
.description("Use when 'kafkaConnect.authMethod' is 'basicauth' to specify the username for Authorization Basic header");
.ofString("basicAuthUser")
.description("Use when 'kafkaConnect.authMethod' is 'basicauth' to specify the username for Authorization Basic header");

public static final ConfigProperty<String> KAFKA_CONNECT_BASIC_AUTH_PASSWORD = ConfigProperty
.ofString("basicAuthPassword")
.description("Use when 'kafkaConnect.authMethod' is 'basicauth' to specify the password for Authorization Basic header");
.ofString("basicAuthPassword")
.description("Use when 'kafkaConnect.authMethod' is 'basicauth' to specify the password for Authorization Basic header");

public static final ConfigProperty<Boolean> KAFKA_CONNECT_DEBUG_LOGGING_ENABLED = ConfigProperty
.ofBoolean("debugLoggingEnabled")
.description("Enable debug logging.")
.orElse(false);
.ofBoolean("debugLoggingEnabled")
.description("Enable debug logging.")
.orElse(false);

private final Configuration configuration;

Expand All @@ -58,7 +61,7 @@ public String getConnectUrl() {
}

public AuthMethod getAuthMethod() {
return AuthMethod.getForNameIgnoreCase(KAFKA_CONNECT_AUTH_METHOD.get(configuration));
return Enums.getForNameIgnoreCase(KAFKA_CONNECT_AUTH_METHOD.get(configuration), AuthMethod.class, AuthMethod.INVALID);
}

public String getBasicAuthUsername() {
Expand All @@ -77,7 +80,13 @@ public boolean getDebugLoggingEnabled() {
return KAFKA_CONNECT_DEBUG_LOGGING_ENABLED.get(configuration);
}

/** {@inheritDoc} **/
public SSLConfig getSslConfig() {
return SslConfigSupport.getSslConfig(null, configuration);
}

/**
* {@inheritDoc}
**/
@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -86,7 +95,9 @@ public boolean equals(Object o) {
return Objects.equals(configuration, that.configuration);
}

/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public int hashCode() {
return Objects.hash(configuration);
Expand Down
2 changes: 2 additions & 0 deletions up
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ set -e
BASEDIR=$(dirname "$(readlink -f $0)")
DOCKERDIR="$BASEDIR"

if [ -z "$DOCKER_STACK" ]; then
DOCKER_STACK="$DOCKERDIR/docker-compose.yml"
fi

line() {
echo -e "\n----------------------------------------------------------------------------------------------------------------------------------------\n"
Expand Down

0 comments on commit d759877

Please sign in to comment.