Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added quota on disk topics #211

Merged
merged 11 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ Success RoleBinding/test-role-group1 (created)

From now on, members of the group ``group1/test-ops`` (either Gitlab, LDAP or OIDC groups) can use ns4kafka to manage topics starting with `test.` on the `local` Kafka cluster.

But wait ! That's not enough. Now you should only let them create Topics successfully if and only if their configuration is aligned with your strategy ! Let's add Validators !
But wait ! That's not enough. Now you should only let them create Topics successfully if and only if their configuration is aligned with your strategy ! Let's add Validators !

````yaml
# namespace.yml
Expand Down Expand Up @@ -775,3 +775,27 @@ user@local:/home/user$ kafkactl apply -f namespace.yml
Success Namespace/test (changed)
````

## Quota management

It is possible to define quotas on a namespace. Ideal for clusters with limited resources!

A namespace with quotas will not be able to exceed the limits enforced by the quotas.

```yaml
apiVersion: v1
kind: ResourceQuota
metadata:
namespace: test
name: test-rq1
spec:
count/topics: 10
count/partitions: 60
count/connectors: 5
disk/topics: 500MiB
```

- **count/topics**: maximum number of deployable topics
- **count/partitions**: maximum number of deployable partitions
- **count/connectors**: maximum number of deployable connectors
- **disk/topics**: maximum size of all topics. Computed from the sum of _retention.bytes_ * _number of partitions_ of all topics.
Unit of measure accepted is byte (B), kibibyte (KiB), mebibyte (MiB), gibibyte (GiB)
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ HttpResponse<ResourceQuota> apply(String namespace, @Body @Valid ResourceQuota q
quota.getMetadata().setCluster(ns.getMetadata().getCluster());
quota.getMetadata().setNamespace(namespace);

List<String> validationErrors = resourceQuotaService.validateNewQuotaAgainstCurrentResource(ns, quota);
List<String> validationErrors = resourceQuotaService.validateNewResourceQuota(ns, quota);
if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(validationErrors, quota.getKind(), quota.getMetadata().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,12 @@ public HttpResponse<Topic> apply(String namespace, @Valid @Body Topic topic, @Qu
return formatHttpResponse(existingTopic.get(), ApplyStatus.unchanged);
}

ApplyStatus status = existingTopic.isPresent() ? ApplyStatus.changed : ApplyStatus.created;

// Only check quota on topic creation
if (status.equals(ApplyStatus.created)) {
validationErrors.addAll(resourceQuotaService.validateTopicQuota(ns, topic));
if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(validationErrors, topic.getKind(), topic.getMetadata().getName());
}
validationErrors.addAll(resourceQuotaService.validateTopicQuota(ns, existingTopic, topic));
if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(validationErrors, topic.getKind(), topic.getMetadata().getName());
}

ApplyStatus status = existingTopic.isPresent() ? ApplyStatus.changed : ApplyStatus.created;
if (dryrun) {
return formatHttpResponse(topic, status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
@NoArgsConstructor
@Data
public class Namespace {

private final String apiVersion = "v1";
private final String kind = "Namespace";

@Valid
@NotNull
private ObjectMeta metadata;
Expand All @@ -40,8 +40,5 @@ public static class NamespaceSpec {
private List<String> connectClusters = List.of();
private TopicValidator topicValidator;
private ConnectValidator connectValidator;
//private ResourceQuota quota;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ResourceQuota {
public enum ResourceQuotaSpecKey {
COUNT_TOPICS("count/topics"),
COUNT_PARTITIONS("count/partitions"),
DISK_TOPICS("disk/topics"),
COUNT_CONNECTORS("count/connectors");

private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public static class ResourceQuotaResponseSpec {
*/
private String countPartition;

/**
* The disk quota for topics
*/
private String diskTopic;

/**
* The count quota for connectors
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,28 @@
import com.michelin.ns4kafka.models.quota.ResourceQuota;
import com.michelin.ns4kafka.models.quota.ResourceQuotaResponse;
import com.michelin.ns4kafka.repositories.ResourceQuotaRepository;
import com.michelin.ns4kafka.utils.BytesUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import static com.michelin.ns4kafka.models.quota.ResourceQuota.ResourceQuotaSpecKey.*;
import static com.michelin.ns4kafka.utils.BytesUtils.*;
import static org.apache.kafka.common.config.TopicConfig.RETENTION_BYTES_CONFIG;

@Slf4j
@Singleton
public class ResourceQuotaService {
/**
* Error message when the given quota is already exceeded
*/
private static final String QUOTA_ALREADY_EXCEEDED_ERROR = "Quota already exceeded for %s: %s/%s (used/limit)";

/**
* Quota response format
*/
Expand All @@ -30,11 +37,6 @@ public class ResourceQuotaService {
*/
private static final String NO_QUOTA_RESPONSE_FORMAT = "%s";

/**
* Limit to display when there is no quota
*/
private static final String UNLIMITED_QUOTA = "INF";

/**
* Role binding repository
*/
Expand Down Expand Up @@ -96,54 +98,103 @@ public void delete(ResourceQuota resourceQuota) {
* @param resourceQuota The new resource quota
* @return A list of validation errors
*/
public List<String> validateNewQuotaAgainstCurrentResource(Namespace namespace, ResourceQuota resourceQuota) {
public List<String> validateNewResourceQuota(Namespace namespace, ResourceQuota resourceQuota) {
List<String> errors = new ArrayList<>();

Arrays.stream(values()).forEach(quotaKey -> {
if (resourceQuota.getSpec().get(quotaKey.getKey()) != null) {
int used = getCurrentUsedResource(namespace, quotaKey);
int limit = Integer.parseInt(resourceQuota.getSpec().get(quotaKey.getKey()));
if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_TOPICS.getKey()))) {
long used = getCurrentCountTopics(namespace);
long limit = Long.parseLong(resourceQuota.getSpec().get(COUNT_TOPICS.getKey()));
if (used > limit) {
errors.add(String.format(QUOTA_ALREADY_EXCEEDED_ERROR, COUNT_TOPICS, used, limit));
}
}

if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_PARTITIONS.getKey()))) {
long used = getCurrentCountPartitions(namespace);
long limit = Long.parseLong(resourceQuota.getSpec().get(COUNT_PARTITIONS.getKey()));
if (used > limit) {
errors.add(String.format(QUOTA_ALREADY_EXCEEDED_ERROR, COUNT_PARTITIONS, used, limit));
}
}

if (StringUtils.isNotBlank(resourceQuota.getSpec().get(DISK_TOPICS.getKey()))) {
String limitAsString = resourceQuota.getSpec().get(DISK_TOPICS.getKey());
if (!limitAsString.endsWith(BYTE) && !limitAsString.endsWith(KIBIBYTE) && !limitAsString.endsWith(MEBIBYTE) && !limitAsString.endsWith(GIBIBYTE)) {
errors.add(String.format("Invalid value for %s: value must end with either %s, %s, %s or %s",
DISK_TOPICS, BYTE, KIBIBYTE, MEBIBYTE, GIBIBYTE));
} else {
long used = getCurrentDiskTopics(namespace);
long limit = BytesUtils.humanReadableToBytes(limitAsString);
if (used > limit) {
errors.add(String.format("Quota already exceeded for %s: %s/%s (used/limit)", quotaKey, used, limit));
errors.add(String.format(QUOTA_ALREADY_EXCEEDED_ERROR, DISK_TOPICS,
BytesUtils.bytesToHumanReadable(used), limitAsString));
}
}
});
}

if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_CONNECTORS.getKey()))) {
long used = getCurrentCountConnectors(namespace);
long limit = Long.parseLong(resourceQuota.getSpec().get(COUNT_CONNECTORS.getKey()));
if (used > limit) {
errors.add(String.format(QUOTA_ALREADY_EXCEEDED_ERROR, COUNT_CONNECTORS, used, limit));
}
}

return errors;
}

/**
* For a given quota key, get the current value used by the namespace
* Get currently used number of topics
* @param namespace The namespace
* @param key The quota key
* @return The current value
* @return The number of topics
*/
public Integer getCurrentUsedResource(Namespace namespace, ResourceQuota.ResourceQuotaSpecKey key) {
if (COUNT_TOPICS.equals(key)) {
return topicService.findAllForNamespace(namespace).size();
}
public long getCurrentCountTopics(Namespace namespace) {
return topicService.findAllForNamespace(namespace).size();
}

if (COUNT_PARTITIONS.equals(key)) {
return topicService.findAllForNamespace(namespace)
.stream()
.map(topic -> topic.getSpec().getPartitions())
.reduce(0, Integer::sum);
}
/**
* Get currently used number of partitions
* @param namespace The namespace
* @return The number of partitions
*/
public long getCurrentCountPartitions(Namespace namespace) {
return topicService.findAllForNamespace(namespace)
.stream()
.map(topic -> topic.getSpec().getPartitions())
.reduce(0, Integer::sum)
.longValue();
}

if (COUNT_CONNECTORS.equals(key)) {
return kafkaConnectService.findAllForNamespace(namespace).size();
}
/**
* Get currently used topic disk in bytes
* @param namespace The namespace
* @return The number of topic disk
*/
public long getCurrentDiskTopics(Namespace namespace) {
return topicService.findAllForNamespace(namespace)
.stream()
.map(topic -> Long.parseLong(topic.getSpec().getConfigs().getOrDefault("retention.bytes", "0")) *
topic.getSpec().getPartitions())
.reduce(0L, Long::sum);
}

return 0;
/**
* Get currently used number of topics
* @param namespace The namespace
* @return The number of topics
*/
public long getCurrentCountConnectors(Namespace namespace) {
return kafkaConnectService.findAllForNamespace(namespace).size();
}

/**
* Validate the topic quota
* @param namespace The namespace
* @param topic The topic
* @param existingTopic The existing topic
* @param newTopic The new topic
* @return A list of errors
*/
public List<String> validateTopicQuota(Namespace namespace, Topic topic) {
public List<String> validateTopicQuota(Namespace namespace, Optional<Topic> existingTopic, Topic newTopic) {
Optional<ResourceQuota> resourceQuotaOptional = findByNamespace(namespace.getMetadata().getName());
if (resourceQuotaOptional.isEmpty()) {
return List.of();
Expand All @@ -152,20 +203,42 @@ public List<String> validateTopicQuota(Namespace namespace, Topic topic) {
List<String> errors = new ArrayList<>();
ResourceQuota resourceQuota = resourceQuotaOptional.get();

if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_TOPICS.getKey()))) {
int used = getCurrentUsedResource(namespace, COUNT_TOPICS);
int limit = Integer.parseInt(resourceQuota.getSpec().get(COUNT_TOPICS.toString()));
if (used + 1 > limit) {
errors.add(String.format("Exceeding quota for %s: %s/%s (used/limit). Cannot add 1 topic.", COUNT_TOPICS, used, limit));
// Check count topics and count partitions only at creation
if (existingTopic.isEmpty()) {
if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_TOPICS.getKey()))) {
long used = getCurrentCountTopics(namespace);
long limit = Long.parseLong(resourceQuota.getSpec().get(COUNT_TOPICS.getKey()));
if (used + 1 > limit) {
errors.add(String.format("Exceeding quota for %s: %s/%s (used/limit). Cannot add 1 topic.", COUNT_TOPICS, used, limit));
}
}

if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_PARTITIONS.getKey()))) {
long used = getCurrentCountPartitions(namespace);
long limit = Long.parseLong(resourceQuota.getSpec().get(COUNT_PARTITIONS.getKey()));
if (used + newTopic.getSpec().getPartitions() > limit) {
errors.add(String.format("Exceeding quota for %s: %s/%s (used/limit). Cannot add %s partition(s).", COUNT_PARTITIONS, used, limit, newTopic.getSpec().getPartitions()));
}
}
}

