Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Connect cluster as resource to Ns4Kafka #214

Merged
merged 15 commits into from
Nov 2, 2022
29 changes: 27 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ spec:
```

Available options :
- **spec.resourceType**: TOPIC, GROUP, CONNECT
- **spec.resourceType**: TOPIC, GROUP, CONNECT, CONNECT_CLUSTER
- **spec.resourcePatternType**: PREFIXED, LITERAL
- **spec.permission**: READ, WRITE

Expand Down Expand Up @@ -261,6 +261,31 @@ user@local:/home/user$ kafkactl apply -f connector.yml
Success Connector/test.connect1 (created)
```

### Connect Cluster

This resource declares a Connect cluster that has been self-deployed so namespace are autonomous to deploy connectors on it
without any Ns4Kafka outage.

```yaml
---
apiVersion: v1
kind: ConnectCluster
metadata:
name: test.myConnectCluster
spec:
url: http://localhost:8083
username: myUsername
password: myPassword
```

```bash
user@local:/home/user$ kafkactl apply -f connect-cluster.yml
Success ConnectCluster/test.myConnectCluster (created)
```

**metadata.name** should not collide with the name of a Connect cluster declared in the Ns4Kafka configuration.
An error message will be thrown otherwise.

### Kafka Streams

This resource only grants the necessary Kafka ACLs for your Kafka Stream to work properly (if you have internal topics). It doesn’t do anything with your actual Kafka Stream code or Kafka Stream deployment.
Expand Down Expand Up @@ -779,7 +804,7 @@ Success Namespace/test (changed)

It is possible to define quotas on a namespace. Ideal for clusters with limited resources!

A namespace with quotas will not be able to exceed the limits enforced by the quotas.
A namespace with quotas will not be able to exceed the limits enforced by these quotas.

