diff --git a/src/main/java/com/purbon/kafka/topology/Constants.java b/src/main/java/com/purbon/kafka/topology/Constants.java index 1ba101dce..ebb61e00b 100644 --- a/src/main/java/com/purbon/kafka/topology/Constants.java +++ b/src/main/java/com/purbon/kafka/topology/Constants.java @@ -152,6 +152,18 @@ public class Constants { public static final String TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP = "validations.topic.name.regexp"; + public static final String TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_VALUE = + "validations.replication.factor.value"; + + public static final String TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_OP = + "validations.replication.factor.op"; + + public static final String TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_VALUE = + "validations.partition.number.value"; + + public static final String TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_OP = + "validations.partition.number.op"; + public static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location"; public static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password"; public static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location"; diff --git a/src/main/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidation.java b/src/main/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidation.java new file mode 100644 index 000000000..dddcf6d6a --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidation.java @@ -0,0 +1,29 @@ +package com.purbon.kafka.topology.validation.topic; + +import com.purbon.kafka.topology.exceptions.ValidationException; +import com.purbon.kafka.topology.model.Topic; +import com.purbon.kafka.topology.validation.TopicValidation; + +public class MinInSyncReplicasValidation implements TopicValidation { + + public static final String MIN_INSYNC_REPLICAS = "min.insync.replicas"; + + @Override + public void valid(Topic topic) throws ValidationException { + if (topic.replicationFactor().isPresent() + && !validateMinInsyncReplicas(topic)) { + String msg = + String.format( + "Topic %s has an unexpected min.insync.replicas config vs it's replication factor: %s value", + topic, topic.replicationFactor().get()); + throw new ValidationException(msg); + } + } + + private boolean validateMinInsyncReplicas(Topic topic) { + short replicationFactor = topic.replicationFactor().orElse((short)-1); + String minInSyncReplicas = topic.getConfig().get(MIN_INSYNC_REPLICAS); + return minInSyncReplicas == null || ((Integer.parseInt(minInSyncReplicas) <= replicationFactor - 1) && (Integer.parseInt(minInSyncReplicas) > 1)); + } + +} diff --git a/src/main/java/com/purbon/kafka/topology/validation/topic/PartitionNumberValidation.java b/src/main/java/com/purbon/kafka/topology/validation/topic/PartitionNumberValidation.java index 9bb7a90ed..49f29beef 100644 --- a/src/main/java/com/purbon/kafka/topology/validation/topic/PartitionNumberValidation.java +++ b/src/main/java/com/purbon/kafka/topology/validation/topic/PartitionNumberValidation.java @@ -1,18 +1,100 @@ package com.purbon.kafka.topology.validation.topic; +import static com.purbon.kafka.topology.Constants.TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_OP; +import static com.purbon.kafka.topology.Constants.TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_VALUE; + +import com.purbon.kafka.topology.Configuration; +import com.purbon.kafka.topology.exceptions.ConfigurationException; import com.purbon.kafka.topology.exceptions.ValidationException; import com.purbon.kafka.topology.model.Topic; import com.purbon.kafka.topology.validation.TopicValidation; +import com.typesafe.config.ConfigException; +import java.util.Arrays; +import java.util.List; +import org.apache.logging.log4j.util.Strings; public class PartitionNumberValidation implements TopicValidation { + private final int partitionNumberValue; + private final String partitionNumberOp; + private Configuration config; + + public PartitionNumberValidation(Configuration config) throws ConfigurationException { + this(getPartitionNumber(config), getPartitionNumberOp(config)); + this.config = config; + } + + public PartitionNumberValidation(int partitionNumberValue, String partitionNumberOp) { + this.partitionNumberValue = partitionNumberValue; + this.partitionNumberOp = partitionNumberOp; + } + @Override public void valid(Topic topic) throws ValidationException { - if (topic.getPartitionCount().isPresent() && topic.partitionsCount() < 3) { + if (topic.getPartitionCount().isPresent() && !validatePartitionCount(topic.partitionsCount())) { String msg = String.format( "Topic %s has an invalid number of partitions: %s", topic, topic.partitionsCount()); throw new ValidationException(msg); } } + + private boolean validatePartitionCount(int topicValue) throws ValidationException { + boolean result; + switch (partitionNumberOp) { + case "gt": + result = topicValue > partitionNumberValue; + break; + case "lt": + result = topicValue < partitionNumberValue; + break; + case "eq": + result = topicValue == partitionNumberValue; + break; + case "gte": + result = topicValue >= partitionNumberValue; + break; + case "lte": + result = topicValue <= partitionNumberValue; + break; + case "ne": + result = topicValue != partitionNumberValue; + break; + default: + throw new ValidationException("Invalid Operation code in use " + partitionNumberOp); + } + return result; + } + + private static String getPartitionNumberOp(Configuration config) throws ConfigurationException { + List validOpCodes = Arrays.asList("gt", "lt", "eq", "gte", "lte", "ne"); + try { + String opCode = + config.getProperty(TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_OP).toLowerCase().strip(); + if (!validOpCodes.contains(opCode)) { + throw new ConfigException.BadValue( + TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_OP, "Not supported operation value"); + } + return opCode; + } catch (ConfigException e) { + String msg = + String.format( + "PartitionNumberValidation requires you to define a partition number " + + "comparison op in config '%s' with a supported operations code - %s.", + TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_OP, Strings.join(validOpCodes, ',')); + throw new ConfigurationException(msg); + } + } + + private static int getPartitionNumber(Configuration config) throws ConfigurationException { + try { + return Integer.parseInt(config.getProperty(TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_VALUE)); + } catch (ConfigException e) { + String msg = + String.format( + "PartitionNumberValidation requires you to define a partition number value in config '%s'", + TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_VALUE); + throw new ConfigurationException(msg); + } + } } diff --git a/src/main/java/com/purbon/kafka/topology/validation/topic/ReplicationFactorValidation.java b/src/main/java/com/purbon/kafka/topology/validation/topic/ReplicationFactorValidation.java index 55ab051c1..f4091185c 100644 --- a/src/main/java/com/purbon/kafka/topology/validation/topic/ReplicationFactorValidation.java +++ b/src/main/java/com/purbon/kafka/topology/validation/topic/ReplicationFactorValidation.java @@ -1,14 +1,38 @@ package com.purbon.kafka.topology.validation.topic; +import static com.purbon.kafka.topology.Constants.TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_OP; +import static com.purbon.kafka.topology.Constants.TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_VALUE; + +import com.purbon.kafka.topology.Configuration; +import com.purbon.kafka.topology.exceptions.ConfigurationException; import com.purbon.kafka.topology.exceptions.ValidationException; import com.purbon.kafka.topology.model.Topic; import com.purbon.kafka.topology.validation.TopicValidation; +import com.typesafe.config.ConfigException; +import java.util.Arrays; +import java.util.List; +import org.apache.logging.log4j.util.Strings; public class ReplicationFactorValidation implements TopicValidation { + private final short replicationFactorValue; + private final String replicationFactorOp; + private Configuration config; + + public ReplicationFactorValidation(Configuration config) throws ConfigurationException { + this(getReplicationFactor(config), getReplicationFactorOp(config)); + this.config = config; + } + + public ReplicationFactorValidation(short replicationFactorValue, String replicationFactorOp) { + this.replicationFactorValue = replicationFactorValue; + this.replicationFactorOp = replicationFactorOp; + } + @Override public void valid(Topic topic) throws ValidationException { - if (topic.replicationFactor().isPresent() && topic.replicationFactor().get() != 3) { + if (topic.replicationFactor().isPresent() + && !validateReplicationFactor(topic.replicationFactor().get())) { String msg = String.format( "Topic %s has an unexpected replication factor: %s", @@ -16,4 +40,63 @@ public void valid(Topic topic) throws ValidationException { throw new ValidationException(msg); } } + + private boolean validateReplicationFactor(short topicValue) throws ValidationException { + boolean result; + switch (replicationFactorOp) { + case "gt": + result = topicValue > replicationFactorValue; + break; + case "lt": + result = topicValue < replicationFactorValue; + break; + case "eq": + result = topicValue == replicationFactorValue; + break; + case "gte": + result = topicValue >= replicationFactorValue; + break; + case "lte": + result = topicValue <= replicationFactorValue; + break; + case "ne": + result = topicValue != replicationFactorValue; + break; + default: + throw new ValidationException("Invalid Operation code in use " + replicationFactorOp); + } + return result; + } + + private static String getReplicationFactorOp(Configuration config) throws ConfigurationException { + List validOpCodes = Arrays.asList("gt", "lt", "eq", "gte", "lte", "ne"); + try { + String opCode = + config.getProperty(TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_OP).toLowerCase().strip(); + if (!validOpCodes.contains(opCode)) { + throw new ConfigException.BadValue( + TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_OP, "Not supported operation value"); + } + return opCode; + } catch (ConfigException e) { + String msg = + String.format( + "ReplicationFactorValidation requires you to define a replication factor " + + "comparison op in config '%s' with a supported operations code - %s.", + TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_OP, Strings.join(validOpCodes, ',')); + throw new ConfigurationException(msg); + } + } + + private static short getReplicationFactor(Configuration config) throws ConfigurationException { + try { + return Short.parseShort(config.getProperty(TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_VALUE)); + } catch (ConfigException e) { + String msg = + String.format( + "ReplicationFactorValidation requires you to define a replication factor value in config '%s'", + TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_VALUE); + throw new ConfigurationException(msg); + } + } } diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 0509a52ce..1f1601b3e 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -162,4 +162,14 @@ allow { ksql = ${?ALLOW_DELETE_ARTEFACTS_KSQL} } } +} +validations { + replication.factor { + op = "ne" + } + + partition.number { + op = "gte" + value = 3 + } } \ No newline at end of file diff --git a/src/test/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidationTest.java b/src/test/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidationTest.java new file mode 100644 index 000000000..5da082957 --- /dev/null +++ b/src/test/java/com/purbon/kafka/topology/validation/topic/MinInSyncReplicasValidationTest.java @@ -0,0 +1,59 @@ +package com.purbon.kafka.topology.validation.topic; + +import com.purbon.kafka.topology.exceptions.ConfigurationException; +import com.purbon.kafka.topology.exceptions.ValidationException; +import com.purbon.kafka.topology.model.Topic; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class MinInSyncReplicasValidationTest { + + @Test(expected = ValidationException.class) + public void shouldCheckKoValuesSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "3"); + config.put("min.insync.replicas", "3"); + + Topic topic = new Topic("topic", config); + MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation(); + validation.valid(topic); + } + + @Test(expected = ValidationException.class) + public void shouldCheckMinimalValuesSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "3"); + config.put("min.insync.replicas", "1"); + + Topic topic = new Topic("topic", config); + MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation(); + validation.valid(topic); + } + + @Test + public void shouldCheckOkValuesSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "3"); + config.put("min.insync.replicas", "2"); + + Topic topic = new Topic("topic", config); + MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation(); + validation.valid(topic); + } + + @Test + public void shouldCheckMissingMinInSyncValuesSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "3"); + + Topic topic = new Topic("topic", config); + MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation(); + validation.valid(topic); + } +} diff --git a/src/test/java/com/purbon/kafka/topology/validation/topic/PartitionNumberValidationTest.java b/src/test/java/com/purbon/kafka/topology/validation/topic/PartitionNumberValidationTest.java new file mode 100644 index 000000000..a23033ae6 --- /dev/null +++ b/src/test/java/com/purbon/kafka/topology/validation/topic/PartitionNumberValidationTest.java @@ -0,0 +1,131 @@ +package com.purbon.kafka.topology.validation.topic; + +import com.purbon.kafka.topology.exceptions.ConfigurationException; +import com.purbon.kafka.topology.exceptions.ValidationException; +import com.purbon.kafka.topology.model.Topic; +import java.util.HashMap; +import java.util.Map; +import org.junit.Test; + +public class PartitionNumberValidationTest { + + @Test(expected = ValidationException.class) + public void shouldVerifyDifferentValuesWhenUsingEq() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + PartitionNumberValidation validation = new PartitionNumberValidation((short) 35, "eq"); + validation.valid(topic); + } + + @Test(expected = ValidationException.class) + public void shouldVerifyDifferentValuesWhenUsingGte() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + PartitionNumberValidation validation = new PartitionNumberValidation((short) 124, "gte"); + validation.valid(topic); + } + + @Test(expected = ValidationException.class) + public void shouldVerifyDifferentValuesWhenUsingLte() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + PartitionNumberValidation validation = new PartitionNumberValidation((short) 15, "lte"); + validation.valid(topic); + } + + @Test(expected = ValidationException.class) + public void shouldVerifyDifferentValuesWhenUsingGt() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + PartitionNumberValidation validation = new PartitionNumberValidation((short) 125, "gt"); + validation.valid(topic); + } + + @Test(expected = ValidationException.class) + public void shouldVerifyDifferentValuesWhenUsingLt() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + PartitionNumberValidation validation = new PartitionNumberValidation((short) 33, "lt"); + validation.valid(topic); + } + + @Test + public void shouldVerifyDifferentValuesWhenUsingEqSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + PartitionNumberValidation validation = new PartitionNumberValidation((short) 123, "eq"); + validation.valid(topic); + } + + @Test + public void shouldVerifyDifferentValuesWhenUsingGteSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + PartitionNumberValidation validation = new PartitionNumberValidation((short) 34, "gte"); + validation.valid(topic); + } + + @Test + public void shouldVerifyDifferentValuesWhenUsingLteSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + PartitionNumberValidation validation = new PartitionNumberValidation((short) 123, "lte"); + validation.valid(topic); + } + + @Test + public void shouldVerifyDifferentValuesWhenUsingGtSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + PartitionNumberValidation validation = new PartitionNumberValidation((short) 122, "gt"); + validation.valid(topic); + } + + @Test + public void shouldVerifyDifferentValuesWhenUsingLtSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + PartitionNumberValidation validation = new PartitionNumberValidation((short) 124, "lt"); + validation.valid(topic); + } +} diff --git a/src/test/java/com/purbon/kafka/topology/validation/topic/ReplicationFactorValidationTest.java b/src/test/java/com/purbon/kafka/topology/validation/topic/ReplicationFactorValidationTest.java new file mode 100644 index 000000000..c9faf12cd --- /dev/null +++ b/src/test/java/com/purbon/kafka/topology/validation/topic/ReplicationFactorValidationTest.java @@ -0,0 +1,131 @@ +package com.purbon.kafka.topology.validation.topic; + +import com.purbon.kafka.topology.exceptions.ConfigurationException; +import com.purbon.kafka.topology.exceptions.ValidationException; +import com.purbon.kafka.topology.model.Topic; +import java.util.HashMap; +import java.util.Map; +import org.junit.Test; + +public class ReplicationFactorValidationTest { + + @Test(expected = ValidationException.class) + public void shouldVerifyDifferentValuesWhenUsingEq() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + ReplicationFactorValidation validation = new ReplicationFactorValidation((short) 35, "eq"); + validation.valid(topic); + } + + @Test(expected = ValidationException.class) + public void shouldVerifyDifferentValuesWhenUsingGte() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + ReplicationFactorValidation validation = new ReplicationFactorValidation((short) 35, "gte"); + validation.valid(topic); + } + + @Test(expected = ValidationException.class) + public void shouldVerifyDifferentValuesWhenUsingLte() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + ReplicationFactorValidation validation = new ReplicationFactorValidation((short) 15, "lte"); + validation.valid(topic); + } + + @Test(expected = ValidationException.class) + public void shouldVerifyDifferentValuesWhenUsingGt() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + ReplicationFactorValidation validation = new ReplicationFactorValidation((short) 35, "gt"); + validation.valid(topic); + } + + @Test(expected = ValidationException.class) + public void shouldVerifyDifferentValuesWhenUsingLt() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + ReplicationFactorValidation validation = new ReplicationFactorValidation((short) 33, "lt"); + validation.valid(topic); + } + + @Test + public void shouldVerifyDifferentValuesWhenUsingEqSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + ReplicationFactorValidation validation = new ReplicationFactorValidation((short) 34, "eq"); + validation.valid(topic); + } + + @Test + public void shouldVerifyDifferentValuesWhenUsingGteSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + ReplicationFactorValidation validation = new ReplicationFactorValidation((short) 34, "gte"); + validation.valid(topic); + } + + @Test + public void shouldVerifyDifferentValuesWhenUsingLteSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + ReplicationFactorValidation validation = new ReplicationFactorValidation((short) 34, "lte"); + validation.valid(topic); + } + + @Test + public void shouldVerifyDifferentValuesWhenUsingGtSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + ReplicationFactorValidation validation = new ReplicationFactorValidation((short) 33, "gt"); + validation.valid(topic); + } + + @Test + public void shouldVerifyDifferentValuesWhenUsingLtSuccessfully() + throws ValidationException, ConfigurationException { + Map config = new HashMap<>(); + config.put("replication.factor", "34"); + config.put("num.partitions", "123"); + + Topic topic = new Topic("topic", config); + ReplicationFactorValidation validation = new ReplicationFactorValidation((short) 35, "lt"); + validation.valid(topic); + } +}