diff --git a/docker/sasl/docker-compose.yml b/docker/sasl/docker-compose.yml index dbc728c41..9abf17e7b 100644 --- a/docker/sasl/docker-compose.yml +++ b/docker/sasl/docker-compose.yml @@ -74,6 +74,8 @@ services: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components + CONNECT_REST_EXTENSION_CLASSES: org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension + KAFKA_OPTS: -Djava.security.auth.login.config=/tmp/connect.jaas volumes: - $PWD/scripts:/scripts - $PWD/connect-plugins:/usr/share/confluent-hub-components diff --git a/docker/sasl/kafka-connect/Dockerfile b/docker/sasl/kafka-connect/Dockerfile index b21f70f62..67ea870a8 100644 --- a/docker/sasl/kafka-connect/Dockerfile +++ b/docker/sasl/kafka-connect/Dockerfile @@ -4,5 +4,8 @@ ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" USER root +COPY connect.jaas /tmp/connect.jaas +COPY connect.password /tmp/connect.password + RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.5.0 \ && confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.1 diff --git a/docker/sasl/kafka-connect/connect.jaas b/docker/sasl/kafka-connect/connect.jaas new file mode 100644 index 000000000..568c9e4ff --- /dev/null +++ b/docker/sasl/kafka-connect/connect.jaas @@ -0,0 +1,4 @@ +KafkaConnect { + org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required + file="/tmp/connect.password"; +}; diff --git a/docker/sasl/kafka-connect/connect.password b/docker/sasl/kafka-connect/connect.password new file mode 100644 index 000000000..bc28d4d91 --- /dev/null +++ b/docker/sasl/kafka-connect/connect.password @@ -0,0 +1 @@ +user: pass \ No newline at end of file diff --git a/example/topology-builder-sasl-plain.properties b/example/topology-builder-sasl-plain.properties index 92f2e5c4a..78b4a6ef7 100644 --- a/example/topology-builder-sasl-plain.properties +++ b/example/topology-builder-sasl-plain.properties @@ -7,4 +7,6 @@ sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule require #topology.validations.1=com.purbon.kafka.topology.validation.topic.PartitionNumberValidation #confluent.schema.registry.url="http://localhost:8082" platform.servers.connect.0=connect:http://localhost:18083 +platform.servers.basic.auth.0=connect@user:pass + topology.state.cluster.enabled=false \ No newline at end of file diff --git a/src/main/java/com/purbon/kafka/topology/Configuration.java b/src/main/java/com/purbon/kafka/topology/Configuration.java index 4271d7709..10505de0c 100644 --- a/src/main/java/com/purbon/kafka/topology/Configuration.java +++ b/src/main/java/com/purbon/kafka/topology/Configuration.java @@ -414,14 +414,30 @@ public Map getKafkaConnectServers() { .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); } + public Map getServersBasicAuthMap() { + List basicAuths = config.getStringList(PLATFORM_SERVERS_BASIC_AUTH); + return basicAuths.stream() + .map(s -> s.split("@")) + .map( + strings -> { + String key = strings[0].strip(); // label + String value = String.join(":", Arrays.copyOfRange(strings, 1, strings.length)); + return new Pair<>(key, value); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + } + public KsqlClientConfig getKSQLClientConfig() { KsqlClientConfig.Builder ksqlConf = new KsqlClientConfig.Builder() .setServer(getProperty(PLATFORM_SERVER_KSQL_URL)) - .setTrustStore(getPropertyOrNull(PLATFORM_SERVER_KSQL_TRUSTSTORE, SSL_TRUSTSTORE_LOCATION)) - .setTrustStorePassword(getPropertyOrNull(PLATFORM_SERVER_KSQL_TRUSTSTORE_PW, SSL_TRUSTSTORE_PASSWORD)) + .setTrustStore( + getPropertyOrNull(PLATFORM_SERVER_KSQL_TRUSTSTORE, SSL_TRUSTSTORE_LOCATION)) + .setTrustStorePassword( + getPropertyOrNull(PLATFORM_SERVER_KSQL_TRUSTSTORE_PW, SSL_TRUSTSTORE_PASSWORD)) .setKeyStore(getPropertyOrNull(PLATFORM_SERVER_KSQL_KEYSTORE, SSL_KEYSTORE_LOCATION)) - .setKeyStorePassword(getPropertyOrNull(PLATFORM_SERVER_KSQL_KEYSTORE_PW, SSL_KEYSTORE_PASSWORD)); + .setKeyStorePassword( + getPropertyOrNull(PLATFORM_SERVER_KSQL_KEYSTORE_PW, SSL_KEYSTORE_PASSWORD)); if (hasProperty(PLATFORM_SERVER_KSQL_ALPN)) { ksqlConf.setUseAlpn(config.getBoolean(PLATFORM_SERVER_KSQL_ALPN)); } diff --git a/src/main/java/com/purbon/kafka/topology/Constants.java b/src/main/java/com/purbon/kafka/topology/Constants.java index a27666751..5a01c1825 100644 --- a/src/main/java/com/purbon/kafka/topology/Constants.java +++ b/src/main/java/com/purbon/kafka/topology/Constants.java @@ -83,6 +83,8 @@ public class Constants { public static final String GROUP_MANAGED_PREFIXES = "topology.group.managed.prefixes"; public static final String PLATFORM_SERVERS_CONNECT = "platform.servers.connect"; + public static final String PLATFORM_SERVERS_BASIC_AUTH = "platform.servers.basic.auth"; + public static final String PLATFORM_SERVER_KSQL_URL = "platform.server.ksql.url"; // XXX: consider re-using properties as they are used in ksql cli with --config-file public static final String PLATFORM_SERVER_KSQL_ALPN = "platform.server.ksql.useAlpn"; @@ -114,5 +116,4 @@ public class Constants { public static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location"; public static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password"; public static final String SSL_KEY_PASSWORD = "ssl.key.password"; - } diff --git a/src/main/java/com/purbon/kafka/topology/JulieOps.java b/src/main/java/com/purbon/kafka/topology/JulieOps.java index 4c6591623..6266f2143 100644 --- a/src/main/java/com/purbon/kafka/topology/JulieOps.java +++ b/src/main/java/com/purbon/kafka/topology/JulieOps.java @@ -173,7 +173,11 @@ private static KafkaConnectArtefactManager configureKConnectArtefactManager( Configuration config, String topologyFileOrDir) { Map clients = config.getKafkaConnectServers().entrySet().stream() - .map(entry -> new Pair<>(entry.getKey(), new KConnectApiClient(entry.getValue(), config))) + .map( + entry -> + new Pair<>( + entry.getKey(), + new KConnectApiClient(entry.getValue(), entry.getKey(), config))) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); if (clients.isEmpty()) { diff --git a/src/main/java/com/purbon/kafka/topology/api/connect/KConnectApiClient.java b/src/main/java/com/purbon/kafka/topology/api/connect/KConnectApiClient.java index 50259ca62..a59d792fd 100644 --- a/src/main/java/com/purbon/kafka/topology/api/connect/KConnectApiClient.java +++ b/src/main/java/com/purbon/kafka/topology/api/connect/KConnectApiClient.java @@ -6,6 +6,7 @@ import com.purbon.kafka.topology.clients.JulieHttpClient; import com.purbon.kafka.topology.model.Artefact; import com.purbon.kafka.topology.model.artefact.KafkaConnectArtefact; +import com.purbon.kafka.topology.utils.BasicAuth; import com.purbon.kafka.topology.utils.JSON; import java.io.IOException; import java.util.Collection; @@ -16,12 +17,18 @@ public class KConnectApiClient extends JulieHttpClient implements ArtefactClient { - public KConnectApiClient(String server) { - super(server); + public KConnectApiClient(String server, Configuration config) { + this(server, "", config); } - public KConnectApiClient(String server, Configuration config) { + public KConnectApiClient(String server, String label, Configuration config) { super(server, Optional.of(config)); + // configure basic authentication if available + var basicAuths = config.getServersBasicAuthMap(); + if (basicAuths.containsKey(label)) { + String[] values = basicAuths.get(label).split(":"); + setBasicAuth(new BasicAuth(values[0], values[1])); + } } @Override diff --git a/src/main/java/com/purbon/kafka/topology/api/mds/MDSApiClient.java b/src/main/java/com/purbon/kafka/topology/api/mds/MDSApiClient.java index 0af30ee6c..cca2e6b55 100644 --- a/src/main/java/com/purbon/kafka/topology/api/mds/MDSApiClient.java +++ b/src/main/java/com/purbon/kafka/topology/api/mds/MDSApiClient.java @@ -16,7 +16,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; - import org.apache.kafka.common.resource.ResourceType; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/src/main/java/com/purbon/kafka/topology/api/mds/MDSApiClientBuilder.java b/src/main/java/com/purbon/kafka/topology/api/mds/MDSApiClientBuilder.java index a89eb7e5f..e4c74b2eb 100644 --- a/src/main/java/com/purbon/kafka/topology/api/mds/MDSApiClientBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/api/mds/MDSApiClientBuilder.java @@ -1,11 +1,10 @@ package com.purbon.kafka.topology.api.mds; import com.purbon.kafka.topology.Configuration; +import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Optional; - public class MDSApiClientBuilder { private static final Logger LOGGER = LogManager.getLogger(MDSApiClientBuilder.class); diff --git a/src/main/java/com/purbon/kafka/topology/backend/RedisBackend.java b/src/main/java/com/purbon/kafka/topology/backend/RedisBackend.java index 8031f59df..99189a527 100644 --- a/src/main/java/com/purbon/kafka/topology/backend/RedisBackend.java +++ b/src/main/java/com/purbon/kafka/topology/backend/RedisBackend.java @@ -2,12 +2,11 @@ import com.purbon.kafka.topology.BackendController.Mode; import com.purbon.kafka.topology.utils.JSON; +import java.io.IOException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import redis.clients.jedis.Jedis; -import java.io.IOException; - public class RedisBackend implements Backend { private static final Logger LOGGER = LogManager.getLogger(RedisBackend.class); @@ -54,7 +53,6 @@ public BackendState load() throws IOException { return (BackendState) JSON.toObject(content, BackendState.class); } - private void connectIfNeed() { if (!jedis.isConnected()) { createOrOpen(); diff --git a/src/main/java/com/purbon/kafka/topology/clients/JulieHttpClient.java b/src/main/java/com/purbon/kafka/topology/clients/JulieHttpClient.java index 738fe3a68..3338a6785 100644 --- a/src/main/java/com/purbon/kafka/topology/clients/JulieHttpClient.java +++ b/src/main/java/com/purbon/kafka/topology/clients/JulieHttpClient.java @@ -3,7 +3,6 @@ import static java.net.http.HttpRequest.BodyPublishers.noBody; import static java.net.http.HttpRequest.BodyPublishers.ofString; -import com.google.cloud.storage.Storage; import com.purbon.kafka.topology.Configuration; import com.purbon.kafka.topology.api.mds.Response; import com.purbon.kafka.topology.utils.BasicAuth; @@ -24,13 +23,11 @@ import java.security.cert.CertificateException; import java.time.Duration; import java.util.Optional; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public abstract class JulieHttpClient { @@ -45,6 +42,7 @@ public abstract class JulieHttpClient { public JulieHttpClient(String server) { this(server, Optional.empty()); } + public JulieHttpClient(String server, Optional configOptional) { this.server = server; this.token = ""; @@ -81,13 +79,14 @@ private HttpClient configureHttpOrHttpsClient(Optional configOpti if (ks != null) { try { kmf.init(ks, config.getSslKeyStorePassword().get().toCharArray()); - } catch ( KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException ex) { + } catch (KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException ex) { LOGGER.error(ex); kmf = null; } } - KeyStore ts = loadKeyStore(config.getSslTrustStoreLocation(), config.getSslTrustStorePassword()); + KeyStore ts = + loadKeyStore(config.getSslTrustStoreLocation(), config.getSslTrustStorePassword()); if (ts != null) { try { tmf.init(ts); @@ -109,12 +108,11 @@ private HttpClient configureHttpOrHttpsClient(Optional configOpti LOGGER.error(e); } - return HttpClient.newBuilder() - .sslContext(sslContext) - .build(); + return HttpClient.newBuilder().sslContext(sslContext).build(); } - private KeyStore loadKeyStore(Optional sslStoreLocation, Optional sslStorePassword) { + private KeyStore loadKeyStore( + Optional sslStoreLocation, Optional sslStorePassword) { if (sslStoreLocation.isPresent() && sslStorePassword.isPresent()) { try { KeyStore ks = KeyStore.getInstance("PKCS12"); @@ -122,7 +120,10 @@ private KeyStore loadKeyStore(Optional sslStoreLocation, Optional { @@ -14,16 +13,15 @@ public class ConnectContainer extends GenericContainer { private static int CONNECT_PORT = 8083; private static int CONNECT_SSL_PORT = 8084; - public ConnectContainer(AlternativeKafkaContainer kafka, - String truststore, - String keystore) { + public ConnectContainer(AlternativeKafkaContainer kafka, String truststore, String keystore) { this(DEFAULT_IMAGE, kafka, truststore, keystore); } - public ConnectContainer(final DockerImageName dockerImageName, - AlternativeKafkaContainer kafka, - String truststore, - String keystore) { + public ConnectContainer( + final DockerImageName dockerImageName, + AlternativeKafkaContainer kafka, + String truststore, + String keystore) { super(dockerImageName); String kafkaHost = kafka.getNetworkAliases().get(1); @@ -42,23 +40,29 @@ public ConnectContainer(final DockerImageName dockerImageName, withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1"); withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1"); withEnv("CONNECT_PLUGIN_PATH", "/usr/share/java"); - withEnv("CONNECT_LISTENERS", "http://0.0.0.0:" + CONNECT_PORT + ", https://0.0.0.0:" + CONNECT_SSL_PORT); + withEnv( + "CONNECT_LISTENERS", + "http://0.0.0.0:" + CONNECT_PORT + ", https://0.0.0.0:" + CONNECT_SSL_PORT); withEnv("CONNECT_SASL_JAAS_CONFIG", saslConfig()); withEnv("CONNECT_SASL_MECHANISM", "PLAIN"); withEnv("CONNECT_SECURITY_PROTOCOL", "SASL_PLAINTEXT"); withEnv("CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", "HTTPS"); - withEnv("CONNECT_LISTENERS_HTTPS_SSL_TRUSTSTORE_LOCATION", "/etc/kafka-connect/secrets/server.truststore"); + withEnv( + "CONNECT_LISTENERS_HTTPS_SSL_TRUSTSTORE_LOCATION", + "/etc/kafka-connect/secrets/server.truststore"); withEnv("CONNECT_LISTENERS_HTTPS_SSL_TRUSTSTORE_PASSWORD", "ksqldb"); - withEnv("CONNECT_LISTENERS_HTTPS_SSL_KEYSTORE_LOCATION", "/etc/kafka-connect/secrets/server.keystore"); + withEnv( + "CONNECT_LISTENERS_HTTPS_SSL_KEYSTORE_LOCATION", + "/etc/kafka-connect/secrets/server.keystore"); withEnv("CONNECT_LISTENERS_HTTPS_SSL_KEYSTORE_PASSWORD", "ksqldb"); withEnv("CONNECT_LISTENERS_HTTPS_SSL_KEY_PASSWORD", "ksqldb"); this.withClasspathResourceMapping( truststore, "/etc/kafka-connect/secrets/server.truststore", BindMode.READ_ONLY) - .withClasspathResourceMapping( - keystore, "/etc/kafka-connect/secrets/server.keystore", BindMode.READ_ONLY); + .withClasspathResourceMapping( + keystore, "/etc/kafka-connect/secrets/server.keystore", BindMode.READ_ONLY); withNetworkAliases("connect"); withNetwork(kafka.getNetwork());