Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return list of deleted resources in resources deletion API #470

Merged
merged 3 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Optional<Connector> get(String namespace, String connector) {
*
* @param namespace The namespace
* @param connector The connector to create
* @param dryrun Does the creation is a dry run
* @param dryrun Is dry run mode or not?
* @return The created connector
*/
@Post("{?dryrun}")
Expand Down Expand Up @@ -162,7 +162,7 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
*
* @param namespace The current namespace
* @param connector The current connector name to delete
* @param dryrun Run in dry mode or not
* @param dryrun Is dry run mode or not?
* @return A HTTP response
* @deprecated use {@link #bulkDelete(String, String, boolean)} instead.
*/
Expand Down Expand Up @@ -206,14 +206,14 @@ public Mono<HttpResponse<Void>> delete(String namespace, String connector,
* Delete connectors.
*
* @param namespace The current namespace
* @param name The name parameter
* @param dryrun Run in dry mode or not
* @param name The name parameter
* @param dryrun Run in dry mode or not?
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Status(HttpStatus.OK)
@Delete
public Mono<HttpResponse<Void>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
public Mono<HttpResponse<List<Connector>>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

List<Connector> connectors = connectorService.findByWildcardName(ns, name);
Expand All @@ -233,15 +233,15 @@ public Mono<HttpResponse<Void>> bulkDelete(String namespace, @QueryValue(default
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
return Mono.just(HttpResponse.ok(connectors));
}

return Flux.fromIterable(connectors)
.flatMap(connector -> {
sendEventLog(connector, ApplyStatus.deleted, connector.getSpec(), null, EMPTY_STRING);
return connectorService.delete(ns, connector);
})
.then(Mono.just(HttpResponse.noContent()));
.then(Mono.just(HttpResponse.ok(connectors)));
}

/**
Expand Down Expand Up @@ -305,7 +305,7 @@ public Mono<MutableHttpResponse<ChangeConnectorState>> changeState(String namesp
* Import unsynchronized connectors.
*
* @param namespace The namespace
* @param dryrun Is dry run mode or not ?
* @param dryrun Is dry run mode or not?
* @return The list of imported connectors
*/
@Post("/_/import{?dryrun}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ConsumerGroupController extends NamespacedResourceController {
* @param namespace The namespace
* @param consumerGroup The consumer group
* @param consumerGroupResetOffsets The information about how to reset
* @param dryrun Is dry run mode or not ?
* @param dryrun Is dry run mode or not?
* @return The reset offsets response
*/
@Post("/{consumerGroup}/reset{?dryrun}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Optional<Namespace> get(String namespace) {
* Create a namespace.
*
* @param namespace The namespace
* @param dryrun Does the creation is a dry run
* @param dryrun Is dry run mode or not?
* @return The created namespace
*/
@Post("{?dryrun}")
Expand Down Expand Up @@ -118,19 +118,21 @@ public HttpResponse<Namespace> apply(@Valid @Body Namespace namespace,
* Delete a namespace.
*
* @param namespace The namespace
* @param dryrun Is dry run mode or not ?
* @param dryrun Is dry run mode or not?
* @return An HTTP response
* @deprecated use bulkDelete instead.
*/
@Delete("/{namespace}{?dryrun}")
@Deprecated(since = "1.13.0")
public HttpResponse<Void> delete(String namespace, @QueryValue(defaultValue = "false") boolean dryrun) {
Optional<Namespace> optionalNamespace = namespaceService.findByName(namespace);

if (optionalNamespace.isEmpty()) {
return HttpResponse.notFound();
}

List<String> namespaceResources = namespaceService.findAllResourcesByNamespace(optionalNamespace.get());

if (!namespaceResources.isEmpty()) {
List<String> validationErrors = namespaceResources
.stream()
Expand All @@ -143,21 +145,31 @@ public HttpResponse<Void> delete(String namespace, @QueryValue(defaultValue = "f
return HttpResponse.noContent();
}

performDeletion(optionalNamespace.get());
sendEventLog(
optionalNamespace.get(),
ApplyStatus.deleted,
optionalNamespace.get().getSpec(),
null,
EMPTY_STRING
);

namespaceService.delete(optionalNamespace.get());

return HttpResponse.noContent();
}

/**
* Delete namespaces.
*
* @param dryrun Is dry run mode or not ?
* @param name The name parameter
* @param dryrun Is dry run mode or not?
* @param name The name parameter
* @return An HTTP response
*/
@Delete
public HttpResponse<Void> bulkDelete(@QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
public HttpResponse<List<Namespace>> bulkDelete(@QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
List<Namespace> namespaces = namespaceService.findByWildcardName(name);

if (namespaces.isEmpty()) {
return HttpResponse.notFound();
}
Expand All @@ -182,26 +194,21 @@ public HttpResponse<Void> bulkDelete(@QueryValue(defaultValue = "*") String name
}

if (dryrun) {
return HttpResponse.noContent();
return HttpResponse.ok(namespaces);
}

namespaces.forEach(this::performDeletion);
return HttpResponse.noContent();
}
namespaces.forEach(namespace -> {
sendEventLog(
namespace,
ApplyStatus.deleted,
namespace.getSpec(),
null,
EMPTY_STRING
);

/**
* Perform the deletion of the namespace and send an event log.
*
* @param namespace The namespace to delete
*/
private void performDeletion(Namespace namespace) {
sendEventLog(
namespace,
ApplyStatus.deleted,
namespace.getSpec(),
null,
EMPTY_STRING
);
namespaceService.delete(namespace);
namespaceService.delete(namespace);
});

return HttpResponse.ok(namespaces);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public Optional<RoleBinding> get(String namespace, String name) {
*
* @param namespace The namespace
* @param roleBinding The role binding
* @param dryrun Does the creation is a dry run
* @param dryrun Is dry run mode or not?
* @return The created role binding
*/
@Post("{?dryrun}")
Expand Down Expand Up @@ -147,17 +147,18 @@ public HttpResponse<Void> delete(String namespace, String name,
* @param dryrun Is dry run mode or not?
* @return An HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Status(HttpStatus.OK)
@Delete
public HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
public HttpResponse<List<RoleBinding>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
List<RoleBinding> roleBindings = roleBindingService.findByWildcardName(namespace, name);

if (roleBindings.isEmpty()) {
return HttpResponse.notFound();
}

if (dryrun) {
return HttpResponse.noContent();
return HttpResponse.ok(roleBindings);
}

roleBindings.forEach(roleBinding -> {
Expand All @@ -171,6 +172,6 @@ public HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue
roleBindingService.delete(roleBinding);
});

return HttpResponse.noContent();
return HttpResponse.ok(roleBindings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.Instant;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -88,7 +89,7 @@ public Mono<Schema> get(String namespace, String subject) {
*
* @param namespace The namespace
* @param schema The schema to create
* @param dryrun Does the creation is a dry run
* @param dryrun Is dry run mode or not?
* @return The created schema
*/
@Post
Expand Down Expand Up @@ -161,12 +162,12 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
* @param dryrun Run in dry mode or not?
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Status(HttpStatus.OK)
@Delete
public Mono<HttpResponse<Void>> bulkDelete(String namespace,
@QueryValue(defaultValue = "*") String name,
@QueryValue("version") Optional<String> versionOptional,
@QueryValue(defaultValue = "false") boolean dryrun) {
public Mono<HttpResponse<List<Schema>>> bulkDelete(String namespace,
@QueryValue(defaultValue = "*") String name,
@QueryValue("version") Optional<String> versionOptional,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

return schemaService.findByWildcardName(ns, name)
Expand All @@ -176,17 +177,22 @@ public Mono<HttpResponse<Void>> bulkDelete(String namespace,
.map(Optional::of)
.defaultIfEmpty(Optional.empty()))
.collectList()
.flatMap(schemas -> {
if (schemas.isEmpty() || schemas.stream().anyMatch(Optional::isEmpty)) {
.flatMap(optionalSchemas -> {
if (optionalSchemas.isEmpty() || optionalSchemas.stream().anyMatch(Optional::isEmpty)) {
return Mono.just(HttpResponse.notFound());
}

List<Schema> schemas = optionalSchemas
.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.toList();

if (dryrun) {
return Mono.just(HttpResponse.noContent());
return Mono.just(HttpResponse.ok(schemas));
}

return Flux.fromIterable(schemas)
.map(Optional::get)
.flatMap(schema -> (versionOptional.isEmpty()
? schemaService.deleteAllVersions(ns, schema.getMetadata().getName()) :
schemaService.deleteVersion(ns, schema.getMetadata().getName(), versionOptional.get()))
Expand All @@ -201,7 +207,7 @@ public Mono<HttpResponse<Void>> bulkDelete(String namespace,
);
return Mono.just(HttpResponse.noContent());
}))
.then(Mono.just(HttpResponse.noContent()));
.then(Mono.just(HttpResponse.ok(schemas)));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,19 +155,18 @@ HttpResponse<Void> delete(String namespace, String stream, @QueryValue(defaultVa
* @param dryrun Is dry run mode or not?
* @return An HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Status(HttpStatus.OK)
@Delete
HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
HttpResponse<List<KafkaStream>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

List<KafkaStream> kafkaStreams = streamService.findByWildcardName(ns, name);

List<String> validationErrors = kafkaStreams.stream()
.filter(kafkaStream ->
!streamService.isNamespaceOwnerOfKafkaStream(ns, kafkaStream.getMetadata().getName()))
.map(kafkaStream -> invalidOwner(kafkaStream.getMetadata().getName()))
.toList();
.filter(kafkaStream ->
!streamService.isNamespaceOwnerOfKafkaStream(ns, kafkaStream.getMetadata().getName()))
.map(kafkaStream -> invalidOwner(kafkaStream.getMetadata().getName()))
.toList();

if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(KAFKA_STREAM, name, validationErrors);
Expand All @@ -178,8 +177,9 @@ HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue = "*")
}

if (dryrun) {
return HttpResponse.noContent();
return HttpResponse.ok(kafkaStreams);
}

kafkaStreams.forEach(kafkaStream -> {
sendEventLog(
kafkaStream,
Expand All @@ -188,9 +188,10 @@ HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue = "*")
null,
EMPTY_STRING
);

streamService.delete(ns, kafkaStream);
});

return HttpResponse.noContent();
return HttpResponse.ok(kafkaStreams);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ public HttpResponse<AccessControlEntry> apply(Authentication authentication, Str
* @return An HTTP response
*/
@Delete
@Status(HttpStatus.NO_CONTENT)
public HttpResponse<Void> bulkDelete(Authentication authentication, String namespace,
@QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
@Status(HttpStatus.OK)
public HttpResponse<List<AccessControlEntry>> bulkDelete(Authentication authentication, String namespace,
@QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {

Namespace ns = getNamespace(namespace);
List<AccessControlEntry> acls = aclService.findAllGrantedByNamespaceByWildcardName(ns, name);
Expand All @@ -192,7 +192,7 @@ public HttpResponse<Void> bulkDelete(Authentication authentication, String names
}

if (dryrun) {
return HttpResponse.noContent();
return HttpResponse.ok(acls);
}

acls.forEach(acl -> {
Expand All @@ -205,7 +205,7 @@ public HttpResponse<Void> bulkDelete(Authentication authentication, String names
aclService.delete(acl);
});

return HttpResponse.noContent();
return HttpResponse.ok(acls);
}

/**
Expand Down
Loading