Skip to content

Commit

Permalink
Add extra validations for topics (#508)
Browse files Browse the repository at this point in the history
* make replicationFactor validation parametizased using configs values for operation and expected value

* make nums partitions validation parametizased using configs values for operation and expected value

* add a min in sync values standard validation
  • Loading branch information
purbon committed Jul 28, 2022
1 parent bea4496 commit 1a650e5
Show file tree
Hide file tree
Showing 8 changed files with 539 additions and 2 deletions.
12 changes: 12 additions & 0 deletions src/main/java/com/purbon/kafka/topology/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ public class Constants {
public static final String TOPOLOGY_VALIDATIONS_TOPIC_NAME_REGEXP =
"validations.topic.name.regexp";

public static final String TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_VALUE =
"validations.replication.factor.value";

public static final String TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_OP =
"validations.replication.factor.op";

public static final String TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_VALUE =
"validations.partition.number.value";

public static final String TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_OP =
"validations.partition.number.op";

public static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location";
public static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password";
public static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.purbon.kafka.topology.validation.topic;

import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.validation.TopicValidation;

public class MinInSyncReplicasValidation implements TopicValidation {

public static final String MIN_INSYNC_REPLICAS = "min.insync.replicas";

@Override
public void valid(Topic topic) throws ValidationException {
if (topic.replicationFactor().isPresent()
&& !validateMinInsyncReplicas(topic)) {
String msg =
String.format(
"Topic %s has an unexpected min.insync.replicas config vs it's replication factor: %s value",
topic, topic.replicationFactor().get());
throw new ValidationException(msg);
}
}

private boolean validateMinInsyncReplicas(Topic topic) {
short replicationFactor = topic.replicationFactor().orElse((short)-1);
String minInSyncReplicas = topic.getConfig().get(MIN_INSYNC_REPLICAS);
return minInSyncReplicas == null || ((Integer.parseInt(minInSyncReplicas) <= replicationFactor - 1) && (Integer.parseInt(minInSyncReplicas) > 1));
}

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,100 @@
package com.purbon.kafka.topology.validation.topic;

import static com.purbon.kafka.topology.Constants.TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_OP;
import static com.purbon.kafka.topology.Constants.TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_VALUE;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.exceptions.ConfigurationException;
import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.validation.TopicValidation;
import com.typesafe.config.ConfigException;
import java.util.Arrays;
import java.util.List;
import org.apache.logging.log4j.util.Strings;

public class PartitionNumberValidation implements TopicValidation {

private final int partitionNumberValue;
private final String partitionNumberOp;
private Configuration config;

public PartitionNumberValidation(Configuration config) throws ConfigurationException {
this(getPartitionNumber(config), getPartitionNumberOp(config));
this.config = config;
}

public PartitionNumberValidation(int partitionNumberValue, String partitionNumberOp) {
this.partitionNumberValue = partitionNumberValue;
this.partitionNumberOp = partitionNumberOp;
}

@Override
public void valid(Topic topic) throws ValidationException {
if (topic.getPartitionCount().isPresent() && topic.partitionsCount() < 3) {
if (topic.getPartitionCount().isPresent() && !validatePartitionCount(topic.partitionsCount())) {
String msg =
String.format(
"Topic %s has an invalid number of partitions: %s", topic, topic.partitionsCount());
throw new ValidationException(msg);
}
}

private boolean validatePartitionCount(int topicValue) throws ValidationException {
boolean result;
switch (partitionNumberOp) {
case "gt":
result = topicValue > partitionNumberValue;
break;
case "lt":
result = topicValue < partitionNumberValue;
break;
case "eq":
result = topicValue == partitionNumberValue;
break;
case "gte":
result = topicValue >= partitionNumberValue;
break;
case "lte":
result = topicValue <= partitionNumberValue;
break;
case "ne":
result = topicValue != partitionNumberValue;
break;
default:
throw new ValidationException("Invalid Operation code in use " + partitionNumberOp);
}
return result;
}

private static String getPartitionNumberOp(Configuration config) throws ConfigurationException {
List<String> validOpCodes = Arrays.asList("gt", "lt", "eq", "gte", "lte", "ne");
try {
String opCode =
config.getProperty(TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_OP).toLowerCase().strip();
if (!validOpCodes.contains(opCode)) {
throw new ConfigException.BadValue(
TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_OP, "Not supported operation value");
}
return opCode;
} catch (ConfigException e) {
String msg =
String.format(
"PartitionNumberValidation requires you to define a partition number "
+ "comparison op in config '%s' with a supported operations code - %s.",
TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_OP, Strings.join(validOpCodes, ','));
throw new ConfigurationException(msg);
}
}

private static int getPartitionNumber(Configuration config) throws ConfigurationException {
try {
return Integer.parseInt(config.getProperty(TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_VALUE));
} catch (ConfigException e) {
String msg =
String.format(
"PartitionNumberValidation requires you to define a partition number value in config '%s'",
TOPOLOGY_VALIDATIONS_PARTITION_NUMBER_VALUE);
throw new ConfigurationException(msg);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,102 @@
package com.purbon.kafka.topology.validation.topic;

import static com.purbon.kafka.topology.Constants.TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_OP;
import static com.purbon.kafka.topology.Constants.TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_VALUE;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.exceptions.ConfigurationException;
import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.validation.TopicValidation;
import com.typesafe.config.ConfigException;
import java.util.Arrays;
import java.util.List;
import org.apache.logging.log4j.util.Strings;

public class ReplicationFactorValidation implements TopicValidation {

private final short replicationFactorValue;
private final String replicationFactorOp;
private Configuration config;

public ReplicationFactorValidation(Configuration config) throws ConfigurationException {
this(getReplicationFactor(config), getReplicationFactorOp(config));
this.config = config;
}

public ReplicationFactorValidation(short replicationFactorValue, String replicationFactorOp) {
this.replicationFactorValue = replicationFactorValue;
this.replicationFactorOp = replicationFactorOp;
}

@Override
public void valid(Topic topic) throws ValidationException {
if (topic.replicationFactor().isPresent() && topic.replicationFactor().get() != 3) {
if (topic.replicationFactor().isPresent()
&& !validateReplicationFactor(topic.replicationFactor().get())) {
String msg =
String.format(
"Topic %s has an unexpected replication factor: %s",
topic, topic.replicationFactor().get());
throw new ValidationException(msg);
}
}

private boolean validateReplicationFactor(short topicValue) throws ValidationException {
boolean result;
switch (replicationFactorOp) {
case "gt":
result = topicValue > replicationFactorValue;
break;
case "lt":
result = topicValue < replicationFactorValue;
break;
case "eq":
result = topicValue == replicationFactorValue;
break;
case "gte":
result = topicValue >= replicationFactorValue;
break;
case "lte":
result = topicValue <= replicationFactorValue;
break;
case "ne":
result = topicValue != replicationFactorValue;
break;
default:
throw new ValidationException("Invalid Operation code in use " + replicationFactorOp);
}
return result;
}

private static String getReplicationFactorOp(Configuration config) throws ConfigurationException {
List<String> validOpCodes = Arrays.asList("gt", "lt", "eq", "gte", "lte", "ne");
try {
String opCode =
config.getProperty(TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_OP).toLowerCase().strip();
if (!validOpCodes.contains(opCode)) {
throw new ConfigException.BadValue(
TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_OP, "Not supported operation value");
}
return opCode;
} catch (ConfigException e) {
String msg =
String.format(
"ReplicationFactorValidation requires you to define a replication factor "
+ "comparison op in config '%s' with a supported operations code - %s.",
TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_OP, Strings.join(validOpCodes, ','));
throw new ConfigurationException(msg);
}
}

private static short getReplicationFactor(Configuration config) throws ConfigurationException {
try {
return Short.parseShort(config.getProperty(TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_VALUE));
} catch (ConfigException e) {
String msg =
String.format(
"ReplicationFactorValidation requires you to define a replication factor value in config '%s'",
TOPOLOGY_VALIDATIONS_REPLICATION_FACTOR_VALUE);
throw new ConfigurationException(msg);
}
}
}
10 changes: 10 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,14 @@ allow {
ksql = ${?ALLOW_DELETE_ARTEFACTS_KSQL}
}
}
}
validations {
replication.factor {
op = "ne"
}

partition.number {
op = "gte"
value = 3
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.purbon.kafka.topology.validation.topic;

import com.purbon.kafka.topology.exceptions.ConfigurationException;
import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Topic;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class MinInSyncReplicasValidationTest {

@Test(expected = ValidationException.class)
public void shouldCheckKoValuesSuccessfully()
throws ValidationException, ConfigurationException {
Map<String, String> config = new HashMap<>();
config.put("replication.factor", "3");
config.put("min.insync.replicas", "3");

Topic topic = new Topic("topic", config);
MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation();
validation.valid(topic);
}

@Test(expected = ValidationException.class)
public void shouldCheckMinimalValuesSuccessfully()
throws ValidationException, ConfigurationException {
Map<String, String> config = new HashMap<>();
config.put("replication.factor", "3");
config.put("min.insync.replicas", "1");

Topic topic = new Topic("topic", config);
MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation();
validation.valid(topic);
}

@Test
public void shouldCheckOkValuesSuccessfully()
throws ValidationException, ConfigurationException {
Map<String, String> config = new HashMap<>();
config.put("replication.factor", "3");
config.put("min.insync.replicas", "2");

Topic topic = new Topic("topic", config);
MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation();
validation.valid(topic);
}

@Test
public void shouldCheckMissingMinInSyncValuesSuccessfully()
throws ValidationException, ConfigurationException {
Map<String, String> config = new HashMap<>();
config.put("replication.factor", "3");

Topic topic = new Topic("topic", config);
MinInSyncReplicasValidation validation = new MinInSyncReplicasValidation();
validation.valid(topic);
}
}
Loading

0 comments on commit 1a650e5

Please sign in to comment.