diff --git a/README.md b/README.md index efb7eb11..44d17800 100644 --- a/README.md +++ b/README.md @@ -731,7 +731,7 @@ Success RoleBinding/test-role-group1 (created) From now on, members of the group ``group1/test-ops`` (either Gitlab, LDAP or OIDC groups) can use ns4kafka to manage topics starting with `test.` on the `local` Kafka cluster. - But wait ! That's not enough. Now you should only let them create Topics successfully if and only if their configuration is aligned with your strategy ! Let's add Validators ! +But wait ! That's not enough. Now you should only let them create Topics successfully if and only if their configuration is aligned with your strategy ! Let's add Validators ! ````yaml # namespace.yml @@ -775,3 +775,27 @@ user@local:/home/user$ kafkactl apply -f namespace.yml Success Namespace/test (changed) ```` +## Quota management + +It is possible to define quotas on a namespace. Ideal for clusters with limited resources! + +A namespace with quotas will not be able to exceed the limits enforced by the quotas. + +```yaml +apiVersion: v1 +kind: ResourceQuota +metadata: + namespace: test + name: test-rq1 +spec: + count/topics: 10 + count/partitions: 60 + count/connectors: 5 + disk/topics: 500MiB +``` + +- **count/topics**: maximum number of deployable topics +- **count/partitions**: maximum number of deployable partitions +- **count/connectors**: maximum number of deployable connectors +- **disk/topics**: maximum size of all topics. Computed from the sum of _retention.bytes_ * _number of partitions_ of all topics. +Unit of measure accepted is byte (B), kibibyte (KiB), mebibyte (MiB), gibibyte (GiB) diff --git a/api/src/main/java/com/michelin/ns4kafka/controllers/ResourceQuotaController.java b/api/src/main/java/com/michelin/ns4kafka/controllers/ResourceQuotaController.java index 9cc8d33d..418a5932 100644 --- a/api/src/main/java/com/michelin/ns4kafka/controllers/ResourceQuotaController.java +++ b/api/src/main/java/com/michelin/ns4kafka/controllers/ResourceQuotaController.java @@ -70,7 +70,7 @@ HttpResponse apply(String namespace, @Body @Valid ResourceQuota q quota.getMetadata().setCluster(ns.getMetadata().getCluster()); quota.getMetadata().setNamespace(namespace); - List validationErrors = resourceQuotaService.validateNewQuotaAgainstCurrentResource(ns, quota); + List validationErrors = resourceQuotaService.validateNewResourceQuota(ns, quota); if (!validationErrors.isEmpty()) { throw new ResourceValidationException(validationErrors, quota.getKind(), quota.getMetadata().getName()); } diff --git a/api/src/main/java/com/michelin/ns4kafka/controllers/TopicController.java b/api/src/main/java/com/michelin/ns4kafka/controllers/TopicController.java index e719d237..0392d216 100644 --- a/api/src/main/java/com/michelin/ns4kafka/controllers/TopicController.java +++ b/api/src/main/java/com/michelin/ns4kafka/controllers/TopicController.java @@ -107,16 +107,12 @@ public HttpResponse apply(String namespace, @Valid @Body Topic topic, @Qu return formatHttpResponse(existingTopic.get(), ApplyStatus.unchanged); } - ApplyStatus status = existingTopic.isPresent() ? ApplyStatus.changed : ApplyStatus.created; - - // Only check quota on topic creation - if (status.equals(ApplyStatus.created)) { - validationErrors.addAll(resourceQuotaService.validateTopicQuota(ns, topic)); - if (!validationErrors.isEmpty()) { - throw new ResourceValidationException(validationErrors, topic.getKind(), topic.getMetadata().getName()); - } + validationErrors.addAll(resourceQuotaService.validateTopicQuota(ns, existingTopic, topic)); + if (!validationErrors.isEmpty()) { + throw new ResourceValidationException(validationErrors, topic.getKind(), topic.getMetadata().getName()); } + ApplyStatus status = existingTopic.isPresent() ? ApplyStatus.changed : ApplyStatus.created; if (dryrun) { return formatHttpResponse(topic, status); } diff --git a/api/src/main/java/com/michelin/ns4kafka/models/Namespace.java b/api/src/main/java/com/michelin/ns4kafka/models/Namespace.java index 2c371f46..8de62769 100644 --- a/api/src/main/java/com/michelin/ns4kafka/models/Namespace.java +++ b/api/src/main/java/com/michelin/ns4kafka/models/Namespace.java @@ -19,9 +19,9 @@ @NoArgsConstructor @Data public class Namespace { - private final String apiVersion = "v1"; private final String kind = "Namespace"; + @Valid @NotNull private ObjectMeta metadata; @@ -40,8 +40,5 @@ public static class NamespaceSpec { private List connectClusters = List.of(); private TopicValidator topicValidator; private ConnectValidator connectValidator; - //private ResourceQuota quota; } - - } diff --git a/api/src/main/java/com/michelin/ns4kafka/models/quota/ResourceQuota.java b/api/src/main/java/com/michelin/ns4kafka/models/quota/ResourceQuota.java index d7112372..fe8f61c6 100644 --- a/api/src/main/java/com/michelin/ns4kafka/models/quota/ResourceQuota.java +++ b/api/src/main/java/com/michelin/ns4kafka/models/quota/ResourceQuota.java @@ -44,6 +44,7 @@ public class ResourceQuota { public enum ResourceQuotaSpecKey { COUNT_TOPICS("count/topics"), COUNT_PARTITIONS("count/partitions"), + DISK_TOPICS("disk/topics"), COUNT_CONNECTORS("count/connectors"); private final String key; diff --git a/api/src/main/java/com/michelin/ns4kafka/models/quota/ResourceQuotaResponse.java b/api/src/main/java/com/michelin/ns4kafka/models/quota/ResourceQuotaResponse.java index 0ec20b9f..0f44357f 100644 --- a/api/src/main/java/com/michelin/ns4kafka/models/quota/ResourceQuotaResponse.java +++ b/api/src/main/java/com/michelin/ns4kafka/models/quota/ResourceQuotaResponse.java @@ -54,6 +54,11 @@ public static class ResourceQuotaResponseSpec { */ private String countPartition; + /** + * The disk quota for topics + */ + private String diskTopic; + /** * The count quota for connectors */ diff --git a/api/src/main/java/com/michelin/ns4kafka/services/ResourceQuotaService.java b/api/src/main/java/com/michelin/ns4kafka/services/ResourceQuotaService.java index 44d33c87..7c4d6f5c 100644 --- a/api/src/main/java/com/michelin/ns4kafka/services/ResourceQuotaService.java +++ b/api/src/main/java/com/michelin/ns4kafka/services/ResourceQuotaService.java @@ -5,21 +5,28 @@ import com.michelin.ns4kafka.models.quota.ResourceQuota; import com.michelin.ns4kafka.models.quota.ResourceQuotaResponse; import com.michelin.ns4kafka.repositories.ResourceQuotaRepository; +import com.michelin.ns4kafka.utils.BytesUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import javax.inject.Inject; import javax.inject.Singleton; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Optional; import static com.michelin.ns4kafka.models.quota.ResourceQuota.ResourceQuotaSpecKey.*; +import static com.michelin.ns4kafka.utils.BytesUtils.*; +import static org.apache.kafka.common.config.TopicConfig.RETENTION_BYTES_CONFIG; @Slf4j @Singleton public class ResourceQuotaService { + /** + * Error message when the given quota is already exceeded + */ + private static final String QUOTA_ALREADY_EXCEEDED_ERROR = "Quota already exceeded for %s: %s/%s (used/limit)"; + /** * Quota response format */ @@ -30,11 +37,6 @@ public class ResourceQuotaService { */ private static final String NO_QUOTA_RESPONSE_FORMAT = "%s"; - /** - * Limit to display when there is no quota - */ - private static final String UNLIMITED_QUOTA = "INF"; - /** * Role binding repository */ @@ -96,54 +98,103 @@ public void delete(ResourceQuota resourceQuota) { * @param resourceQuota The new resource quota * @return A list of validation errors */ - public List validateNewQuotaAgainstCurrentResource(Namespace namespace, ResourceQuota resourceQuota) { + public List validateNewResourceQuota(Namespace namespace, ResourceQuota resourceQuota) { List errors = new ArrayList<>(); - Arrays.stream(values()).forEach(quotaKey -> { - if (resourceQuota.getSpec().get(quotaKey.getKey()) != null) { - int used = getCurrentUsedResource(namespace, quotaKey); - int limit = Integer.parseInt(resourceQuota.getSpec().get(quotaKey.getKey())); + if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_TOPICS.getKey()))) { + long used = getCurrentCountTopics(namespace); + long limit = Long.parseLong(resourceQuota.getSpec().get(COUNT_TOPICS.getKey())); + if (used > limit) { + errors.add(String.format(QUOTA_ALREADY_EXCEEDED_ERROR, COUNT_TOPICS, used, limit)); + } + } + + if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_PARTITIONS.getKey()))) { + long used = getCurrentCountPartitions(namespace); + long limit = Long.parseLong(resourceQuota.getSpec().get(COUNT_PARTITIONS.getKey())); + if (used > limit) { + errors.add(String.format(QUOTA_ALREADY_EXCEEDED_ERROR, COUNT_PARTITIONS, used, limit)); + } + } + + if (StringUtils.isNotBlank(resourceQuota.getSpec().get(DISK_TOPICS.getKey()))) { + String limitAsString = resourceQuota.getSpec().get(DISK_TOPICS.getKey()); + if (!limitAsString.endsWith(BYTE) && !limitAsString.endsWith(KIBIBYTE) && !limitAsString.endsWith(MEBIBYTE) && !limitAsString.endsWith(GIBIBYTE)) { + errors.add(String.format("Invalid value for %s: value must end with either %s, %s, %s or %s", + DISK_TOPICS, BYTE, KIBIBYTE, MEBIBYTE, GIBIBYTE)); + } else { + long used = getCurrentDiskTopics(namespace); + long limit = BytesUtils.humanReadableToBytes(limitAsString); if (used > limit) { - errors.add(String.format("Quota already exceeded for %s: %s/%s (used/limit)", quotaKey, used, limit)); + errors.add(String.format(QUOTA_ALREADY_EXCEEDED_ERROR, DISK_TOPICS, + BytesUtils.bytesToHumanReadable(used), limitAsString)); } } - }); + } + + if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_CONNECTORS.getKey()))) { + long used = getCurrentCountConnectors(namespace); + long limit = Long.parseLong(resourceQuota.getSpec().get(COUNT_CONNECTORS.getKey())); + if (used > limit) { + errors.add(String.format(QUOTA_ALREADY_EXCEEDED_ERROR, COUNT_CONNECTORS, used, limit)); + } + } return errors; } /** - * For a given quota key, get the current value used by the namespace + * Get currently used number of topics * @param namespace The namespace - * @param key The quota key - * @return The current value + * @return The number of topics */ - public Integer getCurrentUsedResource(Namespace namespace, ResourceQuota.ResourceQuotaSpecKey key) { - if (COUNT_TOPICS.equals(key)) { - return topicService.findAllForNamespace(namespace).size(); - } + public long getCurrentCountTopics(Namespace namespace) { + return topicService.findAllForNamespace(namespace).size(); + } - if (COUNT_PARTITIONS.equals(key)) { - return topicService.findAllForNamespace(namespace) - .stream() - .map(topic -> topic.getSpec().getPartitions()) - .reduce(0, Integer::sum); - } + /** + * Get currently used number of partitions + * @param namespace The namespace + * @return The number of partitions + */ + public long getCurrentCountPartitions(Namespace namespace) { + return topicService.findAllForNamespace(namespace) + .stream() + .map(topic -> topic.getSpec().getPartitions()) + .reduce(0, Integer::sum) + .longValue(); + } - if (COUNT_CONNECTORS.equals(key)) { - return kafkaConnectService.findAllForNamespace(namespace).size(); - } + /** + * Get currently used topic disk in bytes + * @param namespace The namespace + * @return The number of topic disk + */ + public long getCurrentDiskTopics(Namespace namespace) { + return topicService.findAllForNamespace(namespace) + .stream() + .map(topic -> Long.parseLong(topic.getSpec().getConfigs().getOrDefault("retention.bytes", "0")) * + topic.getSpec().getPartitions()) + .reduce(0L, Long::sum); + } - return 0; + /** + * Get currently used number of topics + * @param namespace The namespace + * @return The number of topics + */ + public long getCurrentCountConnectors(Namespace namespace) { + return kafkaConnectService.findAllForNamespace(namespace).size(); } /** * Validate the topic quota * @param namespace The namespace - * @param topic The topic + * @param existingTopic The existing topic + * @param newTopic The new topic * @return A list of errors */ - public List validateTopicQuota(Namespace namespace, Topic topic) { + public List validateTopicQuota(Namespace namespace, Optional existingTopic, Topic newTopic) { Optional resourceQuotaOptional = findByNamespace(namespace.getMetadata().getName()); if (resourceQuotaOptional.isEmpty()) { return List.of(); @@ -152,20 +203,42 @@ public List validateTopicQuota(Namespace namespace, Topic topic) { List errors = new ArrayList<>(); ResourceQuota resourceQuota = resourceQuotaOptional.get(); - if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_TOPICS.getKey()))) { - int used = getCurrentUsedResource(namespace, COUNT_TOPICS); - int limit = Integer.parseInt(resourceQuota.getSpec().get(COUNT_TOPICS.toString())); - if (used + 1 > limit) { - errors.add(String.format("Exceeding quota for %s: %s/%s (used/limit). Cannot add 1 topic.", COUNT_TOPICS, used, limit)); + // Check count topics and count partitions only at creation + if (existingTopic.isEmpty()) { + if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_TOPICS.getKey()))) { + long used = getCurrentCountTopics(namespace); + long limit = Long.parseLong(resourceQuota.getSpec().get(COUNT_TOPICS.getKey())); + if (used + 1 > limit) { + errors.add(String.format("Exceeding quota for %s: %s/%s (used/limit). Cannot add 1 topic.", COUNT_TOPICS, used, limit)); + } + } + + if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_PARTITIONS.getKey()))) { + long used = getCurrentCountPartitions(namespace); + long limit = Long.parseLong(resourceQuota.getSpec().get(COUNT_PARTITIONS.getKey())); + if (used + newTopic.getSpec().getPartitions() > limit) { + errors.add(String.format("Exceeding quota for %s: %s/%s (used/limit). Cannot add %s partition(s).", COUNT_PARTITIONS, used, limit, newTopic.getSpec().getPartitions())); + } } } - if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_PARTITIONS.getKey()))) { - int used = getCurrentUsedResource(namespace, COUNT_PARTITIONS); - int limit = Integer.parseInt(resourceQuota.getSpec().get(COUNT_PARTITIONS.toString())); - if (used + topic.getSpec().getPartitions() > limit) { - errors.add(String.format("Exceeding quota for %s: %s/%s (used/limit). Cannot add %s partition(s).", COUNT_PARTITIONS, used, limit, topic.getSpec().getPartitions())); + if (StringUtils.isNotBlank(resourceQuota.getSpec().get(DISK_TOPICS.getKey())) && + StringUtils.isNotBlank(newTopic.getSpec().getConfigs().get(RETENTION_BYTES_CONFIG))) { + long used = getCurrentDiskTopics(namespace); + long limit = BytesUtils.humanReadableToBytes(resourceQuota.getSpec().get(DISK_TOPICS.getKey())); + + long newTopicSize = Long.parseLong(newTopic.getSpec().getConfigs().get(RETENTION_BYTES_CONFIG)) * newTopic.getSpec().getPartitions(); + long existingTopicSize = existingTopic + .map(value -> Long.parseLong(value.getSpec().getConfigs().getOrDefault(RETENTION_BYTES_CONFIG, "0")) + * value.getSpec().getPartitions()) + .orElse(0L); + + long bytesToAdd = newTopicSize - existingTopicSize; + if (bytesToAdd > 0 && used + bytesToAdd > limit) { + errors.add(String.format("Exceeding quota for %s: %s/%s (used/limit). Cannot add %s of data.", DISK_TOPICS, + BytesUtils.bytesToHumanReadable(used), BytesUtils.bytesToHumanReadable(limit), BytesUtils.bytesToHumanReadable(bytesToAdd))); } + } return errors; @@ -186,8 +259,8 @@ public List validateConnectorQuota(Namespace namespace) { ResourceQuota resourceQuota = resourceQuotaOptional.get(); if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_CONNECTORS.getKey()))) { - int used = getCurrentUsedResource(namespace, COUNT_CONNECTORS); - int limit = Integer.parseInt(resourceQuota.getSpec().get(COUNT_CONNECTORS.toString())); + long used = getCurrentCountConnectors(namespace); + long limit = Long.parseLong(resourceQuota.getSpec().get(COUNT_CONNECTORS.getKey())); if (used + 1 > limit) { errors.add(String.format("Exceeding quota for %s: %s/%s (used/limit). Cannot add 1 connector.", COUNT_CONNECTORS, used, limit)); } @@ -203,23 +276,32 @@ public List validateConnectorQuota(Namespace namespace) { * @return A list of quotas as response format */ public ResourceQuotaResponse toResponse(Namespace namespace, Optional resourceQuota) { - String countTopic = resourceQuota.isPresent() ? String.format(QUOTA_RESPONSE_FORMAT, getCurrentUsedResource(namespace, COUNT_TOPICS), - resourceQuota.map(quota -> quota.getSpec().get(COUNT_TOPICS.toString())).orElse(UNLIMITED_QUOTA)) : - String.format(NO_QUOTA_RESPONSE_FORMAT, getCurrentUsedResource(namespace, COUNT_TOPICS)); - - String countPartition = resourceQuota.isPresent() ? String.format(QUOTA_RESPONSE_FORMAT, getCurrentUsedResource(namespace, COUNT_PARTITIONS), - resourceQuota.map(quota -> quota.getSpec().get(COUNT_PARTITIONS.toString())).orElse(UNLIMITED_QUOTA)) : - String.format(NO_QUOTA_RESPONSE_FORMAT, getCurrentUsedResource(namespace, COUNT_PARTITIONS)); - - String countConnector = resourceQuota.isPresent() ? String.format(QUOTA_RESPONSE_FORMAT, getCurrentUsedResource(namespace, COUNT_CONNECTORS), - resourceQuota.map(quota -> quota.getSpec().get(COUNT_CONNECTORS.toString())).orElse(UNLIMITED_QUOTA)) : - String.format(NO_QUOTA_RESPONSE_FORMAT, getCurrentUsedResource(namespace, COUNT_CONNECTORS)); + long currentCountTopic = getCurrentCountTopics(namespace); + String countTopic = resourceQuota.isPresent() && StringUtils.isNotBlank(resourceQuota.get().getSpec().get(COUNT_TOPICS.getKey())) ? + String.format(QUOTA_RESPONSE_FORMAT, currentCountTopic, resourceQuota.get().getSpec().get(COUNT_TOPICS.getKey())) : + String.format(NO_QUOTA_RESPONSE_FORMAT, currentCountTopic); + + long currentCountPartition = getCurrentCountPartitions(namespace); + String countPartition = resourceQuota.isPresent() && StringUtils.isNotBlank(resourceQuota.get().getSpec().get(COUNT_PARTITIONS.getKey())) ? + String.format(QUOTA_RESPONSE_FORMAT, currentCountPartition, resourceQuota.get().getSpec().get(COUNT_PARTITIONS.getKey())) : + String.format(NO_QUOTA_RESPONSE_FORMAT, currentCountPartition); + + long currentDiskTopic = getCurrentDiskTopics(namespace); + String diskTopic = resourceQuota.isPresent() && StringUtils.isNotBlank(resourceQuota.get().getSpec().get(DISK_TOPICS.getKey())) ? + String.format(QUOTA_RESPONSE_FORMAT, BytesUtils.bytesToHumanReadable(currentDiskTopic), resourceQuota.get().getSpec().get(DISK_TOPICS.getKey())) : + String.format(NO_QUOTA_RESPONSE_FORMAT, BytesUtils.bytesToHumanReadable(currentDiskTopic)); + + long currentCountConnector = getCurrentCountConnectors(namespace); + String countConnector = resourceQuota.isPresent() && StringUtils.isNotBlank(resourceQuota.get().getSpec().get(COUNT_CONNECTORS.getKey())) ? + String.format(QUOTA_RESPONSE_FORMAT, currentCountConnector, resourceQuota.get().getSpec().get(COUNT_CONNECTORS.getKey())) : + String.format(NO_QUOTA_RESPONSE_FORMAT, currentCountConnector); return ResourceQuotaResponse.builder() .metadata(resourceQuota.map(ResourceQuota::getMetadata).orElse(null)) .spec(ResourceQuotaResponse.ResourceQuotaResponseSpec.builder() .countTopic(countTopic) .countPartition(countPartition) + .diskTopic(diskTopic) .countConnector(countConnector) .build()) .build(); diff --git a/api/src/main/java/com/michelin/ns4kafka/utils/BytesUtils.java b/api/src/main/java/com/michelin/ns4kafka/utils/BytesUtils.java new file mode 100644 index 00000000..073307d3 --- /dev/null +++ b/api/src/main/java/com/michelin/ns4kafka/utils/BytesUtils.java @@ -0,0 +1,66 @@ +package com.michelin.ns4kafka.utils; + +import org.apache.commons.lang3.StringUtils; + +import java.math.BigDecimal; +import java.math.RoundingMode; + +public class BytesUtils { + public static final String BYTE = "B"; + public static final String KIBIBYTE = "KiB"; + public static final String MEBIBYTE = "MiB"; + public static final String GIBIBYTE = "GiB"; + + /** + * Converts given bytes to either kibibyte, mebibite or gibibyte + * @param bytes The bytes to convert + * @return The converted value as human-readable value + */ + public static String bytesToHumanReadable(long bytes) { + double kibibyte = 1024; + double mebibyte = kibibyte * 1024; + double gibibyte = mebibyte * 1024; + + if (bytes >= kibibyte && bytes < mebibyte) { + return BigDecimal.valueOf(bytes / kibibyte).setScale(3, RoundingMode.CEILING).doubleValue() + KIBIBYTE; + } + + if (bytes >= mebibyte && bytes < gibibyte) { + return BigDecimal.valueOf(bytes / mebibyte).setScale(3, RoundingMode.CEILING).doubleValue() + MEBIBYTE; + } + + if (bytes >= gibibyte) { + return BigDecimal.valueOf(bytes / gibibyte).setScale(3, RoundingMode.CEILING).doubleValue() + GIBIBYTE; + } + + return bytes + BYTE; + } + + public static long humanReadableToBytes(String quota) { + long kibibyte = 1024; + long mebibyte = kibibyte * 1024; + long gibibyte = mebibyte * 1024; + + if (quota.endsWith(KIBIBYTE)) { + return BigDecimal.valueOf(Double.parseDouble(quota.replace(KIBIBYTE, StringUtils.EMPTY)) * kibibyte) + .setScale(0, RoundingMode.CEILING) + .longValue(); + } + + if (quota.endsWith(MEBIBYTE)) { + return BigDecimal.valueOf(Double.parseDouble(quota.replace(MEBIBYTE, StringUtils.EMPTY)) * mebibyte) + .setScale(0, RoundingMode.CEILING) + .longValue(); + } + + if (quota.endsWith(GIBIBYTE)) { + return BigDecimal.valueOf(Double.parseDouble(quota.replace(GIBIBYTE, StringUtils.EMPTY)) * gibibyte) + .setScale(0, RoundingMode.CEILING) + .longValue(); + } + + return Long.parseLong(quota.replace(BYTE, StringUtils.EMPTY)); + } + + private BytesUtils() {} +} diff --git a/api/src/test/java/com/michelin/ns4kafka/controllers/ResourceQuotaControllerTest.java b/api/src/test/java/com/michelin/ns4kafka/controllers/ResourceQuotaControllerTest.java index e2679f29..e799d831 100644 --- a/api/src/test/java/com/michelin/ns4kafka/controllers/ResourceQuotaControllerTest.java +++ b/api/src/test/java/com/michelin/ns4kafka/controllers/ResourceQuotaControllerTest.java @@ -162,7 +162,7 @@ void applyValidationErrors() { .build(); when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); - when(resourceQuotaService.validateNewQuotaAgainstCurrentResource(ns, resourceQuota)).thenReturn(List.of("Quota already exceeded")); + when(resourceQuotaService.validateNewResourceQuota(ns, resourceQuota)).thenReturn(List.of("Quota already exceeded")); ResourceValidationException actual = Assertions.assertThrows(ResourceValidationException.class, () -> resourceQuotaController.apply("test", resourceQuota, false)); @@ -193,7 +193,7 @@ void applyUnchanged() { .build(); when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); - when(resourceQuotaService.validateNewQuotaAgainstCurrentResource(ns, resourceQuota)).thenReturn(List.of()); + when(resourceQuotaService.validateNewResourceQuota(ns, resourceQuota)).thenReturn(List.of()); when(resourceQuotaService.findByNamespace(ns.getMetadata().getName())).thenReturn(Optional.of(resourceQuota)); var response = resourceQuotaController.apply("test", resourceQuota, false); @@ -223,7 +223,7 @@ void applyDryRun() { .build(); when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); - when(resourceQuotaService.validateNewQuotaAgainstCurrentResource(ns, resourceQuota)).thenReturn(List.of()); + when(resourceQuotaService.validateNewResourceQuota(ns, resourceQuota)).thenReturn(List.of()); when(resourceQuotaService.findByNamespace(ns.getMetadata().getName())).thenReturn(Optional.empty()); var response = resourceQuotaController.apply("test", resourceQuota, true); @@ -252,7 +252,7 @@ void applyCreated() { .build(); when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); - when(resourceQuotaService.validateNewQuotaAgainstCurrentResource(ns, resourceQuota)).thenReturn(List.of()); + when(resourceQuotaService.validateNewResourceQuota(ns, resourceQuota)).thenReturn(List.of()); when(resourceQuotaService.findByNamespace(ns.getMetadata().getName())).thenReturn(Optional.empty()); when(securityService.username()).thenReturn(Optional.of("test-user")); when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); @@ -294,7 +294,7 @@ void applyUpdated() { .build(); when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); - when(resourceQuotaService.validateNewQuotaAgainstCurrentResource(ns, resourceQuota)).thenReturn(List.of()); + when(resourceQuotaService.validateNewResourceQuota(ns, resourceQuota)).thenReturn(List.of()); when(resourceQuotaService.findByNamespace(ns.getMetadata().getName())).thenReturn(Optional.of(resourceQuotaExisting)); when(securityService.username()).thenReturn(Optional.of("test-user")); when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); diff --git a/api/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java b/api/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java index 91448a73..5a09f53f 100644 --- a/api/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java +++ b/api/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java @@ -268,6 +268,7 @@ void createNewTopic() throws InterruptedException, ExecutionException, TimeoutEx .topicValidator(TopicValidator.makeDefault()) .build()) .build(); + Topic topic = Topic.builder() .metadata(ObjectMeta.builder() .name("test.topic") @@ -285,7 +286,7 @@ void createNewTopic() throws InterruptedException, ExecutionException, TimeoutEx .thenReturn(Optional.of(ns)); when(topicService.isNamespaceOwnerOfTopic(any(), any())).thenReturn(true); when(topicService.findByName(ns, "test.topic")).thenReturn(Optional.empty()); - when(resourceQuotaService.validateTopicQuota(ns, topic)).thenReturn(List.of()); + when(resourceQuotaService.validateTopicQuota(ns, Optional.empty(), topic)).thenReturn(List.of()); when(securityService.username()).thenReturn(Optional.of("test-user")); when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); doNothing().when(applicationEventPublisher).publishEvent(any()); @@ -500,7 +501,7 @@ void createNewTopicDryRun() throws InterruptedException, ExecutionException, Tim .thenReturn(Optional.of(ns)); when(topicService.isNamespaceOwnerOfTopic(any(), any())).thenReturn(true); when(topicService.findByName(ns, "test.topic")).thenReturn(Optional.empty()); - when(resourceQuotaService.validateTopicQuota(ns, topic)).thenReturn(List.of()); + when(resourceQuotaService.validateTopicQuota(ns, Optional.empty(), topic)).thenReturn(List.of()); var response = topicController.apply("test", topic, true); Assertions.assertEquals("created", response.header("X-Ns4kafka-Result")); @@ -575,7 +576,7 @@ void createNewTopicFailQuotaValidation() { .thenReturn(Optional.of(ns)); when(topicService.isNamespaceOwnerOfTopic(any(), any())).thenReturn(true); when(topicService.findByName(ns, "test.topic")).thenReturn(Optional.empty()); - when(resourceQuotaService.validateTopicQuota(ns, topic)).thenReturn(List.of("Quota error")); + when(resourceQuotaService.validateTopicQuota(ns, Optional.empty(), topic)).thenReturn(List.of("Quota error")); ResourceValidationException actual = Assertions.assertThrows(ResourceValidationException.class, () -> topicController.apply("test", topic, false)); diff --git a/api/src/test/java/com/michelin/ns4kafka/services/ResourceQuotaServiceTest.java b/api/src/test/java/com/michelin/ns4kafka/services/ResourceQuotaServiceTest.java index f269c6c5..87d6088f 100644 --- a/api/src/test/java/com/michelin/ns4kafka/services/ResourceQuotaServiceTest.java +++ b/api/src/test/java/com/michelin/ns4kafka/services/ResourceQuotaServiceTest.java @@ -19,6 +19,7 @@ import java.util.Optional; import static com.michelin.ns4kafka.models.quota.ResourceQuota.ResourceQuotaSpecKey.*; +import static org.apache.kafka.common.config.TopicConfig.RETENTION_BYTES_CONFIG; import static org.mockito.Mockito.*; @ExtendWith(MockitoExtension.class) @@ -270,7 +271,7 @@ void validateNewQuotaAgainstCurrentResourceSuccess() { when(topicService.findAllForNamespace(ns)) .thenReturn(List.of(topic1, topic2, topic3)); - List validationErrors = resourceQuotaService.validateNewQuotaAgainstCurrentResource(ns, resourceQuota); + List validationErrors = resourceQuotaService.validateNewResourceQuota(ns, resourceQuota); Assertions.assertEquals(0, validationErrors.size()); } @@ -321,7 +322,7 @@ void validateNewQuotaAgainstCurrentResourceForCountTopics() { when(topicService.findAllForNamespace(ns)) .thenReturn(List.of(topic1, topic2, topic3)); - List validationErrors = resourceQuotaService.validateNewQuotaAgainstCurrentResource(ns, resourceQuota); + List validationErrors = resourceQuotaService.validateNewResourceQuota(ns, resourceQuota); Assertions.assertEquals(1, validationErrors.size()); Assertions.assertEquals("Quota already exceeded for count/topics: 3/2 (used/limit)", validationErrors.get(0)); } @@ -382,11 +383,92 @@ void validateNewQuotaAgainstCurrentResourceForCountPartitions() { when(topicService.findAllForNamespace(ns)) .thenReturn(List.of(topic1, topic2, topic3)); - List validationErrors = resourceQuotaService.validateNewQuotaAgainstCurrentResource(ns, resourceQuota); + List validationErrors = resourceQuotaService.validateNewResourceQuota(ns, resourceQuota); Assertions.assertEquals(1, validationErrors.size()); Assertions.assertEquals("Quota already exceeded for count/partitions: 19/10 (used/limit)", validationErrors.get(0)); } + /** + * Test format when creating quota on disk/topics + */ + @Test + void validateNewQuotaDiskTopicsFormat() { + Namespace ns = Namespace.builder() + .metadata(ObjectMeta.builder() + .name("namespace") + .cluster("local") + .build()) + .spec(Namespace.NamespaceSpec.builder() + .connectClusters(List.of("local-name")) + .build()) + .build(); + + ResourceQuota resourceQuota = ResourceQuota.builder() + .metadata(ObjectMeta.builder() + .cluster("local") + .name("test") + .build()) + .spec(Map.of(DISK_TOPICS.toString(), "10")) + .build(); + + List validationErrors = resourceQuotaService.validateNewResourceQuota(ns, resourceQuota); + Assertions.assertEquals(1, validationErrors.size()); + Assertions.assertEquals("Invalid value for disk/topics: value must end with either B, KiB, MiB or GiB", validationErrors.get(0)); + } + + /** + * Test validation when creating quota on disk/topics + */ + @Test + void validateNewQuotaAgainstCurrentResourceForDiskTopics() { + Namespace ns = Namespace.builder() + .metadata(ObjectMeta.builder() + .name("namespace") + .cluster("local") + .build()) + .spec(Namespace.NamespaceSpec.builder() + .connectClusters(List.of("local-name")) + .build()) + .build(); + + ResourceQuota resourceQuota = ResourceQuota.builder() + .metadata(ObjectMeta.builder() + .cluster("local") + .name("test") + .build()) + .spec(Map.of(DISK_TOPICS.toString(), "5000B")) + .build(); + + Topic topic1 = Topic.builder() + .metadata(ObjectMeta.builder() + .name("topic") + .namespace("namespace") + .build()) + .spec(Topic.TopicSpec.builder() + .partitions(6) + .configs(Map.of("retention.bytes", "1000")) + .build()) + .build(); + + Topic topic2 = Topic.builder() + .metadata(ObjectMeta.builder() + .name("topic") + .namespace("namespace") + .build()) + .spec(Topic.TopicSpec.builder() + .partitions(3) + .configs(Map.of("retention.bytes", "1000")) + .build()) + .build(); + + when(topicService.findAllForNamespace(ns)) + .thenReturn(List.of(topic1, topic2)); + + List validationErrors = resourceQuotaService.validateNewResourceQuota(ns, resourceQuota); + Assertions.assertEquals(1, validationErrors.size()); + Assertions.assertEquals("Quota already exceeded for disk/topics: 8.79KiB/5000B (used/limit)", validationErrors.get(0)); + } + /** * Test validation when creating quota on count/connectors */ @@ -415,7 +497,7 @@ void validateNewQuotaAgainstCurrentResourceForCountConnectors() { Connector.builder().metadata(ObjectMeta.builder().name("connect1").build()).build(), Connector.builder().metadata(ObjectMeta.builder().name("connect2").build()).build())); - List validationErrors = resourceQuotaService.validateNewQuotaAgainstCurrentResource(ns, resourceQuota); + List validationErrors = resourceQuotaService.validateNewResourceQuota(ns, resourceQuota); Assertions.assertEquals(1, validationErrors.size()); Assertions.assertEquals("Quota already exceeded for count/connectors: 2/1 (used/limit)", validationErrors.get(0)); } @@ -459,8 +541,8 @@ void getCurrentUsedResourceForCountTopics() { when(topicService.findAllForNamespace(ns)) .thenReturn(List.of(topic1, topic2, topic3)); - Integer currentlyUsed = resourceQuotaService.getCurrentUsedResource(ns, COUNT_TOPICS); - Assertions.assertEquals(3, currentlyUsed); + long currentlyUsed = resourceQuotaService.getCurrentCountTopics(ns); + Assertions.assertEquals(3L, currentlyUsed); } /** @@ -511,8 +593,8 @@ void getCurrentUsedResourceForCountPartitions() { when(topicService.findAllForNamespace(ns)) .thenReturn(List.of(topic1, topic2, topic3)); - Integer currentlyUsed = resourceQuotaService.getCurrentUsedResource(ns, COUNT_PARTITIONS); - Assertions.assertEquals(19, currentlyUsed); + long currentlyUsed = resourceQuotaService.getCurrentCountPartitions(ns); + Assertions.assertEquals(19L, currentlyUsed); } /** @@ -535,15 +617,15 @@ void getCurrentUsedResourceForCountConnectors() { Connector.builder().metadata(ObjectMeta.builder().name("connect1").build()).build(), Connector.builder().metadata(ObjectMeta.builder().name("connect2").build()).build())); - Integer currentlyUsed = resourceQuotaService.getCurrentUsedResource(ns, COUNT_CONNECTORS); - Assertions.assertEquals(2, currentlyUsed); + long currentlyUsed = resourceQuotaService.getCurrentCountConnectors(ns); + Assertions.assertEquals(2L, currentlyUsed); } /** - * Test get current used resource for count topics + * Test get current used resource for disk topics */ @Test - void getCurrentUsedResourceForUnknownKey() { + void getCurrentUsedResourceForDiskTopics() { Namespace ns = Namespace.builder() .metadata(ObjectMeta.builder() .name("namespace") @@ -554,8 +636,44 @@ void getCurrentUsedResourceForUnknownKey() { .build()) .build(); - Integer currentlyUsed = resourceQuotaService.getCurrentUsedResource(ns, null); - Assertions.assertEquals(0, currentlyUsed); + Topic topic1 = Topic.builder() + .metadata(ObjectMeta.builder() + .name("topic") + .namespace("namespace") + .build()) + .spec(Topic.TopicSpec.builder() + .partitions(6) + .configs(Map.of("retention.bytes", "1000")) + .build()) + .build(); + + Topic topic2 = Topic.builder() + .metadata(ObjectMeta.builder() + .name("topic") + .namespace("namespace") + .build()) + .spec(Topic.TopicSpec.builder() + .partitions(3) + .configs(Map.of("retention.bytes", "50000")) + .build()) + .build(); + + Topic topic3 = Topic.builder() + .metadata(ObjectMeta.builder() + .name("topic") + .namespace("namespace") + .build()) + .spec(Topic.TopicSpec.builder() + .partitions(10) + .configs(Map.of("retention.bytes", "2500")) + .build()) + .build(); + + when(topicService.findAllForNamespace(ns)) + .thenReturn(List.of(topic1, topic2, topic3)); + + long currentlyUsed = resourceQuotaService.getCurrentDiskTopics(ns); + Assertions.assertEquals(181000L, currentlyUsed); } /** @@ -580,6 +698,7 @@ void validateTopicQuota() { .build()) .spec(Map.of(COUNT_TOPICS.toString(), "4")) .spec(Map.of(COUNT_PARTITIONS.toString(), "25")) + .spec(Map.of(DISK_TOPICS.toString(), "2GiB")) .build(); Topic newTopic = Topic.builder() @@ -589,6 +708,7 @@ void validateTopicQuota() { .build()) .spec(Topic.TopicSpec.builder() .partitions(6) + .configs(Map.of(RETENTION_BYTES_CONFIG, "1000")) .build()) .build(); @@ -600,6 +720,7 @@ void validateTopicQuota() { .build()) .spec(Topic.TopicSpec.builder() .partitions(6) + .configs(Map.of(RETENTION_BYTES_CONFIG, "1000")) .build()) .build(); @@ -610,6 +731,7 @@ void validateTopicQuota() { .build()) .spec(Topic.TopicSpec.builder() .partitions(3) + .configs(Map.of(RETENTION_BYTES_CONFIG, "1000")) .build()) .build(); @@ -620,6 +742,7 @@ void validateTopicQuota() { .build()) .spec(Topic.TopicSpec.builder() .partitions(10) + .configs(Map.of(RETENTION_BYTES_CONFIG, "1000")) .build()) .build(); @@ -628,7 +751,7 @@ void validateTopicQuota() { when(topicService.findAllForNamespace(ns)) .thenReturn(List.of(topic1, topic2, topic3)); - List validationErrors = resourceQuotaService.validateTopicQuota(ns, newTopic); + List validationErrors = resourceQuotaService.validateTopicQuota(ns, Optional.empty(), newTopic); Assertions.assertEquals(0, validationErrors.size()); } @@ -660,7 +783,7 @@ void validateTopicQuotaNoQuota() { when(resourceQuotaRepository.findForNamespace("namespace")) .thenReturn(Optional.empty()); - List validationErrors = resourceQuotaService.validateTopicQuota(ns, newTopic); + List validationErrors = resourceQuotaService.validateTopicQuota(ns, Optional.empty(), newTopic); Assertions.assertEquals(0, validationErrors.size()); } @@ -684,7 +807,7 @@ void validateTopicQuotaExceed() { .cluster("local") .name("test") .build()) - .spec(Map.of(COUNT_TOPICS.toString(), "3", COUNT_PARTITIONS.toString(), "20")) + .spec(Map.of(COUNT_TOPICS.toString(), "3", COUNT_PARTITIONS.toString(), "20", DISK_TOPICS.toString(), "20KiB")) .build(); Topic newTopic = Topic.builder() @@ -694,6 +817,7 @@ void validateTopicQuotaExceed() { .build()) .spec(Topic.TopicSpec.builder() .partitions(6) + .configs(Map.of(RETENTION_BYTES_CONFIG, "1000")) .build()) .build(); @@ -705,6 +829,7 @@ void validateTopicQuotaExceed() { .build()) .spec(Topic.TopicSpec.builder() .partitions(6) + .configs(Map.of(RETENTION_BYTES_CONFIG, "1000")) .build()) .build(); @@ -715,6 +840,7 @@ void validateTopicQuotaExceed() { .build()) .spec(Topic.TopicSpec.builder() .partitions(3) + .configs(Map.of(RETENTION_BYTES_CONFIG, "1000")) .build()) .build(); @@ -725,6 +851,7 @@ void validateTopicQuotaExceed() { .build()) .spec(Topic.TopicSpec.builder() .partitions(10) + .configs(Map.of(RETENTION_BYTES_CONFIG, "1000")) .build()) .build(); @@ -733,10 +860,88 @@ void validateTopicQuotaExceed() { when(topicService.findAllForNamespace(ns)) .thenReturn(List.of(topic1, topic2, topic3)); - List validationErrors = resourceQuotaService.validateTopicQuota(ns, newTopic); - Assertions.assertEquals(2, validationErrors.size()); + List validationErrors = resourceQuotaService.validateTopicQuota(ns, Optional.empty(), newTopic); + Assertions.assertEquals(3, validationErrors.size()); Assertions.assertEquals("Exceeding quota for count/topics: 3/3 (used/limit). Cannot add 1 topic.", validationErrors.get(0)); Assertions.assertEquals("Exceeding quota for count/partitions: 19/20 (used/limit). Cannot add 6 partition(s).", validationErrors.get(1)); + Assertions.assertEquals("Exceeding quota for disk/topics: 18.555KiB/20.0KiB (used/limit). Cannot add 5.86KiB of data.", validationErrors.get(2)); + } + + /** + * Test quota validation on topic update when quota is being exceeded + */ + @Test + void validateUpdateTopicQuotaExceed() { + Namespace ns = Namespace.builder() + .metadata(ObjectMeta.builder() + .name("namespace") + .cluster("local") + .build()) + .spec(Namespace.NamespaceSpec.builder() + .connectClusters(List.of("local-name")) + .build()) + .build(); + + ResourceQuota resourceQuota = ResourceQuota.builder() + .metadata(ObjectMeta.builder() + .cluster("local") + .name("test") + .build()) + .spec(Map.of(COUNT_TOPICS.toString(), "3", COUNT_PARTITIONS.toString(), "20", DISK_TOPICS.toString(), "20KiB")) + .build(); + + Topic newTopic = Topic.builder() + .metadata(ObjectMeta.builder() + .name("topic") + .namespace("namespace") + .build()) + .spec(Topic.TopicSpec.builder() + .partitions(6) + .configs(Map.of(RETENTION_BYTES_CONFIG, "1500")) + .build()) + .build(); + + Topic topic1 = Topic.builder() + .metadata(ObjectMeta.builder() + .name("topic") + .namespace("namespace") + .build()) + .spec(Topic.TopicSpec.builder() + .partitions(6) + .configs(Map.of(RETENTION_BYTES_CONFIG, "1000")) + .build()) + .build(); + + Topic topic2 = Topic.builder() + .metadata(ObjectMeta.builder() + .name("topic") + .namespace("namespace") + .build()) + .spec(Topic.TopicSpec.builder() + .partitions(3) + .configs(Map.of(RETENTION_BYTES_CONFIG, "1000")) + .build()) + .build(); + + Topic topic3 = Topic.builder() + .metadata(ObjectMeta.builder() + .name("topic") + .namespace("namespace") + .build()) + .spec(Topic.TopicSpec.builder() + .partitions(10) + .configs(Map.of(RETENTION_BYTES_CONFIG, "1000")) + .build()) + .build(); + + when(resourceQuotaRepository.findForNamespace("namespace")) + .thenReturn(Optional.of(resourceQuota)); + when(topicService.findAllForNamespace(ns)) + .thenReturn(List.of(topic1, topic2, topic3)); + + List validationErrors = resourceQuotaService.validateTopicQuota(ns, Optional.of(topic1), newTopic); + Assertions.assertEquals(1, validationErrors.size()); + Assertions.assertEquals("Exceeding quota for disk/topics: 18.555KiB/20.0KiB (used/limit). Cannot add 2.93KiB of data.", validationErrors.get(0)); } /** @@ -852,7 +1057,8 @@ void toResponse() { .build()) .spec(Map.of(COUNT_TOPICS.toString(), "3", COUNT_PARTITIONS.toString(), "20", - COUNT_CONNECTORS.toString(), "2")) + COUNT_CONNECTORS.toString(), "2", + DISK_TOPICS.toString(), "60KiB")) .build(); Topic topic1 = Topic.builder() @@ -862,6 +1068,7 @@ void toResponse() { .build()) .spec(Topic.TopicSpec.builder() .partitions(6) + .configs(Map.of("retention.bytes", "1000")) .build()) .build(); @@ -872,6 +1079,7 @@ void toResponse() { .build()) .spec(Topic.TopicSpec.builder() .partitions(3) + .configs(Map.of("retention.bytes", "2000")) .build()) .build(); @@ -882,6 +1090,7 @@ void toResponse() { .build()) .spec(Topic.TopicSpec.builder() .partitions(10) + .configs(Map.of("retention.bytes", "4000")) .build()) .build(); @@ -897,6 +1106,7 @@ void toResponse() { Assertions.assertEquals("3/3", response.getSpec().getCountTopic()); Assertions.assertEquals("19/20", response.getSpec().getCountPartition()); Assertions.assertEquals("2/2", response.getSpec().getCountConnector()); + Assertions.assertEquals("50.782KiB/60KiB", response.getSpec().getDiskTopic()); } /** @@ -921,6 +1131,7 @@ void toResponseNoQuota() { .build()) .spec(Topic.TopicSpec.builder() .partitions(6) + .configs(Map.of("retention.bytes", "1000")) .build()) .build(); @@ -931,6 +1142,7 @@ void toResponseNoQuota() { .build()) .spec(Topic.TopicSpec.builder() .partitions(3) + .configs(Map.of("retention.bytes", "2000")) .build()) .build(); @@ -941,6 +1153,7 @@ void toResponseNoQuota() { .build()) .spec(Topic.TopicSpec.builder() .partitions(10) + .configs(Map.of("retention.bytes", "4000")) .build()) .build(); @@ -956,5 +1169,6 @@ void toResponseNoQuota() { Assertions.assertEquals("3", response.getSpec().getCountTopic()); Assertions.assertEquals("19", response.getSpec().getCountPartition()); Assertions.assertEquals("2", response.getSpec().getCountConnector()); + Assertions.assertEquals("50.782KiB", response.getSpec().getDiskTopic()); } } diff --git a/api/src/test/java/com/michelin/ns4kafka/utils/BytesUtilsTest.java b/api/src/test/java/com/michelin/ns4kafka/utils/BytesUtilsTest.java new file mode 100644 index 00000000..e0d771d5 --- /dev/null +++ b/api/src/test/java/com/michelin/ns4kafka/utils/BytesUtilsTest.java @@ -0,0 +1,47 @@ +package com.michelin.ns4kafka.utils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class BytesUtilsTest { + + /** + * Validate bytes conversion to human-readable string + */ + @Test + void validateBytesToHumanReadable() { + Assertions.assertEquals("0B", BytesUtils.bytesToHumanReadable(0L)); + Assertions.assertEquals("27B", BytesUtils.bytesToHumanReadable(27L)); + Assertions.assertEquals("999B", BytesUtils.bytesToHumanReadable(999L)); + Assertions.assertEquals("1000B", BytesUtils.bytesToHumanReadable(1000L)); + + Assertions.assertEquals("1.0KiB", BytesUtils.bytesToHumanReadable(1024L)); + Assertions.assertEquals("1.688KiB", BytesUtils.bytesToHumanReadable(1728L)); + Assertions.assertEquals("108.0KiB", BytesUtils.bytesToHumanReadable(110592L)); + + Assertions.assertEquals("6.75MiB", BytesUtils.bytesToHumanReadable(7077888L)); + Assertions.assertEquals("432.0MiB", BytesUtils.bytesToHumanReadable(452984832L)); + + Assertions.assertEquals("27.0GiB", BytesUtils.bytesToHumanReadable(28991029248L)); + } + + /** + * Validate human-readable string to bytes conversion + */ + @Test + void validateHumanReadableToBytes() { + Assertions.assertEquals(0L, BytesUtils.humanReadableToBytes("0B")); + Assertions.assertEquals(27L, BytesUtils.humanReadableToBytes("27B")); + Assertions.assertEquals(999L, BytesUtils.humanReadableToBytes("999B")); + Assertions.assertEquals(1000L, BytesUtils.humanReadableToBytes("1000B")); + + Assertions.assertEquals(1024L, BytesUtils.humanReadableToBytes("1KiB")); + Assertions.assertEquals(1729L, BytesUtils.humanReadableToBytes("1.688KiB")); + Assertions.assertEquals(110592L, BytesUtils.humanReadableToBytes("108KiB")); + + Assertions.assertEquals(7077888L, BytesUtils.humanReadableToBytes("6.75MiB")); + Assertions.assertEquals(452984832L, BytesUtils.humanReadableToBytes("432.0MiB")); + + Assertions.assertEquals(28991029248L, BytesUtils.humanReadableToBytes("27GiB")); + } +} diff --git a/cli/src/main/java/com/michelin/ns4kafka/cli/services/FormatService.java b/cli/src/main/java/com/michelin/ns4kafka/cli/services/FormatService.java index 2bcc5953..04f3db83 100644 --- a/cli/src/main/java/com/michelin/ns4kafka/cli/services/FormatService.java +++ b/cli/src/main/java/com/michelin/ns4kafka/cli/services/FormatService.java @@ -62,7 +62,7 @@ public void displayError(HttpClientResponseException e, String kind, String name String causes = status.getDetails().getCauses().size() > 1 ? "\n - " + String.join("\n - ", status.getDetails().getCauses()) : status.getDetails().getCauses().get(0); - System.out.printf("Failed %s/%s %s for causes: %s", kind, name, status.getMessage(), causes); + System.out.printf("Failed %s/%s %s for causes: %s%n", kind, name, status.getMessage(), causes); } else { System.out.printf("Failed %s/%s %s%n", kind, name, e.getMessage()); } diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index fef6cee1..8d856478 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -49,6 +49,7 @@ kafkactl: - "QUOTA:/metadata/name" - "COUNT/TOPICS:/spec/countTopic" - "COUNT/PARTITIONS:/spec/countPartition" + - "DISK/TOPICS:/spec/diskTopic" - "COUNT/CONNECTORS:/spec/countConnector" ChangeConnectorState: - "CONNECTOR:/metadata/name" diff --git a/kafkactl.bat b/kafkactl.bat index 52692bd5..15634e40 100644 --- a/kafkactl.bat +++ b/kafkactl.bat @@ -1,2 +1,2 @@ @ECHO OFF -%JAVA_HOME%\bin\java -jar .\cli\build\libs\kafkactl-1.8.3-SNAPSHOT.jar %* +%JAVA_HOME%\bin\java -jar .\cli\build\libs\kafkactl-1.9.1-SNAPSHOT.jar %*