Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added resource quota on count #203

Merged
merged 16 commits into from
Jul 18, 2022
1 change: 0 additions & 1 deletion api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ dependencies {
implementation("io.micronaut.security:micronaut-security-ldap")
implementation("io.micronaut.kafka:micronaut-kafka")


runtimeOnly("ch.qos.logback:logback-classic")

testAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,86 @@
@RolesAllowed(SecurityRule.IS_ANONYMOUS)
@Controller("/api-resources")
public class ApiResourcesController {
/**
* ACL resource definition
*/
public static final ResourceDefinition ACL = ResourceDefinition.builder()
.kind("AccessControlEntry")
.namespaced(true)
.synchronizable(false)
.path("acls")
.names(List.of("acls", "acl", "ac"))
.build();

/**
* Connector resource definition
*/
public static final ResourceDefinition CONNECTOR = ResourceDefinition.builder()
.kind("Connector")
.namespaced(true)
.synchronizable(true)
.path("connects")
.names(List.of("connects", "connect", "co"))
.build();

/**
* Kafka Streams resource definition
*/
public static final ResourceDefinition KSTREAM = ResourceDefinition.builder()
.kind("KafkaStream")
.namespaced(true)
.synchronizable(false)
.path("streams")
.names(List.of("streams", "stream", "st"))
.build();

/**
* Role binding resource definition
*/
public static final ResourceDefinition ROLE_BINDING = ResourceDefinition.builder()
.kind("RoleBinding")
.namespaced(true)
.synchronizable(false)
.path("role-bindings")
.names(List.of("rolebindings", "rolebinding", "rb"))
.build();

/**
* Topic resource definition
*/
public static final ResourceDefinition TOPIC = ResourceDefinition.builder()
.kind("Topic")
.namespaced(true)
.synchronizable(true)
.path("topics")
.names(List.of("topics", "topic", "to"))
.build();

/**
* Schema resource definition
*/
public static final ResourceDefinition SCHEMA = ResourceDefinition.builder()
.kind("Schema")
.namespaced(true)
.synchronizable(false)
.path("schemas")
.names(List.of("schemas", "schema", "sc"))
.build();

/**
* Resource quota resource definition
*/
public static final ResourceDefinition RESOURCE_QUOTA = ResourceDefinition.builder()
.kind("ResourceQuota")
.namespaced(true)
.synchronizable(false)
.path("resource-quotas")
.names(List.of("resource-quotas", "resource-quota", "quotas", "quota", "qu"))
.build();

/**
* Namespace resource definition
*/
public static final ResourceDefinition NAMESPACE = ResourceDefinition.builder()
.kind("Namespace")
.namespaced(false)
Expand All @@ -72,27 +110,38 @@ public class ApiResourcesController {
.names(List.of("namespaces", "namespace", "ns"))
.build();

/**
* Role binding repository
*/
@Inject
RoleBindingRepository roleBindingRepository;

/**
* List all the API resources
* @param authentication The authentication
* @return The list of API resources
*/
@Get
public List<ResourceDefinition> list(@Nullable Authentication authentication) {
List<ResourceDefinition> all = List.of(
ACL,
CONNECTOR,
KSTREAM,
ROLE_BINDING,
RESOURCE_QUOTA,
TOPIC,
NAMESPACE,
SCHEMA
);
if(authentication==null){

if (authentication == null) {
return all; // Backward compatibility for cli <= 1.3.0
}

List<String> roles = (List<String>)authentication.getAttributes().getOrDefault("roles", List.of());
List<String> groups = (List<String>) authentication.getAttributes().getOrDefault("groups",List.of());

if(roles.contains(ResourceBasedSecurityRule.IS_ADMIN)) {
if (roles.contains(ResourceBasedSecurityRule.IS_ADMIN)) {
return all;
}

Expand All @@ -101,6 +150,7 @@ public List<ResourceDefinition> list(@Nullable Authentication authentication) {
.flatMap(roleBinding -> roleBinding.getSpec().getRole().getResourceTypes().stream())
.distinct()
.collect(Collectors.toList());

return all.stream()
.filter(resourceDefinition -> authorizedResources.contains(resourceDefinition.getPath()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.michelin.ns4kafka.models.connector.Connector;
import com.michelin.ns4kafka.models.Namespace;
import com.michelin.ns4kafka.services.KafkaConnectService;
import com.michelin.ns4kafka.services.ResourceQuotaService;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MutableHttpResponse;
Expand All @@ -28,14 +29,20 @@ public class ConnectController extends NamespacedResourceController {
/**
* Message threw when namespace is not owner of the given connector
*/
private final String NAMESPACE_NOT_OWNER = "Namespace not owner of this connector %s.";
private static final String NAMESPACE_NOT_OWNER = "Namespace not owner of this connector %s.";

/**
* Connector service
*/
@Inject
KafkaConnectService kafkaConnectService;

/**
* The resource quota service
*/
@Inject
ResourceQuotaService resourceQuotaService;

/**
* Get all the connectors by namespace
* @param namespace The namespace
Expand Down Expand Up @@ -155,11 +162,16 @@ public Single<HttpResponse<Connector>> apply(String namespace, @Valid @Body Conn
return Single.just(formatHttpResponse(connector, status));
}

sendEventLog(connector.getKind(),
connector.getMetadata(),
status,
existingConnector.<Object>map(Connector::getSpec).orElse(null),
connector.getSpec());
// Only check quota on connector creation
if (status.equals(ApplyStatus.created)) {
List<String> quotaErrors = resourceQuotaService.validateConnectorQuota(ns);
if (!quotaErrors.isEmpty()) {
return Single.error(new ResourceValidationException(quotaErrors, connector.getKind(), connector.getMetadata().getName()));
}
}

sendEventLog(connector.getKind(), connector.getMetadata(), status,
existingConnector.<Object>map(Connector::getSpec).orElse(null), connector.getSpec());

return Single.just(formatHttpResponse(kafkaConnectService.createOrUpdate(connector), status));
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.michelin.ns4kafka.controllers;

import com.michelin.ns4kafka.models.Namespace;
import com.michelin.ns4kafka.models.quota.ResourceQuota;
import com.michelin.ns4kafka.models.quota.ResourceQuotaResponse;
import com.michelin.ns4kafka.services.ResourceQuotaService;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.annotation.*;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.swagger.v3.oas.annotations.tags.Tag;

import javax.inject.Inject;
import javax.validation.Valid;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Optional;

@Tag(name = "Resource Quota")
@Controller(value = "/api/namespaces/{namespace}/resource-quotas")
@ExecuteOn(TaskExecutors.IO)
public class ResourceQuotaController extends NamespacedResourceController {
/**
* The resource quota service
*/
@Inject
ResourceQuotaService resourceQuotaService;

/**
* Get all the quotas by namespace
* @param namespace The namespace
* @return Listed quotas
*/
@Get
public List<ResourceQuotaResponse> list(String namespace) {
Namespace ns = getNamespace(namespace);
return List.of(resourceQuotaService.toResponse(ns, resourceQuotaService.findByNamespace(namespace)));
}

/**
* Get a quota by namespace and name
* @param namespace The name
* @param quota The quota name
* @return Listed quotas
*/
@Get("/{quota}")
public Optional<ResourceQuotaResponse> get(String namespace, String quota) {
Namespace ns = getNamespace(namespace);
Optional<ResourceQuota> resourceQuota = resourceQuotaService.findByName(namespace, quota);
if (resourceQuota.isEmpty()) {
return Optional.empty();
}
return Optional.of(resourceQuotaService.toResponse(ns, resourceQuota));
}

/**
* Publish a resource quota
* @param namespace The namespace
* @param quota The resource quota
* @param dryrun Does the creation is a dry run
* @return The created role binding
*/
@Post("/{?dryrun}")
HttpResponse<ResourceQuota> apply(String namespace, @Body @Valid ResourceQuota quota, @QueryValue(defaultValue = "false") boolean dryrun){
Namespace ns = getNamespace(namespace);

quota.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
quota.getMetadata().setCluster(ns.getMetadata().getCluster());
quota.getMetadata().setNamespace(namespace);

List<String> validationErrors = resourceQuotaService.validateNewQuotaAgainstCurrentResource(ns, quota);
if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(validationErrors, quota.getKind(), quota.getMetadata().getName());
}

Optional<ResourceQuota> resourceQuotaOptional = resourceQuotaService.findByNamespace(namespace);
if (resourceQuotaOptional.isPresent() && resourceQuotaOptional.get().equals(quota)) {
return formatHttpResponse(quota, ApplyStatus.unchanged);
}

ApplyStatus status = resourceQuotaOptional.isPresent() ? ApplyStatus.changed : ApplyStatus.created;
if (dryrun) {
return formatHttpResponse(quota, status);
}

sendEventLog(quota.getKind(), quota.getMetadata(), status,
resourceQuotaOptional.<Object>map(ResourceQuota::getSpec).orElse(null), quota.getSpec());

return formatHttpResponse(resourceQuotaService.create(quota), status);
}

/**
* Delete a resource quota
* @param namespace The namespace
* @param name The resource quota
* @param dryrun Is dry run mode or not ?
* @return An HTTP response
*/
@Delete("/{name}{?dryrun}")
@Status(HttpStatus.NO_CONTENT)
public HttpResponse<Void> delete(String namespace, String name, @QueryValue(defaultValue = "false") boolean dryrun) {
Optional<ResourceQuota> resourceQuota = resourceQuotaService.findByName(namespace, name);
if (resourceQuota.isEmpty()) {
return HttpResponse.notFound();
}

if (dryrun) {
return HttpResponse.noContent();
}

ResourceQuota resourceQuotaToDelete = resourceQuota.get();
sendEventLog(resourceQuotaToDelete .getKind(), resourceQuotaToDelete.getMetadata(), ApplyStatus.deleted,
resourceQuotaToDelete.getSpec(), null);
resourceQuotaService.delete(resourceQuotaToDelete);
return HttpResponse.noContent();
}
}
Loading