diff --git a/pom.xml b/pom.xml
index 9f82e3b9a..681075528 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,6 +122,7 @@
1.4.3
24.1.0
3.1.0
+ 3.6.6
2.0.9
1.5.6
@@ -236,6 +237,11 @@
aws-msk-iam-auth
2.1.0
+
+ io.projectreactor
+ reactor-core
+ ${reactor-core.version}
+
org.mockito
diff --git a/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java b/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java
index 06276061f..ba3b36826 100644
--- a/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java
+++ b/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java
@@ -8,18 +8,13 @@
import io.streamthoughts.jikkou.extension.aiven.api.data.CompatibilityCheckResponse;
import io.streamthoughts.jikkou.schema.registry.api.AsyncSchemaRegistryApi;
-import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck;
-import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject;
-import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject;
-import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId;
-import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration;
-import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion;
+import io.streamthoughts.jikkou.schema.registry.api.data.*;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;
+import reactor.core.publisher.Mono;
/**
* AsyncSchemaRegistryApi implementation for Aiven.
@@ -41,20 +36,17 @@ public AivenAsyncSchemaRegistryApi(final @NotNull AivenApiClient api) {
* {@inheritDoc}
**/
@Override
- public CompletableFuture> listSubjects() {
- return CompletableFuture.supplyAsync(
- () -> api.listSchemaRegistrySubjects().subjects()
- );
+ public Mono> listSubjects() {
+ return Mono.fromCallable(() -> api.listSchemaRegistrySubjects().subjects());
}
/**
* {@inheritDoc}
**/
@Override
- public CompletableFuture> deleteSubjectVersions(@NotNull String subject,
- boolean permanent) {
- return CompletableFuture.supplyAsync(
- () -> {
+ public Mono> deleteSubjectVersions(@NotNull String subject,
+ boolean permanent) {
+ return Mono.fromCallable(() -> {
api.deleteSchemaRegistrySubject(subject);
return Collections.emptyList();
}
@@ -65,16 +57,16 @@ public CompletableFuture> deleteSubjectVersions(@NotNull String su
* {@inheritDoc}
**/
@Override
- public CompletableFuture registerSubjectVersion(@NotNull String subject,
- @NotNull SubjectSchemaRegistration schema,
- boolean normalize) {
+ public Mono registerSubjectVersion(@NotNull String subject,
+ @NotNull SubjectSchemaRegistration schema,
+ boolean normalize) {
// Drop references - not supported through the Aiven's API.
SubjectSchemaRegistration registration = new SubjectSchemaRegistration(
schema.schema(),
schema.schemaType(),
null
);
- return CompletableFuture.supplyAsync(
+ return Mono.fromCallable(
() -> new SubjectSchemaId(api.registerSchemaRegistrySubjectVersion(subject, registration).version())
);
}
@@ -83,8 +75,8 @@ public CompletableFuture registerSubjectVersion(@NotNull String
* {@inheritDoc}
**/
@Override
- public CompletableFuture getLatestSubjectSchema(@NotNull String subject) {
- return CompletableFuture.supplyAsync(
+ public Mono getLatestSubjectSchema(@NotNull String subject) {
+ return Mono.fromCallable(
() -> api.getSchemaRegistryLatestSubjectVersion(subject).version()
);
}
@@ -93,8 +85,8 @@ public CompletableFuture getLatestSubjectSchema(@NotNull S
* {@inheritDoc}
**/
@Override
- public CompletableFuture getGlobalCompatibility() {
- return CompletableFuture.supplyAsync(
+ public Mono getGlobalCompatibility() {
+ return Mono.fromCallable(
() -> new CompatibilityLevelObject(api.getSchemaRegistryGlobalCompatibility().compatibilityLevel().name())
);
}
@@ -103,9 +95,9 @@ public CompletableFuture getGlobalCompatibility() {
* {@inheritDoc}
**/
@Override
- public CompletableFuture getSubjectCompatibilityLevel(@NotNull String subject,
- boolean defaultToGlobal) {
- return CompletableFuture.supplyAsync(
+ public Mono getSubjectCompatibilityLevel(@NotNull String subject,
+ boolean defaultToGlobal) {
+ return Mono.fromCallable(
() -> new CompatibilityLevelObject(api.getSchemaRegistrySubjectCompatibility(subject).compatibilityLevel().name())
);
}
@@ -114,9 +106,9 @@ public CompletableFuture getSubjectCompatibilityLevel(
* {@inheritDoc}
**/
@Override
- public CompletableFuture updateSubjectCompatibilityLevel(@NotNull String subject,
- @NotNull CompatibilityObject compatibility) {
- return CompletableFuture.supplyAsync(
+ public Mono updateSubjectCompatibilityLevel(@NotNull String subject,
+ @NotNull CompatibilityObject compatibility) {
+ return Mono.fromCallable(
() -> {
api.updateSchemaRegistrySubjectCompatibility(subject, compatibility);
return new CompatibilityObject(compatibility.compatibility());
@@ -128,7 +120,7 @@ public CompletableFuture updateSubjectCompatibilityLevel(@N
* {@inheritDoc}
**/
@Override
- public CompletableFuture deleteSubjectCompatibilityLevel(@NotNull String subject) {
+ public Mono deleteSubjectCompatibilityLevel(@NotNull String subject) {
throw new AivenApiClientException(
"Deleting configuration for Schema Registry subject is not supported by " +
"the Aiven API (for more information: https://api.aiven.io/doc/)."
@@ -139,11 +131,11 @@ public CompletableFuture deleteSubjectCompatibilityLevel(@N
* {@inheritDoc}
**/
@Override
- public CompletableFuture testCompatibility(@NotNull String subject,
- String version,
- boolean verbose,
- @NotNull SubjectSchemaRegistration schema) {
- return CompletableFuture.supplyAsync(
+ public Mono testCompatibility(@NotNull String subject,
+ String version,
+ boolean verbose,
+ @NotNull SubjectSchemaRegistration schema) {
+ return Mono.fromCallable(
() -> {
CompatibilityCheckResponse response = api.checkSchemaRegistryCompatibility(subject, version, schema);
return new CompatibilityCheck(
@@ -158,9 +150,9 @@ public CompletableFuture testCompatibility(@NotNull String s
* {@inheritDoc}
**/
@Override
- public CompletableFuture testCompatibilityLatest(@NotNull String subject,
- boolean verbose,
- @NotNull SubjectSchemaRegistration schema) {
+ public Mono testCompatibilityLatest(@NotNull String subject,
+ boolean verbose,
+ @NotNull SubjectSchemaRegistration schema) {
return testCompatibility(subject, "latest", verbose, schema);
}
diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/KafkaConnectExtensionConfig.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/KafkaConnectExtensionConfig.java
index db1f83f24..9b402f288 100644
--- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/KafkaConnectExtensionConfig.java
+++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/KafkaConnectExtensionConfig.java
@@ -15,8 +15,6 @@
import io.streamthoughts.jikkou.core.models.HasMetadata;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectClientConfig;
import io.streamthoughts.jikkou.kafka.connect.exception.KafkaConnectClusterNotFoundException;
-import org.jetbrains.annotations.NotNull;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -27,6 +25,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.jetbrains.annotations.NotNull;
public final class KafkaConnectExtensionConfig {
diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeComputer.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeComputer.java
index a865a1574..6bd6b98e5 100644
--- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeComputer.java
+++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeComputer.java
@@ -16,7 +16,6 @@
import io.streamthoughts.jikkou.core.reconciler.change.ResourceChangeFactory;
import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java
index fe071fd3f..a21703e91 100644
--- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java
+++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java
@@ -6,6 +6,10 @@
*/
package io.streamthoughts.jikkou.kafka.connect.change;
+import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_CLASS_CONFIG;
+import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG;
+import static io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer.DATA_CONNECTOR_CLASS;
+
import io.streamthoughts.jikkou.core.data.TypeConverter;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.models.change.SpecificStateChange;
@@ -22,9 +26,6 @@
import io.streamthoughts.jikkou.kafka.connect.api.data.ConnectorInfoResponse;
import io.streamthoughts.jikkou.kafka.connect.api.data.ErrorResponse;
import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.VisibleForTesting;
-
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -34,10 +35,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-
-import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_CLASS_CONFIG;
-import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG;
-import static io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer.DATA_CONNECTOR_CLASS;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.VisibleForTesting;
public final class KafkaConnectorChangeHandler extends BaseChangeHandler {
diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorCollector.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorCollector.java
index c8c46129f..eb47a5618 100644
--- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorCollector.java
+++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorCollector.java
@@ -24,16 +24,15 @@
import io.streamthoughts.jikkou.kafka.connect.exception.KafkaConnectClusterNotFoundException;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector;
import io.streamthoughts.jikkou.kafka.connect.service.KafkaConnectClusterService;
-import org.jetbrains.annotations.NotNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A ResourceCollector to get {@link V1KafkaConnector} resources.
diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorController.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorController.java
index 9305ef105..a5e7ab4ec 100644
--- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorController.java
+++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/reconciler/KafkaConnectorController.java
@@ -6,6 +6,11 @@
*/
package io.streamthoughts.jikkou.kafka.connect.reconciler;
+import static io.streamthoughts.jikkou.core.ReconciliationMode.CREATE;
+import static io.streamthoughts.jikkou.core.ReconciliationMode.DELETE;
+import static io.streamthoughts.jikkou.core.ReconciliationMode.FULL;
+import static io.streamthoughts.jikkou.core.ReconciliationMode.UPDATE;
+
import io.streamthoughts.jikkou.core.ReconciliationContext;
import io.streamthoughts.jikkou.core.annotation.SupportedResource;
import io.streamthoughts.jikkou.core.extension.ContextualExtension;
@@ -28,19 +33,13 @@
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeDescription;
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeHandler;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector;
-import org.jetbrains.annotations.NotNull;
-
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-
-import static io.streamthoughts.jikkou.core.ReconciliationMode.CREATE;
-import static io.streamthoughts.jikkou.core.ReconciliationMode.DELETE;
-import static io.streamthoughts.jikkou.core.ReconciliationMode.FULL;
-import static io.streamthoughts.jikkou.core.ReconciliationMode.UPDATE;
+import org.jetbrains.annotations.NotNull;
@SupportedResource(type = V1KafkaConnector.class)
@SupportedResource(apiVersion = ApiVersions.KAFKA_V1BETA, kind = "KafkaConnectorChange")
diff --git a/providers/jikkou-provider-kafka/pom.xml b/providers/jikkou-provider-kafka/pom.xml
index 99077f34f..4a065c1a7 100644
--- a/providers/jikkou-provider-kafka/pom.xml
+++ b/providers/jikkou-provider-kafka/pom.xml
@@ -23,7 +23,6 @@
${project.parent.basedir}/header
- 3.6.6
@@ -81,7 +80,6 @@
io.projectreactor
reactor-core
- ${reactor-core.version}
diff --git a/providers/jikkou-provider-schema-registry/pom.xml b/providers/jikkou-provider-schema-registry/pom.xml
index b40010f1f..8701b24f1 100644
--- a/providers/jikkou-provider-schema-registry/pom.xml
+++ b/providers/jikkou-provider-schema-registry/pom.xml
@@ -57,6 +57,10 @@
org.jsonschema2pojo
jsonschema2pojo-maven-plugin
+
+ io.projectreactor
+ reactor-core
+
org.mockito
diff --git a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java
index e8b1caf96..c80db6cf1 100644
--- a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java
+++ b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java
@@ -19,7 +19,6 @@
import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels;
import io.streamthoughts.jikkou.schema.registry.model.SchemaType;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -36,6 +35,7 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
+import reactor.core.publisher.Mono;
@Testcontainers
@Tag("integration")
@@ -102,10 +102,10 @@ public void beforeEach() {
@Test
void shouldGetGlobalCompatibilityLevel() throws ExecutionException, InterruptedException {
// When
- CompletableFuture future = async.getGlobalCompatibility();
+ Mono future = async.getGlobalCompatibility();
// Then
- CompatibilityLevelObject result = future.get();
+ CompatibilityLevelObject result = future.block();
Assertions.assertEquals("BACKWARD", result.compatibilityLevel());
}
@@ -113,10 +113,10 @@ void shouldGetGlobalCompatibilityLevel() throws ExecutionException, InterruptedE
@Test
void shouldListSchemaForEmptySubject() throws ExecutionException, InterruptedException {
// When
- CompletableFuture> future = async.listSubjects();
+ Mono> future = async.listSubjects();
// Then
- List results = future.get();
+ List results = future.block();
Assertions.assertTrue(results.isEmpty());
}
@@ -124,14 +124,14 @@ void shouldListSchemaForEmptySubject() throws ExecutionException, InterruptedExc
@Test
void shouldRegisterSchemaVersionForNewSubject() throws ExecutionException, InterruptedException {
// When
- CompletableFuture future = async.registerSubjectVersion(
+ Mono future = async.registerSubjectVersion(
TEST_SUBJECT,
new SubjectSchemaRegistration(AVRO_SCHEMA, SchemaType.AVRO),
true
);
// Then
- SubjectSchemaId result = future.get();
+ SubjectSchemaId result = future.block();
Assertions.assertEquals(1, result.id());
}
@@ -139,26 +139,26 @@ void shouldRegisterSchemaVersionForNewSubject() throws ExecutionException, Inter
@Test
void shouldListSchemaForExistingSubject() throws ExecutionException, InterruptedException {
// When
- CompletableFuture> future = async.listSubjects();
+ Mono> future = async.listSubjects();
// Then
- List results = future.get();
+ List results = future.block();
Assertions.assertFalse(results.isEmpty());
Assertions.assertEquals(1, results.size());
- Assertions.assertEquals(TEST_SUBJECT, results.get(0));
+ Assertions.assertEquals(TEST_SUBJECT, results.getFirst());
}
@Order(5)
@Test
void shouldUpdateCompatibilityForExistingSubject() throws ExecutionException, InterruptedException {
// When
- CompletableFuture future = async.updateSubjectCompatibilityLevel(
+ Mono future = async.updateSubjectCompatibilityLevel(
TEST_SUBJECT,
new CompatibilityObject(CompatibilityLevels.FULL_TRANSITIVE.name())
);
// Then
- CompatibilityObject result = future.get();
+ CompatibilityObject result = future.block();
Assertions.assertEquals(CompatibilityLevels.FULL_TRANSITIVE.name(), result.compatibility());
}
@@ -166,10 +166,10 @@ void shouldUpdateCompatibilityForExistingSubject() throws ExecutionException, In
@Test
void shouldGetCompatibilityForExistingSubject() throws ExecutionException, InterruptedException {
// When
- CompletableFuture future = async.getSubjectCompatibilityLevel(TEST_SUBJECT, false);
+ Mono future = async.getSubjectCompatibilityLevel(TEST_SUBJECT, false);
// Then
- CompatibilityLevelObject result = future.get();
+ CompatibilityLevelObject result = future.block();
Assertions.assertEquals(CompatibilityLevels.FULL_TRANSITIVE.name(), result.compatibilityLevel());
}
@@ -177,17 +177,10 @@ void shouldGetCompatibilityForExistingSubject() throws ExecutionException, Inter
@Test
void shouldGetErrorCompatibilityForNotExistingSubjectAndDefaultToGlobalFalse() {
// When
- CompletableFuture future = async.getSubjectCompatibilityLevel("unknown", false);
+ Mono future = async.getSubjectCompatibilityLevel("unknown", false);
// Then
- RestClientException exception = Assertions
- .assertThrowsExactly(RestClientException.class, () -> {
- try {
- future.get();
- } catch (ExecutionException e) {
- throw e.getCause();
- }
- });
+ RestClientException exception = Assertions.assertThrowsExactly(RestClientException.class, future::block);
ErrorResponse response = exception.getResponseEntity(ErrorResponse.class);
Assertions.assertEquals(40408, response.errorCode());
@@ -198,10 +191,10 @@ void shouldGetErrorCompatibilityForNotExistingSubjectAndDefaultToGlobalFalse() {
@Test
void shouldGetGlobalCompatibilityForNotExistingSubjectAndDefaultToGlobalTrue() throws ExecutionException, InterruptedException {
// When
- CompletableFuture future = async.getSubjectCompatibilityLevel("unknown", true);
+ Mono future = async.getSubjectCompatibilityLevel("unknown", true);
// Then
- CompatibilityLevelObject result = future.get();
+ CompatibilityLevelObject result = future.block();
Assertions.assertEquals(CompatibilityLevels.BACKWARD.name(), result.compatibilityLevel());
}
@@ -209,7 +202,7 @@ void shouldGetGlobalCompatibilityForNotExistingSubjectAndDefaultToGlobalTrue() t
@Test
void shouldGetTrueForTestingCompatibleSchema() throws ExecutionException, InterruptedException {
// When
- CompletableFuture future = async.testCompatibility(
+ Mono future = async.testCompatibility(
TEST_SUBJECT,
"-1",
true,
@@ -217,7 +210,7 @@ void shouldGetTrueForTestingCompatibleSchema() throws ExecutionException, Interr
);
// Then
- CompatibilityCheck result = future.get();
+ CompatibilityCheck result = future.block();
Assertions.assertTrue(result.isCompatible());
Assertions.assertTrue(result.getMessages().isEmpty());
}
@@ -226,7 +219,7 @@ void shouldGetTrueForTestingCompatibleSchema() throws ExecutionException, Interr
@Test
void shouldGetFalseForTestingCompatibleSchema() throws ExecutionException, InterruptedException {
// When
- CompletableFuture future = async.testCompatibility(
+ Mono future = async.testCompatibility(
TEST_SUBJECT,
"-1",
true,
@@ -234,7 +227,7 @@ void shouldGetFalseForTestingCompatibleSchema() throws ExecutionException, Inter
);
// Then
- CompatibilityCheck result = future.get();
+ CompatibilityCheck result = future.block();
Assertions.assertFalse(result.isCompatible());
Assertions.assertFalse(result.getMessages().isEmpty());
}
diff --git a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java
index adab98328..37e8b3b72 100644
--- a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java
+++ b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java
@@ -34,7 +34,7 @@ public void beforeEach() throws ExecutionException, InterruptedException {
TEST_SUBJECT,
new SubjectSchemaRegistration(AVRO_SCHEMA, SchemaType.AVRO),
false
- ).get();
+ ).block();
}
@Test
@@ -50,7 +50,7 @@ public void shouldGetAllSchemasWithGlobalCompatibilityLevelTrue() {
Assertions.assertNotNull(resources);
Assertions.assertEquals(1, resources.size());
- V1SchemaRegistrySubject subject = resources.get(0);
+ V1SchemaRegistrySubject subject = resources.getFirst();
Assertions.assertEquals(TEST_SUBJECT, subject.getMetadata().getName());
Assertions.assertEquals(SchemaType.AVRO, subject.getSpec().getSchemaType());
Assertions.assertEquals(CompatibilityLevels.BACKWARD, subject.getSpec().getCompatibilityLevel());
@@ -69,7 +69,7 @@ public void shouldGetAllSchemasWithGlobalCompatibilityLevelFalse() {
Assertions.assertNotNull(resources);
Assertions.assertEquals(1, resources.size());
- V1SchemaRegistrySubject subject = resources.get(0);
+ V1SchemaRegistrySubject subject = resources.getFirst();
Assertions.assertEquals(TEST_SUBJECT, subject.getMetadata().getName());
Assertions.assertEquals(SchemaType.AVRO, subject.getSpec().getSchemaType());
Assertions.assertNull(subject.getSpec().getCompatibilityLevel());
diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java
index e203cbec4..5a0d958bc 100644
--- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java
+++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java
@@ -6,15 +6,10 @@
*/
package io.streamthoughts.jikkou.schema.registry.api;
-import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck;
-import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject;
-import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject;
-import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId;
-import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration;
-import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion;
+import io.streamthoughts.jikkou.schema.registry.api.data.*;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;
+import reactor.core.publisher.Mono;
/**
* Asynchronous Schema Registry Api.
@@ -26,7 +21,7 @@ public interface AsyncSchemaRegistryApi extends AutoCloseable {
*
* @return a list of registered subjects.
*/
- CompletableFuture> listSubjects();
+ Mono> listSubjects();
/**
* Deletes the specified subject and its associated compatibility level if registered.
@@ -36,8 +31,8 @@ public interface AsyncSchemaRegistryApi extends AutoCloseable {
* all associated metadata including the schema ID.
* @return a list of versions
*/
- CompletableFuture> deleteSubjectVersions(@NotNull String subject,
- boolean permanent);
+ Mono> deleteSubjectVersions(@NotNull String subject,
+ boolean permanent);
/**
* Register a new schema under the specified subject.
@@ -47,9 +42,9 @@ CompletableFuture> deleteSubjectVersions(@NotNull String subject,
* @param normalize whether to normalize the given schema
* @return the globally unique identifier of the schema.
*/
- CompletableFuture registerSubjectVersion(@NotNull String subject,
- @NotNull SubjectSchemaRegistration schema,
- boolean normalize);
+ Mono registerSubjectVersion(@NotNull String subject,
+ @NotNull SubjectSchemaRegistration schema,
+ boolean normalize);
/**
* Get the latest version of the schema registered under the specified subject.
@@ -57,14 +52,14 @@ CompletableFuture registerSubjectVersion(@NotNull String subjec
* @param subject name of the subject
* @return a {@link SubjectSchemaVersion} object.
*/
- CompletableFuture getLatestSubjectSchema(@NotNull String subject);
+ Mono getLatestSubjectSchema(@NotNull String subject);
/**
* Gets the schema registry global compatibility level.
*
* @return the compatibility level.
*/
- CompletableFuture getGlobalCompatibility();
+ Mono getGlobalCompatibility();
/**
* Gets compatibility level for the specified subject.
@@ -73,8 +68,8 @@ CompletableFuture registerSubjectVersion(@NotNull String subjec
* @param defaultToGlobal flag to default to global compatibility.
* @return the compatibility level.
*/
- CompletableFuture getSubjectCompatibilityLevel(@NotNull String subject,
- boolean defaultToGlobal);
+ Mono getSubjectCompatibilityLevel(@NotNull String subject,
+ boolean defaultToGlobal);
/**
* Updates compatibility level for the specified subject.
@@ -83,8 +78,8 @@ CompletableFuture getSubjectCompatibilityLevel(@NotNul
* @param compatibility the new compatibility level for the subject.
* @return the updated compatibility level.
*/
- CompletableFuture updateSubjectCompatibilityLevel(@NotNull String subject,
- @NotNull CompatibilityObject compatibility);
+ Mono updateSubjectCompatibilityLevel(@NotNull String subject,
+ @NotNull CompatibilityObject compatibility);
/**
* Deletes the specified subject-level compatibility level config and reverts to the global default.
@@ -92,16 +87,16 @@ CompletableFuture updateSubjectCompatibilityLevel(@NotNull
* @param subject the name of the subject.
* @return the compatibility level.
*/
- CompletableFuture deleteSubjectCompatibilityLevel(@NotNull String subject);
+ Mono deleteSubjectCompatibilityLevel(@NotNull String subject);
- CompletableFuture testCompatibility(@NotNull String subject,
- String version,
- boolean verbose,
- @NotNull SubjectSchemaRegistration schema);
+ Mono testCompatibility(@NotNull String subject,
+ String version,
+ boolean verbose,
+ @NotNull SubjectSchemaRegistration schema);
- CompletableFuture testCompatibilityLatest(@NotNull String subject,
- boolean verbose,
- @NotNull SubjectSchemaRegistration schema);
+ Mono testCompatibilityLatest(@NotNull String subject,
+ boolean verbose,
+ @NotNull SubjectSchemaRegistration schema);
@Override
default void close() {
diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java
index d99ea7d93..4b66c57af 100644
--- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java
+++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java
@@ -14,8 +14,8 @@
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;
+import reactor.core.publisher.Mono;
/**
* A wrapper around the REST API {@link SchemaRegistryApi} to provide asynchronous methods.
@@ -37,27 +37,27 @@ public DefaultAsyncSchemaRegistryApi(final @NotNull SchemaRegistryApi api) {
* @see SchemaRegistryApi#listSubjects()
*/
@Override
- public CompletableFuture> listSubjects() {
- return CompletableFuture.supplyAsync(api::listSubjects);
+ public Mono> listSubjects() {
+ return Mono.fromCallable(api::listSubjects);
}
/**
* @see SchemaRegistryApi#deleteSubjectVersions(String, boolean)
*/
@Override
- public CompletableFuture> deleteSubjectVersions(@NotNull final String subject,
+ public Mono> deleteSubjectVersions(@NotNull final String subject,
boolean permanent) {
- return CompletableFuture.supplyAsync(() -> api.deleteSubjectVersions(subject, permanent));
+ return Mono.fromCallable(() -> api.deleteSubjectVersions(subject, permanent));
}
/**
* @see SchemaRegistryApi#registerSchema(String, SubjectSchemaRegistration, boolean)
*/
@Override
- public CompletableFuture registerSubjectVersion(@NotNull final String subject,
+ public Mono registerSubjectVersion(@NotNull final String subject,
@NotNull final SubjectSchemaRegistration schema,
boolean normalize) {
- return CompletableFuture.supplyAsync(() -> api.registerSchema(subject, schema, normalize));
+ return Mono.fromCallable(() -> api.registerSchema(subject, schema, normalize));
}
@@ -65,42 +65,42 @@ public CompletableFuture registerSubjectVersion(@NotNull final
* @see SchemaRegistryApi#getLatestSubjectSchema(String)
*/
@Override
- public CompletableFuture getLatestSubjectSchema(@NotNull final String subject) {
- return CompletableFuture.supplyAsync(() -> api.getLatestSubjectSchema(subject));
+ public Mono getLatestSubjectSchema(@NotNull final String subject) {
+ return Mono.fromCallable(() -> api.getLatestSubjectSchema(subject));
}
/**
* @see SchemaRegistryApi#getGlobalCompatibility()
*/
@Override
- public CompletableFuture getGlobalCompatibility() {
- return CompletableFuture.supplyAsync(api::getGlobalCompatibility);
+ public Mono getGlobalCompatibility() {
+ return Mono.fromCallable(api::getGlobalCompatibility);
}
/**
* @see SchemaRegistryApi#getConfigCompatibility(String, boolean)
*/
@Override
- public CompletableFuture getSubjectCompatibilityLevel(@NotNull final String subject,
+ public Mono getSubjectCompatibilityLevel(@NotNull final String subject,
boolean defaultToGlobal) {
- return CompletableFuture.supplyAsync(() -> api.getConfigCompatibility(subject, defaultToGlobal));
+ return Mono.fromCallable(() -> api.getConfigCompatibility(subject, defaultToGlobal));
}
/**
* @see SchemaRegistryApi#updateConfigCompatibility(String, CompatibilityObject)
*/
@Override
- public CompletableFuture updateSubjectCompatibilityLevel(@NotNull final String subject,
+ public Mono updateSubjectCompatibilityLevel(@NotNull final String subject,
@NotNull final CompatibilityObject compatibility) {
- return CompletableFuture.supplyAsync(() -> api.updateConfigCompatibility(subject, compatibility));
+ return Mono.fromCallable(() -> api.updateConfigCompatibility(subject, compatibility));
}
/**
* @see SchemaRegistryApi#deleteConfigCompatibility(String)
*/
@Override
- public CompletableFuture deleteSubjectCompatibilityLevel(@NotNull final String subject) {
- return CompletableFuture.supplyAsync(() -> api.deleteConfigCompatibility(subject));
+ public Mono deleteSubjectCompatibilityLevel(@NotNull final String subject) {
+ return Mono.fromCallable(() -> api.deleteConfigCompatibility(subject));
}
@@ -108,11 +108,11 @@ public CompletableFuture deleteSubjectCompatibilityLevel(@N
* @see SchemaRegistryApi#deleteConfigCompatibility(String)
*/
@Override
- public CompletableFuture testCompatibility(@NotNull final String subject,
+ public Mono testCompatibility(@NotNull final String subject,
String version,
boolean verbose,
@NotNull final SubjectSchemaRegistration schema) {
- return CompletableFuture.supplyAsync(
+ return Mono.fromCallable(
() -> api.testCompatibility(subject, Integer.parseInt(version), verbose, schema)
);
}
@@ -121,12 +121,10 @@ public CompletableFuture testCompatibility(@NotNull final St
* @see SchemaRegistryApi#testCompatibilityLatest(String, boolean, SubjectSchemaRegistration)
*/
@Override
- public CompletableFuture testCompatibilityLatest(@NotNull String subject,
- boolean verbose,
- @NotNull SubjectSchemaRegistration schema) {
- return CompletableFuture.supplyAsync(
- () -> api.testCompatibilityLatest(subject, verbose, schema)
- );
+ public Mono testCompatibilityLatest(@NotNull String subject,
+ boolean verbose,
+ @NotNull SubjectSchemaRegistration schema) {
+ return Mono.fromCallable(() -> api.testCompatibilityLatest(subject, verbose, schema));
}
@Override
diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java
index 643fec877..8856c5d3e 100644
--- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java
+++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java
@@ -32,7 +32,7 @@ public final class SchemaSubjectChangeComputer extends ResourceChangeComputer> TYPE_CONVERTER = ObjectTypeConverter.newForType(new TypeReference<>() {
+ private static final TypeConverter