Skip to content

Commit

Permalink
Ksql: Add SSL support and bugfixes (#308)
Browse files Browse the repository at this point in the history
* extend ksql support

* add documentation for new ksql properties and introduce KsqlClientConfig class to bundle ksql related options
    format code
* add wait strategy in to ksql container to get rid of Thread.sleep call in integration tests
* add basic support for TLS and fix handling ksql urls #288 #287
* add missing toString method  #289

* refactor and add tests

* ensure verifyHost defaults to true
* testcontainers: wait for log message instead of polling ksql /info endpoint
* refactor to use BasicAuth class in JulieHttpClient
* add integration test for ksqldb with TLS

* remove editorconfig

* ammend token variable initialization within the julie http base client

Co-authored-by: Rocco Schulz <rocco@is-gr8.com>
Co-authored-by: Pere Urbón <purbon@users.noreply.github.com>
Co-authored-by: Pere Urbon <pere@confluent.io>
  • Loading branch information
4 people authored Sep 11, 2021
1 parent 19bc7fe commit 6bf0280
Show file tree
Hide file tree
Showing 35 changed files with 670 additions and 116 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Example:
## Contribution Steps

1. Test your changes! [Run](https://github.com/purbon/kafka-topology-builder#testing) the test suite
2. Run `mvn fmt:format` to format the code
3. Send a pull request! Push your changes to your fork of the repository and
[submit a pull
request](https://help.github.com/articles/using-pull-requests). In the pull
Expand Down
2 changes: 1 addition & 1 deletion docs/config-values.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Important configuration values
*******************************

This page describe the most common configuration values for Julie Ops, this values can be set within the topology-builder properties file.
This page describe the most common configuration values for Julie Ops, these values can be set within the topology-builder properties file.

Access control configuration
-----------
Expand Down
31 changes: 29 additions & 2 deletions docs/futures/what-ksql-management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,35 @@ It is currently supported to manage:
Configuring KSQL servers
-----------

In this configuration, you can define the KSQL server to be used during the artefacts creation, this could be done like this:
To manage KSQL artefacts, you must define the ksqlDB server URL to be used during the artefacts creation. This could be done like this:

.. code-block:: bash
platform.server.ksql = "http://ksql:8088"
platform.server.ksql.url = "http://ksql:8088"
*NOTE* The URL must contain the protocol and the port.


The following additional configuration keys are optional and can be used
to configure authentication and SSL related properties:

.. code-block:: bash
platform.server.ksql.useAlpn = false
platform.server.ksql.truststore = "/path/to/truststore.jks"
platform.server.ksql.truststorePw = "truststoresecret"
platform.server.ksql.verifyHost = true
platform.server.ksql.keystore = "/path/to/keystore.jks"
platform.server.ksql.keystorePw = "keystoresecret"
platform.server.ksql.user = "basic auth user"
platform.server.ksql.password = "basic auth password"
E.g. to connect to a Confluent Cloud ksqlDB cluster, you'd use the following properties:

.. code-block:: bash
platform.server.ksql.url = "https://<host>:443"
platform.server.ksql.useAlpn = true
platform.server.ksql.user = "<ksqlDB-API-key>"
platform.server.ksql.password = "<ksqlDB-API-secret>"
10 changes: 9 additions & 1 deletion example/descriptor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ projects:
write:
- "topicC"
- "topicD"
ksql:
artefacts:
streams:
- path: "ksql/riderlocations.sql"
name: "riderLocations"
tables:
- path: "ksql/users.sql"
name: "users"
connectors:
- principal: "User:Connect1"
group: "group"
Expand Down Expand Up @@ -81,4 +89,4 @@ platform:
control_center:
instances:
- principal: "User:ControlCenter"
appId: "controlcenter"
appId: "controlcenter"
1 change: 1 addition & 0 deletions example/ksql/riderlocations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) WITH (kafka_topic='locations', value_format='json', partitions=1);
6 changes: 6 additions & 0 deletions example/ksql/users.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE users ( id BIGINT PRIMARY KEY, usertimestamp BIGINT, gender VARCHAR, region_id VARCHAR)
WITH (
KAFKA_TOPIC = 'my-users-topic',
KEY_FORMAT='KAFKA', PARTITIONS=2, REPLICAS=1,
VALUE_FORMAT = 'JSON'
);
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ public BindingsBuilderProvider builder() throws IOException {

private MDSApiClient apiClientLogIn() {
MDSApiClient apiClient = mdsApiClientBuilder.build();
String mdsUser = config.getProperty(MDS_USER_CONFIG);
String mdsPassword = config.getProperty(MDS_PASSWORD_CONFIG);
apiClient.login(mdsUser, mdsPassword);
config.getMdsBasicAuth().ifPresent(apiClient::setBasicAuth);
return apiClient;
}
}
57 changes: 47 additions & 10 deletions src/main/java/com/purbon/kafka/topology/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import static com.purbon.kafka.topology.CommandLineInterface.*;
import static com.purbon.kafka.topology.Constants.*;

import com.purbon.kafka.topology.api.ksql.KsqlClientConfig;
import com.purbon.kafka.topology.exceptions.ConfigurationException;
import com.purbon.kafka.topology.model.Project;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.serdes.TopologySerdes.FileType;
import com.purbon.kafka.topology.utils.BasicAuth;
import com.purbon.kafka.topology.utils.Pair;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import java.io.File;
import java.util.*;
Expand Down Expand Up @@ -363,22 +366,56 @@ public Map<String, String> getKafkaConnectServers() {
return servers.stream()
.map(server -> server.split(":"))
.map(
new Function<String[], Pair<String, String>>() {
@Override
public Pair<String, String> apply(String[] strings) {
String key = strings[0].strip();
String value = String.join(":", Arrays.copyOfRange(strings, 1, strings.length));
return new Pair<>(key, value);
}
strings -> {
String key = strings[0].strip();
String value = String.join(":", Arrays.copyOfRange(strings, 1, strings.length));
return new Pair<>(key, value);
})
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

public String getKSQLServer() {
return config.getString(PLATFORM_SERVER_KSQL);
public KsqlClientConfig getKSQLClientConfig() {
KsqlClientConfig.Builder ksqlConf =
new KsqlClientConfig.Builder()
.setServer(getProperty(PLATFORM_SERVER_KSQL_URL))
.setTrustStore(getPropertyOrNull(PLATFORM_SERVER_KSQL_TRUSTSTORE))
.setTrustStorePassword(getPropertyOrNull(PLATFORM_SERVER_KSQL_TRUSTSTORE_PW))
.setKeyStore(getPropertyOrNull(PLATFORM_SERVER_KSQL_KEYSTORE))
.setKeyStorePassword(getPropertyOrNull(PLATFORM_SERVER_KSQL_KEYSTORE_PW));
if (hasProperty(PLATFORM_SERVER_KSQL_ALPN)) {
ksqlConf.setUseAlpn(config.getBoolean(PLATFORM_SERVER_KSQL_ALPN));
}
if (hasProperty(PLATFORM_SERVER_KSQL_BASIC_AUTH_PASSWORD)
&& hasProperty(PLATFORM_SERVER_KSQL_BASIC_AUTH_USER)) {
ksqlConf.setBasicAuth(
new BasicAuth(
getProperty(PLATFORM_SERVER_KSQL_BASIC_AUTH_USER),
getProperty(PLATFORM_SERVER_KSQL_BASIC_AUTH_PASSWORD)));
}

if (hasProperty(PLATFORM_SERVER_KSQL_VERIFY_HOST)) {
ksqlConf.setVerifyHost(config.getBoolean(PLATFORM_SERVER_KSQL_VERIFY_HOST));
}
return ksqlConf.build();
}

public boolean hasKSQLServer() {
return config.hasPath(PLATFORM_SERVER_KSQL);
return config.hasPath(PLATFORM_SERVER_KSQL_URL);
}

private String getPropertyOrNull(String key) {
try {
return config.getString(key);
} catch (ConfigException.Missing e) {
return null;
}
}

public Optional<BasicAuth> getMdsBasicAuth() {
BasicAuth auth = null;
if (hasProperty(MDS_USER_CONFIG) && hasProperty(MDS_PASSWORD_CONFIG)) {
auth = new BasicAuth(getProperty(MDS_USER_CONFIG), getProperty(MDS_PASSWORD_CONFIG));
}
return Optional.ofNullable(auth);
}
}
14 changes: 13 additions & 1 deletion src/main/java/com/purbon/kafka/topology/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

public class Constants {

public static final String HTTPS = "https";
public static final String KAFKA_INTERNAL_TOPIC_PREFIXES = "kafka.internal.topic.prefixes";
public static final String ACCESS_CONTROL_IMPLEMENTATION_CLASS =
"topology.builder.access.control.class";
Expand Down Expand Up @@ -82,7 +83,18 @@ public class Constants {
public static final String GROUP_MANAGED_PREFIXES = "topology.group.managed.prefixes";

public static final String PLATFORM_SERVERS_CONNECT = "platform.servers.connect";
public static final String PLATFORM_SERVER_KSQL = "platform.server.ksql";
public static final String PLATFORM_SERVER_KSQL_URL = "platform.server.ksql.url";
// XXX: consider re-using properties as they are used in ksql cli with --config-file
public static final String PLATFORM_SERVER_KSQL_ALPN = "platform.server.ksql.useAlpn";
public static final String PLATFORM_SERVER_KSQL_TRUSTSTORE = "platform.server.ksql.truststore";
public static final String PLATFORM_SERVER_KSQL_TRUSTSTORE_PW =
"platform.server.ksql.truststorePw";
public static final String PLATFORM_SERVER_KSQL_VERIFY_HOST = "platform.server.ksql.verifyHost";
public static final String PLATFORM_SERVER_KSQL_KEYSTORE = "platform.server.ksql.keystore";
public static final String PLATFORM_SERVER_KSQL_KEYSTORE_PW = "platform.server.ksql.keystorePw";
public static final String PLATFORM_SERVER_KSQL_BASIC_AUTH_USER = "platform.server.ksql.user";
public static final String PLATFORM_SERVER_KSQL_BASIC_AUTH_PASSWORD =
"platform.server.ksql.password";

public static final String TOPOLOGY_BUILDER_INTERNAL_PRINCIPAL =
"topology.builder.internal.principal";
Expand Down
26 changes: 10 additions & 16 deletions src/main/java/com/purbon/kafka/topology/JulieOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,14 @@ public static JulieOps build(String topologyFile, String plansFile, Map<String,

PrincipalProviderFactory principalProviderFactory = new PrincipalProviderFactory(builderConfig);

JulieOps builder =
build(
topologyFile,
plansFile,
builderConfig,
adminClient,
factory.get(),
factory.builder(),
principalProviderFactory.get());

return builder;
return build(
topologyFile,
plansFile,
builderConfig,
adminClient,
factory.get(),
factory.builder(),
principalProviderFactory.get());
}

public static JulieOps build(
Expand Down Expand Up @@ -192,10 +189,7 @@ private static KSqlArtefactManager configureKSqlArtefactManager(

Map<String, KsqlApiClient> clients = new HashMap<>();
if (config.hasKSQLServer()) {
String ksqlAddress = config.getKSQLServer();
String server = ksqlAddress.substring(0, ksqlAddress.lastIndexOf(":"));
Integer port = Integer.parseInt(ksqlAddress.substring(ksqlAddress.lastIndexOf(":") + 1));
KsqlApiClient client = new KsqlApiClient(server, port);
KsqlApiClient client = new KsqlApiClient(config.getKSQLClientConfig());
clients.put("default", client);
}

Expand Down Expand Up @@ -280,7 +274,7 @@ public static String getVersion() {
private static BackendController buildBackendController(Configuration config) throws IOException {

String backendClass = config.getStateProcessorImplementationClassName();
Backend backend = null;
Backend backend;
try {
if (backendClass.equalsIgnoreCase(STATE_PROCESSOR_DEFAULT_CLASS)) {
backend = new FileBackend();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ public class CreateArtefactAction extends BaseAction {

private static final Logger LOGGER = LogManager.getLogger(CreateArtefactAction.class);

private ArtefactClient client;
private Artefact artefact;
private String rootPath;
private Collection<? extends Artefact> artefacts;
private final ArtefactClient client;
private final Artefact artefact;
private final String rootPath;
private final Collection<? extends Artefact> artefacts;

public CreateArtefactAction(
ArtefactClient client,
Expand Down Expand Up @@ -48,8 +48,7 @@ public Artefact getArtefact() {

private String content() throws IOException {
LOGGER.debug(
String.format(
"Reading artefact content from %s with rootPath %s", artefact.getPath(), rootPath));
"Reading artefact content from " + artefact.getPath() + " with rootPath " + rootPath);
return Utils.readFullFile(filePath(artefact.getPath(), rootPath));
}

Expand Down
68 changes: 37 additions & 31 deletions src/main/java/com/purbon/kafka/topology/api/ksql/KsqlApiClient.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.purbon.kafka.topology.api.ksql;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.purbon.kafka.topology.Constants;
import com.purbon.kafka.topology.clients.ArtefactClient;
import com.purbon.kafka.topology.model.Artefact;
import com.purbon.kafka.topology.model.artefact.KsqlArtefact;
Expand All @@ -12,37 +13,44 @@
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.TableInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.net.URL;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class KsqlApiClient implements ArtefactClient {

private String server;
private Integer port;
private Client client;
private final URL server;
private final Client client;

public static String QUERY_TYPE = "query";
public static String STREAM_TYPE = "stream";
public static String TABLE_TYPE = "table";
public static final String QUERY_TYPE = "query";
public static final String STREAM_TYPE = "stream";
public static final String TABLE_TYPE = "table";

public KsqlApiClient(String server, Integer port) {
this.server = server;
this.port = port;
public KsqlApiClient(KsqlClientConfig ksqlClientConfig) {
this.server = ksqlClientConfig.getServer();
ClientOptions options =
ClientOptions.create().setHost(server.split(":")[0].strip()).setPort(port);
client = Client.create(options);
ClientOptions.create().setHost(server.getHost()).setPort(server.getPort());
if (server.getProtocol() != null && server.getProtocol().equals(Constants.HTTPS)) {
options.setUseTls(true);
}
options.setUseAlpn(ksqlClientConfig.useAlpn());
options.setKeyStore(ksqlClientConfig.getKeyStore());
options.setKeyStorePassword(ksqlClientConfig.getKeyStorePassword());
options.setTrustStore(ksqlClientConfig.getTrustStore());
options.setTrustStorePassword(ksqlClientConfig.getTrustStorePassword());
options.setVerifyHost(ksqlClientConfig.isVerifyHost());
if (ksqlClientConfig.useBasicAuth()) {
options.setBasicAuthCredentials(
ksqlClientConfig.getBasicAuth().getUser(), ksqlClientConfig.getBasicAuth().getPassword());
}
this.client = Client.create(options);
}

@Override
public String getServer() {
return server + ":" + port;
return server.toString();
}

@Override
Expand Down Expand Up @@ -93,21 +101,19 @@ public List<String> listTables() throws IOException {
}

return infos.stream()
.map(tableInfo -> new KsqlTableArtefact("", server, tableInfo.getName()))
.map(artefactToString())
.map(tableInfo -> new KsqlTableArtefact("", server.getHost(), tableInfo.getName()))
.map(this::artefactToString)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}

private Function<KsqlArtefact, String> artefactToString() {
return ksqlArtefact -> {
try {
return JSON.asString(ksqlArtefact);
} catch (JsonProcessingException e) {
e.printStackTrace();
return "";
}
};
private String artefactToString(KsqlArtefact ksqlArtefact) {
try {
return JSON.asString(ksqlArtefact);
} catch (JsonProcessingException e) {
e.printStackTrace();
return "";
}
}

public List<String> listStreams() throws IOException {
Expand All @@ -121,8 +127,8 @@ public List<String> listStreams() throws IOException {

return infos.stream()
.filter(e -> !"KSQL_PROCESSING_LOG".equalsIgnoreCase(e.getName()))
.map(queryInfo -> new KsqlStreamArtefact("", server, queryInfo.getName()))
.map(artefactToString())
.map(queryInfo -> new KsqlStreamArtefact("", server.getHost(), queryInfo.getName()))
.map(this::artefactToString)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}
Expand Down
Loading

0 comments on commit 6bf0280

Please sign in to comment.