From 1cf911cbb2712e93b0ee5fd0f1b31e5dbe96cdd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pere=20Urb=C3=B3n?= Date: Sat, 1 May 2021 17:05:06 +0200 Subject: [PATCH] Add a validate only flag process to facilitate descriptor testing in feature branches and add ConfigurationKeyValidation and TopicNameRegexValidation validations (#274) * add a validate option for the CLI, this would allow only to run validations in the topology. This would be without access to the cluster, if validations are required with the connectivity they would need to use the dry run parameter * add validation for config keys in topics * Add a TopicNameRegexValidation to validate topic names * simplify the code by making the validation a topic one and add test for this module * make the config value a constant for the topic name regexp * docs update Co-authored-by: Leonardo Bonacci remove usage of var --- README.md | 2 + docs/futures/run-validations.rst | 53 ++++++++++++++ docs/how-to-run-it.rst | 3 + .../topology-builder-sasl-plain.properties | 2 + .../kafka/topology/CommandLineInterface.java | 17 ++++- .../purbon/kafka/topology/Configuration.java | 5 ++ .../com/purbon/kafka/topology/Constants.java | 3 + .../com/purbon/kafka/topology/JulieOps.java | 3 + .../TopologyBuilderAdminClientBuilder.java | 2 +- .../topic/ConfigurationKeyValidation.java | 50 +++++++++++++ .../topic/TopicNameRegexValidation.java | 71 +++++++++++++++++++ .../com/purbon/kafka/topology/CLITest.java | 2 + .../topic/ConfigurationKeyValidationTest.java | 42 +++++++++++ .../topic/TopicNameRegexValidationTest.java | 31 ++++++++ 14 files changed, 284 insertions(+), 2 deletions(-) create mode 100644 docs/futures/run-validations.rst create mode 100644 src/main/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidation.java create mode 100644 src/main/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidation.java create mode 100644 src/test/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidationTest.java create mode 100644 src/test/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidationTest.java diff --git a/README.md b/README.md index 31242030d..4e95d1a8a 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,8 @@ usage: cli --help Prints usage information. --quiet Print minimum status update --topology Topology config file. + --validate Only run configured validations in + your topology --version Prints useful version information. ``` diff --git a/docs/futures/run-validations.rst b/docs/futures/run-validations.rst new file mode 100644 index 000000000..e73eac380 --- /dev/null +++ b/docs/futures/run-validations.rst @@ -0,0 +1,53 @@ +Validate your topologies +******************************* + +A normal practise in many *gitops* deployments is to run a set of automated validations before allowing the changes in. +JulieOps allows the users to run a variable set of validations before the project will apply the changes into each of the managed components. + +Validate a topology in a feature branch +----------- + +As a user you can use the *--validate* CLI option to only validate the incoming topology. Note this would run a validation completely offline, +without any knowledge of the current state in the cluster. + +To configure which validations you require for your topology the reader would need to do it in the configuration file, this can be done like this: + +.. code-block:: bash + + topology.validations.0=com.purbon.kafka.topology.validation.topic.ConfigurationKeyValidation + topology.validations.1=com.purbon.kafka.topology.validation.topic.TopicNameRegexValidation + topology.validations.topic.name.regexp="[a-z0-9]" + +In the previous example we have configured two validations. + +1.- ConfigurationKeyValidation will make sure all config keys are valid for Kafka. +2.- Will validate, based on the configured regexp that all topic names follow the right pattern. + +All detected errors will be reported in a single outcome like this: + +.. code-block:: bash + + ... + Exception in thread "main" com.purbon.kafka.topology.exceptions.ValidationException: Topology name does not follow the camelCase format: context + Topic context.company.env.source.projectA.foo has an invalid number of partitions: 1 + Topic context.company.env.source.projectA.bar.avro has an invalid number of partitions: 1 + Topic context.company.env.source.projectB.bar.avro has an invalid number of partitions: 1 + Topic context.company.env.source.projectC.topicE has an invalid number of partitions: 1 + Topic context.company.env.source.projectC.topicF has an invalid number of partitions: 1 + at com.purbon.kafka.topology.JulieOps.build(JulieOps.java:125) + at com.purbon.kafka.topology.JulieOps.build(JulieOps.java:75) + at com.purbon.kafka.topology.CommandLineInterface.processTopology(CommandLineInterface.java:206) + at com.purbon.kafka.topology.CommandLineInterface.run(CommandLineInterface.java:156) + at com.purbon.kafka.topology.CommandLineInterface.main(CommandLineInterface.java:146) + +Add your own validations +----------- + +JulieOps provides you with a set of integrated validations, however you as user can provide your own. To do so you will need to: + +* Code your validation following the required interfaces as defined in the JulieOps project. See core validations to see the current pattern. +* Build a jar with your validations. +* Run JulieOps with a configured CLASSPATH where the JVM can find access to your validations jar in order to dynamically load them. +Remember when running JulieOps you can use the _JULIE_OPS_OPTIONS_ env variable to pass custom system configurations such as CLASSPATH or related to security. + +**NOTE**: The UberJar is for now only available from the release page, in future releases we will facilitate a smaller plugin jar. \ No newline at end of file diff --git a/docs/how-to-run-it.rst b/docs/how-to-run-it.rst index d8801785a..d06f88d8d 100644 --- a/docs/how-to-run-it.rst +++ b/docs/how-to-run-it.rst @@ -17,6 +17,7 @@ If you are using the CLI tool, you an use the *--help* command to list the diffe $> julie-ops-cli.sh --help usage: cli +<<<<<<< HEAD --allowDelete Permits delete operations for topics and configs. (deprecated, to be removed) @@ -31,6 +32,8 @@ If you are using the CLI tool, you an use the *--help* command to list the diffe --plans File describing the predefined plans --quiet Print minimum status update --topology Topology config file. + --validate Only run configured validations in + your topology --version Prints useful version information. The most important ones are: diff --git a/example/topology-builder-sasl-plain.properties b/example/topology-builder-sasl-plain.properties index 694a58074..96a7ac9e4 100644 --- a/example/topology-builder-sasl-plain.properties +++ b/example/topology-builder-sasl-plain.properties @@ -3,4 +3,6 @@ sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="kafka" \ password="kafka"; +#topology.validations.0=com.purbon.kafka.topology.validation.topology.CamelCaseNameFormatValidation +#topology.validations.1=com.purbon.kafka.topology.validation.topic.PartitionNumberValidation #confluent.schema.registry.url="http://localhost:8082" \ No newline at end of file diff --git a/src/main/java/com/purbon/kafka/topology/CommandLineInterface.java b/src/main/java/com/purbon/kafka/topology/CommandLineInterface.java index fa34e1976..9c3996b79 100644 --- a/src/main/java/com/purbon/kafka/topology/CommandLineInterface.java +++ b/src/main/java/com/purbon/kafka/topology/CommandLineInterface.java @@ -40,6 +40,9 @@ public class CommandLineInterface { public static final String QUIET_OPTION = "quiet"; public static final String QUIET_DESC = "Print minimum status update"; + public static final String VALIDATE_OPTION = "validate"; + public static final String VALIDATE_DESC = "Only run configured validations in your topology"; + public static final String HELP_OPTION = "help"; public static final String HELP_DESC = "Prints usage information."; @@ -114,6 +117,14 @@ private Options buildOptions() { .required(false) .build(); + final Option validateOption = + Option.builder() + .longOpt(VALIDATE_OPTION) + .hasArg(false) + .desc(VALIDATE_DESC) + .required(false) + .build(); + final Option versionOption = Option.builder() .longOpt(VERSION_OPTION) @@ -136,6 +147,7 @@ private Options buildOptions() { options.addOption(allowDeleteOption); options.addOption(dryRunOption); options.addOption(quietOption); + options.addOption(validateOption); options.addOption(versionOption); options.addOption(helpOption); @@ -156,7 +168,9 @@ public void run(String[] args) throws Exception { processTopology( cmd.getOptionValue(TOPOLOGY_OPTION), cmd.getOptionValue(PLANS_OPTION, "default"), config); - System.out.println("Kafka Topology updated"); + if (!cmd.hasOption(DRY_RUN_OPTION) && !cmd.hasOption(VALIDATE_OPTION)) { + System.out.println("Kafka Topology updated"); + } } private Map parseConfig(CommandLine cmd) { @@ -174,6 +188,7 @@ private Map parseConfig(CommandLine cmd) { } config.put(DRY_RUN_OPTION, String.valueOf(cmd.hasOption(DRY_RUN_OPTION))); config.put(QUIET_OPTION, String.valueOf(cmd.hasOption(QUIET_OPTION))); + config.put(VALIDATE_OPTION, String.valueOf(cmd.hasOption(VALIDATE_OPTION))); config.put( OVERRIDING_CLIENT_CONFIG_OPTION, cmd.getOptionValue(OVERRIDING_CLIENT_CONFIG_OPTION)); config.put(CLIENT_CONFIG_OPTION, cmd.getOptionValue(CLIENT_CONFIG_OPTION)); diff --git a/src/main/java/com/purbon/kafka/topology/Configuration.java b/src/main/java/com/purbon/kafka/topology/Configuration.java index 8c2d0d3b9..7048f8e33 100644 --- a/src/main/java/com/purbon/kafka/topology/Configuration.java +++ b/src/main/java/com/purbon/kafka/topology/Configuration.java @@ -285,6 +285,11 @@ public boolean isQuiet() { return Boolean.parseBoolean(cliParams.getOrDefault(CommandLineInterface.QUIET_OPTION, "false")); } + public boolean doValidate() { + return Boolean.parseBoolean( + cliParams.getOrDefault(CommandLineInterface.VALIDATE_OPTION, "false")); + } + public boolean isDryRun() { return Boolean.parseBoolean(cliParams.getOrDefault(DRY_RUN_OPTION, "false")); } diff --git a/src/main/java/com/purbon/kafka/topology/Constants.java b/src/main/java/com/purbon/kafka/topology/Constants.java index 0c77c1b3e..07e0f3d7e 100644 --- a/src/main/java/com/purbon/kafka/topology/Constants.java +++ b/src/main/java/com/purbon/kafka/topology/Constants.java @@ -88,4 +88,7 @@ public class Constants { public static final String JULIE_S3_BUCKET = "julie.s3.bucket"; public static final String JULIE_GCP_PROJECT_ID = "julie.gcp.project.id"; public static final String JULIE_GCP_BUCKET = "julie.gcp.bucket"; + + public static final String TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP = + "topology.validations.topic.name.regexp"; } diff --git a/src/main/java/com/purbon/kafka/topology/JulieOps.java b/src/main/java/com/purbon/kafka/topology/JulieOps.java index 9d2b0834e..03269cc83 100644 --- a/src/main/java/com/purbon/kafka/topology/JulieOps.java +++ b/src/main/java/com/purbon/kafka/topology/JulieOps.java @@ -191,6 +191,9 @@ void run(ExecutionPlan plan) throws IOException { } public void run() throws IOException { + if (config.doValidate()) { + return; + } BackendController cs = buildBackendController(config); ExecutionPlan plan = ExecutionPlan.init(cs, outputStream); run(plan); diff --git a/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClientBuilder.java b/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClientBuilder.java index 5965a63e1..a4e0b25a4 100644 --- a/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClientBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/api/adminclient/TopologyBuilderAdminClientBuilder.java @@ -26,7 +26,7 @@ public TopologyBuilderAdminClient build() throws IOException { "Connecting AdminClient to %s", props.getProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG))); TopologyBuilderAdminClient client = new TopologyBuilderAdminClient(AdminClient.create(props)); - if (!config.isDryRun()) { + if (!config.isDryRun() && !config.doValidate()) { client.healthCheck(); } return client; diff --git a/src/main/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidation.java b/src/main/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidation.java new file mode 100644 index 000000000..c2fc5e1b2 --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidation.java @@ -0,0 +1,50 @@ +package com.purbon.kafka.topology.validation.topic; + +import com.purbon.kafka.topology.exceptions.ValidationException; +import com.purbon.kafka.topology.model.Impl.TopicImpl; +import com.purbon.kafka.topology.model.Topic; +import com.purbon.kafka.topology.validation.TopicValidation; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Map; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * This validation checks that all topic configs are valid ones according to the TopicConfig class. + */ +public class ConfigurationKeyValidation implements TopicValidation { + + private static final Logger LOGGER = LogManager.getLogger(ConfigurationKeyValidation.class); + + @Override + public void valid(Topic topic) throws ValidationException { + Field[] fields = TopicConfig.class.getDeclaredFields(); + TopicConfig config = new TopicConfig(); + Map topicConfig = getTopicConfig(topic); + for (Map.Entry entry : topicConfig.entrySet()) { + boolean match = + Arrays.stream(fields) + .anyMatch( + field -> { + try { + return ((String) field.get(config)).contains(entry.getKey()); + } catch (IllegalAccessException e) { + LOGGER.error(e); + return false; + } + }); + if (!match) { + String msg = + String.format("Topic %s has an invalid configuration value: %s", topic, entry.getKey()); + throw new ValidationException(msg); + } + } + } + + private Map getTopicConfig(Topic topic) { + Topic clonedTopic = ((TopicImpl) topic).clone(); + return clonedTopic.getRawConfig(); + } +} diff --git a/src/main/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidation.java b/src/main/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidation.java new file mode 100644 index 000000000..75c3ce505 --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidation.java @@ -0,0 +1,71 @@ +package com.purbon.kafka.topology.validation.topic; + +import static com.purbon.kafka.topology.Constants.TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP; + +import com.purbon.kafka.topology.exceptions.ConfigurationException; +import com.purbon.kafka.topology.exceptions.ValidationException; +import com.purbon.kafka.topology.model.Topic; +import com.purbon.kafka.topology.validation.TopicValidation; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigException; +import com.typesafe.config.ConfigFactory; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class TopicNameRegexValidation implements TopicValidation { + + private static final Logger LOGGER = LogManager.getLogger(TopicNameRegexValidation.class); + + private String topicNamePattern; + + public TopicNameRegexValidation() throws ConfigurationException { + this(getTopicNamePatternFromConfig()); + } + + public TopicNameRegexValidation(String pattern) throws ConfigurationException { + validateRegexpPattern(pattern); + + this.topicNamePattern = pattern; + } + + @Override + public void valid(Topic topic) throws ValidationException { + LOGGER.trace(String.format("Applying Topic Name Regex Validation [%s]", topicNamePattern)); + + if (!topic.getName().matches(topicNamePattern)) { + String msg = + String.format("Topic name '%s' does not follow regex: %s", topic, topicNamePattern); + throw new ValidationException(msg); + } + } + + private static String getTopicNamePatternFromConfig() throws ConfigurationException { + Config config = ConfigFactory.load(); + try { + return config.getString(TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP); + } catch (ConfigException e) { + String msg = + String.format( + "TopicNameRegexValidation requires you to define your regex in config '%s'", + TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP); + throw new ConfigurationException(msg); + } + } + + private void validateRegexpPattern(String pattern) throws ConfigurationException { + if (StringUtils.isBlank(pattern)) { + throw new ConfigurationException( + "TopicNameRegexValidation is configured without specifying a topic name pattern. Use config 'topology.validations.regexp'"); + } + + try { + Pattern.compile(pattern); + } catch (PatternSyntaxException exception) { + throw new ConfigurationException( + String.format("TopicNameRegexValidation configured with unvalid regex '%s'", pattern)); + } + } +} diff --git a/src/test/java/com/purbon/kafka/topology/CLITest.java b/src/test/java/com/purbon/kafka/topology/CLITest.java index 071bb3bb3..7aea92cd4 100644 --- a/src/test/java/com/purbon/kafka/topology/CLITest.java +++ b/src/test/java/com/purbon/kafka/topology/CLITest.java @@ -43,6 +43,7 @@ public void testParamPassing() throws Exception { config.put(ALLOW_DELETE_OPTION, "false"); config.put(DRY_RUN_OPTION, "false"); config.put(QUIET_OPTION, "false"); + config.put(VALIDATE_OPTION, "false"); config.put(CLIENT_CONFIG_OPTION, "topology-builder-sasl-plain.properties"); config.put(OVERRIDING_CLIENT_CONFIG_OPTION, null); cli.run(args); @@ -67,6 +68,7 @@ public void testDryRun() throws Exception { config.put(ALLOW_DELETE_OPTION, "false"); config.put(DRY_RUN_OPTION, "true"); config.put(QUIET_OPTION, "false"); + config.put(VALIDATE_OPTION, "false"); config.put(CLIENT_CONFIG_OPTION, "topology-builder-sasl-plain.properties"); config.put(OVERRIDING_CLIENT_CONFIG_OPTION, null); cli.run(args); diff --git a/src/test/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidationTest.java b/src/test/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidationTest.java new file mode 100644 index 000000000..2d4e249a7 --- /dev/null +++ b/src/test/java/com/purbon/kafka/topology/validation/topic/ConfigurationKeyValidationTest.java @@ -0,0 +1,42 @@ +package com.purbon.kafka.topology.validation.topic; + +import com.purbon.kafka.topology.exceptions.ValidationException; +import com.purbon.kafka.topology.model.Impl.TopicImpl; +import com.purbon.kafka.topology.model.Topic; +import java.util.HashMap; +import org.apache.kafka.common.config.TopicConfig; +import org.junit.Test; + +public class ConfigurationKeyValidationTest { + + @Test(expected = ValidationException.class) + public void testKoConfigValues() throws ValidationException { + HashMap config = new HashMap(); + config.put("foo", "2"); + config.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + Topic topic = new TopicImpl("topic", config); + + ConfigurationKeyValidation validation = new ConfigurationKeyValidation(); + validation.valid(topic); + } + + @Test + public void testOkConfigValues() throws ValidationException { + HashMap config = new HashMap(); + config.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + Topic topic = new TopicImpl("topic", config); + + ConfigurationKeyValidation validation = new ConfigurationKeyValidation(); + validation.valid(topic); + } + + @Test + public void testPartitionsAndReplicationConfigValues() throws ValidationException { + HashMap config = new HashMap(); + config.put("replication.factor", "3"); + Topic topic = new TopicImpl("topic", config); + + ConfigurationKeyValidation validation = new ConfigurationKeyValidation(); + validation.valid(topic); + } +} diff --git a/src/test/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidationTest.java b/src/test/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidationTest.java new file mode 100644 index 000000000..bd90d73b1 --- /dev/null +++ b/src/test/java/com/purbon/kafka/topology/validation/topic/TopicNameRegexValidationTest.java @@ -0,0 +1,31 @@ +package com.purbon.kafka.topology.validation.topic; + +import com.purbon.kafka.topology.exceptions.ConfigurationException; +import com.purbon.kafka.topology.exceptions.ValidationException; +import com.purbon.kafka.topology.model.Impl.TopicImpl; +import com.purbon.kafka.topology.model.Topic; +import org.junit.Test; + +public class TopicNameRegexValidationTest { + + @Test(expected = ValidationException.class) + public void testKoConfigValues() throws ValidationException, ConfigurationException { + Topic topic = new TopicImpl("topic"); + TopicNameRegexValidation validation = new TopicNameRegexValidation("[1-9]"); + validation.valid(topic); + } + + @Test + public void testOkConfigValues() throws ValidationException, ConfigurationException { + Topic topic = new TopicImpl("topic"); + TopicNameRegexValidation validation = new TopicNameRegexValidation("[a-z]*"); + validation.valid(topic); + } + + @Test(expected = ConfigurationException.class) + public void testEmptyParam() throws ValidationException, ConfigurationException { + Topic topic = new TopicImpl("topic"); + TopicNameRegexValidation validation = new TopicNameRegexValidation(""); + validation.valid(topic); + } +}