```yaml
apiVersion: v1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.michelin.ns4kafka.controllers;
package com.michelin.ns4kafka.config;

import io.micronaut.context.annotation.ConfigurationProperties;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.michelin.ns4kafka.services.executors;
package com.michelin.ns4kafka.config;

import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.EachProperty;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.michelin.ns4kafka.repositories.kafka;
package com.michelin.ns4kafka.config;

import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Property;
import io.micronaut.core.convert.format.MapFormat;

import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.michelin.ns4kafka.security;
package com.michelin.ns4kafka.config;

import com.michelin.ns4kafka.security.local.LocalUser;
import io.micronaut.context.annotation.ConfigurationProperties;
Expand All @@ -13,4 +13,5 @@
public class SecurityConfig {
private List<LocalUser> localUsers;
private String adminGroup;
private String aes256EncryptionKey;
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.michelin.ns4kafka.controllers;

import com.michelin.ns4kafka.controllers.generic.NamespacedResourceController;
import com.michelin.ns4kafka.models.AccessControlEntry;
import com.michelin.ns4kafka.models.Namespace;
import com.michelin.ns4kafka.security.ResourceBasedSecurityRule;
import com.michelin.ns4kafka.services.AccessControlEntryService;
import com.michelin.ns4kafka.services.NamespaceService;
import com.michelin.ns4kafka.utils.enums.ApplyStatus;
import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.annotation.*;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.michelin.ns4kafka.controllers;

import com.michelin.ns4kafka.config.AkhqClaimProviderControllerConfig;
import com.michelin.ns4kafka.models.AccessControlEntry;
import com.michelin.ns4kafka.services.AccessControlEntryService;
import com.michelin.ns4kafka.services.NamespaceService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ApiResourcesController {
.kind("Connector")
.namespaced(true)
.synchronizable(true)
.path("connects")
.path("connectors")
.names(List.of("connects", "connect", "co"))
.build();

Expand Down Expand Up @@ -99,6 +99,17 @@ public class ApiResourcesController {
.names(List.of("resource-quotas", "resource-quota", "quotas", "quota", "qu"))
.build();

/**
* Connect worker resource definition
*/
public static final ResourceDefinition CONNECT_CLUSTER = ResourceDefinition.builder()
.kind("ConnectCluster")
.namespaced(true)
.synchronizable(false)
.path("connect-clusters")
.names(List.of("connect-clusters", "connect-cluster", "cc"))
.build();

/**
* Namespace resource definition
*/
Expand Down Expand Up @@ -129,6 +140,7 @@ public List<ResourceDefinition> list(@Nullable Authentication authentication) {
KSTREAM,
ROLE_BINDING,
RESOURCE_QUOTA,
CONNECT_CLUSTER,
TOPIC,
NAMESPACE,
SCHEMA
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package com.michelin.ns4kafka.controllers;

import com.michelin.ns4kafka.controllers.generic.NamespacedResourceController;
import com.michelin.ns4kafka.models.ConnectCluster;
import com.michelin.ns4kafka.models.Namespace;
import com.michelin.ns4kafka.models.connector.Connector;
import com.michelin.ns4kafka.services.ConnectClusterService;
import com.michelin.ns4kafka.services.ConnectorService;
import com.michelin.ns4kafka.utils.enums.ApplyStatus;
import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.annotation.*;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.swagger.v3.oas.annotations.tags.Tag;

import javax.inject.Inject;
import javax.validation.Valid;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

@Tag(name = "Connect Clusters")
@Controller(value = "/api/namespaces/{namespace}/connect-clusters")
@ExecuteOn(TaskExecutors.IO)
public class ConnectClusterController extends NamespacedResourceController {
@Inject
ConnectClusterService connectClusterService;

@Inject
ConnectorService connectorService;

/**
* Get all the Connect workers by namespace
* @param namespace The namespace
* @return A list of connectors
*/
@Get
public List<ConnectCluster> list(String namespace) {
return connectClusterService.findAllByNamespaceOwner(getNamespace(namespace));
}

/**
* Get the last version of a connector by namespace and name
* @param namespace The namespace
* @param connectCluster The name
* @return A Connect worker
*/
@Get("/{connectCluster}")
public Optional<ConnectCluster> getConnectCluster(String namespace, String connectCluster) {
return connectClusterService.findByNamespaceAndNameOwner(getNamespace(namespace), connectCluster);
}

/**
* Publish a Connect worker
* @param namespace The namespace
* @param connectCluster The connect worker
* @param dryrun Does the creation is a dry run
* @return The created role binding
*/
@Post("/{?dryrun}")
HttpResponse<ConnectCluster> apply(String namespace, @Body @Valid ConnectCluster connectCluster, @QueryValue(defaultValue = "false") boolean dryrun) throws Exception {
Namespace ns = getNamespace(namespace);

List<String> validationErrors = new ArrayList<>();
if (!connectClusterService.isNamespaceOwnerOfConnectCluster(ns, connectCluster.getMetadata().getName())) {
validationErrors.add(String.format("Namespace not owner of this Connect cluster %s.", connectCluster.getMetadata().getName()));
}

validationErrors.addAll(connectClusterService.validateConnectClusterCreation(connectCluster));

if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(validationErrors, connectCluster.getKind(), connectCluster.getMetadata().getName());
}

connectCluster.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
connectCluster.getMetadata().setCluster(ns.getMetadata().getCluster());
connectCluster.getMetadata().setNamespace(ns.getMetadata().getName());

Optional<ConnectCluster> existingConnectCluster = connectClusterService.findByNamespaceAndNameOwner(ns, connectCluster.getMetadata().getName());
if (existingConnectCluster.isPresent() && existingConnectCluster.get().equals(connectCluster)) {
return formatHttpResponse(existingConnectCluster.get(), ApplyStatus.unchanged);
}

ApplyStatus status = existingConnectCluster.isPresent() ? ApplyStatus.changed : ApplyStatus.created;
if (dryrun) {
return formatHttpResponse(connectCluster, status);
}

sendEventLog(connectCluster.getKind(), connectCluster.getMetadata(), status, existingConnectCluster.<Object>map(ConnectCluster::getSpec).orElse(null),
connectCluster.getSpec());

return formatHttpResponse(connectClusterService.create(connectCluster), status);
}

/**
* Delete Connect cluster by the given name
* @param namespace The current namespace
* @param connectCluster The current connect cluster name to delete
* @param dryrun Run in dry mode or not
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{connectCluster}{?dryrun}")
public HttpResponse<Void> delete(String namespace, String connectCluster, @QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

List<String> validationErrors = new ArrayList<>();
if (!connectClusterService.isNamespaceOwnerOfConnectCluster(ns, connectCluster)) {
validationErrors.add(String.format("Namespace not owner of this Connect cluster %s.", connectCluster));
}

List<Connector> connectors = connectorService.findAllByConnectCluster(ns, connectCluster);
if (!connectors.isEmpty()) {
validationErrors.add(String.format("The Connect cluster %s has %s deployed connector(s): %s. Please remove the associated connector(s) before deleting it.", connectCluster, connectors.size(),
connectors.stream().map(connector -> connector.getMetadata().getName()).collect(Collectors.joining(", "))));
}

if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(validationErrors, "ConnectCluster", connectCluster);
}

Optional<ConnectCluster> optionalConnectCluster = connectClusterService.findByNamespaceAndNameOwner(ns, connectCluster);
if (optionalConnectCluster.isEmpty()) {
return HttpResponse.notFound();
}

if (dryrun) {
return HttpResponse.noContent();
}

ConnectCluster connectClusterToDelete = optionalConnectCluster.get();
sendEventLog(connectClusterToDelete.getKind(), connectClusterToDelete.getMetadata(), ApplyStatus.deleted, connectClusterToDelete.getSpec(), null);

connectClusterService.delete(connectClusterToDelete);
return HttpResponse.noContent();
}
}
Loading