diff --git a/README.md b/README.md index 31242030d..4e95d1a8a 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,8 @@ usage: cli --help Prints usage information. --quiet Print minimum status update --topology Topology config file. + --validate Only run configured validations in + your topology --version Prints useful version information. ``` diff --git a/docs/futures/run-validations.rst b/docs/futures/run-validations.rst new file mode 100644 index 000000000..e73eac380 --- /dev/null +++ b/docs/futures/run-validations.rst @@ -0,0 +1,53 @@ +Validate your topologies +******************************* + +A normal practise in many *gitops* deployments is to run a set of automated validations before allowing the changes in. +JulieOps allows the users to run a variable set of validations before the project will apply the changes into each of the managed components. + +Validate a topology in a feature branch +----------- + +As a user you can use the *--validate* CLI option to only validate the incoming topology. Note this would run a validation completely offline, +without any knowledge of the current state in the cluster. + +To configure which validations you require for your topology the reader would need to do it in the configuration file, this can be done like this: + +.. code-block:: bash + + topology.validations.0=com.purbon.kafka.topology.validation.topic.ConfigurationKeyValidation + topology.validations.1=com.purbon.kafka.topology.validation.topic.TopicNameRegexValidation + topology.validations.topic.name.regexp="[a-z0-9]" + +In the previous example we have configured two validations. + +1.- ConfigurationKeyValidation will make sure all config keys are valid for Kafka. +2.- Will validate, based on the configured regexp that all topic names follow the right pattern. + +All detected errors will be reported in a single outcome like this: + +.. code-block:: bash + + ... + Exception in thread "main" com.purbon.kafka.topology.exceptions.ValidationException: Topology name does not follow the camelCase format: context + Topic context.company.env.source.projectA.foo has an invalid number of partitions: 1 + Topic context.company.env.source.projectA.bar.avro has an invalid number of partitions: 1 + Topic context.company.env.source.projectB.bar.avro has an invalid number of partitions: 1 + Topic context.company.env.source.projectC.topicE has an invalid number of partitions: 1 + Topic context.company.env.source.projectC.topicF has an invalid number of partitions: 1 + at com.purbon.kafka.topology.JulieOps.build(JulieOps.java:125) + at com.purbon.kafka.topology.JulieOps.build(JulieOps.java:75) + at com.purbon.kafka.topology.CommandLineInterface.processTopology(CommandLineInterface.java:206) + at com.purbon.kafka.topology.CommandLineInterface.run(CommandLineInterface.java:156) + at com.purbon.kafka.topology.CommandLineInterface.main(CommandLineInterface.java:146) + +Add your own validations +----------- + +JulieOps provides you with a set of integrated validations, however you as user can provide your own. To do so you will need to: + +* Code your validation following the required interfaces as defined in the JulieOps project. See core validations to see the current pattern. +* Build a jar with your validations. +* Run JulieOps with a configured CLASSPATH where the JVM can find access to your validations jar in order to dynamically load them. +Remember when running JulieOps you can use the _JULIE_OPS_OPTIONS_ env variable to pass custom system configurations such as CLASSPATH or related to security. + +**NOTE**: The UberJar is for now only available from the release page, in future releases we will facilitate a smaller plugin jar. \ No newline at end of file diff --git a/docs/how-to-run-it.rst b/docs/how-to-run-it.rst index d8801785a..d06f88d8d 100644 --- a/docs/how-to-run-it.rst +++ b/docs/how-to-run-it.rst @@ -17,6 +17,7 @@ If you are using the CLI tool, you an use the *--help* command to list the diffe $> julie-ops-cli.sh --help usage: cli +<<<<<<< HEAD --allowDelete Permits delete operations for topics and configs. (deprecated, to be removed) @@ -31,6 +32,8 @@ If you are using the CLI tool, you an use the *--help* command to list the diffe --plans File describing the predefined plans --quiet Print minimum status update --topology Topology config file. + --validate Only run configured validations in + your topology --version Prints useful version information. The most important ones are: diff --git a/example/topology-builder-sasl-plain.properties b/example/topology-builder-sasl-plain.properties index 694a58074..96a7ac9e4 100644 --- a/example/topology-builder-sasl-plain.properties +++ b/example/topology-builder-sasl-plain.properties @@ -3,4 +3,6 @@ sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="kafka" \ password="kafka"; +#topology.validations.0=com.purbon.kafka.topology.validation.topology.CamelCaseNameFormatValidation +#topology.validations.1=com.purbon.kafka.topology.validation.topic.PartitionNumberValidation #confluent.schema.registry.url="http://localhost:8082" \ No newline at end of file diff --git a/src/main/java/com/purbon/kafka/topology/CommandLineInterface.java b/src/main/java/com/purbon/kafka/topology/CommandLineInterface.java index fa34e1976..9c3996b79 100644 --- a/src/main/java/com/purbon/kafka/topology/CommandLineInterface.java +++ b/src/main/java/com/purbon/kafka/topology/CommandLineInterface.java @@ -40,6 +40,9 @@ public class CommandLineInterface { public static final String QUIET_OPTION = "quiet"; public static final String QUIET_DESC = "Print minimum status update"; + public static final String VALIDATE_OPTION = "validate"; + public static final String VALIDATE_DESC = "Only run configured validations in your topology"; + public static final String HELP_OPTION = "help"; public static final String HELP_DESC = "Prints usage information."; @@ -114,6 +117,14 @@ private Options buildOptions() { .required(false) .build(); + final Option validateOption = + Option.builder() + .longOpt(VALIDATE_OPTION) + .hasArg(false) + .desc(VALIDATE_DESC) + .required(false) + .build(); + final Option versionOption = Option.builder() .longOpt(VERSION_OPTION) @@ -136,6 +147,7 @@ private Options buildOptions() { options.addOption(allowDeleteOption); options.addOption(dryRunOption); options.addOption(quietOption); + options.addOption(validateOption); options.addOption(versionOption); options.addOption(helpOption); @@ -156,7 +168,9 @@ public void run(String[] args) throws Exception { processTopology( cmd.getOptionValue(TOPOLOGY_OPTION), cmd.getOptionValue(PLANS_OPTION, "default"), config); - System.out.println("Kafka Topology updated"); + if (!cmd.hasOption(DRY_RUN_OPTION) && !cmd.hasOption(VALIDATE_OPTION)) { + System.out.println("Kafka Topology updated"); + } } private Map parseConfig(CommandLine cmd) { @@ -174,6 +188,7 @@ private Map parseConfig(CommandLine cmd) { } config.put(DRY_RUN_OPTION, String.valueOf(cmd.hasOption(DRY_RUN_OPTION))); config.put(QUIET_OPTION, String.valueOf(cmd.hasOption(QUIET_OPTION))); + config.put(VALIDATE_OPTION, String.valueOf(cmd.hasOption(VALIDATE_OPTION))); config.put( OVERRIDING_CLIENT_CONFIG_OPTION, cmd.getOptionValue(OVERRIDING_CLIENT_CONFIG_OPTION)); config.put(CLIENT_CONFIG_OPTION, cmd.getOptionValue(CLIENT_CONFIG_OPTION)); diff --git a/src/main/java/com/purbon/kafka/topology/Configuration.java b/src/main/java/com/purbon/kafka/topology/Configuration.java index 8c2d0d3b9..7048f8e33 100644 --- a/src/main/java/com/purbon/kafka/topology/Configuration.java +++ b/src/main/java/com/purbon/kafka/topology/Configuration.java @@ -285,6 +285,11 @@ public boolean isQuiet() { return Boolean.parseBoolean(cliParams.getOrDefault(CommandLineInterface.QUIET_OPTION, "false")); } + public boolean doValidate() { + return Boolean.parseBoolean( + cliParams.getOrDefault(CommandLineInterface.VALIDATE_OPTION, "false")); + } + public boolean isDryRun() { return Boolean.parseBoolean(cliParams.getOrDefault(DRY_RUN_OPTION, "false")); } diff --git a/src/main/java/com/purbon/kafka/topology/Constants.java b/src/main/java/com/purbon/kafka/topology/Constants.java index 0c77c1b3e..07e0f3d7e 100644 --- a/src/main/java/com/purbon/kafka/topology/Constants.java +++ b/src/main/java/com/purbon/kafka/topology/Constants.java @@ -88,4 +88,7 @@ public class Constants { public static final String JULIE_S3_BUCKET = "julie.s3.bucket"; public static final String JULIE_GCP_PROJECT_ID = "julie.gcp.project.id"; public static final String JULIE_GCP_BUCKET = "julie.gcp.bucket"; + + public static final String TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP = + "topology.validations.topic.name.regexp"; } diff --git a/src/main/java/com/purbon/kafka/topology/JulieOps.java b/src/main/java/com/purbon/kafka/topology/JulieOps.java index 9d2b0834e..03269cc83 100644 --- a/src/main/java/com/purbon/kafka/topology/JulieOps.java +++ b/src/main/java/com/purbon/kafka/topology/JulieOps.java @@ -191,6 +191,9 @@ void run(ExecutionPlan plan) throws IOException { } public void run() throws IOException { + if (config.doValidate()) { + return; + } BackendController cs = buildBackendController(config); ExecutionPlan plan = ExecutionPlan.init(cs, outputStream); run(plan); diff --git a/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClientBuilder.java b/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClientBuilder.java index 5965a63e1..a4e0b25a4 100644 --- a/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClientBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClientBuilder.java @@ -26,7 +26,7 @@ public TopologyBuilderAdminClient build() throws IOException { "Connecting AdminClient to %s", props.getProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG))); TopologyBuilderAdminClient client = new TopologyBuilderAdminClient(AdminClient.create(props)); - if (!config.isDryRun()) { + if (!config.isDryRun() && !config.doValidate()) { client.healthCheck(); } return client; diff --git a/src/main/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidation.java b/src/main/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidation.java new file mode 100644 index 000000000..c2fc5e1b2 --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidation.java @@ -0,0 +1,50 @@ +package com.purbon.kafka.topology.validation.topic; + +import com.purbon.kafka.topology.exceptions.ValidationException; +import com.purbon.kafka.topology.model.Impl.TopicImpl; +import com.purbon.kafka.topology.model.Topic; +import com.purbon.kafka.topology.validation.TopicValidation; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Map; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * This validation checks that all topic configs are valid ones according to the TopicConfig class. + */ +public class ConfigurationKeyValidation implements TopicValidation { + + private static final Logger LOGGER = LogManager.getLogger(ConfigurationKeyValidation.class); + + @Override + public void valid(Topic topic) throws ValidationException { + Field[] fields = TopicConfig.class.getDeclaredFields(); + TopicConfig config = new TopicConfig(); + Map topicConfig = getTopicConfig(topic); + for (Map.Entry entry : topicConfig.entrySet()) { + boolean match = + Arrays.stream(fields) + .anyMatch( + field -> { + try { + return ((String) field.get(config)).contains(entry.getKey()); + } catch (IllegalAccessException e) { + LOGGER.error(e); + return false; + } + }); + if (!match) { + String msg = + String.format("Topic %s has an invalid configuration value: %s", topic, entry.getKey()); + throw new ValidationException(msg); + } + } + } + + private Map getTopicConfig(Topic topic) { + Topic clonedTopic = ((TopicImpl) topic).clone(); + return clonedTopic.getRawConfig(); + } +} diff --git a/src/main/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidation.java b/src/main/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidation.java new file mode 100644 index 000000000..75c3ce505 --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidation.java @@ -0,0 +1,71 @@ +package com.purbon.kafka.topology.validation.topic; + +import static com.purbon.kafka.topology.Constants.TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP; + +import com.purbon.kafka.topology.exceptions.ConfigurationException; +import com.purbon.kafka.topology.exceptions.ValidationException; +import com.purbon.kafka.topology.model.Topic; +import com.purbon.kafka.topology.validation.TopicValidation; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigException; +import com.typesafe.config.ConfigFactory; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class TopicNameRegexValidation implements TopicValidation { + + private static final Logger LOGGER = LogManager.getLogger(TopicNameRegexValidation.class); + + private String topicNamePattern; + + public TopicNameRegexValidation() throws ConfigurationException { + this(getTopicNamePatternFromConfig()); + } + + public TopicNameRegexValidation(String pattern) throws ConfigurationException { + validateRegexpPattern(pattern); + + this.topicNamePattern = pattern; + } + + @Override + public void valid(Topic topic) throws ValidationException { + LOGGER.trace(String.format("Applying Topic Name Regex Validation [%s]", topicNamePattern)); + + if (!topic.getName().matches(topicNamePattern)) { + String msg = + String.format("Topic name '%s' does not follow regex: %s", topic, topicNamePattern); + throw new ValidationException(msg); + } + } + + private static String getTopicNamePatternFromConfig() throws ConfigurationException { + Config config = ConfigFactory.load(); + try { + return config.getString(TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP); + } catch (ConfigException e) { + String msg = + String.format( + "TopicNameRegexValidation requires you to define your regex in config '%s'", + TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP); + throw new ConfigurationException(msg); + } + } + + private void validateRegexpPattern(String pattern) throws ConfigurationException { + if (StringUtils.isBlank(pattern)) { + throw new ConfigurationException( + "TopicNameRegexValidation is configured without specifying a topic name pattern. Use config 'topology.validations.regexp'"); + } + + try { + Pattern.compile(pattern); + } catch (PatternSyntaxException exception) { + throw new ConfigurationException( + String.format("TopicNameRegexValidation configured with unvalid regex '%s'", pattern)); + } + } +} diff --git a/src/test/java/com/purbon/kafka/topology/CLITest.java b/src/test/java/com/purbon/kafka/topology/CLITest.java index 071bb3bb3..7aea92cd4 100644 --- a/src/test/java/com/purbon/kafka/topology/CLITest.java +++ b/src/test/java/com/purbon/kafka/topology/CLITest.java @@ -43,6 +43,7 @@ public void testParamPassing() throws Exception { config.put(ALLOW_DELETE_OPTION, "false"); config.put(DRY_RUN_OPTION, "false"); config.put(QUIET_OPTION, "false"); + config.put(VALIDATE_OPTION, "false"); config.put(CLIENT_CONFIG_OPTION, "topology-builder-sasl-plain.properties"); config.put(OVERRIDING_CLIENT_CONFIG_OPTION, null); cli.run(args); @@ -67,6 +68,7 @@ public void testDryRun() throws Exception { config.put(ALLOW_DELETE_OPTION, "false"); config.put(DRY_RUN_OPTION, "true"); config.put(QUIET_OPTION, "false"); + config.put(VALIDATE_OPTION, "false"); config.put(CLIENT_CONFIG_OPTION, "topology-builder-sasl-plain.properties"); config.put(OVERRIDING_CLIENT_CONFIG_OPTION, null); cli.run(args); diff --git a/src/test/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidationTest.java b/src/test/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidationTest.java new file mode 100644 index 000000000..2d4e249a7 --- /dev/null +++ b/src/test/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidationTest.java @@ -0,0 +1,42 @@ +package com.purbon.kafka.topology.validation.topic; + +import com.purbon.kafka.topology.exceptions.ValidationException; +import com.purbon.kafka.topology.model.Impl.TopicImpl; +import com.purbon.kafka.topology.model.Topic; +import java.util.HashMap; +import org.apache.kafka.common.config.TopicConfig; +import org.junit.Test; + +public class ConfigurationKeyValidationTest { + + @Test(expected = ValidationException.class) + public void testKoConfigValues() throws ValidationException { + HashMap config = new HashMap(); + config.put("foo", "2"); + config.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + Topic topic = new TopicImpl("topic", config); + + ConfigurationKeyValidation validation = new ConfigurationKeyValidation(); + validation.valid(topic); + } + + @Test + public void testOkConfigValues() throws ValidationException { + HashMap config = new HashMap(); + config.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + Topic topic = new TopicImpl("topic", config); + + ConfigurationKeyValidation validation = new ConfigurationKeyValidation(); + validation.valid(topic); + } + + @Test + public void testPartitionsAndReplicationConfigValues() throws ValidationException { + HashMap config = new HashMap(); + config.put("replication.factor", "3"); + Topic topic = new TopicImpl("topic", config); + + ConfigurationKeyValidation validation = new ConfigurationKeyValidation(); + validation.valid(topic); + } +} diff --git a/src/test/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidationTest.java b/src/test/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidationTest.java new file mode 100644 index 000000000..bd90d73b1 --- /dev/null +++ b/src/test/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidationTest.java @@ -0,0 +1,31 @@ +package com.purbon.kafka.topology.validation.topic; + +import com.purbon.kafka.topology.exceptions.ConfigurationException; +import com.purbon.kafka.topology.exceptions.ValidationException; +import com.purbon.kafka.topology.model.Impl.TopicImpl; +import com.purbon.kafka.topology.model.Topic; +import org.junit.Test; + +public class TopicNameRegexValidationTest { + + @Test(expected = ValidationException.class) + public void testKoConfigValues() throws ValidationException, ConfigurationException { + Topic topic = new TopicImpl("topic"); + TopicNameRegexValidation validation = new TopicNameRegexValidation("[1-9]"); + validation.valid(topic); + } + + @Test + public void testOkConfigValues() throws ValidationException, ConfigurationException { + Topic topic = new TopicImpl("topic"); + TopicNameRegexValidation validation = new TopicNameRegexValidation("[a-z]*"); + validation.valid(topic); + } + + @Test(expected = ConfigurationException.class) + public void testEmptyParam() throws ValidationException, ConfigurationException { + Topic topic = new TopicImpl("topic"); + TopicNameRegexValidation validation = new TopicNameRegexValidation(""); + validation.valid(topic); + } +}