Skip to content

Commit

Permalink
refactor(provider): optimize schema registry provider
Browse files Browse the repository at this point in the history
related-to: #498
  • Loading branch information
fhussonnois committed Nov 29, 2024
1 parent ddf6088 commit 5ff8d26
Show file tree
Hide file tree
Showing 21 changed files with 205 additions and 262 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
<config.version>1.4.3</config.version>
<annotations.version>24.1.0</annotations.version>
<jakarta.validation-api.version>3.1.0</jakarta.validation-api.version>
<reactor-core.version>3.6.6</reactor-core.version>
<!-- Logging -->
<slf4j-api.version>2.0.9</slf4j-api.version>
<logback.version>1.5.6</logback.version>
Expand Down Expand Up @@ -236,6 +237,11 @@
<artifactId>aws-msk-iam-auth</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor-core.version}</version>
</dependency>
<!-- START dependencies for Tests -->
<dependency>
<groupId>org.mockito</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,13 @@

import io.streamthoughts.jikkou.extension.aiven.api.data.CompatibilityCheckResponse;
import io.streamthoughts.jikkou.schema.registry.api.AsyncSchemaRegistryApi;
import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck;
import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject;
import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion;
import io.streamthoughts.jikkou.schema.registry.api.data.*;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;

/**
* AsyncSchemaRegistryApi implementation for Aiven.
Expand All @@ -41,20 +36,17 @@ public AivenAsyncSchemaRegistryApi(final @NotNull AivenApiClient api) {
* {@inheritDoc}
**/
@Override
public CompletableFuture<List<String>> listSubjects() {
return CompletableFuture.supplyAsync(
() -> api.listSchemaRegistrySubjects().subjects()
);
public Mono<List<String>> listSubjects() {
return Mono.fromCallable(() -> api.listSchemaRegistrySubjects().subjects());
}

/**
* {@inheritDoc}
**/
@Override
public CompletableFuture<List<Integer>> deleteSubjectVersions(@NotNull String subject,
boolean permanent) {
return CompletableFuture.supplyAsync(
() -> {
public Mono<List<Integer>> deleteSubjectVersions(@NotNull String subject,
boolean permanent) {
return Mono.fromCallable(() -> {
api.deleteSchemaRegistrySubject(subject);
return Collections.emptyList();
}
Expand All @@ -65,16 +57,16 @@ public CompletableFuture<List<Integer>> deleteSubjectVersions(@NotNull String su
* {@inheritDoc}
**/
@Override
public CompletableFuture<SubjectSchemaId> registerSubjectVersion(@NotNull String subject,
@NotNull SubjectSchemaRegistration schema,
boolean normalize) {
public Mono<SubjectSchemaId> registerSubjectVersion(@NotNull String subject,
@NotNull SubjectSchemaRegistration schema,
boolean normalize) {
// Drop references - not supported through the Aiven's API.
SubjectSchemaRegistration registration = new SubjectSchemaRegistration(
schema.schema(),
schema.schemaType(),
null
);
return CompletableFuture.supplyAsync(
return Mono.fromCallable(
() -> new SubjectSchemaId(api.registerSchemaRegistrySubjectVersion(subject, registration).version())
);
}
Expand All @@ -83,8 +75,8 @@ public CompletableFuture<SubjectSchemaId> registerSubjectVersion(@NotNull String
* {@inheritDoc}
**/
@Override
public CompletableFuture<SubjectSchemaVersion> getLatestSubjectSchema(@NotNull String subject) {
return CompletableFuture.supplyAsync(
public Mono<SubjectSchemaVersion> getLatestSubjectSchema(@NotNull String subject) {
return Mono.fromCallable(
() -> api.getSchemaRegistryLatestSubjectVersion(subject).version()
);
}
Expand All @@ -93,8 +85,8 @@ public CompletableFuture<SubjectSchemaVersion> getLatestSubjectSchema(@NotNull S
* {@inheritDoc}
**/
@Override
public CompletableFuture<CompatibilityLevelObject> getGlobalCompatibility() {
return CompletableFuture.supplyAsync(
public Mono<CompatibilityLevelObject> getGlobalCompatibility() {
return Mono.fromCallable(
() -> new CompatibilityLevelObject(api.getSchemaRegistryGlobalCompatibility().compatibilityLevel().name())
);
}
Expand All @@ -103,9 +95,9 @@ public CompletableFuture<CompatibilityLevelObject> getGlobalCompatibility() {
* {@inheritDoc}
**/
@Override
public CompletableFuture<CompatibilityLevelObject> getSubjectCompatibilityLevel(@NotNull String subject,
boolean defaultToGlobal) {
return CompletableFuture.supplyAsync(
public Mono<CompatibilityLevelObject> getSubjectCompatibilityLevel(@NotNull String subject,
boolean defaultToGlobal) {
return Mono.fromCallable(
() -> new CompatibilityLevelObject(api.getSchemaRegistrySubjectCompatibility(subject).compatibilityLevel().name())
);
}
Expand All @@ -114,9 +106,9 @@ public CompletableFuture<CompatibilityLevelObject> getSubjectCompatibilityLevel(
* {@inheritDoc}
**/
@Override
public CompletableFuture<CompatibilityObject> updateSubjectCompatibilityLevel(@NotNull String subject,
@NotNull CompatibilityObject compatibility) {
return CompletableFuture.supplyAsync(
public Mono<CompatibilityObject> updateSubjectCompatibilityLevel(@NotNull String subject,
@NotNull CompatibilityObject compatibility) {
return Mono.fromCallable(
() -> {
api.updateSchemaRegistrySubjectCompatibility(subject, compatibility);
return new CompatibilityObject(compatibility.compatibility());
Expand All @@ -128,7 +120,7 @@ public CompletableFuture<CompatibilityObject> updateSubjectCompatibilityLevel(@N
* {@inheritDoc}
**/
@Override
public CompletableFuture<CompatibilityObject> deleteSubjectCompatibilityLevel(@NotNull String subject) {
public Mono<CompatibilityObject> deleteSubjectCompatibilityLevel(@NotNull String subject) {
throw new AivenApiClientException(
"Deleting configuration for Schema Registry subject is not supported by " +
"the Aiven API (for more information: https://api.aiven.io/doc/)."
Expand All @@ -139,11 +131,11 @@ public CompletableFuture<CompatibilityObject> deleteSubjectCompatibilityLevel(@N
* {@inheritDoc}
**/
@Override
public CompletableFuture<CompatibilityCheck> testCompatibility(@NotNull String subject,
String version,
boolean verbose,
@NotNull SubjectSchemaRegistration schema) {
return CompletableFuture.supplyAsync(
public Mono<CompatibilityCheck> testCompatibility(@NotNull String subject,
String version,
boolean verbose,
@NotNull SubjectSchemaRegistration schema) {
return Mono.fromCallable(
() -> {
CompatibilityCheckResponse response = api.checkSchemaRegistryCompatibility(subject, version, schema);
return new CompatibilityCheck(
Expand All @@ -158,9 +150,9 @@ public CompletableFuture<CompatibilityCheck> testCompatibility(@NotNull String s
* {@inheritDoc}
**/
@Override
public CompletableFuture<CompatibilityCheck> testCompatibilityLatest(@NotNull String subject,
boolean verbose,
@NotNull SubjectSchemaRegistration schema) {
public Mono<CompatibilityCheck> testCompatibilityLatest(@NotNull String subject,
boolean verbose,
@NotNull SubjectSchemaRegistration schema) {
return testCompatibility(subject, "latest", verbose, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import io.streamthoughts.jikkou.core.models.HasMetadata;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectClientConfig;
import io.streamthoughts.jikkou.kafka.connect.exception.KafkaConnectClusterNotFoundException;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -27,6 +25,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;

public final class KafkaConnectExtensionConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.streamthoughts.jikkou.core.reconciler.change.ResourceChangeFactory;
import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
*/
package io.streamthoughts.jikkou.kafka.connect.change;

import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_CLASS_CONFIG;
import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG;
import static io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer.DATA_CONNECTOR_CLASS;

import io.streamthoughts.jikkou.core.data.TypeConverter;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.models.change.SpecificStateChange;
Expand All @@ -22,9 +26,6 @@
import io.streamthoughts.jikkou.kafka.connect.api.data.ConnectorInfoResponse;
import io.streamthoughts.jikkou.kafka.connect.api.data.ErrorResponse;
import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -34,10 +35,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_CLASS_CONFIG;
import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG;
import static io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer.DATA_CONNECTOR_CLASS;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;

public final class KafkaConnectorChangeHandler extends BaseChangeHandler<ResourceChange> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@
import io.streamthoughts.jikkou.kafka.connect.exception.KafkaConnectClusterNotFoundException;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector;
import io.streamthoughts.jikkou.kafka.connect.service.KafkaConnectClusterService;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A ResourceCollector to get {@link V1KafkaConnector} resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
*/
package io.streamthoughts.jikkou.kafka.connect.reconciler;

import static io.streamthoughts.jikkou.core.ReconciliationMode.CREATE;
import static io.streamthoughts.jikkou.core.ReconciliationMode.DELETE;
import static io.streamthoughts.jikkou.core.ReconciliationMode.FULL;
import static io.streamthoughts.jikkou.core.ReconciliationMode.UPDATE;

import io.streamthoughts.jikkou.core.ReconciliationContext;
import io.streamthoughts.jikkou.core.annotation.SupportedResource;
import io.streamthoughts.jikkou.core.extension.ContextualExtension;
Expand All @@ -28,19 +33,13 @@
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeDescription;
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeHandler;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector;
import org.jetbrains.annotations.NotNull;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static io.streamthoughts.jikkou.core.ReconciliationMode.CREATE;
import static io.streamthoughts.jikkou.core.ReconciliationMode.DELETE;
import static io.streamthoughts.jikkou.core.ReconciliationMode.FULL;
import static io.streamthoughts.jikkou.core.ReconciliationMode.UPDATE;
import org.jetbrains.annotations.NotNull;

@SupportedResource(type = V1KafkaConnector.class)
@SupportedResource(apiVersion = ApiVersions.KAFKA_V1BETA, kind = "KafkaConnectorChange")
Expand Down
2 changes: 0 additions & 2 deletions providers/jikkou-provider-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

<properties>
<license.header.file>${project.parent.basedir}/header</license.header.file>
<reactor-core.version>3.6.6</reactor-core.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -81,7 +80,6 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor-core.version}</version>
</dependency>
<!-- START dependencies for test -->
<dependency>
Expand Down
4 changes: 4 additions & 0 deletions providers/jikkou-provider-schema-registry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<!-- START dependencies for test -->
<dependency>
<groupId>org.mockito</groupId>
Expand Down
Loading

0 comments on commit 5ff8d26

Please sign in to comment.