Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure basic auch for Kafka Connect REST api operations #339

Merged
merged 4 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/sasl/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ services:
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components
CONNECT_REST_EXTENSION_CLASSES: org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension
KAFKA_OPTS: -Djava.security.auth.login.config=/tmp/connect.jaas
volumes:
- $PWD/scripts:/scripts
- $PWD/connect-plugins:/usr/share/confluent-hub-components
Expand Down
3 changes: 3 additions & 0 deletions docker/sasl/kafka-connect/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

USER root

COPY connect.jaas /tmp/connect.jaas
COPY connect.password /tmp/connect.password

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.5.0 \
&& confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.1
4 changes: 4 additions & 0 deletions docker/sasl/kafka-connect/connect.jaas
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
KafkaConnect {
org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required
file="/tmp/connect.password";
};
1 change: 1 addition & 0 deletions docker/sasl/kafka-connect/connect.password
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
user: pass
2 changes: 2 additions & 0 deletions example/topology-builder-sasl-plain.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule require
#topology.validations.1=com.purbon.kafka.topology.validation.topic.PartitionNumberValidation
#confluent.schema.registry.url="http://localhost:8082"
platform.servers.connect.0=connect:http://localhost:18083
platform.servers.basic.auth.0=connect@user:pass

topology.state.cluster.enabled=false
22 changes: 19 additions & 3 deletions src/main/java/com/purbon/kafka/topology/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,30 @@ public Map<String, String> getKafkaConnectServers() {
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

public Map<String, String> getServersBasicAuthMap() {
List<String> basicAuths = config.getStringList(PLATFORM_SERVERS_BASIC_AUTH);
return basicAuths.stream()
.map(s -> s.split("@"))
.map(
strings -> {
String key = strings[0].strip(); // label
String value = String.join(":", Arrays.copyOfRange(strings, 1, strings.length));
return new Pair<>(key, value);
})
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

public KsqlClientConfig getKSQLClientConfig() {
KsqlClientConfig.Builder ksqlConf =
new KsqlClientConfig.Builder()
.setServer(getProperty(PLATFORM_SERVER_KSQL_URL))
.setTrustStore(getPropertyOrNull(PLATFORM_SERVER_KSQL_TRUSTSTORE, SSL_TRUSTSTORE_LOCATION))
.setTrustStorePassword(getPropertyOrNull(PLATFORM_SERVER_KSQL_TRUSTSTORE_PW, SSL_TRUSTSTORE_PASSWORD))
.setTrustStore(
getPropertyOrNull(PLATFORM_SERVER_KSQL_TRUSTSTORE, SSL_TRUSTSTORE_LOCATION))
.setTrustStorePassword(
getPropertyOrNull(PLATFORM_SERVER_KSQL_TRUSTSTORE_PW, SSL_TRUSTSTORE_PASSWORD))
.setKeyStore(getPropertyOrNull(PLATFORM_SERVER_KSQL_KEYSTORE, SSL_KEYSTORE_LOCATION))
.setKeyStorePassword(getPropertyOrNull(PLATFORM_SERVER_KSQL_KEYSTORE_PW, SSL_KEYSTORE_PASSWORD));
.setKeyStorePassword(
getPropertyOrNull(PLATFORM_SERVER_KSQL_KEYSTORE_PW, SSL_KEYSTORE_PASSWORD));
if (hasProperty(PLATFORM_SERVER_KSQL_ALPN)) {
ksqlConf.setUseAlpn(config.getBoolean(PLATFORM_SERVER_KSQL_ALPN));
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/purbon/kafka/topology/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class Constants {
public static final String GROUP_MANAGED_PREFIXES = "topology.group.managed.prefixes";

public static final String PLATFORM_SERVERS_CONNECT = "platform.servers.connect";
public static final String PLATFORM_SERVERS_BASIC_AUTH = "platform.servers.basic.auth";

public static final String PLATFORM_SERVER_KSQL_URL = "platform.server.ksql.url";
// XXX: consider re-using properties as they are used in ksql cli with --config-file
public static final String PLATFORM_SERVER_KSQL_ALPN = "platform.server.ksql.useAlpn";
Expand Down Expand Up @@ -114,5 +116,4 @@ public class Constants {
public static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location";
public static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password";
public static final String SSL_KEY_PASSWORD = "ssl.key.password";

}
6 changes: 5 additions & 1 deletion src/main/java/com/purbon/kafka/topology/JulieOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ private static KafkaConnectArtefactManager configureKConnectArtefactManager(
Configuration config, String topologyFileOrDir) {
Map<String, KConnectApiClient> clients =
config.getKafkaConnectServers().entrySet().stream()
.map(entry -> new Pair<>(entry.getKey(), new KConnectApiClient(entry.getValue(), config)))
.map(
entry ->
new Pair<>(
entry.getKey(),
new KConnectApiClient(entry.getValue(), entry.getKey(), config)))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

if (clients.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.purbon.kafka.topology.clients.JulieHttpClient;
import com.purbon.kafka.topology.model.Artefact;
import com.purbon.kafka.topology.model.artefact.KafkaConnectArtefact;
import com.purbon.kafka.topology.utils.BasicAuth;
import com.purbon.kafka.topology.utils.JSON;
import java.io.IOException;
import java.util.Collection;
Expand All @@ -16,12 +17,18 @@

public class KConnectApiClient extends JulieHttpClient implements ArtefactClient {

public KConnectApiClient(String server) {
super(server);
public KConnectApiClient(String server, Configuration config) {
this(server, "", config);
}

public KConnectApiClient(String server, Configuration config) {
public KConnectApiClient(String server, String label, Configuration config) {
super(server, Optional.of(config));
// configure basic authentication if available
var basicAuths = config.getServersBasicAuthMap();
if (basicAuths.containsKey(label)) {
String[] values = basicAuths.get(label).split(":");
setBasicAuth(new BasicAuth(values[0], values[1]));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.kafka.common.resource.ResourceType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.purbon.kafka.topology.api.mds;

import com.purbon.kafka.topology.Configuration;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Optional;

public class MDSApiClientBuilder {

private static final Logger LOGGER = LogManager.getLogger(MDSApiClientBuilder.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

import com.purbon.kafka.topology.BackendController.Mode;
import com.purbon.kafka.topology.utils.JSON;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import redis.clients.jedis.Jedis;

import java.io.IOException;

public class RedisBackend implements Backend {

private static final Logger LOGGER = LogManager.getLogger(RedisBackend.class);
Expand Down Expand Up @@ -54,7 +53,6 @@ public BackendState load() throws IOException {
return (BackendState) JSON.toObject(content, BackendState.class);
}


private void connectIfNeed() {
if (!jedis.isConnected()) {
createOrOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static java.net.http.HttpRequest.BodyPublishers.noBody;
import static java.net.http.HttpRequest.BodyPublishers.ofString;

import com.google.cloud.storage.Storage;
import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.api.mds.Response;
import com.purbon.kafka.topology.utils.BasicAuth;
Expand All @@ -24,13 +23,11 @@
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.Optional;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public abstract class JulieHttpClient {

Expand All @@ -45,6 +42,7 @@ public abstract class JulieHttpClient {
public JulieHttpClient(String server) {
this(server, Optional.empty());
}

public JulieHttpClient(String server, Optional<Configuration> configOptional) {
this.server = server;
this.token = "";
Expand Down Expand Up @@ -81,13 +79,14 @@ private HttpClient configureHttpOrHttpsClient(Optional<Configuration> configOpti
if (ks != null) {
try {
kmf.init(ks, config.getSslKeyStorePassword().get().toCharArray());
} catch ( KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException ex) {
} catch (KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException ex) {
LOGGER.error(ex);
kmf = null;
}
}

KeyStore ts = loadKeyStore(config.getSslTrustStoreLocation(), config.getSslTrustStorePassword());
KeyStore ts =
loadKeyStore(config.getSslTrustStoreLocation(), config.getSslTrustStorePassword());
if (ts != null) {
try {
tmf.init(ts);
Expand All @@ -109,20 +108,22 @@ private HttpClient configureHttpOrHttpsClient(Optional<Configuration> configOpti
LOGGER.error(e);
}

return HttpClient.newBuilder()
.sslContext(sslContext)
.build();
return HttpClient.newBuilder().sslContext(sslContext).build();
}

private KeyStore loadKeyStore(Optional<String> sslStoreLocation, Optional<String> sslStorePassword) {
private KeyStore loadKeyStore(
Optional<String> sslStoreLocation, Optional<String> sslStorePassword) {
if (sslStoreLocation.isPresent() && sslStorePassword.isPresent()) {
try {
KeyStore ks = KeyStore.getInstance("PKCS12");
char[] password = sslStorePassword.get().toCharArray();
InputStream is = Files.newInputStream(Path.of(sslStoreLocation.get()));
ks.load(is, password);
return ks;
} catch (KeyStoreException | IOException | NoSuchAlgorithmException | CertificateException ex) {
} catch (KeyStoreException
| IOException
| NoSuchAlgorithmException
| CertificateException ex) {
LOGGER.error(ex);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class RBACPredefinedRoles {

public static boolean isClusterScopedRole(String role) {
return !DEVELOPER_READ.equals(role)
&& !DEVELOPER_WRITE.equals(role)
&& !RESOURCE_OWNER.equals(role);
&& !DEVELOPER_WRITE.equals(role)
&& !RESOURCE_OWNER.equals(role);
}
}
1 change: 1 addition & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ kafka {
platform {
servers {
connect = []
basic.auth = []
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Properties;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.purbon.kafka.topology.integration.backend;

import static org.assertj.core.api.Assertions.assertThat;

import com.purbon.kafka.topology.backend.BackendState;
import com.purbon.kafka.topology.backend.RedisBackend;
import com.purbon.kafka.topology.model.artefact.KafkaConnectArtefact;
Expand All @@ -8,16 +10,13 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.kafka.common.resource.ResourceType;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

import static org.assertj.core.api.Assertions.assertThat;

public class RedisBackendIT {

@Rule
Expand Down Expand Up @@ -46,7 +45,6 @@ public void testStoreAndFetch() throws IOException {

rsp.save(state);


BackendState recoveredState = rsp.load();

Assert.assertEquals(2, recoveredState.getTopics().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

public class ConnectContainer extends GenericContainer<ConnectContainer> {
Expand All @@ -14,16 +13,15 @@ public class ConnectContainer extends GenericContainer<ConnectContainer> {
private static int CONNECT_PORT = 8083;
private static int CONNECT_SSL_PORT = 8084;

public ConnectContainer(AlternativeKafkaContainer kafka,
String truststore,
String keystore) {
public ConnectContainer(AlternativeKafkaContainer kafka, String truststore, String keystore) {
this(DEFAULT_IMAGE, kafka, truststore, keystore);
}

public ConnectContainer(final DockerImageName dockerImageName,
AlternativeKafkaContainer kafka,
String truststore,
String keystore) {
public ConnectContainer(
final DockerImageName dockerImageName,
AlternativeKafkaContainer kafka,
String truststore,
String keystore) {
super(dockerImageName);

String kafkaHost = kafka.getNetworkAliases().get(1);
Expand All @@ -42,23 +40,29 @@ public ConnectContainer(final DockerImageName dockerImageName,
withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1");
withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1");
withEnv("CONNECT_PLUGIN_PATH", "/usr/share/java");
withEnv("CONNECT_LISTENERS", "http://0.0.0.0:" + CONNECT_PORT + ", https://0.0.0.0:" + CONNECT_SSL_PORT);
withEnv(
"CONNECT_LISTENERS",
"http://0.0.0.0:" + CONNECT_PORT + ", https://0.0.0.0:" + CONNECT_SSL_PORT);

withEnv("CONNECT_SASL_JAAS_CONFIG", saslConfig());
withEnv("CONNECT_SASL_MECHANISM", "PLAIN");
withEnv("CONNECT_SECURITY_PROTOCOL", "SASL_PLAINTEXT");

withEnv("CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", "HTTPS");
withEnv("CONNECT_LISTENERS_HTTPS_SSL_TRUSTSTORE_LOCATION", "/etc/kafka-connect/secrets/server.truststore");
withEnv(
"CONNECT_LISTENERS_HTTPS_SSL_TRUSTSTORE_LOCATION",
"/etc/kafka-connect/secrets/server.truststore");
withEnv("CONNECT_LISTENERS_HTTPS_SSL_TRUSTSTORE_PASSWORD", "ksqldb");
withEnv("CONNECT_LISTENERS_HTTPS_SSL_KEYSTORE_LOCATION", "/etc/kafka-connect/secrets/server.keystore");
withEnv(
"CONNECT_LISTENERS_HTTPS_SSL_KEYSTORE_LOCATION",
"/etc/kafka-connect/secrets/server.keystore");
withEnv("CONNECT_LISTENERS_HTTPS_SSL_KEYSTORE_PASSWORD", "ksqldb");
withEnv("CONNECT_LISTENERS_HTTPS_SSL_KEY_PASSWORD", "ksqldb");

this.withClasspathResourceMapping(
truststore, "/etc/kafka-connect/secrets/server.truststore", BindMode.READ_ONLY)
.withClasspathResourceMapping(
keystore, "/etc/kafka-connect/secrets/server.keystore", BindMode.READ_ONLY);
.withClasspathResourceMapping(
keystore, "/etc/kafka-connect/secrets/server.keystore", BindMode.READ_ONLY);

withNetworkAliases("connect");
withNetwork(kafka.getNetwork());
Expand Down