if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_PARTITIONS.getKey()))) {
int used = getCurrentUsedResource(namespace, COUNT_PARTITIONS);
int limit = Integer.parseInt(resourceQuota.getSpec().get(COUNT_PARTITIONS.toString()));
if (used + topic.getSpec().getPartitions() > limit) {
errors.add(String.format("Exceeding quota for %s: %s/%s (used/limit). Cannot add %s partition(s).", COUNT_PARTITIONS, used, limit, topic.getSpec().getPartitions()));
if (StringUtils.isNotBlank(resourceQuota.getSpec().get(DISK_TOPICS.getKey())) &&
StringUtils.isNotBlank(newTopic.getSpec().getConfigs().get(RETENTION_BYTES_CONFIG))) {
long used = getCurrentDiskTopics(namespace);
long limit = BytesUtils.humanReadableToBytes(resourceQuota.getSpec().get(DISK_TOPICS.getKey()));

long newTopicSize = Long.parseLong(newTopic.getSpec().getConfigs().get(RETENTION_BYTES_CONFIG)) * newTopic.getSpec().getPartitions();
long existingTopicSize = existingTopic
.map(value -> Long.parseLong(value.getSpec().getConfigs().getOrDefault(RETENTION_BYTES_CONFIG, "0"))
* value.getSpec().getPartitions())
.orElse(0L);

long bytesToAdd = newTopicSize - existingTopicSize;
if (bytesToAdd > 0 && used + bytesToAdd > limit) {
errors.add(String.format("Exceeding quota for %s: %s/%s (used/limit). Cannot add %s of data.", DISK_TOPICS,
BytesUtils.bytesToHumanReadable(used), BytesUtils.bytesToHumanReadable(limit), BytesUtils.bytesToHumanReadable(bytesToAdd)));
}

}

return errors;
Expand All @@ -186,8 +259,8 @@ public List<String> validateConnectorQuota(Namespace namespace) {
ResourceQuota resourceQuota = resourceQuotaOptional.get();

if (StringUtils.isNotBlank(resourceQuota.getSpec().get(COUNT_CONNECTORS.getKey()))) {
int used = getCurrentUsedResource(namespace, COUNT_CONNECTORS);
int limit = Integer.parseInt(resourceQuota.getSpec().get(COUNT_CONNECTORS.toString()));
long used = getCurrentCountConnectors(namespace);
long limit = Long.parseLong(resourceQuota.getSpec().get(COUNT_CONNECTORS.getKey()));
if (used + 1 > limit) {
errors.add(String.format("Exceeding quota for %s: %s/%s (used/limit). Cannot add 1 connector.", COUNT_CONNECTORS, used, limit));
}
Expand All @@ -203,23 +276,32 @@ public List<String> validateConnectorQuota(Namespace namespace) {
* @return A list of quotas as response format
*/
public ResourceQuotaResponse toResponse(Namespace namespace, Optional<ResourceQuota> resourceQuota) {
String countTopic = resourceQuota.isPresent() ? String.format(QUOTA_RESPONSE_FORMAT, getCurrentUsedResource(namespace, COUNT_TOPICS),
resourceQuota.map(quota -> quota.getSpec().get(COUNT_TOPICS.toString())).orElse(UNLIMITED_QUOTA)) :
String.format(NO_QUOTA_RESPONSE_FORMAT, getCurrentUsedResource(namespace, COUNT_TOPICS));

String countPartition = resourceQuota.isPresent() ? String.format(QUOTA_RESPONSE_FORMAT, getCurrentUsedResource(namespace, COUNT_PARTITIONS),
resourceQuota.map(quota -> quota.getSpec().get(COUNT_PARTITIONS.toString())).orElse(UNLIMITED_QUOTA)) :
String.format(NO_QUOTA_RESPONSE_FORMAT, getCurrentUsedResource(namespace, COUNT_PARTITIONS));

String countConnector = resourceQuota.isPresent() ? String.format(QUOTA_RESPONSE_FORMAT, getCurrentUsedResource(namespace, COUNT_CONNECTORS),
resourceQuota.map(quota -> quota.getSpec().get(COUNT_CONNECTORS.toString())).orElse(UNLIMITED_QUOTA)) :
String.format(NO_QUOTA_RESPONSE_FORMAT, getCurrentUsedResource(namespace, COUNT_CONNECTORS));
long currentCountTopic = getCurrentCountTopics(namespace);
String countTopic = resourceQuota.isPresent() && StringUtils.isNotBlank(resourceQuota.get().getSpec().get(COUNT_TOPICS.getKey())) ?
String.format(QUOTA_RESPONSE_FORMAT, currentCountTopic, resourceQuota.get().getSpec().get(COUNT_TOPICS.getKey())) :
String.format(NO_QUOTA_RESPONSE_FORMAT, currentCountTopic);

long currentCountPartition = getCurrentCountPartitions(namespace);
String countPartition = resourceQuota.isPresent() && StringUtils.isNotBlank(resourceQuota.get().getSpec().get(COUNT_PARTITIONS.getKey())) ?
String.format(QUOTA_RESPONSE_FORMAT, currentCountPartition, resourceQuota.get().getSpec().get(COUNT_PARTITIONS.getKey())) :
String.format(NO_QUOTA_RESPONSE_FORMAT, currentCountPartition);

long currentDiskTopic = getCurrentDiskTopics(namespace);
String diskTopic = resourceQuota.isPresent() && StringUtils.isNotBlank(resourceQuota.get().getSpec().get(DISK_TOPICS.getKey())) ?
String.format(QUOTA_RESPONSE_FORMAT, BytesUtils.bytesToHumanReadable(currentDiskTopic), resourceQuota.get().getSpec().get(DISK_TOPICS.getKey())) :
String.format(NO_QUOTA_RESPONSE_FORMAT, BytesUtils.bytesToHumanReadable(currentDiskTopic));

long currentCountConnector = getCurrentCountConnectors(namespace);
String countConnector = resourceQuota.isPresent() && StringUtils.isNotBlank(resourceQuota.get().getSpec().get(COUNT_CONNECTORS.getKey())) ?
String.format(QUOTA_RESPONSE_FORMAT, currentCountConnector, resourceQuota.get().getSpec().get(COUNT_CONNECTORS.getKey())) :
String.format(NO_QUOTA_RESPONSE_FORMAT, currentCountConnector);

return ResourceQuotaResponse.builder()
.metadata(resourceQuota.map(ResourceQuota::getMetadata).orElse(null))
.spec(ResourceQuotaResponse.ResourceQuotaResponseSpec.builder()
.countTopic(countTopic)
.countPartition(countPartition)
.diskTopic(diskTopic)
.countConnector(countConnector)
.build())
.build();
Expand Down
Loading