Skip to content

Commit

Permalink
Fix config passing for topology validator for regular expressions (#443)
Browse files Browse the repository at this point in the history
* fix config passing for topology validator for regular expressions

* ammend docs
  • Loading branch information
purbon committed Feb 5, 2022
1 parent d8bd325 commit 927ffd8
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/futures/run-validations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ To configure which validations you require for your topology the reader would ne
topology.validations.0=com.purbon.kafka.topology.validation.topic.ConfigurationKeyValidation
topology.validations.1=com.purbon.kafka.topology.validation.topic.TopicNameRegexValidation
topology.validations.topic.name.regexp="[a-z0-9]"
validations.topic.name.regexp="[a-z0-9]"
In the previous example we have configured two validations.

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/purbon/kafka/topology/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public class Constants {
"julie.multiple.context.per.dir.enabled";

public static final String TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP =
"topology.validations.topic.name.regexp";
"validations.topic.name.regexp";

public static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location";
public static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ private List<Validation> validations() {
validationClass));
}

Constructor<?> constructor = clazz.getConstructor();
Object instance = constructor.newInstance();
Constructor<?> constructor = clazz.getConstructor(Configuration.class);
Object instance = constructor.newInstance(config);
if (instance instanceof TopologyValidation) {
return (TopologyValidation) instance;
} else if (instance instanceof TopicValidation) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,43 @@
package com.purbon.kafka.topology.validation.topic;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.validation.TopicValidation;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Map;

import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* This validation checks that all topic configs are valid ones according to the TopicConfig class.
*/
@NoArgsConstructor
@RequiredArgsConstructor
public class ConfigurationKeyValidation implements TopicValidation {

private static final Logger LOGGER = LogManager.getLogger(ConfigurationKeyValidation.class);

@NonNull private Configuration config;

@Override
public void valid(Topic topic) throws ValidationException {
Field[] fields = TopicConfig.class.getDeclaredFields();
TopicConfig config = new TopicConfig();
Map<String, String> topicConfig = getTopicConfig(topic);
for (Map.Entry<String, String> entry : topicConfig.entrySet()) {
TopicConfig topicConfig = new TopicConfig();
Map<String, String> topicConfigMap = getTopicConfig(topic);
for (Map.Entry<String, String> entry : topicConfigMap.entrySet()) {
boolean match =
Arrays.stream(fields)
.anyMatch(
field -> {
try {
return ((String) field.get(config)).contains(entry.getKey());
return ((String) field.get(topicConfig)).contains(entry.getKey());
} catch (IllegalAccessException e) {
LOGGER.error(e);
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
package com.purbon.kafka.topology.validation.topic;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.validation.TopicValidation;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

@NoArgsConstructor
@RequiredArgsConstructor
public class PartitionNumberValidation implements TopicValidation {

@NonNull
private Configuration config;

@Override
public void valid(Topic topic) throws ValidationException {
if (topic.getPartitionCount().isPresent() && topic.partitionsCount() < 3) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
package com.purbon.kafka.topology.validation.topic;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.validation.TopicValidation;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

@NoArgsConstructor
@RequiredArgsConstructor
public class ReplicationFactorValidation implements TopicValidation {

@NonNull
private Configuration config;

@Override
public void valid(Topic topic) throws ValidationException {
if (topic.replicationFactor().isPresent() && topic.replicationFactor().get() != 3) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.purbon.kafka.topology.Constants.TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.exceptions.ConfigurationException;
import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Topic;
Expand All @@ -11,6 +12,9 @@
import com.typesafe.config.ConfigFactory;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -20,9 +24,11 @@ public class TopicNameRegexValidation implements TopicValidation {
private static final Logger LOGGER = LogManager.getLogger(TopicNameRegexValidation.class);

private String topicNamePattern;
private Configuration config;

public TopicNameRegexValidation() throws ConfigurationException {
this(getTopicNamePatternFromConfig());
public TopicNameRegexValidation(Configuration config) throws ConfigurationException {
this(getTopicNamePatternFromConfig(config));
this.config = config;
}

public TopicNameRegexValidation(String pattern) throws ConfigurationException {
Expand All @@ -42,10 +48,9 @@ public void valid(Topic topic) throws ValidationException {
}
}

private static String getTopicNamePatternFromConfig() throws ConfigurationException {
Config config = ConfigFactory.load();
private static String getTopicNamePatternFromConfig(Configuration config) throws ConfigurationException {
try {
return config.getString(TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP);
return config.getProperty(TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP);
} catch (ConfigException e) {
String msg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package com.purbon.kafka.topology.validation.topology;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Project;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.validation.TopologyValidation;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

@NoArgsConstructor
@RequiredArgsConstructor
public class CamelCaseNameFormatValidation implements TopologyValidation {

private String camelCasePattern = "([a-z]+[A-Z]+\\w+)+";
@NonNull private Configuration config;

@Override
public void valid(Topology topology) throws ValidationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ public void testInvalidExecutionWithFailedValidation() {
.isEqualTo("Topic contextOrg.source.baz.topicF has an invalid number of partitions: 1");
}

@Test
public void regexpValidationShouldFindPatterns() {
Topology topology = parser.deserialise(TestUtils.getResourceFile("/descriptor.yaml"));

Configuration config =
createTopologyBuilderConfig(
"com.purbon.kafka.topology.validation.topology.CamelCaseNameFormatValidation",
"com.purbon.kafka.topology.validation.topic.TopicNameRegexValidation");

TopologyValidator validator = new TopologyValidator(config);
List<String> results = validator.validate(topology);
assertThat(results).hasSize(1);
}

@Test(expected = IllegalStateException.class)
public void testUsingDeprecatedValidationsConfig() {

Expand Down Expand Up @@ -106,6 +120,7 @@ private Configuration createTopologyBuilderConfig(String... validations) {

Properties props = new Properties();
props.put(TOPOLOGY_VALIDATIONS_CONFIG, Arrays.asList(validations));
props.put(TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP, "[a-zA-Z0-9.-]*");
return new Configuration(cliOps, props);
}
}

0 comments on commit 927ffd8

Please sign in to comment.