From 5ff8d2643b6e8efc8097125c407b008ed93e776e Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Tue, 26 Nov 2024 23:34:58 +0100 Subject: [PATCH] refactor(provider): optimize schema registry provider related-to: #498 --- pom.xml | 6 + .../api/AivenAsyncSchemaRegistryApi.java | 68 +++++------ .../connect/KafkaConnectExtensionConfig.java | 3 +- .../change/KafkaConnectorChangeComputer.java | 1 - .../change/KafkaConnectorChangeHandler.java | 13 +- .../reconciler/KafkaConnectorCollector.java | 7 +- .../reconciler/KafkaConnectorController.java | 13 +- providers/jikkou-provider-kafka/pom.xml | 2 - .../jikkou-provider-schema-registry/pom.xml | 4 + .../api/AsyncSchemaRegistryApiTest.java | 51 ++++---- .../SchemaRegistrySubjectCollectorTest.java | 6 +- .../registry/api/AsyncSchemaRegistryApi.java | 49 ++++---- .../api/DefaultAsyncSchemaRegistryApi.java | 48 ++++---- .../change/SchemaSubjectChangeComputer.java | 2 +- .../AbstractSchemaSubjectChangeHandler.java | 23 ++-- .../CreateSchemaSubjectChangeHandler.java | 8 +- .../DeleteSchemaSubjectChangeHandler.java | 10 +- .../UpdateSchemaSubjectChangeHandler.java | 16 ++- .../SchemaRegistrySubjectCollector.java | 115 ++++++------------ .../SchemaRegistrySubjectController.java | 9 +- .../SchemaCompatibilityValidation.java | 13 +- 21 files changed, 205 insertions(+), 262 deletions(-) diff --git a/pom.xml b/pom.xml index 9f82e3b9a..681075528 100644 --- a/pom.xml +++ b/pom.xml @@ -122,6 +122,7 @@ 1.4.3 24.1.0 3.1.0 + 3.6.6 2.0.9 1.5.6 @@ -236,6 +237,11 @@ aws-msk-iam-auth 2.1.0 + + io.projectreactor + reactor-core + ${reactor-core.version} + org.mockito diff --git a/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java b/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java index 06276061f..ba3b36826 100644 --- a/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java +++ b/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java @@ -8,18 +8,13 @@ import io.streamthoughts.jikkou.extension.aiven.api.data.CompatibilityCheckResponse; import io.streamthoughts.jikkou.schema.registry.api.AsyncSchemaRegistryApi; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion; +import io.streamthoughts.jikkou.schema.registry.api.data.*; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Mono; /** * AsyncSchemaRegistryApi implementation for Aiven. @@ -41,20 +36,17 @@ public AivenAsyncSchemaRegistryApi(final @NotNull AivenApiClient api) { * {@inheritDoc} **/ @Override - public CompletableFuture> listSubjects() { - return CompletableFuture.supplyAsync( - () -> api.listSchemaRegistrySubjects().subjects() - ); + public Mono> listSubjects() { + return Mono.fromCallable(() -> api.listSchemaRegistrySubjects().subjects()); } /** * {@inheritDoc} **/ @Override - public CompletableFuture> deleteSubjectVersions(@NotNull String subject, - boolean permanent) { - return CompletableFuture.supplyAsync( - () -> { + public Mono> deleteSubjectVersions(@NotNull String subject, + boolean permanent) { + return Mono.fromCallable(() -> { api.deleteSchemaRegistrySubject(subject); return Collections.emptyList(); } @@ -65,16 +57,16 @@ public CompletableFuture> deleteSubjectVersions(@NotNull String su * {@inheritDoc} **/ @Override - public CompletableFuture registerSubjectVersion(@NotNull String subject, - @NotNull SubjectSchemaRegistration schema, - boolean normalize) { + public Mono registerSubjectVersion(@NotNull String subject, + @NotNull SubjectSchemaRegistration schema, + boolean normalize) { // Drop references - not supported through the Aiven's API. SubjectSchemaRegistration registration = new SubjectSchemaRegistration( schema.schema(), schema.schemaType(), null ); - return CompletableFuture.supplyAsync( + return Mono.fromCallable( () -> new SubjectSchemaId(api.registerSchemaRegistrySubjectVersion(subject, registration).version()) ); } @@ -83,8 +75,8 @@ public CompletableFuture registerSubjectVersion(@NotNull String * {@inheritDoc} **/ @Override - public CompletableFuture getLatestSubjectSchema(@NotNull String subject) { - return CompletableFuture.supplyAsync( + public Mono getLatestSubjectSchema(@NotNull String subject) { + return Mono.fromCallable( () -> api.getSchemaRegistryLatestSubjectVersion(subject).version() ); } @@ -93,8 +85,8 @@ public CompletableFuture getLatestSubjectSchema(@NotNull S * {@inheritDoc} **/ @Override - public CompletableFuture getGlobalCompatibility() { - return CompletableFuture.supplyAsync( + public Mono getGlobalCompatibility() { + return Mono.fromCallable( () -> new CompatibilityLevelObject(api.getSchemaRegistryGlobalCompatibility().compatibilityLevel().name()) ); } @@ -103,9 +95,9 @@ public CompletableFuture getGlobalCompatibility() { * {@inheritDoc} **/ @Override - public CompletableFuture getSubjectCompatibilityLevel(@NotNull String subject, - boolean defaultToGlobal) { - return CompletableFuture.supplyAsync( + public Mono getSubjectCompatibilityLevel(@NotNull String subject, + boolean defaultToGlobal) { + return Mono.fromCallable( () -> new CompatibilityLevelObject(api.getSchemaRegistrySubjectCompatibility(subject).compatibilityLevel().name()) ); } @@ -114,9 +106,9 @@ public CompletableFuture getSubjectCompatibilityLevel( * {@inheritDoc} **/ @Override - public CompletableFuture updateSubjectCompatibilityLevel(@NotNull String subject, - @NotNull CompatibilityObject compatibility) { - return CompletableFuture.supplyAsync( + public Mono updateSubjectCompatibilityLevel(@NotNull String subject, + @NotNull CompatibilityObject compatibility) { + return Mono.fromCallable( () -> { api.updateSchemaRegistrySubjectCompatibility(subject, compatibility); return new CompatibilityObject(compatibility.compatibility()); @@ -128,7 +120,7 @@ public CompletableFuture updateSubjectCompatibilityLevel(@N * {@inheritDoc} **/ @Override - public CompletableFuture deleteSubjectCompatibilityLevel(@NotNull String subject) { + public Mono deleteSubjectCompatibilityLevel(@NotNull String subject) { throw new AivenApiClientException( "Deleting configuration for Schema Registry subject is not supported by " + "the Aiven API (for more information: https://api.aiven.io/doc/)." @@ -139,11 +131,11 @@ public CompletableFuture deleteSubjectCompatibilityLevel(@N * {@inheritDoc} **/ @Override - public CompletableFuture testCompatibility(@NotNull String subject, - String version, - boolean verbose, - @NotNull SubjectSchemaRegistration schema) { - return CompletableFuture.supplyAsync( + public Mono testCompatibility(@NotNull String subject, + String version, + boolean verbose, + @NotNull SubjectSchemaRegistration schema) { + return Mono.fromCallable( () -> { CompatibilityCheckResponse response = api.checkSchemaRegistryCompatibility(subject, version, schema); return new CompatibilityCheck( @@ -158,9 +150,9 @@ public CompletableFuture testCompatibility(@NotNull String s * {@inheritDoc} **/ @Override - public CompletableFuture testCompatibilityLatest(@NotNull String subject, - boolean verbose, - @NotNull SubjectSchemaRegistration schema) { + public Mono testCompatibilityLatest(@NotNull String subject, + boolean verbose, + @NotNull SubjectSchemaRegistration schema) { return testCompatibility(subject, "latest", verbose, schema); } diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/KafkaConnectExtensionConfig.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/KafkaConnectExtensionConfig.java index db1f83f24..9b402f288 100644 --- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/KafkaConnectExtensionConfig.java +++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/KafkaConnectExtensionConfig.java @@ -15,8 +15,6 @@ import io.streamthoughts.jikkou.core.models.HasMetadata; import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectClientConfig; import io.streamthoughts.jikkou.kafka.connect.exception.KafkaConnectClusterNotFoundException; -import org.jetbrains.annotations.NotNull; - import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -27,6 +25,7 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import org.jetbrains.annotations.NotNull; public final class KafkaConnectExtensionConfig { diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeComputer.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeComputer.java index a865a1574..6bd6b98e5 100644 --- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeComputer.java +++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeComputer.java @@ -16,7 +16,6 @@ import io.streamthoughts.jikkou.core.reconciler.change.ResourceChangeFactory; import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState; import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector; - import java.util.ArrayList; import java.util.Collections; import java.util.List; diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java index fe071fd3f..a21703e91 100644 --- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java +++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java @@ -6,6 +6,10 @@ */ package io.streamthoughts.jikkou.kafka.connect.change; +import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_CLASS_CONFIG; +import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG; +import static io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer.DATA_CONNECTOR_CLASS; + import io.streamthoughts.jikkou.core.data.TypeConverter; import io.streamthoughts.jikkou.core.models.change.ResourceChange; import io.streamthoughts.jikkou.core.models.change.SpecificStateChange; @@ -22,9 +26,6 @@ import io.streamthoughts.jikkou.kafka.connect.api.data.ConnectorInfoResponse; import io.streamthoughts.jikkou.kafka.connect.api.data.ErrorResponse; import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.VisibleForTesting; - import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,10 +35,8 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; - -import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_CLASS_CONFIG; -import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG; -import static io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer.DATA_CONNECTOR_CLASS; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.VisibleForTesting; public final class KafkaConnectorChangeHandler extends BaseChangeHandler { diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorCollector.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorCollector.java index c8c46129f..eb47a5618 100644 --- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorCollector.java +++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorCollector.java @@ -24,16 +24,15 @@ import io.streamthoughts.jikkou.kafka.connect.exception.KafkaConnectClusterNotFoundException; import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector; import io.streamthoughts.jikkou.kafka.connect.service.KafkaConnectClusterService; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A ResourceCollector to get {@link V1KafkaConnector} resources. diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorController.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorController.java index 9305ef105..a5e7ab4ec 100644 --- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorController.java +++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorController.java @@ -6,6 +6,11 @@ */ package io.streamthoughts.jikkou.kafka.connect.reconciler; +import static io.streamthoughts.jikkou.core.ReconciliationMode.CREATE; +import static io.streamthoughts.jikkou.core.ReconciliationMode.DELETE; +import static io.streamthoughts.jikkou.core.ReconciliationMode.FULL; +import static io.streamthoughts.jikkou.core.ReconciliationMode.UPDATE; + import io.streamthoughts.jikkou.core.ReconciliationContext; import io.streamthoughts.jikkou.core.annotation.SupportedResource; import io.streamthoughts.jikkou.core.extension.ContextualExtension; @@ -28,19 +33,13 @@ import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeDescription; import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeHandler; import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector; -import org.jetbrains.annotations.NotNull; - import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.function.Predicate; import java.util.stream.Collectors; - -import static io.streamthoughts.jikkou.core.ReconciliationMode.CREATE; -import static io.streamthoughts.jikkou.core.ReconciliationMode.DELETE; -import static io.streamthoughts.jikkou.core.ReconciliationMode.FULL; -import static io.streamthoughts.jikkou.core.ReconciliationMode.UPDATE; +import org.jetbrains.annotations.NotNull; @SupportedResource(type = V1KafkaConnector.class) @SupportedResource(apiVersion = ApiVersions.KAFKA_V1BETA, kind = "KafkaConnectorChange") diff --git a/providers/jikkou-provider-kafka/pom.xml b/providers/jikkou-provider-kafka/pom.xml index 99077f34f..4a065c1a7 100644 --- a/providers/jikkou-provider-kafka/pom.xml +++ b/providers/jikkou-provider-kafka/pom.xml @@ -23,7 +23,6 @@ ${project.parent.basedir}/header - 3.6.6 @@ -81,7 +80,6 @@ io.projectreactor reactor-core - ${reactor-core.version} diff --git a/providers/jikkou-provider-schema-registry/pom.xml b/providers/jikkou-provider-schema-registry/pom.xml index b40010f1f..8701b24f1 100644 --- a/providers/jikkou-provider-schema-registry/pom.xml +++ b/providers/jikkou-provider-schema-registry/pom.xml @@ -57,6 +57,10 @@ org.jsonschema2pojo jsonschema2pojo-maven-plugin + + io.projectreactor + reactor-core + org.mockito diff --git a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java index e8b1caf96..c80db6cf1 100644 --- a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java +++ b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java @@ -19,7 +19,6 @@ import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels; import io.streamthoughts.jikkou.schema.registry.model.SchemaType; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -36,6 +35,7 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; +import reactor.core.publisher.Mono; @Testcontainers @Tag("integration") @@ -102,10 +102,10 @@ public void beforeEach() { @Test void shouldGetGlobalCompatibilityLevel() throws ExecutionException, InterruptedException { // When - CompletableFuture future = async.getGlobalCompatibility(); + Mono future = async.getGlobalCompatibility(); // Then - CompatibilityLevelObject result = future.get(); + CompatibilityLevelObject result = future.block(); Assertions.assertEquals("BACKWARD", result.compatibilityLevel()); } @@ -113,10 +113,10 @@ void shouldGetGlobalCompatibilityLevel() throws ExecutionException, InterruptedE @Test void shouldListSchemaForEmptySubject() throws ExecutionException, InterruptedException { // When - CompletableFuture> future = async.listSubjects(); + Mono> future = async.listSubjects(); // Then - List results = future.get(); + List results = future.block(); Assertions.assertTrue(results.isEmpty()); } @@ -124,14 +124,14 @@ void shouldListSchemaForEmptySubject() throws ExecutionException, InterruptedExc @Test void shouldRegisterSchemaVersionForNewSubject() throws ExecutionException, InterruptedException { // When - CompletableFuture future = async.registerSubjectVersion( + Mono future = async.registerSubjectVersion( TEST_SUBJECT, new SubjectSchemaRegistration(AVRO_SCHEMA, SchemaType.AVRO), true ); // Then - SubjectSchemaId result = future.get(); + SubjectSchemaId result = future.block(); Assertions.assertEquals(1, result.id()); } @@ -139,26 +139,26 @@ void shouldRegisterSchemaVersionForNewSubject() throws ExecutionException, Inter @Test void shouldListSchemaForExistingSubject() throws ExecutionException, InterruptedException { // When - CompletableFuture> future = async.listSubjects(); + Mono> future = async.listSubjects(); // Then - List results = future.get(); + List results = future.block(); Assertions.assertFalse(results.isEmpty()); Assertions.assertEquals(1, results.size()); - Assertions.assertEquals(TEST_SUBJECT, results.get(0)); + Assertions.assertEquals(TEST_SUBJECT, results.getFirst()); } @Order(5) @Test void shouldUpdateCompatibilityForExistingSubject() throws ExecutionException, InterruptedException { // When - CompletableFuture future = async.updateSubjectCompatibilityLevel( + Mono future = async.updateSubjectCompatibilityLevel( TEST_SUBJECT, new CompatibilityObject(CompatibilityLevels.FULL_TRANSITIVE.name()) ); // Then - CompatibilityObject result = future.get(); + CompatibilityObject result = future.block(); Assertions.assertEquals(CompatibilityLevels.FULL_TRANSITIVE.name(), result.compatibility()); } @@ -166,10 +166,10 @@ void shouldUpdateCompatibilityForExistingSubject() throws ExecutionException, In @Test void shouldGetCompatibilityForExistingSubject() throws ExecutionException, InterruptedException { // When - CompletableFuture future = async.getSubjectCompatibilityLevel(TEST_SUBJECT, false); + Mono future = async.getSubjectCompatibilityLevel(TEST_SUBJECT, false); // Then - CompatibilityLevelObject result = future.get(); + CompatibilityLevelObject result = future.block(); Assertions.assertEquals(CompatibilityLevels.FULL_TRANSITIVE.name(), result.compatibilityLevel()); } @@ -177,17 +177,10 @@ void shouldGetCompatibilityForExistingSubject() throws ExecutionException, Inter @Test void shouldGetErrorCompatibilityForNotExistingSubjectAndDefaultToGlobalFalse() { // When - CompletableFuture future = async.getSubjectCompatibilityLevel("unknown", false); + Mono future = async.getSubjectCompatibilityLevel("unknown", false); // Then - RestClientException exception = Assertions - .assertThrowsExactly(RestClientException.class, () -> { - try { - future.get(); - } catch (ExecutionException e) { - throw e.getCause(); - } - }); + RestClientException exception = Assertions.assertThrowsExactly(RestClientException.class, future::block); ErrorResponse response = exception.getResponseEntity(ErrorResponse.class); Assertions.assertEquals(40408, response.errorCode()); @@ -198,10 +191,10 @@ void shouldGetErrorCompatibilityForNotExistingSubjectAndDefaultToGlobalFalse() { @Test void shouldGetGlobalCompatibilityForNotExistingSubjectAndDefaultToGlobalTrue() throws ExecutionException, InterruptedException { // When - CompletableFuture future = async.getSubjectCompatibilityLevel("unknown", true); + Mono future = async.getSubjectCompatibilityLevel("unknown", true); // Then - CompatibilityLevelObject result = future.get(); + CompatibilityLevelObject result = future.block(); Assertions.assertEquals(CompatibilityLevels.BACKWARD.name(), result.compatibilityLevel()); } @@ -209,7 +202,7 @@ void shouldGetGlobalCompatibilityForNotExistingSubjectAndDefaultToGlobalTrue() t @Test void shouldGetTrueForTestingCompatibleSchema() throws ExecutionException, InterruptedException { // When - CompletableFuture future = async.testCompatibility( + Mono future = async.testCompatibility( TEST_SUBJECT, "-1", true, @@ -217,7 +210,7 @@ void shouldGetTrueForTestingCompatibleSchema() throws ExecutionException, Interr ); // Then - CompatibilityCheck result = future.get(); + CompatibilityCheck result = future.block(); Assertions.assertTrue(result.isCompatible()); Assertions.assertTrue(result.getMessages().isEmpty()); } @@ -226,7 +219,7 @@ void shouldGetTrueForTestingCompatibleSchema() throws ExecutionException, Interr @Test void shouldGetFalseForTestingCompatibleSchema() throws ExecutionException, InterruptedException { // When - CompletableFuture future = async.testCompatibility( + Mono future = async.testCompatibility( TEST_SUBJECT, "-1", true, @@ -234,7 +227,7 @@ void shouldGetFalseForTestingCompatibleSchema() throws ExecutionException, Inter ); // Then - CompatibilityCheck result = future.get(); + CompatibilityCheck result = future.block(); Assertions.assertFalse(result.isCompatible()); Assertions.assertFalse(result.getMessages().isEmpty()); } diff --git a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java index adab98328..37e8b3b72 100644 --- a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java +++ b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java @@ -34,7 +34,7 @@ public void beforeEach() throws ExecutionException, InterruptedException { TEST_SUBJECT, new SubjectSchemaRegistration(AVRO_SCHEMA, SchemaType.AVRO), false - ).get(); + ).block(); } @Test @@ -50,7 +50,7 @@ public void shouldGetAllSchemasWithGlobalCompatibilityLevelTrue() { Assertions.assertNotNull(resources); Assertions.assertEquals(1, resources.size()); - V1SchemaRegistrySubject subject = resources.get(0); + V1SchemaRegistrySubject subject = resources.getFirst(); Assertions.assertEquals(TEST_SUBJECT, subject.getMetadata().getName()); Assertions.assertEquals(SchemaType.AVRO, subject.getSpec().getSchemaType()); Assertions.assertEquals(CompatibilityLevels.BACKWARD, subject.getSpec().getCompatibilityLevel()); @@ -69,7 +69,7 @@ public void shouldGetAllSchemasWithGlobalCompatibilityLevelFalse() { Assertions.assertNotNull(resources); Assertions.assertEquals(1, resources.size()); - V1SchemaRegistrySubject subject = resources.get(0); + V1SchemaRegistrySubject subject = resources.getFirst(); Assertions.assertEquals(TEST_SUBJECT, subject.getMetadata().getName()); Assertions.assertEquals(SchemaType.AVRO, subject.getSpec().getSchemaType()); Assertions.assertNull(subject.getSpec().getCompatibilityLevel()); diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java index e203cbec4..5a0d958bc 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java @@ -6,15 +6,10 @@ */ package io.streamthoughts.jikkou.schema.registry.api; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion; +import io.streamthoughts.jikkou.schema.registry.api.data.*; import java.util.List; -import java.util.concurrent.CompletableFuture; import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Mono; /** * Asynchronous Schema Registry Api. @@ -26,7 +21,7 @@ public interface AsyncSchemaRegistryApi extends AutoCloseable { * * @return a list of registered subjects. */ - CompletableFuture> listSubjects(); + Mono> listSubjects(); /** * Deletes the specified subject and its associated compatibility level if registered. @@ -36,8 +31,8 @@ public interface AsyncSchemaRegistryApi extends AutoCloseable { * all associated metadata including the schema ID. * @return a list of versions */ - CompletableFuture> deleteSubjectVersions(@NotNull String subject, - boolean permanent); + Mono> deleteSubjectVersions(@NotNull String subject, + boolean permanent); /** * Register a new schema under the specified subject. @@ -47,9 +42,9 @@ CompletableFuture> deleteSubjectVersions(@NotNull String subject, * @param normalize whether to normalize the given schema * @return the globally unique identifier of the schema. */ - CompletableFuture registerSubjectVersion(@NotNull String subject, - @NotNull SubjectSchemaRegistration schema, - boolean normalize); + Mono registerSubjectVersion(@NotNull String subject, + @NotNull SubjectSchemaRegistration schema, + boolean normalize); /** * Get the latest version of the schema registered under the specified subject. @@ -57,14 +52,14 @@ CompletableFuture registerSubjectVersion(@NotNull String subjec * @param subject name of the subject * @return a {@link SubjectSchemaVersion} object. */ - CompletableFuture getLatestSubjectSchema(@NotNull String subject); + Mono getLatestSubjectSchema(@NotNull String subject); /** * Gets the schema registry global compatibility level. * * @return the compatibility level. */ - CompletableFuture getGlobalCompatibility(); + Mono getGlobalCompatibility(); /** * Gets compatibility level for the specified subject. @@ -73,8 +68,8 @@ CompletableFuture registerSubjectVersion(@NotNull String subjec * @param defaultToGlobal flag to default to global compatibility. * @return the compatibility level. */ - CompletableFuture getSubjectCompatibilityLevel(@NotNull String subject, - boolean defaultToGlobal); + Mono getSubjectCompatibilityLevel(@NotNull String subject, + boolean defaultToGlobal); /** * Updates compatibility level for the specified subject. @@ -83,8 +78,8 @@ CompletableFuture getSubjectCompatibilityLevel(@NotNul * @param compatibility the new compatibility level for the subject. * @return the updated compatibility level. */ - CompletableFuture updateSubjectCompatibilityLevel(@NotNull String subject, - @NotNull CompatibilityObject compatibility); + Mono updateSubjectCompatibilityLevel(@NotNull String subject, + @NotNull CompatibilityObject compatibility); /** * Deletes the specified subject-level compatibility level config and reverts to the global default. @@ -92,16 +87,16 @@ CompletableFuture updateSubjectCompatibilityLevel(@NotNull * @param subject the name of the subject. * @return the compatibility level. */ - CompletableFuture deleteSubjectCompatibilityLevel(@NotNull String subject); + Mono deleteSubjectCompatibilityLevel(@NotNull String subject); - CompletableFuture testCompatibility(@NotNull String subject, - String version, - boolean verbose, - @NotNull SubjectSchemaRegistration schema); + Mono testCompatibility(@NotNull String subject, + String version, + boolean verbose, + @NotNull SubjectSchemaRegistration schema); - CompletableFuture testCompatibilityLatest(@NotNull String subject, - boolean verbose, - @NotNull SubjectSchemaRegistration schema); + Mono testCompatibilityLatest(@NotNull String subject, + boolean verbose, + @NotNull SubjectSchemaRegistration schema); @Override default void close() { diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java index d99ea7d93..4b66c57af 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java @@ -14,8 +14,8 @@ import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion; import java.util.List; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Mono; /** * A wrapper around the REST API {@link SchemaRegistryApi} to provide asynchronous methods. @@ -37,27 +37,27 @@ public DefaultAsyncSchemaRegistryApi(final @NotNull SchemaRegistryApi api) { * @see SchemaRegistryApi#listSubjects() */ @Override - public CompletableFuture> listSubjects() { - return CompletableFuture.supplyAsync(api::listSubjects); + public Mono> listSubjects() { + return Mono.fromCallable(api::listSubjects); } /** * @see SchemaRegistryApi#deleteSubjectVersions(String, boolean) */ @Override - public CompletableFuture> deleteSubjectVersions(@NotNull final String subject, + public Mono> deleteSubjectVersions(@NotNull final String subject, boolean permanent) { - return CompletableFuture.supplyAsync(() -> api.deleteSubjectVersions(subject, permanent)); + return Mono.fromCallable(() -> api.deleteSubjectVersions(subject, permanent)); } /** * @see SchemaRegistryApi#registerSchema(String, SubjectSchemaRegistration, boolean) */ @Override - public CompletableFuture registerSubjectVersion(@NotNull final String subject, + public Mono registerSubjectVersion(@NotNull final String subject, @NotNull final SubjectSchemaRegistration schema, boolean normalize) { - return CompletableFuture.supplyAsync(() -> api.registerSchema(subject, schema, normalize)); + return Mono.fromCallable(() -> api.registerSchema(subject, schema, normalize)); } @@ -65,42 +65,42 @@ public CompletableFuture registerSubjectVersion(@NotNull final * @see SchemaRegistryApi#getLatestSubjectSchema(String) */ @Override - public CompletableFuture getLatestSubjectSchema(@NotNull final String subject) { - return CompletableFuture.supplyAsync(() -> api.getLatestSubjectSchema(subject)); + public Mono getLatestSubjectSchema(@NotNull final String subject) { + return Mono.fromCallable(() -> api.getLatestSubjectSchema(subject)); } /** * @see SchemaRegistryApi#getGlobalCompatibility() */ @Override - public CompletableFuture getGlobalCompatibility() { - return CompletableFuture.supplyAsync(api::getGlobalCompatibility); + public Mono getGlobalCompatibility() { + return Mono.fromCallable(api::getGlobalCompatibility); } /** * @see SchemaRegistryApi#getConfigCompatibility(String, boolean) */ @Override - public CompletableFuture getSubjectCompatibilityLevel(@NotNull final String subject, + public Mono getSubjectCompatibilityLevel(@NotNull final String subject, boolean defaultToGlobal) { - return CompletableFuture.supplyAsync(() -> api.getConfigCompatibility(subject, defaultToGlobal)); + return Mono.fromCallable(() -> api.getConfigCompatibility(subject, defaultToGlobal)); } /** * @see SchemaRegistryApi#updateConfigCompatibility(String, CompatibilityObject) */ @Override - public CompletableFuture updateSubjectCompatibilityLevel(@NotNull final String subject, + public Mono updateSubjectCompatibilityLevel(@NotNull final String subject, @NotNull final CompatibilityObject compatibility) { - return CompletableFuture.supplyAsync(() -> api.updateConfigCompatibility(subject, compatibility)); + return Mono.fromCallable(() -> api.updateConfigCompatibility(subject, compatibility)); } /** * @see SchemaRegistryApi#deleteConfigCompatibility(String) */ @Override - public CompletableFuture deleteSubjectCompatibilityLevel(@NotNull final String subject) { - return CompletableFuture.supplyAsync(() -> api.deleteConfigCompatibility(subject)); + public Mono deleteSubjectCompatibilityLevel(@NotNull final String subject) { + return Mono.fromCallable(() -> api.deleteConfigCompatibility(subject)); } @@ -108,11 +108,11 @@ public CompletableFuture deleteSubjectCompatibilityLevel(@N * @see SchemaRegistryApi#deleteConfigCompatibility(String) */ @Override - public CompletableFuture testCompatibility(@NotNull final String subject, + public Mono testCompatibility(@NotNull final String subject, String version, boolean verbose, @NotNull final SubjectSchemaRegistration schema) { - return CompletableFuture.supplyAsync( + return Mono.fromCallable( () -> api.testCompatibility(subject, Integer.parseInt(version), verbose, schema) ); } @@ -121,12 +121,10 @@ public CompletableFuture testCompatibility(@NotNull final St * @see SchemaRegistryApi#testCompatibilityLatest(String, boolean, SubjectSchemaRegistration) */ @Override - public CompletableFuture testCompatibilityLatest(@NotNull String subject, - boolean verbose, - @NotNull SubjectSchemaRegistration schema) { - return CompletableFuture.supplyAsync( - () -> api.testCompatibilityLatest(subject, verbose, schema) - ); + public Mono testCompatibilityLatest(@NotNull String subject, + boolean verbose, + @NotNull SubjectSchemaRegistration schema) { + return Mono.fromCallable(() -> api.testCompatibilityLatest(subject, verbose, schema)); } @Override diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java index 643fec877..8856c5d3e 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java @@ -32,7 +32,7 @@ public final class SchemaSubjectChangeComputer extends ResourceChangeComputer> TYPE_CONVERTER = ObjectTypeConverter.newForType(new TypeReference<>() { + private static final TypeConverter> TYPE_CONVERTER = ObjectTypeConverter.newForType(new TypeReference<>() { }); /** diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java index eff300800..ab4da6777 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java @@ -37,6 +37,7 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; public abstract class AbstractSchemaSubjectChangeHandler implements ChangeHandler { @@ -53,7 +54,7 @@ public AbstractSchemaSubjectChangeHandler(@NotNull final AsyncSchemaRegistryApi this.api = Objects.requireNonNull(api, "api must not be null"); } - protected CompletableFuture updateCompatibilityLevel(final ResourceChange change) { + protected Mono updateCompatibilityLevel(final ResourceChange change) { final CompatibilityLevels compatibilityLevels = StateChangeList .of(change.getSpec().getChanges()) .getLast(DATA_COMPATIBILITY_LEVEL, TypeConverter.of(CompatibilityLevels.class)) @@ -63,18 +64,18 @@ protected CompletableFuture updateCompatibilityLevel(final ResourceChange LOG.info("Updating compatibility-level for Schema Registry subject '{}'.", subjectName); return api .updateSubjectCompatibilityLevel(subjectName, new CompatibilityObject(compatibilityLevels.name())) - .thenApply(compatibilityObject -> { + .handle((compatibilityObject, sink) -> { if (LOG.isInfoEnabled()) { LOG.info( - "Updated compatibility-level for Schema Registry subject '{}' to '{}'.", - subjectName, - compatibilityObject.compatibility()); + "Updated compatibility-level for Schema Registry subject '{}' to '{}'.", + subjectName, + compatibilityObject.compatibility() + ); } - return null; }); } - protected CompletableFuture registerSubjectVersion(@NotNull final ResourceChange change) { + protected Mono registerSubjectVersion(@NotNull final ResourceChange change) { String schema = change.getSpec() .getChanges() .getLast(DATA_SCHEMA, TypeConverter.String()) @@ -108,7 +109,7 @@ protected CompletableFuture registerSubjectVersion(@NotNull final Resource new SubjectSchemaRegistration(schema, type, references), options.normalizeSchema() ) - .thenApply(subjectSchemaId -> { + .handle((subjectSchemaId, sink) -> { if (LOG.isInfoEnabled()) { LOG.info( "Registered Schema Registry subject version: subject '{}', id '{}'.", @@ -121,7 +122,6 @@ protected CompletableFuture registerSubjectVersion(@NotNull final Resource subjectSchemaId.id() ); } - return null; }); } @@ -131,7 +131,7 @@ protected SchemaSubjectChangeOptions getSchemaSubjectChangeOptions(@NotNull Reso .convertValue(change.getSpec().getData()); } - protected CompletableFuture deleteCompatibilityLevel(@NotNull ResourceChange change) { + protected Mono deleteCompatibilityLevel(@NotNull ResourceChange change) { final String subject = change.getMetadata().getName(); if (LOG.isInfoEnabled()) { LOG.info("Deleting compatibility-level for Schema Registry subject '{}'.", @@ -140,14 +140,13 @@ protected CompletableFuture deleteCompatibilityLevel(@NotNull ResourceChan } return api .deleteSubjectCompatibilityLevel(subject) - .thenApplyAsync(compatibilityObject -> { + .handle((compatibilityObject, sink) -> { if (LOG.isInfoEnabled()) { LOG.info( "Deleted compatibility-level for Schema Registry subject '{}' to '{}'.", change.getMetadata().getName(), compatibilityObject.compatibility()); } - return null; }); } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/CreateSchemaSubjectChangeHandler.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/CreateSchemaSubjectChangeHandler.java index cf67bec80..be0d826e5 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/CreateSchemaSubjectChangeHandler.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/CreateSchemaSubjectChangeHandler.java @@ -19,8 +19,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Mono; public final class CreateSchemaSubjectChangeHandler extends AbstractSchemaSubjectChangeHandler @@ -51,17 +51,17 @@ public List> handleChanges(@NotNull List> results = new ArrayList<>(); for (ResourceChange change : changes) { - CompletableFuture future = registerSubjectVersion(change); + Mono mono = registerSubjectVersion(change); StateChange compatibilityLevels = StateChangeList .of(change.getSpec().getChanges()) .getLast(DATA_COMPATIBILITY_LEVEL); if (compatibilityLevels != null) { - future = future.thenComposeAsync(unused -> updateCompatibilityLevel(change)); + mono = mono.then(updateCompatibilityLevel(change)); } - results.add(toChangeResponse(change, future)); + results.add(toChangeResponse(change, mono.toFuture())); } return results; } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/DeleteSchemaSubjectChangeHandler.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/DeleteSchemaSubjectChangeHandler.java index 58b31fe13..1fad5ac2e 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/DeleteSchemaSubjectChangeHandler.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/DeleteSchemaSubjectChangeHandler.java @@ -16,10 +16,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; public class DeleteSchemaSubjectChangeHandler extends AbstractSchemaSubjectChangeHandler implements ChangeHandler { @@ -51,8 +51,9 @@ public List> handleChanges(@NotNull List future = api.deleteSubjectVersions(subject, options.permanentDelete()) - .thenApplyAsync(versions -> { + Mono mono = api + .deleteSubjectVersions(subject, options.permanentDelete()) + .handle((versions, sink) -> { if (LOG.isInfoEnabled()) { LOG.info( "Deleted all versions for Schema Registry subject '{}': {}", @@ -60,9 +61,8 @@ public List> handleChanges(@NotNull List> handleChanges(@NotNull List> results = new ArrayList<>(); for (ResourceChange change : changes) { - CompletableFuture future = CompletableFuture.completedFuture(null); + Mono mono = Mono.empty(); StateChange schema = StateChangeList .of(change.getSpec().getChanges()) .getLast(DATA_SCHEMA); if (UPDATE == schema.getOp()) { - future = future.thenComposeAsync(unused -> registerSubjectVersion(change)); + mono = mono.then(registerSubjectVersion(change)); } StateChange compatibilityLevels = StateChangeList @@ -70,13 +68,13 @@ public List> handleChanges(@NotNull List updateCompatibilityLevel(change)); + mono = mono.then(updateCompatibilityLevel(change)); } if (DELETE == compatibilityLevels.getOp()) { - future = future.thenComposeAsync(unused -> deleteCompatibilityLevel(change)); + mono = mono.then(deleteCompatibilityLevel(change)); } - results.add(toChangeResponse(change, future)); + results.add(toChangeResponse(change, mono.toFuture())); } return results; } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollector.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollector.java index 1a698ccb9..0c9a0d895 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollector.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollector.java @@ -6,8 +6,6 @@ */ package io.streamthoughts.jikkou.schema.registry.reconciler; -import io.streamthoughts.jikkou.common.utils.AsyncUtils; -import io.streamthoughts.jikkou.common.utils.Pair; import io.streamthoughts.jikkou.core.annotation.SupportedResource; import io.streamthoughts.jikkou.core.config.Configuration; import io.streamthoughts.jikkou.core.exceptions.JikkouRuntimeException; @@ -27,10 +25,9 @@ import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject; import jakarta.ws.rs.core.Response; import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @SupportedResource(type = V1SchemaRegistrySubject.class) public class SchemaRegistrySubjectCollector extends ContextualExtension implements Collector { @@ -83,28 +80,48 @@ private void init(@NotNull SchemaRegistryClientConfig config) { public ResourceListObject listAll(@NotNull Configuration configuration, @NotNull Selector selector) { - AsyncSchemaRegistryApi api = new DefaultAsyncSchemaRegistryApi(SchemaRegistryApiFactory.create(config)); + try (AsyncSchemaRegistryApi api = new DefaultAsyncSchemaRegistryApi(SchemaRegistryApiFactory.create(config))) { + return listAll(api.listSubjects().flatMapMany(Flux::fromIterable), api); + } + } + + public ResourceListObject listAll(@NotNull List subjects) { + try (AsyncSchemaRegistryApi api = new DefaultAsyncSchemaRegistryApi(SchemaRegistryApiFactory.create(config))) { + return listAll(Flux.fromIterable(subjects), api); + } + } + + private ResourceListObject listAll(@NotNull Flux subjects, + @NotNull AsyncSchemaRegistryApi api) { + Flux flux = subjects + // Get Schema Registry Latest Subject Version + .flatMap(api::getLatestSubjectSchema) + .onErrorResume(t -> t instanceof RestClientException rce && isNotFound(rce) ? Mono.empty() : Mono.error(t)) + // Get Schema Registry Subject Compatibility + .flatMap(subjectSchemaVersion -> api + .getSubjectCompatibilityLevel(subjectSchemaVersion.subject(), defaultToGlobalCompatibilityLevel) + .map(compatibilityObject -> + CompatibilityLevels.valueOf(compatibilityObject.compatibilityLevel())) + .map(compatibilityLevels -> + schemaRegistrySubjectFactory.createSchemaRegistrySubject(subjectSchemaVersion, compatibilityLevels)) + .onErrorResume(t -> t instanceof RestClientException rce && isNotFound(rce) ? + Mono.just(schemaRegistrySubjectFactory.createSchemaRegistrySubject(subjectSchemaVersion, null)) : + Mono.error(t)) + ); try { - CompletableFuture> result = api - .listSubjects() - .thenComposeAsync(subjects -> AsyncUtils.waitForAll(getAllSchemaRegistrySubjectsAsync(subjects, api))); - Optional exception = AsyncUtils.getException(result); - if (exception.isPresent()) { - throw new JikkouRuntimeException( - "Failed to list all schema registry subject versions", - exception.get() - ); - } - List resources = result.join() - .stream() - .filter(selector::apply) - .collect(Collectors.toList()); - return new V1SchemaRegistrySubjectList(resources); - } finally { - api.close(); + return new V1SchemaRegistrySubjectList(flux.collectList().block()); + } catch (Exception e) { + throw new JikkouRuntimeException("Failed to list all schema registry subject versions", e); } } + private static boolean isNotFound(final RestClientException exception) { + return exception.response() + .map(Response::getStatus) + .filter(status -> status.equals(404)) + .isPresent(); + } + public SchemaRegistrySubjectCollector prettyPrintSchema(final boolean prettyPrintSchema) { this.prettyPrintSchema = prettyPrintSchema; return this; @@ -114,56 +131,4 @@ public SchemaRegistrySubjectCollector defaultToGlobalCompatibilityLevel(final bo this.defaultToGlobalCompatibilityLevel = defaultToGlobalCompatibilityLevel; return this; } - - @NotNull - private List> getAllSchemaRegistrySubjectsAsync(final List subjects, - final AsyncSchemaRegistryApi api) { - return subjects.stream() - .map(subject -> getSchemaRegistrySubjectAsync( - api, subject, defaultToGlobalCompatibilityLevel, schemaRegistrySubjectFactory)) - .toList(); - } - - @NotNull - private static CompletableFuture getSchemaRegistrySubjectAsync(@NotNull AsyncSchemaRegistryApi api, - @NotNull String subject, - boolean defaultToGlobalCompatibilityLevel, - @NotNull V1SchemaRegistrySubjectFactory factory) { - - return api - // Get Schema Registry Latest Subject Version - .getLatestSubjectSchema(subject) - // Get Schema Registry Subject Compatibility - .thenCompose(subjectSchemaVersion -> api - .getSubjectCompatibilityLevel(subject, defaultToGlobalCompatibilityLevel) - .thenApply(compatibilityObject -> CompatibilityLevels.valueOf(compatibilityObject.compatibilityLevel())) - .exceptionally(t -> { - if (t.getCause() != null) t = t.getCause(); - - if (t instanceof RestClientException exception) { - if (exception.response() - .map(Response::getStatus) - .filter(status -> status.equals(404)) - .isPresent()) { - return null; - } - } - if (t instanceof RuntimeException re) { - throw re; - } - throw new JikkouRuntimeException(t); - } - ) - .thenApply(compatibilityLevelObject -> - Pair.of(subjectSchemaVersion, compatibilityLevelObject) - ) - ) - // Create SchemaRegistrySubject object - .thenApply(tuple -> - factory.createSchemaRegistrySubject( - tuple._1(), - tuple._2() - ) - ); - } } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectController.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectController.java index b05bc0fb9..3184231c9 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectController.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectController.java @@ -15,13 +15,13 @@ import io.streamthoughts.jikkou.core.annotation.SupportedResource; import io.streamthoughts.jikkou.core.extension.ContextualExtension; import io.streamthoughts.jikkou.core.extension.ExtensionContext; +import io.streamthoughts.jikkou.core.models.ObjectMeta; import io.streamthoughts.jikkou.core.models.change.ResourceChange; import io.streamthoughts.jikkou.core.reconciler.ChangeExecutor; import io.streamthoughts.jikkou.core.reconciler.ChangeHandler; import io.streamthoughts.jikkou.core.reconciler.ChangeResult; import io.streamthoughts.jikkou.core.reconciler.Controller; import io.streamthoughts.jikkou.core.reconciler.annotations.ControllerConfiguration; -import io.streamthoughts.jikkou.core.selector.Selectors; import io.streamthoughts.jikkou.schema.registry.ApiVersions; import io.streamthoughts.jikkou.schema.registry.api.AsyncSchemaRegistryApi; import io.streamthoughts.jikkou.schema.registry.api.DefaultAsyncSchemaRegistryApi; @@ -109,7 +109,12 @@ public List plan( .prettyPrintSchema(false) .defaultToGlobalCompatibilityLevel(false); - List actualSubjects = collector.listAll(context.configuration(), Selectors.NO_SELECTOR).stream() + List subjects = expectedSubjects.stream() + .map(V1SchemaRegistrySubject::getMetadata) + .map(ObjectMeta::getName) + .toList(); + + List actualSubjects = collector.listAll(subjects).stream() .filter(context.selector()::apply) .toList(); diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/validation/SchemaCompatibilityValidation.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/validation/SchemaCompatibilityValidation.java index 239f7ce2d..365fdf274 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/validation/SchemaCompatibilityValidation.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/validation/SchemaCompatibilityValidation.java @@ -25,9 +25,8 @@ import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject; import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubjectSpec; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Mono; @SupportedResource(type = V1SchemaRegistrySubject.class) public class SchemaCompatibilityValidation implements Validation { @@ -65,8 +64,8 @@ public static ValidationResult validate(@NotNull V1SchemaRegistrySubject resourc spec.getReferences() ); try { - CompletableFuture future = api.testCompatibilityLatest(subjectName, true, registration); - CompatibilityCheck check = future.get(); + Mono future = api.testCompatibilityLatest(subjectName, true, registration); + CompatibilityCheck check = future.block(); if (!check.isCompatible()) { return ValidationResult.failure(new ValidationError( validation.getName(), @@ -78,7 +77,7 @@ public static ValidationResult validate(@NotNull V1SchemaRegistrySubject resourc ) )); } - } catch (ExecutionException e) { + } catch (Exception e) { Throwable cause = e.getCause(); if (cause instanceof RestClientException clientException) { ErrorResponse response = clientException.getResponseEntity(ErrorResponse.class); @@ -87,10 +86,6 @@ public static ValidationResult validate(@NotNull V1SchemaRegistrySubject resourc fail(response.message()); } } - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - fail("Thread was interrupted"); } finally { api.close(); }