Skip to content

Commit

Permalink
feat(schema-registry): add SSL support for SchemaRegistry (#425)
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Apr 27, 2024
1 parent acd7e69 commit 1857335
Show file tree
Hide file tree
Showing 10 changed files with 503 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,27 @@
*/
public final class Enums {

/**
* Gets the enum for specified string name.
*
* @param value The enum raw value.
* @param enumType The enum class type.
* @param <T> The enum type.
* @return The Enum.
* @throws IllegalArgumentException if no enum exists for the specified value.
*/
public static <T extends Enum<T>> T getForNameIgnoreCase(final @Nullable String value,
final @NotNull Class<T> enumType,
final T defaultValue) {
if (value == null) throw new IllegalArgumentException("Unsupported value 'null'");

T[] values = enumType.getEnumConstants();
return Arrays.stream(values)
.filter(e -> e.name().equals(value.toUpperCase(Locale.ROOT)))
.findFirst()
.orElse(defaultValue);
}

/**
* Gets the enum for specified string name.
*
Expand Down
114 changes: 114 additions & 0 deletions docker-compose-ssl.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) The original authors
#
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
#
version: '3.8'
services:
kafka:
image: "confluentinc/cp-kafka:7.5.0"
ports:
- "9092:9092"
container_name: kafka
environment:
KAFKA_NODE_ID: 101
# random cluster ID used for formatting LOG_DIR for KRaft
CLUSTER_ID: 'xtzWWN4bTjitpL3kfd9s5g'
KAFKA_CONTROLLER_QUORUM_VOTERS: '101@kafka:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://kafka:29093'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
KAFKA_AUTHORIZER_CLASS_NAME: org.apache.kafka.metadata.authorizer.StandardAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
KAFKA_SUPER_USERS: 'User:anonymous'
networks:
- kafka-platform

schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
hostname: schema-registry
container_name: schema-registry
ports:
- "8081:8081"
- "8082:8082"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: https://0.0.0.0:8081,http://0.0.0.0:8082
SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: "/app/certs/kafka.keystore.jks"
SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: "password"
SCHEMA_REGISTRY_SSL_KEYSTORE_TYPE: "JKS"
SCHEMA_REGISTRY_SSL_KEY_PASSWORD: "password"
SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: "/app/certs/kafka.truststore.jks"
SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: "password"
SCHEMA_REGISTRY_SSL_TRUSTSTORE_TYPE: "JKS"
SCHEMA_REGISTRY_SSL_CLIENT_AUTH: "true"
volumes:
- ./certs/:/app/certs
depends_on:
- kafka
networks:
- kafka-platform
akhq:
image: tchiotludo/akhq
hostname: akhq
container_name: akhq
depends_on:
- kafka
ports:
- "8087:8080"
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "kafka:29092"
schema-registry:
url: "http://schema-registry:8081"
connect:
- name: "connect"
url: "http://connect:8083"
networks:
- kafka-platform

connect:
image: confluentinc/cp-kafka-connect:7.5.0
container_name: connect
depends_on:
- kafka
ports:
- "8083:8083"
- "8000:8000"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_PLUGIN_PATH: "/usr/local/share/kafka/plugins,/usr/share/filestream-connectors"
networks:
- kafka-platform

networks:
kafka-platform:
driver: bridge
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,27 @@
package io.streamthoughts.jikkou.http.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.streamthoughts.jikkou.core.exceptions.JikkouRuntimeException;
import io.streamthoughts.jikkou.core.io.Jackson;
import io.streamthoughts.jikkou.http.client.internal.ProxyInvocationHandler;
import io.streamthoughts.jikkou.http.client.ssl.SSLConfig;
import io.streamthoughts.jikkou.http.client.ssl.SSLContextFactory;
import io.streamthoughts.jikkou.http.client.ssl.SSLUtils;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.core.MultivaluedHashMap;
import jakarta.ws.rs.ext.ContextResolver;
import jakarta.ws.rs.ext.Provider;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -27,17 +37,28 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.jackson.internal.jackson.jaxrs.json.JacksonJsonProvider;
import org.glassfish.jersey.logging.LoggingFeature;
import org.jetbrains.annotations.NotNull;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;

/**
* This class is used to abstract the way a REST API is build based on a given interface.
*/
public class RestClientBuilder {

private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(RestClientBuilder.class);

public static final AllowAllHostNameVerifier NO_HOST_NAME_VERIFIER = new AllowAllHostNameVerifier();

private URI baseUri;

private boolean followRedirects;
Expand All @@ -48,6 +69,8 @@ public class RestClientBuilder {

private final ClientBuilder clientBuilder;

private SSLContext sslContext;

private ObjectMapper objectMapper = Jackson.JSON_OBJECT_MAPPER;

/**
Expand Down Expand Up @@ -100,6 +123,31 @@ public RestClientBuilder baseUrl(URL url) {
}
}

/**
* Sets the truststore.
*
* @return {@code this}.
*/
public RestClientBuilder truststore(KeyStore keyStore) {
clientBuilder.trustStore(keyStore);
return this;
}

/**
* Sets the keystore.
*
* @return {@code this}.
*/
public RestClientBuilder keystore(KeyStore keyStore, String password) {
clientBuilder.keyStore(keyStore, password);
return this;
}

public RestClientBuilder sslIgnoreHostnameVerification() {
clientBuilder.hostnameVerifier(NO_HOST_NAME_VERIFIER);
return this;
}

/**
* Sets the connect timeout.
*
Expand Down Expand Up @@ -166,6 +214,44 @@ public RestClientBuilder objectMapper(final ObjectMapper objectMapper) {
return this;
}

public RestClientBuilder sslConfig(final SSLConfig sslConfig) {
TrustManager[] trustManagers;
try {
trustManagers = SSLUtils.createTrustManagers(
sslConfig.trustStoreLocation(),
sslConfig.trustStorePassword().toCharArray(),
sslConfig.trustStoreType(),
KeyManagerFactory.getDefaultAlgorithm()
);
} catch (CertificateException |
NoSuchAlgorithmException |
KeyStoreException |
IOException e) {
LOG.error("Could not create trust managers for Client Certificate authentication.", e);
throw new JikkouRuntimeException(e);
}
KeyManager[] keyManagers;
try {
keyManagers = SSLUtils.createKeyManagers(
sslConfig.keyStoreLocation(),
sslConfig.keyStorePassword().toCharArray(),
sslConfig.keyStoreType(),
KeyManagerFactory.getDefaultAlgorithm()
);
} catch (CertificateException |
NoSuchAlgorithmException |
UnrecoverableKeyException |
KeyStoreException |
IOException e) {
LOG.error("Could not create key managers for Client Certificate authentication.", e);
throw new JikkouRuntimeException(e);
}
SSLContextFactory sslContextFactory = new SSLContextFactory();
clientBuilder.sslContext(sslContextFactory.getSSLContext(keyManagers, trustManagers));

return sslConfig.ignoreHostnameVerification() ? sslIgnoreHostnameVerification() : this;
}

/**
* Builds a new client for the given resource interface.
*
Expand Down Expand Up @@ -225,4 +311,19 @@ public ObjectMapper getContext(Class<?> type) {
return mapper;
}
}

/**
* A {@link HostnameVerifier} that accept all certificates.
*/
public static class AllowAllHostNameVerifier implements HostnameVerifier {

/**
* {@inheritDoc}
*/
@Override
public boolean verify(final String hostname, final SSLSession sslSession) {
return true;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) The original authors
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.streamthoughts.jikkou.http.client;

import io.streamthoughts.jikkou.core.config.ConfigProperty;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.http.client.ssl.SSLConfig;

public interface SslConfigSupport {

static ConfigProperty<String> sslKeyStoreLocation(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslKeyStoreLocation")
.description("The location of the key store file.");
}

static ConfigProperty<String> sslKeyStoreType(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslKeyStoreType")
.description("The file format of the key store file.")
.orElse("PKCS12");
}

static ConfigProperty<String> sslKeyStorePassword(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslKeyStorePassword")
.description("The password for the key store file.");
}

static ConfigProperty<String> sslKeyPassword(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslKeyPassword")
.description("The password of the private key in the key store file.");
}

static ConfigProperty<String> sslTrustStoreLocation(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslTrustStoreLocation")
.description("The location of the trust store file.");
}

static ConfigProperty<String> sslTrustStoreType(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslTrustStoreType")
.description("The file format of the trust store file.")
.orElse("PKCS12");
}

static ConfigProperty<String> sslTrustStorePassword(final String configNamespace) {
return ConfigProperty
.ofString(configNamespace + ".sslTrustStorePassword")
.description("The password for the trust store file.");
}

static ConfigProperty<Boolean> sslIgnoreHostnameVerification(final String configNamespace) {
return ConfigProperty
.ofBoolean(configNamespace + ".sslIgnoreHostnameVerification")
.description("Specifies whether to ignore the hostname verification.")
.orElse(false);
}

static SSLConfig getSslConfig(final String configNamespace,
final Configuration configuration) {
return new SSLConfig(
SslConfigSupport.sslKeyStoreLocation(configNamespace).getOptional(configuration).orElse(null),
SslConfigSupport.sslKeyStorePassword(configNamespace).getOptional(configuration).orElse(null),
SslConfigSupport.sslKeyStoreType(configNamespace).get(configuration),
SslConfigSupport.sslKeyPassword(configNamespace).getOptional(configuration).orElse(null),
SslConfigSupport.sslTrustStoreLocation(configNamespace).getOptional(configuration).orElse(null),
SslConfigSupport.sslTrustStorePassword(configNamespace).getOptional(configuration).orElse(null),
SslConfigSupport.sslTrustStoreType(configNamespace).get(configuration),
SslConfigSupport.sslIgnoreHostnameVerification(configNamespace).get(configuration)
);
}
}
Loading

0 comments on commit 1857335

Please sign in to comment.