Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a validate only flag process to facilitate descriptor testing in feature branches and add ConfigurationKeyValidation and TopicNameRegexValidation validations #274

Merged
merged 12 commits into from
May 1, 2021
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,20 @@ This is how you can run the tool directly as a docker image:
docker run purbon/kafka-topology-builder:latest julie-ops-cli.sh --help
Parsing failed cause of Missing required options: topology, brokers, clientConfig
usage: cli
--brokers <arg> The Apache Kafka server(s) to connect to.
--clientConfig <arg> The client configuration file.
--overridingClientConfig <arg> The overriding client configuration file.
--dryRun Print the execution plan without altering
anything.
--help Prints usage information.
--quiet Print minimum status update
--topology <arg> Topology config file.
--version Prints useful version information.
--brokers <arg> The Apache Kafka server(s) to connect
to.
--clientConfig <arg> The client configuration file.
--dryRun Print the execution plan without
altering anything.
--help Prints usage information.
--overridingClientConfig <arg> The overriding AdminClient
configuration file.
--plans <arg> File describing the predefined plans
--quiet Print minimum status update
--topology <arg> Topology config file.
--validate Only run configured validations in
your topology
--version Prints useful version information.
```

If you install the tool as rpm, you will have available in your $PATH the _julie-ops-cli.sh_.
Expand Down
53 changes: 53 additions & 0 deletions docs/futures/run-validations.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
Validate your topologies
*******************************

A normal practise in many *gitops* deployments is to run a set of automated validations before allowing the changes in.
JulieOps allows the users to run a variable set of validations before the project will apply the changes into each of the managed components.

Validate a topology in a feature branch
-----------

As a user you can use the *--validate* CLI option to only validate the incoming topology. Note this would run a validation completely offline,
without any knowledge of the current state in the cluster.

To configure which validations you require for your topology the reader would need to do it in the configuration file, this can be done like this:

.. code-block:: bash

topology.validations.0=com.purbon.kafka.topology.validation.topic.ConfigurationKeyValidation
topology.validations.1=com.purbon.kafka.topology.validation.topic.TopicNameRegexValidation
topology.validations.topic.name.regexp="[a-z0-9]"

In the previous example we have configured two validations.

1.- ConfigurationKeyValidation will make sure all config keys are valid for Kafka.
2.- Will validate, based on the configured regexp that all topic names follow the right pattern.

All detected errors will be reported in a single outcome like this:

.. code-block:: bash

...
Exception in thread "main" com.purbon.kafka.topology.exceptions.ValidationException: Topology name does not follow the camelCase format: context
Topic context.company.env.source.projectA.foo has an invalid number of partitions: 1
Topic context.company.env.source.projectA.bar.avro has an invalid number of partitions: 1
Topic context.company.env.source.projectB.bar.avro has an invalid number of partitions: 1
Topic context.company.env.source.projectC.topicE has an invalid number of partitions: 1
Topic context.company.env.source.projectC.topicF has an invalid number of partitions: 1
at com.purbon.kafka.topology.JulieOps.build(JulieOps.java:125)
at com.purbon.kafka.topology.JulieOps.build(JulieOps.java:75)
at com.purbon.kafka.topology.CommandLineInterface.processTopology(CommandLineInterface.java:206)
at com.purbon.kafka.topology.CommandLineInterface.run(CommandLineInterface.java:156)
at com.purbon.kafka.topology.CommandLineInterface.main(CommandLineInterface.java:146)

Add your own validations
-----------

JulieOps provides you with a set of integrated validations, however you as user can provide your own. To do so you will need to:

* Code your validation following the required interfaces as defined in the JulieOps project. See core validations to see the current pattern.
* Build a jar with your validations.
* Run JulieOps with a configured CLASSPATH where the JVM can find access to your validations jar in order to dynamically load them.
Remember when running JulieOps you can use the _JULIE_OPS_OPTIONS_ env variable to pass custom system configurations such as CLASSPATH or related to security.

**NOTE**: The UberJar is for now only available from the release page, in future releases we will facilitate a smaller plugin jar.
25 changes: 13 additions & 12 deletions docs/how-to-run-it.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ If you are using the CLI tool, you an use the *--help* command to list the diffe

$> julie-ops-cli.sh --help
usage: cli
--brokers <arg> The Apache Kafka server(s) to connect
to.
--clientConfig <arg> The client configuration file.
--dryRun Print the execution plan without
altering anything.
--help Prints usage information.
--overridingClientConfig <arg> The overriding client configuration
file.
--plans <arg> File describing the predefined plans
--quiet Print minimum status update
--topology <arg> Topology config file.
--version Prints useful version information.
--brokers <arg> The Apache Kafka server(s) to connect to.
--clientConfig <arg> The client configuration file.
--dryRun Print the execution plan without
altering anything.
--help Prints usage information.
--overridingClientConfig <arg> The overriding AdminClient
configuration file.
--plans <arg> File describing the predefined plans
--quiet Print minimum status update
--topology <arg> Topology config file.
--validate Only run configured validations in
your topology
--version Prints useful version information.

The most important ones are:

Expand Down
2 changes: 2 additions & 0 deletions example/topology-builder-sasl-plain.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="kafka" \
password="kafka";
#topology.validations.0=com.purbon.kafka.topology.validation.topology.CamelCaseNameFormatValidation
#topology.validations.1=com.purbon.kafka.topology.validation.topic.PartitionNumberValidation
#confluent.schema.registry.url="http://localhost:8082"
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class CommandLineInterface {
public static final String QUIET_OPTION = "quiet";
public static final String QUIET_DESC = "Print minimum status update";

public static final String VALIDATE_OPTION = "validate";
public static final String VALIDATE_DESC = "Only run configured validations in your topology";

public static final String HELP_OPTION = "help";
public static final String HELP_DESC = "Prints usage information.";

Expand Down Expand Up @@ -102,6 +105,14 @@ private Options buildOptions() {
.required(false)
.build();

final Option validateOption =
Option.builder()
.longOpt(VALIDATE_OPTION)
.hasArg(false)
.desc(VALIDATE_DESC)
.required(false)
.build();

final Option versionOption =
Option.builder()
.longOpt(VERSION_OPTION)
Expand All @@ -123,6 +134,7 @@ private Options buildOptions() {
options.addOption(overridingAdminClientConfigFileOption);
options.addOption(dryRunOption);
options.addOption(quietOption);
options.addOption(validateOption);
options.addOption(versionOption);
options.addOption(helpOption);

Expand All @@ -143,7 +155,9 @@ public void run(String[] args) throws Exception {

processTopology(
cmd.getOptionValue(TOPOLOGY_OPTION), cmd.getOptionValue(PLANS_OPTION, "default"), config);
System.out.println("Kafka Topology updated");
if (!cmd.hasOption(DRY_RUN_OPTION) && !cmd.hasOption(VALIDATE_OPTION)) {
System.out.println("Kafka Topology updated");
}
}

private Map<String, String> parseConfig(CommandLine cmd) {
Expand All @@ -153,6 +167,7 @@ private Map<String, String> parseConfig(CommandLine cmd) {
}
config.put(DRY_RUN_OPTION, String.valueOf(cmd.hasOption(DRY_RUN_OPTION)));
config.put(QUIET_OPTION, String.valueOf(cmd.hasOption(QUIET_OPTION)));
config.put(VALIDATE_OPTION, String.valueOf(cmd.hasOption(VALIDATE_OPTION)));
config.put(
OVERRIDING_CLIENT_CONFIG_OPTION, cmd.getOptionValue(OVERRIDING_CLIENT_CONFIG_OPTION));
config.put(CLIENT_CONFIG_OPTION, cmd.getOptionValue(CLIENT_CONFIG_OPTION));
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/purbon/kafka/topology/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ public boolean isQuiet() {
return Boolean.parseBoolean(cliParams.getOrDefault(CommandLineInterface.QUIET_OPTION, "false"));
}

public boolean doValidate() {
return Boolean.parseBoolean(
cliParams.getOrDefault(CommandLineInterface.VALIDATE_OPTION, "false"));
}

public boolean isDryRun() {
return Boolean.parseBoolean(cliParams.getOrDefault(DRY_RUN_OPTION, "false"));
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/purbon/kafka/topology/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,7 @@ public class Constants {
public static final String JULIE_S3_BUCKET = "julie.s3.bucket";
public static final String JULIE_GCP_PROJECT_ID = "julie.gcp.project.id";
public static final String JULIE_GCP_BUCKET = "julie.gcp.bucket";

public static final String TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP =
"topology.validations.topic.name.regexp";
}
3 changes: 3 additions & 0 deletions src/main/java/com/purbon/kafka/topology/JulieOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ void run(ExecutionPlan plan) throws IOException {
}

public void run() throws IOException {
if (config.doValidate()) {
return;
}
BackendController cs = buildBackendController(config);
ExecutionPlan plan = ExecutionPlan.init(cs, outputStream);
run(plan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public TopologyBuilderAdminClient build() throws IOException {
"Connecting AdminClient to %s",
props.getProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)));
TopologyBuilderAdminClient client = new TopologyBuilderAdminClient(AdminClient.create(props));
if (!config.isDryRun()) {
if (!config.isDryRun() && !config.doValidate()) {
client.healthCheck();
}
return client;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.purbon.kafka.topology.validation.topic;

import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Impl.TopicImpl;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.validation.TopicValidation;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Map;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* This validation checks that all topic configs are valid ones according to the TopicConfig class.
*/
public class ConfigurationKeyValidation implements TopicValidation {

private static final Logger LOGGER = LogManager.getLogger(ConfigurationKeyValidation.class);

@Override
public void valid(Topic topic) throws ValidationException {
Field[] fields = TopicConfig.class.getDeclaredFields();
TopicConfig config = new TopicConfig();
Map<String, String> topicConfig = getTopicConfig(topic);
for (Map.Entry<String, String> entry : topicConfig.entrySet()) {
boolean match =
Arrays.stream(fields)
.anyMatch(
field -> {
try {
return ((String) field.get(config)).contains(entry.getKey());
} catch (IllegalAccessException e) {
LOGGER.error(e);
return false;
}
});
if (!match) {
String msg =
String.format("Topic %s has an invalid configuration value: %s", topic, entry.getKey());
throw new ValidationException(msg);
}
}
}

private Map<String, String> getTopicConfig(Topic topic) {
Topic clonedTopic = ((TopicImpl) topic).clone();
return clonedTopic.getRawConfig();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.purbon.kafka.topology.validation.topic;

import static com.purbon.kafka.topology.Constants.TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP;

import com.purbon.kafka.topology.exceptions.ConfigurationException;
import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.validation.TopicValidation;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class TopicNameRegexValidation implements TopicValidation {

private static final Logger LOGGER = LogManager.getLogger(TopicNameRegexValidation.class);

private String topicNamePattern;

public TopicNameRegexValidation() throws ConfigurationException {
this(getTopicNamePatternFromConfig());
}

public TopicNameRegexValidation(String pattern) throws ConfigurationException {
validateRegexpPattern(pattern);

this.topicNamePattern = pattern;
}

@Override
public void valid(Topic topic) throws ValidationException {
LOGGER.trace(String.format("Applying Topic Name Regex Validation [%s]", topicNamePattern));

if (!topic.getName().matches(topicNamePattern)) {
String msg =
String.format("Topic name '%s' does not follow regex: %s", topic, topicNamePattern);
throw new ValidationException(msg);
}
}

private static String getTopicNamePatternFromConfig() throws ConfigurationException {
Config config = ConfigFactory.load();
try {
return config.getString(TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP);
} catch (ConfigException e) {
String msg =
String.format(
"TopicNameRegexValidation requires you to define your regex in config '%s'",
TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP);
throw new ConfigurationException(msg);
}
}

private void validateRegexpPattern(String pattern) throws ConfigurationException {
if (StringUtils.isBlank(pattern)) {
throw new ConfigurationException(
"TopicNameRegexValidation is configured without specifying a topic name pattern. Use config 'topology.validations.regexp'");
}

try {
Pattern.compile(pattern);
} catch (PatternSyntaxException exception) {
throw new ConfigurationException(
String.format("TopicNameRegexValidation configured with unvalid regex '%s'", pattern));
}
}
}
2 changes: 2 additions & 0 deletions src/test/java/com/purbon/kafka/topology/CLITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void testParamPassing() throws Exception {
config.put(BROKERS_OPTION, "localhost:9092");
config.put(DRY_RUN_OPTION, "false");
config.put(QUIET_OPTION, "false");
config.put(VALIDATE_OPTION, "false");
config.put(CLIENT_CONFIG_OPTION, "topology-builder-sasl-plain.properties");
config.put(OVERRIDING_CLIENT_CONFIG_OPTION, null);
cli.run(args);
Expand All @@ -65,6 +66,7 @@ public void testDryRun() throws Exception {
config.put(BROKERS_OPTION, "localhost:9092");
config.put(DRY_RUN_OPTION, "true");
config.put(QUIET_OPTION, "false");
config.put(VALIDATE_OPTION, "false");
config.put(CLIENT_CONFIG_OPTION, "topology-builder-sasl-plain.properties");
config.put(OVERRIDING_CLIENT_CONFIG_OPTION, null);
cli.run(args);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.purbon.kafka.topology.validation.topic;

import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Impl.TopicImpl;
import com.purbon.kafka.topology.model.Topic;
import java.util.HashMap;
import org.apache.kafka.common.config.TopicConfig;
import org.junit.Test;

public class ConfigurationKeyValidationTest {

@Test(expected = ValidationException.class)
public void testKoConfigValues() throws ValidationException {
var config = new HashMap<String, String>();
config.put("foo", "2");
config.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip");
Topic topic = new TopicImpl("topic", config);

ConfigurationKeyValidation validation = new ConfigurationKeyValidation();
validation.valid(topic);
}

@Test
public void testOkConfigValues() throws ValidationException {
var config = new HashMap<String, String>();
config.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip");
Topic topic = new TopicImpl("topic", config);

ConfigurationKeyValidation validation = new ConfigurationKeyValidation();
validation.valid(topic);
}

@Test
public void testPartitionsAndReplicationConfigValues() throws ValidationException {
var config = new HashMap<String, String>();
config.put("replication.factor", "3");
Topic topic = new TopicImpl("topic", config);

ConfigurationKeyValidation validation = new ConfigurationKeyValidation();
validation.valid(topic);
}
}
Loading