From 03350b4ce5b2886126f5082fddb56d1cd4545b65 Mon Sep 17 00:00:00 2001 From: Zach Pearce Date: Wed, 27 Mar 2024 14:45:25 -0600 Subject: [PATCH 1/2] feat(provider-schema-registry): add support for subject modes --- .gitignore | 2 + .../api/AivenAsyncSchemaRegistryApi.java | 22 +++-- .../AivenSchemaRegistrySubjectCollector.java | 2 +- .../SchemaRegistrySubjectCollectorTest.java | 11 ++- .../V1SchemaRegistrySubjectFactory.java | 8 +- .../registry/api/AsyncSchemaRegistryApi.java | 35 +++++-- .../api/DefaultAsyncSchemaRegistryApi.java | 22 +++-- .../registry/api/SchemaRegistryApi.java | 37 ++++++-- .../schema/registry/api/data/ModeObject.java | 29 ++++++ .../change/SchemaSubjectChangeComputer.java | 17 ++++ .../AbstractSchemaSubjectChangeHandler.java | 52 +++++++++-- .../CreateSchemaSubjectChangeHandler.java | 9 ++ .../UpdateSchemaSubjectChangeHandler.java | 15 ++- .../jikkou/schema/registry/model/Modes.java | 17 ++++ .../models/V1SchemaRegistrySubjectSpec.java | 30 +++++- .../SchemaRegistrySubjectCollector.java | 93 +++++++++++-------- .../SchemaRegistrySubjectController.java | 3 +- .../main/json/V1SchemaRegistrySubject.json | 7 +- .../SchemaSubjectChangeComputerTest.java | 71 +++++++++++++- .../datasets/resource-subject-test.yaml | 3 +- 20 files changed, 397 insertions(+), 88 deletions(-) create mode 100644 providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/data/ModeObject.java create mode 100644 providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/model/Modes.java diff --git a/.gitignore b/.gitignore index df19144a6..36e9abb20 100644 --- a/.gitignore +++ b/.gitignore @@ -131,3 +131,5 @@ docs/node_modules docs/resources docs/public docs/public + +.sdkmanrc diff --git a/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java b/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java index 06276061f..75e720a7a 100644 --- a/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java +++ b/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java @@ -8,12 +8,7 @@ import io.streamthoughts.jikkou.extension.aiven.api.data.CompatibilityCheckResponse; import io.streamthoughts.jikkou.schema.registry.api.AsyncSchemaRegistryApi; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion; +import io.streamthoughts.jikkou.schema.registry.api.data.*; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -171,4 +166,19 @@ public CompletableFuture testCompatibilityLatest(@NotNull St public void close() { api.close(); } + + @Override + public CompletableFuture getSubjectMode(@NotNull String subject, boolean defaultToGlobal) { + throw new UnsupportedOperationException("Aiven schema registry does not support subject mode"); + } + + @Override + public CompletableFuture updateSubjectMode(@NotNull String subject, @NotNull ModeObject mode) { + throw new UnsupportedOperationException("Aiven schema registry does not support subject mode"); + } + + @Override + public CompletableFuture deleteSubjectMode(@NotNull String subject) { + throw new UnsupportedOperationException("Aiven schema registry does not support subject mode"); + } } diff --git a/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/reconciler/AivenSchemaRegistrySubjectCollector.java b/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/reconciler/AivenSchemaRegistrySubjectCollector.java index 60c9a8b69..1360e801c 100644 --- a/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/reconciler/AivenSchemaRegistrySubjectCollector.java +++ b/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/reconciler/AivenSchemaRegistrySubjectCollector.java @@ -162,7 +162,7 @@ private static CompletableFuture getSchemaRegistrySubje () -> Pair.of(subjectSchemaVersion, api.getSchemaRegistrySubjectCompatibility(subject)))) .thenApply(pair -> // Create SchemaRegistrySubject object - factory.createSchemaRegistrySubject(pair._1().version(), pair._2().compatibilityLevel()) + factory.createSchemaRegistrySubject(pair._1().version(), pair._2().compatibilityLevel(), null) ); } } diff --git a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java index adab98328..391db3e4a 100644 --- a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java +++ b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java @@ -12,6 +12,7 @@ import io.streamthoughts.jikkou.schema.registry.api.AsyncSchemaRegistryApi; import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration; import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels; +import io.streamthoughts.jikkou.schema.registry.model.Modes; import io.streamthoughts.jikkou.schema.registry.model.SchemaType; import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject; import java.util.List; @@ -38,9 +39,10 @@ public void beforeEach() throws ExecutionException, InterruptedException { } @Test - public void shouldGetAllSchemasWithGlobalCompatibilityLevelTrue() { + public void shouldGetAllSchemasWithGlobalCompatibilityLevelAndGlobalModeTrue() { // Given collector.defaultToGlobalCompatibilityLevel(true); + collector.defaultToGlobalMode(true); // When List resources = collector.listAll(Configuration.empty(), Selectors.NO_SELECTOR) @@ -54,12 +56,14 @@ public void shouldGetAllSchemasWithGlobalCompatibilityLevelTrue() { Assertions.assertEquals(TEST_SUBJECT, subject.getMetadata().getName()); Assertions.assertEquals(SchemaType.AVRO, subject.getSpec().getSchemaType()); Assertions.assertEquals(CompatibilityLevels.BACKWARD, subject.getSpec().getCompatibilityLevel()); + Assertions.assertEquals(Modes.READWRITE, subject.getSpec().getMode()); } @Test - public void shouldGetAllSchemasWithGlobalCompatibilityLevelFalse() { + public void shouldGetAllSchemasWithGlobalCompatibilityLevelAndGlobalModeFalse() { // Given collector.defaultToGlobalCompatibilityLevel(false); + collector.defaultToGlobalMode(false); // When List resources = collector.listAll(Configuration.empty(), Selectors.NO_SELECTOR) @@ -73,5 +77,6 @@ public void shouldGetAllSchemasWithGlobalCompatibilityLevelFalse() { Assertions.assertEquals(TEST_SUBJECT, subject.getMetadata().getName()); Assertions.assertEquals(SchemaType.AVRO, subject.getSpec().getSchemaType()); Assertions.assertNull(subject.getSpec().getCompatibilityLevel()); + Assertions.assertNull(subject.getSpec().getMode()); } -} \ No newline at end of file +} diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/V1SchemaRegistrySubjectFactory.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/V1SchemaRegistrySubjectFactory.java index e976c3bc9..b7feb2745 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/V1SchemaRegistrySubjectFactory.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/V1SchemaRegistrySubjectFactory.java @@ -9,6 +9,7 @@ import io.streamthoughts.jikkou.core.models.ObjectMeta; import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion; import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels; +import io.streamthoughts.jikkou.schema.registry.model.Modes; import io.streamthoughts.jikkou.schema.registry.model.SchemaHandle; import io.streamthoughts.jikkou.schema.registry.model.SchemaType; import io.streamthoughts.jikkou.schema.registry.models.SchemaRegistry; @@ -36,7 +37,8 @@ public V1SchemaRegistrySubjectFactory(String schemaRegistryVendor, @NotNull public V1SchemaRegistrySubject createSchemaRegistrySubject(@NotNull SubjectSchemaVersion subjectSchema, - @Nullable CompatibilityLevels compatibilityLevels) { + @Nullable CompatibilityLevels compatibilityLevels, + @Nullable Modes modes) { SchemaType schemaType = Optional.ofNullable(subjectSchema.schemaType()) .map(SchemaType::getForNameIgnoreCase) .orElse(SchemaType.defaultType()); @@ -55,6 +57,10 @@ public V1SchemaRegistrySubject createSchemaRegistrySubject(@NotNull SubjectSchem specBuilder = specBuilder.withCompatibilityLevel(compatibilityLevels); } + if (modes != null) { + specBuilder = specBuilder.withMode(modes); + } + V1SchemaRegistrySubject res = V1SchemaRegistrySubject .builder() .withMetadata(ObjectMeta diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java index e203cbec4..8975a1ffd 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java @@ -6,12 +6,7 @@ */ package io.streamthoughts.jikkou.schema.registry.api; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion; +import io.streamthoughts.jikkou.schema.registry.api.data.*; import java.util.List; import java.util.concurrent.CompletableFuture; import org.jetbrains.annotations.NotNull; @@ -94,6 +89,34 @@ CompletableFuture updateSubjectCompatibilityLevel(@NotNull */ CompletableFuture deleteSubjectCompatibilityLevel(@NotNull String subject); + /** + * Gets mode level for the specified subject. + * + * @param subject the name of the subject. + * @param defaultToGlobal flag to default to global mode. + * @return the mode. + */ + CompletableFuture getSubjectMode(@NotNull String subject, + boolean defaultToGlobal); + + /** + * Updates mode for the specified subject. + * + * @param subject the name of the subject. + * @param mode the new mode for the subject. + * @return the updated mode. + */ + CompletableFuture updateSubjectMode(@NotNull String subject, + @NotNull ModeObject mode); + + /** + * Deletes the specified subject-level mode and reverts to the global default. + * + * @param subject the name of the subject. + * @return the mode. + */ + CompletableFuture deleteSubjectMode(@NotNull String subject); + CompletableFuture testCompatibility(@NotNull String subject, String version, boolean verbose, diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java index d99ea7d93..d674fb3cb 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java @@ -6,12 +6,7 @@ */ package io.streamthoughts.jikkou.schema.registry.api; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion; +import io.streamthoughts.jikkou.schema.registry.api.data.*; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -104,6 +99,21 @@ public CompletableFuture deleteSubjectCompatibilityLevel(@N } + @Override + public CompletableFuture getSubjectMode(@NotNull String subject, boolean defaultToGlobal) { + return CompletableFuture.supplyAsync(() -> api.getMode(subject, defaultToGlobal)); + } + + @Override + public CompletableFuture updateSubjectMode(@NotNull String subject, @NotNull ModeObject mode) { + return CompletableFuture.supplyAsync(() -> api.updateMode(subject, mode)); + } + + @Override + public CompletableFuture deleteSubjectMode(@NotNull final String subject) { + return CompletableFuture.supplyAsync(() -> api.deleteMode(subject)); + } + /** * @see SchemaRegistryApi#deleteConfigCompatibility(String) */ diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryApi.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryApi.java index 5c1cad525..51a164a91 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryApi.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryApi.java @@ -6,14 +6,7 @@ */ package io.streamthoughts.jikkou.schema.registry.api; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject; -import io.streamthoughts.jikkou.schema.registry.api.data.SchemaString; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectVersion; +import io.streamthoughts.jikkou.schema.registry.api.data.*; import jakarta.ws.rs.Consumes; import jakarta.ws.rs.DELETE; import jakarta.ws.rs.DefaultValue; @@ -229,6 +222,34 @@ CompatibilityLevelObject getConfigCompatibility(@PathParam("subject") String sub @Produces("application/vnd.schemaregistry.v1+json") CompatibilityObject deleteConfigCompatibility(@PathParam("subject") String subject); + /* + * ---------------------------------------------------------------------------------------------------------------- + * MODE + * ---------------------------------------------------------------------------------------------------------------- + */ + + @GET + @Path("mode/{subject}") + @Produces("application/vnd.schemaregistry.v1+json") + ModeObject getMode(@PathParam("subject") String subject, + @QueryParam("defaultToGlobal") @DefaultValue("false") boolean defaultToGlobal); + + @PUT + @Path("mode/{subject}") + @Consumes({"application/vnd.schemaregistry.v1+json", "application/vnd.schemaregistry+json", "application/json"}) + ModeObject updateMode(@PathParam("subject") String subject, ModeObject mode); + + /** + * Deletes the specified subject-level mode and reverts to the global default. + * + * @param subject the name of the subject. + * @return the mode. + */ + @DELETE + @Path("mode/{subject}") + @Produces("application/vnd.schemaregistry.v1+json") + ModeObject deleteMode(@PathParam("subject") String subject); + /* * ---------------------------------------------------------------------------------------------------------------- * COMPATIBILITY diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/data/ModeObject.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/data/ModeObject.java new file mode 100644 index 000000000..fb708a045 --- /dev/null +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/data/ModeObject.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) The original authors + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.jikkou.schema.registry.api.data; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.streamthoughts.jikkou.core.annotation.Reflectable; +import java.beans.ConstructorProperties; +import org.jetbrains.annotations.NotNull; + +/** + * ModeObject. + * + * @param mode a mode string. + */ +@Reflectable +public record ModeObject(@NotNull String mode) { + @ConstructorProperties({ + "mode" + }) + public ModeObject {} + + @Override + @JsonProperty("mode") + public String mode() { return mode; } +} diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java index e04b25192..6812a768d 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java @@ -31,6 +31,7 @@ public final class SchemaSubjectChangeComputer extends ResourceChangeComputer { public static final String DATA_COMPATIBILITY_LEVEL = "compatibilityLevel"; + public static final String DATA_MODE = "mode"; public static final String DATA_SCHEMA = "schema"; public static final String DATA_SCHEMA_TYPE = "schemaType"; public static final String DATA_REFERENCES = "references"; @@ -78,6 +79,11 @@ public ResourceChange createChangeForCreate(String key, V1SchemaRegistrySubject StateChange.create(DATA_COMPATIBILITY_LEVEL, after.getSpec().getCompatibilityLevel())); } + if (after.getSpec().getMode() != null) { + specBuilder = specBuilder.withChange( + StateChange.create(DATA_MODE, after.getSpec().getMode())); + } + return GenericResourceChange .builder(V1SchemaRegistrySubject.class) .withMetadata(after.getMetadata()) @@ -89,6 +95,7 @@ public ResourceChange createChangeForCreate(String key, V1SchemaRegistrySubject public ResourceChange createChangeForUpdate(String key, V1SchemaRegistrySubject before, V1SchemaRegistrySubject after) { StateChangeList changes = StateChangeList.emptyList() .with(getChangeForCompatibility(before, after)) + .with(getChangeForMode(before, after)) .with(getChangeForSchema(before, after)) .with(getChangeForSchemaType(before, after)) .with(getChangeForReferences(before, after)); @@ -152,6 +159,16 @@ private StateChange getChangeForCompatibility(V1SchemaRegistrySubject before, ); } + @NotNull + private StateChange getChangeForMode(V1SchemaRegistrySubject before, + V1SchemaRegistrySubject after) { + return StateChange.with( + DATA_MODE, + Optional.ofNullable(before).map(o -> o.getSpec().getMode()).orElse(null), + Optional.ofNullable(after).map(o -> o.getSpec().getMode()).orElse(null) + ); + } + @NotNull private StateChange getChangeForSchema(V1SchemaRegistrySubject before, V1SchemaRegistrySubject after) { diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java index b67b8a7cf..d8f7f1e6d 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java @@ -6,10 +6,7 @@ */ package io.streamthoughts.jikkou.schema.registry.change.handler; -import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.DATA_COMPATIBILITY_LEVEL; -import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.DATA_REFERENCES; -import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.DATA_SCHEMA; -import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.DATA_SCHEMA_TYPE; +import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.*; import io.streamthoughts.jikkou.core.data.TypeConverter; import io.streamthoughts.jikkou.core.models.change.ResourceChange; @@ -23,13 +20,11 @@ import io.streamthoughts.jikkou.schema.registry.SchemaRegistryAnnotations; import io.streamthoughts.jikkou.schema.registry.api.AsyncSchemaRegistryApi; import io.streamthoughts.jikkou.schema.registry.api.SchemaRegistryApi; -import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject; -import io.streamthoughts.jikkou.schema.registry.api.data.ErrorResponse; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaReference; -import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration; +import io.streamthoughts.jikkou.schema.registry.api.data.*; import io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeDescription; import io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeOptions; import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels; +import io.streamthoughts.jikkou.schema.registry.model.Modes; import io.streamthoughts.jikkou.schema.registry.model.SchemaType; import java.util.List; import java.util.Objects; @@ -74,6 +69,27 @@ protected CompletableFuture updateCompatibilityLevel(final ResourceChange }); } + protected CompletableFuture updateMode(final ResourceChange change) { + final Modes modes = StateChangeList + .of(change.getSpec().getChanges()) + .getLast(DATA_MODE, TypeConverter.of(Modes.class)) + .getAfter(); + + final String subjectName = change.getMetadata().getName(); + LOG.info("Updating mode for Schema Registry subject '{}'.", subjectName); + return api + .updateSubjectMode(subjectName, new ModeObject(modes.name())) + .thenApply(modeObject -> { + if (LOG.isInfoEnabled()) { + LOG.info( + "Updated mode for Schema Registry subject '{}' to '{}'.", + subjectName, + modeObject.mode()); + } + return null; + }); + } + protected CompletableFuture registerSubjectVersion(@NotNull final ResourceChange change) { String schema = change.getSpec() .getChanges() @@ -151,6 +167,26 @@ protected CompletableFuture deleteCompatibilityLevel(@NotNull ResourceChan }); } + protected CompletableFuture deleteMode(@NotNull ResourceChange change) { + final String subject = change.getMetadata().getName(); + if (LOG.isInfoEnabled()) { + LOG.info("Deleting mode for Schema Registry subject '{}'.", + subject + ); + } + return api + .deleteSubjectMode(subject) + .thenApplyAsync(modeObject -> { + if (LOG.isInfoEnabled()) { + LOG.info( + "Deleted mode for Schema Registry subject '{}' to '{}'.", + change.getMetadata().getName(), + modeObject.mode()); + } + return null; + }); + } + public ChangeResponse toChangeResponse(ResourceChange change, CompletableFuture future) { CompletableFuture handled = future.handle((unused, throwable) -> { diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/CreateSchemaSubjectChangeHandler.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/CreateSchemaSubjectChangeHandler.java index cf67bec80..52796d476 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/CreateSchemaSubjectChangeHandler.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/CreateSchemaSubjectChangeHandler.java @@ -7,6 +7,7 @@ package io.streamthoughts.jikkou.schema.registry.change.handler; import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.DATA_COMPATIBILITY_LEVEL; +import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.DATA_MODE; import io.streamthoughts.jikkou.core.models.change.ResourceChange; import io.streamthoughts.jikkou.core.models.change.StateChange; @@ -61,6 +62,14 @@ public List> handleChanges(@NotNull List updateCompatibilityLevel(change)); } + StateChange modes = StateChangeList + .of(change.getSpec().getChanges()) + .getLast(DATA_MODE); + + if (modes != null) { + future = future.thenComposeAsync(unused -> updateMode(change)); + } + results.add(toChangeResponse(change, future)); } return results; diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/UpdateSchemaSubjectChangeHandler.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/UpdateSchemaSubjectChangeHandler.java index 6d40196f1..b46644e43 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/UpdateSchemaSubjectChangeHandler.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/UpdateSchemaSubjectChangeHandler.java @@ -9,8 +9,7 @@ import static io.streamthoughts.jikkou.core.reconciler.Operation.CREATE; import static io.streamthoughts.jikkou.core.reconciler.Operation.DELETE; import static io.streamthoughts.jikkou.core.reconciler.Operation.UPDATE; -import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.DATA_COMPATIBILITY_LEVEL; -import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.DATA_SCHEMA; +import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.*; import io.streamthoughts.jikkou.core.models.change.ResourceChange; import io.streamthoughts.jikkou.core.models.change.StateChange; @@ -76,6 +75,18 @@ public List> handleChanges(@NotNull List deleteCompatibilityLevel(change)); } + + StateChange modes = StateChangeList + .of(change.getSpec().getChanges()) + .getLast(DATA_MODE); + + if (UPDATE == modes.getOp() || CREATE == modes.getOp()) { + future = future.thenComposeAsync(unused -> updateMode(change)); + } + + if (DELETE == modes.getOp()) { + future = future.thenComposeAsync(unused -> deleteMode(change)); + } results.add(toChangeResponse(change, future)); } return results; diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/model/Modes.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/model/Modes.java new file mode 100644 index 000000000..8c80af971 --- /dev/null +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/model/Modes.java @@ -0,0 +1,17 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) The original authors + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.jikkou.schema.registry.model; + +/** + * Schema modes + */ +public enum Modes { + + IMPORT, + READONLY, + READWRITE +} diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/models/V1SchemaRegistrySubjectSpec.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/models/V1SchemaRegistrySubjectSpec.java index b31e52674..790a77967 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/models/V1SchemaRegistrySubjectSpec.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/models/V1SchemaRegistrySubjectSpec.java @@ -13,6 +13,7 @@ import io.streamthoughts.jikkou.core.annotation.Reflectable; import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaReference; import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels; +import io.streamthoughts.jikkou.schema.registry.model.Modes; import io.streamthoughts.jikkou.schema.registry.model.SchemaHandle; import io.streamthoughts.jikkou.schema.registry.model.SchemaType; import java.beans.ConstructorProperties; @@ -31,6 +32,7 @@ @Setter @JsonPropertyOrder({ "compatibilityLevel", + "mode", "schemaRegistry", "schemaType", "schema", @@ -48,6 +50,13 @@ public class V1SchemaRegistrySubjectSpec { @JsonProperty("compatibilityLevel") @JsonPropertyDescription("The schema compatibility level for this subject.") private CompatibilityLevels compatibilityLevel; + /** + * The mode for this subject: IMPORT, READONLY, READWRITE. + * + */ + @JsonProperty("mode") + @JsonPropertyDescription("The mode for this subject: IMPORT, READONLY, READWRITE.") + private Modes mode; @JsonProperty("schemaRegistry") private SchemaRegistry schemaRegistry; /** @@ -77,6 +86,7 @@ public V1SchemaRegistrySubjectSpec() { /** * + * @param mode * @param schema * @param schemaRegistry * @param references @@ -85,14 +95,16 @@ public V1SchemaRegistrySubjectSpec() { */ @ConstructorProperties({ "compatibilityLevel", + "mode", "schemaRegistry", "schemaType", "schema", "references" }) - public V1SchemaRegistrySubjectSpec(CompatibilityLevels compatibilityLevel, SchemaRegistry schemaRegistry, SchemaType schemaType, SchemaHandle schema, List references) { + public V1SchemaRegistrySubjectSpec(CompatibilityLevels compatibilityLevel, Modes mode, SchemaRegistry schemaRegistry, SchemaType schemaType, SchemaHandle schema, List references) { super(); this.compatibilityLevel = compatibilityLevel; + this.mode = mode; this.schemaRegistry = schemaRegistry; this.schemaType = schemaType; this.schema = schema; @@ -108,6 +120,15 @@ public CompatibilityLevels getCompatibilityLevel() { return compatibilityLevel; } + /** + * The mode for this subject: IMPORT, READONLY, READWRITE. + * + */ + @JsonProperty("mode") + public Modes getMode() { + return mode; + } + @JsonProperty("schemaRegistry") public SchemaRegistry getSchemaRegistry() { return schemaRegistry; @@ -144,6 +165,10 @@ public String toString() { sb.append('='); sb.append(((this.compatibilityLevel == null)?"":this.compatibilityLevel)); sb.append(','); + sb.append("mode"); + sb.append('='); + sb.append(((this.mode == null)?"":this.mode)); + sb.append(','); sb.append("schemaRegistry"); sb.append('='); sb.append(((this.schemaRegistry == null)?"":this.schemaRegistry)); @@ -171,6 +196,7 @@ public String toString() { @Override public int hashCode() { int result = 1; + result = ((result* 31)+((this.mode == null)? 0 :this.mode.hashCode())); result = ((result* 31)+((this.schemaType == null)? 0 :this.schemaType.hashCode())); result = ((result* 31)+((this.schema == null)? 0 :this.schema.hashCode())); result = ((result* 31)+((this.schemaRegistry == null)? 0 :this.schemaRegistry.hashCode())); @@ -188,7 +214,7 @@ public boolean equals(Object other) { return false; } V1SchemaRegistrySubjectSpec rhs = ((V1SchemaRegistrySubjectSpec) other); - return ((((((this.schemaType == rhs.schemaType)||((this.schemaType!= null)&&this.schemaType.equals(rhs.schemaType)))&&((this.schema == rhs.schema)||((this.schema!= null)&&this.schema.equals(rhs.schema))))&&((this.schemaRegistry == rhs.schemaRegistry)||((this.schemaRegistry!= null)&&this.schemaRegistry.equals(rhs.schemaRegistry))))&&((this.references == rhs.references)||((this.references!= null)&&this.references.equals(rhs.references))))&&((this.compatibilityLevel == rhs.compatibilityLevel)||((this.compatibilityLevel!= null)&&this.compatibilityLevel.equals(rhs.compatibilityLevel)))); + return (((((((this.mode == rhs.mode)||((this.mode!= null)&&this.mode.equals(rhs.mode)))&&((this.schemaType == rhs.schemaType)||((this.schemaType!= null)&&this.schemaType.equals(rhs.schemaType))))&&((this.schema == rhs.schema)||((this.schema!= null)&&this.schema.equals(rhs.schema))))&&((this.schemaRegistry == rhs.schemaRegistry)||((this.schemaRegistry!= null)&&this.schemaRegistry.equals(rhs.schemaRegistry))))&&((this.references == rhs.references)||((this.references!= null)&&this.references.equals(rhs.references))))&&((this.compatibilityLevel == rhs.compatibilityLevel)||((this.compatibilityLevel!= null)&&this.compatibilityLevel.equals(rhs.compatibilityLevel)))); } } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollector.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollector.java index 1a698ccb9..ed0e14a31 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollector.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollector.java @@ -7,7 +7,6 @@ package io.streamthoughts.jikkou.schema.registry.reconciler; import io.streamthoughts.jikkou.common.utils.AsyncUtils; -import io.streamthoughts.jikkou.common.utils.Pair; import io.streamthoughts.jikkou.core.annotation.SupportedResource; import io.streamthoughts.jikkou.core.config.Configuration; import io.streamthoughts.jikkou.core.exceptions.JikkouRuntimeException; @@ -22,8 +21,10 @@ import io.streamthoughts.jikkou.schema.registry.api.DefaultAsyncSchemaRegistryApi; import io.streamthoughts.jikkou.schema.registry.api.SchemaRegistryApiFactory; import io.streamthoughts.jikkou.schema.registry.api.SchemaRegistryClientConfig; +import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion; import io.streamthoughts.jikkou.schema.registry.collections.V1SchemaRegistrySubjectList; import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels; +import io.streamthoughts.jikkou.schema.registry.model.Modes; import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject; import jakarta.ws.rs.core.Response; import java.util.List; @@ -41,6 +42,8 @@ public class SchemaRegistrySubjectCollector extends ContextualExtension implemen private boolean defaultToGlobalCompatibilityLevel = true; + private boolean defaultToGlobalMode = true; + private V1SchemaRegistrySubjectFactory schemaRegistrySubjectFactory; /** @@ -115,12 +118,17 @@ public SchemaRegistrySubjectCollector defaultToGlobalCompatibilityLevel(final bo return this; } + public SchemaRegistrySubjectCollector defaultToGlobalMode(final boolean defaultToGlobalMode) { + this.defaultToGlobalMode = defaultToGlobalMode; + return this; + } + @NotNull private List> getAllSchemaRegistrySubjectsAsync(final List subjects, final AsyncSchemaRegistryApi api) { return subjects.stream() .map(subject -> getSchemaRegistrySubjectAsync( - api, subject, defaultToGlobalCompatibilityLevel, schemaRegistrySubjectFactory)) + api, subject, defaultToGlobalCompatibilityLevel, defaultToGlobalMode, schemaRegistrySubjectFactory)) .toList(); } @@ -128,42 +136,53 @@ private List> getAllSchemaRegistrySub private static CompletableFuture getSchemaRegistrySubjectAsync(@NotNull AsyncSchemaRegistryApi api, @NotNull String subject, boolean defaultToGlobalCompatibilityLevel, + boolean defaultToGlobalMode, @NotNull V1SchemaRegistrySubjectFactory factory) { - return api - // Get Schema Registry Latest Subject Version - .getLatestSubjectSchema(subject) - // Get Schema Registry Subject Compatibility - .thenCompose(subjectSchemaVersion -> api - .getSubjectCompatibilityLevel(subject, defaultToGlobalCompatibilityLevel) - .thenApply(compatibilityObject -> CompatibilityLevels.valueOf(compatibilityObject.compatibilityLevel())) - .exceptionally(t -> { - if (t.getCause() != null) t = t.getCause(); - - if (t instanceof RestClientException exception) { - if (exception.response() - .map(Response::getStatus) - .filter(status -> status.equals(404)) - .isPresent()) { - return null; - } - } - if (t instanceof RuntimeException re) { - throw re; - } - throw new JikkouRuntimeException(t); - } - ) - .thenApply(compatibilityLevelObject -> - Pair.of(subjectSchemaVersion, compatibilityLevelObject) - ) - ) - // Create SchemaRegistrySubject object - .thenApply(tuple -> - factory.createSchemaRegistrySubject( - tuple._1(), - tuple._2() - ) - ); + CompletableFuture schemaSubjectFuture = api.getLatestSubjectSchema(subject); + CompletableFuture compatibilityLevelsFuture = getSubjectCompatibilityLevel(api, subject, defaultToGlobalCompatibilityLevel) + .exceptionally(SchemaRegistrySubjectCollector::handleNotFound); + CompletableFuture modesFuture = getSubjectMode(api, subject, defaultToGlobalMode) + .exceptionally(SchemaRegistrySubjectCollector::handleNotFound); + + return CompletableFuture.allOf(schemaSubjectFuture, compatibilityLevelsFuture, modesFuture) + .thenApplyAsync(it -> { + SubjectSchemaVersion subjectSchema = schemaSubjectFuture.join(); + CompatibilityLevels compatibilityLevels = compatibilityLevelsFuture.join(); + Modes modes = modesFuture.join(); + + return factory.createSchemaRegistrySubject(subjectSchema, compatibilityLevels, modes); + }); + } + + private static CompletableFuture getSubjectCompatibilityLevel(@NotNull AsyncSchemaRegistryApi api, + @NotNull String subject, + boolean defaultToGlobalCompatibilityLevel) { + return api.getSubjectCompatibilityLevel(subject, defaultToGlobalCompatibilityLevel) + .thenApply(compatibilityObject -> CompatibilityLevels.valueOf(compatibilityObject.compatibilityLevel())); + } + + private static CompletableFuture getSubjectMode(@NotNull AsyncSchemaRegistryApi api, + @NotNull String subject, + boolean defaultToGlobalMode) { + return api.getSubjectMode(subject, defaultToGlobalMode) + .thenApply(modeObject -> Modes.valueOf(modeObject.mode())); + } + + private static T handleNotFound(Throwable t) { + if (t.getCause() != null) t = t.getCause(); + + if (t instanceof RestClientException exception) { + if (exception.response() + .map(Response::getStatus) + .filter(status -> status.equals(404)) + .isPresent()) { + return null; + } + } + if (t instanceof RuntimeException re) { + throw re; + } + throw new JikkouRuntimeException(t); } } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectController.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectController.java index b05bc0fb9..4b6e120d7 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectController.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectController.java @@ -107,7 +107,8 @@ public List plan( // Get existing resources from the environment. SchemaRegistrySubjectCollector collector = new SchemaRegistrySubjectCollector(configuration) .prettyPrintSchema(false) - .defaultToGlobalCompatibilityLevel(false); + .defaultToGlobalCompatibilityLevel(false) + .defaultToGlobalMode(false); List actualSubjects = collector.listAll(context.configuration(), Selectors.NO_SELECTOR).stream() .filter(context.selector()::apply) diff --git a/providers/jikkou-provider-schema-registry/src/main/json/V1SchemaRegistrySubject.json b/providers/jikkou-provider-schema-registry/src/main/json/V1SchemaRegistrySubject.json index 7bf34c490..fa277b723 100644 --- a/providers/jikkou-provider-schema-registry/src/main/json/V1SchemaRegistrySubject.json +++ b/providers/jikkou-provider-schema-registry/src/main/json/V1SchemaRegistrySubject.json @@ -63,6 +63,11 @@ "description": "The schema compatibility level for this subject.", "existingJavaType": "io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels" }, + "mode": { + "type": "string", + "description": "The mode for this subject: IMPORT, READONLY, READWRITE.", + "existingJavaType": "io.streamthoughts.jikkou.schema.registry.model.Modes" + }, "schemaRegistry": { "type": "object", "additionalProperties": { @@ -117,4 +122,4 @@ } } } -} \ No newline at end of file +} diff --git a/providers/jikkou-provider-schema-registry/src/test/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputerTest.java b/providers/jikkou-provider-schema-registry/src/test/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputerTest.java index c7311ff02..f0ffb2e84 100644 --- a/providers/jikkou-provider-schema-registry/src/test/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputerTest.java +++ b/providers/jikkou-provider-schema-registry/src/test/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputerTest.java @@ -6,10 +6,7 @@ */ package io.streamthoughts.jikkou.schema.registry.change; -import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.DATA_COMPATIBILITY_LEVEL; -import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.DATA_REFERENCES; -import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.DATA_SCHEMA; -import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.DATA_SCHEMA_TYPE; +import static io.streamthoughts.jikkou.schema.registry.change.SchemaSubjectChangeComputer.*; import io.streamthoughts.jikkou.core.data.json.Json; import io.streamthoughts.jikkou.core.models.ObjectMeta; @@ -19,6 +16,7 @@ import io.streamthoughts.jikkou.core.models.change.StateChange; import io.streamthoughts.jikkou.core.reconciler.Operation; import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels; +import io.streamthoughts.jikkou.schema.registry.model.Modes; import io.streamthoughts.jikkou.schema.registry.model.SchemaHandle; import io.streamthoughts.jikkou.schema.registry.model.SchemaType; import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject; @@ -130,6 +128,7 @@ void shouldGetNoneChangeForExistingSubjectGivenNoChange() { "normalizeSchema", false )) .withChange(StateChange.none(DATA_COMPATIBILITY_LEVEL, null)) + .withChange(StateChange.none(DATA_MODE, null)) .withChange(StateChange.none(DATA_SCHEMA, Json.normalize(SCHEMA_V1.toString()))) .withChange(StateChange.none(DATA_SCHEMA_TYPE, SchemaType.AVRO)) .withChange(StateChange.none(DATA_REFERENCES, Collections.emptyList())) @@ -189,6 +188,67 @@ void shouldGetUpdateChangeForExistingSubjectGivenUpdatedCompatibility() { "normalizeSchema", false )) .withChange(StateChange.create(DATA_COMPATIBILITY_LEVEL, CompatibilityLevels.BACKWARD)) + .withChange(StateChange.none(DATA_MODE, null)) + .withChange(StateChange.none(DATA_SCHEMA, Json.normalize(SCHEMA_V1.toString()))) + .withChange(StateChange.none(DATA_SCHEMA_TYPE, SchemaType.AVRO)) + .withChange(StateChange.none(DATA_REFERENCES, Collections.emptyList())) + .build() + ) + .build() + ); + Assertions.assertEquals(expected, changes); + } + + @Test + void shouldGetUpdateChangeForExistingSubjectGivenUpdatedMode() { + // Given + V1SchemaRegistrySubject before = V1SchemaRegistrySubject + .builder() + .withMetadata(ObjectMeta + .builder() + .withName(TEST_SUBJECT) + .build()) + .withSpec(V1SchemaRegistrySubjectSpec + .builder() + .withSchemaType(SchemaType.AVRO) + .withSchema(new SchemaHandle(SCHEMA_V1.toString())) + .build()) + .build(); + + V1SchemaRegistrySubject after = V1SchemaRegistrySubject + .builder() + .withMetadata(ObjectMeta + .builder() + .withName(TEST_SUBJECT) + .build()) + .withSpec(V1SchemaRegistrySubjectSpec + .builder() + .withSchemaType(SchemaType.AVRO) + .withMode(Modes.IMPORT) + .withSchema(new SchemaHandle(SCHEMA_V1.toString())) + .build()) + .build(); + // When + List changes = computer.computeChanges(List.of(before), List.of(after)); + + // Then + List expected = List.of( + GenericResourceChange + .builder(V1SchemaRegistrySubject.class) + .withMetadata(ObjectMeta + .builder() + .withName(TEST_SUBJECT) + .build() + ) + .withSpec(ResourceChangeSpec + .builder() + .withOperation(Operation.UPDATE) + .withData(Map.of( + "permanentDelete", false, + "normalizeSchema", false + )) + .withChange(StateChange.create(DATA_MODE, Modes.IMPORT)) + .withChange(StateChange.none(DATA_COMPATIBILITY_LEVEL, null)) .withChange(StateChange.none(DATA_SCHEMA, Json.normalize(SCHEMA_V1.toString()))) .withChange(StateChange.none(DATA_SCHEMA_TYPE, SchemaType.AVRO)) .withChange(StateChange.none(DATA_REFERENCES, Collections.emptyList())) @@ -246,6 +306,7 @@ void shouldGetUpdateChangeForExistingSubjectGivenUpdatedSchema() { "normalizeSchema", false )) .withChange(StateChange.none(DATA_COMPATIBILITY_LEVEL, null)) + .withChange(StateChange.none(DATA_MODE, null)) .withChange(StateChange.none(DATA_SCHEMA_TYPE, SchemaType.AVRO)) .withChange(StateChange.update( DATA_SCHEMA, @@ -259,4 +320,4 @@ void shouldGetUpdateChangeForExistingSubjectGivenUpdatedSchema() { ); Assertions.assertEquals(expected, changes); } -} \ No newline at end of file +} diff --git a/providers/jikkou-provider-schema-registry/src/test/resources/datasets/resource-subject-test.yaml b/providers/jikkou-provider-schema-registry/src/test/resources/datasets/resource-subject-test.yaml index 87e3dc55f..eadf997c3 100644 --- a/providers/jikkou-provider-schema-registry/src/test/resources/datasets/resource-subject-test.yaml +++ b/providers/jikkou-provider-schema-registry/src/test/resources/datasets/resource-subject-test.yaml @@ -8,6 +8,7 @@ metadata: schemaregistry.jikkou.io/normalize-schema: true spec: compatibilityLevel: "FULL_TRANSITIVE" + mode: "IMPORT" schemaType: "AVRO" schema: - $ref: classpath://datasets/avro-schema.avsc \ No newline at end of file + $ref: classpath://datasets/avro-schema.avsc From 32ac3f0e755faef722d496bf541c378c42fd3b60 Mon Sep 17 00:00:00 2001 From: Zach Pearce Date: Fri, 29 Mar 2024 10:30:43 -0600 Subject: [PATCH 2/2] feat(provider-schema-registry): specify schema id and version on create --- .../api/AivenAsyncSchemaRegistryApi.java | 4 +- .../api/AsyncSchemaRegistryApiTest.java | 8 ++-- .../SchemaRegistrySubjectCollectorTest.java | 2 +- .../SchemaRegistrySubjectControllerTest.java | 41 ++++++++++++++++++- .../registry/SchemaRegistryAnnotations.java | 17 ++++++++ .../api/data/SubjectSchemaRegistration.java | 23 +++++++++-- .../change/SchemaSubjectChangeComputer.java | 4 +- .../change/SchemaSubjectChangeOptions.java | 8 +++- .../AbstractSchemaSubjectChangeHandler.java | 18 ++++---- .../CreateSchemaSubjectChangeHandler.java | 20 +++++---- .../V1SchemaRegistrySubjectList.java | 5 +-- .../SchemaCompatibilityValidation.java | 2 + .../SchemaSubjectChangeComputerTest.java | 20 ++++++--- 13 files changed, 135 insertions(+), 37 deletions(-) diff --git a/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java b/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java index 75e720a7a..bdec2c1f9 100644 --- a/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java +++ b/providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java @@ -63,8 +63,10 @@ public CompletableFuture> deleteSubjectVersions(@NotNull String su public CompletableFuture registerSubjectVersion(@NotNull String subject, @NotNull SubjectSchemaRegistration schema, boolean normalize) { - // Drop references - not supported through the Aiven's API. + // Drop id, version, and references - not supported through the Aiven's API. SubjectSchemaRegistration registration = new SubjectSchemaRegistration( + null, + null, schema.schema(), schema.schemaType(), null diff --git a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java index e8b1caf96..2b1a1fd6b 100644 --- a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java +++ b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java @@ -126,7 +126,7 @@ void shouldRegisterSchemaVersionForNewSubject() throws ExecutionException, Inter // When CompletableFuture future = async.registerSubjectVersion( TEST_SUBJECT, - new SubjectSchemaRegistration(AVRO_SCHEMA, SchemaType.AVRO), + new SubjectSchemaRegistration(null, null, AVRO_SCHEMA, SchemaType.AVRO), true ); @@ -213,7 +213,7 @@ void shouldGetTrueForTestingCompatibleSchema() throws ExecutionException, Interr TEST_SUBJECT, "-1", true, - new SubjectSchemaRegistration(AVRO_SCHEMA, SchemaType.AVRO) + new SubjectSchemaRegistration(null, null, AVRO_SCHEMA, SchemaType.AVRO) ); // Then @@ -230,7 +230,7 @@ void shouldGetFalseForTestingCompatibleSchema() throws ExecutionException, Inter TEST_SUBJECT, "-1", true, - new SubjectSchemaRegistration(AVRO_SCHEMA_NOT_COMPATIBLE, SchemaType.AVRO) + new SubjectSchemaRegistration(null, null, AVRO_SCHEMA_NOT_COMPATIBLE, SchemaType.AVRO) ); // Then @@ -238,4 +238,4 @@ void shouldGetFalseForTestingCompatibleSchema() throws ExecutionException, Inter Assertions.assertFalse(result.isCompatible()); Assertions.assertFalse(result.getMessages().isEmpty()); } -} \ No newline at end of file +} diff --git a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java index 391db3e4a..ddfd552f3 100644 --- a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java +++ b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollectorTest.java @@ -33,7 +33,7 @@ public void beforeEach() throws ExecutionException, InterruptedException { AsyncSchemaRegistryApi api = getAsyncSchemaRegistryApi(); api.registerSubjectVersion( TEST_SUBJECT, - new SubjectSchemaRegistration(AVRO_SCHEMA, SchemaType.AVRO), + new SubjectSchemaRegistration(null, null, AVRO_SCHEMA, SchemaType.AVRO), false ).get(); } diff --git a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectControllerTest.java b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectControllerTest.java index 3d84413f4..d58ba722d 100644 --- a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectControllerTest.java +++ b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectControllerTest.java @@ -26,11 +26,13 @@ import io.streamthoughts.jikkou.schema.registry.AbstractIntegrationTest; import io.streamthoughts.jikkou.schema.registry.SchemaRegistryAnnotations; import io.streamthoughts.jikkou.schema.registry.api.SchemaRegistryClientConfig; +import io.streamthoughts.jikkou.schema.registry.model.Modes; import io.streamthoughts.jikkou.schema.registry.model.SchemaHandle; import io.streamthoughts.jikkou.schema.registry.model.SchemaType; import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject; import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubjectSpec; import java.util.List; +import java.util.Map; import java.util.Optional; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -83,4 +85,41 @@ void shouldRegisterSchemaForNewResource() { Assertions.assertEquals(Operation.CREATE, data.getSpec().getOp()); Assertions.assertEquals(SchemaType.AVRO, data.getSpec().getChanges().getLast("schemaType", TypeConverter.of(SchemaType.class)).getAfter()); } -} \ No newline at end of file + + @Test + void shouldImportSchemaForNewResource() { + // Given + V1SchemaRegistrySubject resource = V1SchemaRegistrySubject.builder() + .withMetadata(ObjectMeta.builder() + .withName(TEST_SUBJECT) + .withAnnotations(Map.of( + SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID, 123, + SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_VERSION, 4 + )) + .build() + ) + .withSpec(V1SchemaRegistrySubjectSpec + .builder() + .withSchemaType(SchemaType.AVRO) + .withSchema(new SchemaHandle(AVRO_SCHEMA)) + .withMode(Modes.IMPORT) + .build()) + .build(); + // When + ApiChangeResultList result = api.reconcile( + ResourceListObject.of(List.of(resource)), + ReconciliationMode.CREATE, + ReconciliationContext.builder().dryRun(false).build() + ); + // Then + List results = result.results(); + Assertions.assertEquals(1, results.size()); + ChangeResult change = results.getFirst(); + ResourceChange data = change.change(); + Assertions.assertEquals(Optional.of(123), data.getMetadata().findAnnotationByKey(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID)); + Assertions.assertEquals(Optional.of(4), data.getMetadata().findAnnotationByKey(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_VERSION)); + Assertions.assertEquals(Operation.CREATE, data.getSpec().getOp()); + Assertions.assertEquals(SchemaType.AVRO, data.getSpec().getChanges().getLast("schemaType", TypeConverter.of(SchemaType.class)).getAfter()); + Assertions.assertEquals(Modes.IMPORT, data.getSpec().getChanges().getLast("mode", TypeConverter.of(Modes.class)).getAfter()); + } +} diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/SchemaRegistryAnnotations.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/SchemaRegistryAnnotations.java index 27c361eab..170ef9012 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/SchemaRegistryAnnotations.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/SchemaRegistryAnnotations.java @@ -23,7 +23,10 @@ * limitations under the License. */ +import io.streamthoughts.jikkou.core.data.TypeConverter; import io.streamthoughts.jikkou.core.models.CoreAnnotations; +import io.streamthoughts.jikkou.core.models.HasMetadata; +import io.streamthoughts.jikkou.core.models.NamedValue; import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject; public final class SchemaRegistryAnnotations { @@ -42,4 +45,18 @@ public static boolean isAnnotatedWithNormalizeSchema(V1SchemaRegistrySubject sub public static boolean isAnnotatedWitPermananteDelete(V1SchemaRegistrySubject subject) { return CoreAnnotations.isAnnotatedWith(subject, JIKKOU_IO_SCHEMA_REGISTRY_PERMANANTE_DELETE); } + + public static String schemaId(V1SchemaRegistrySubject subject) { + return HasMetadata.getMetadataAnnotation(subject, JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID) + .map(NamedValue::getValue) + .map(o -> TypeConverter.String().convertValue(o)) + .orElse(""); + } + + public static String version(V1SchemaRegistrySubject subject) { + return HasMetadata.getMetadataAnnotation(subject, JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_VERSION) + .map(NamedValue::getValue) + .map(o -> TypeConverter.String().convertValue(o)) + .orElse(""); + } } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/data/SubjectSchemaRegistration.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/data/SubjectSchemaRegistration.java index 97e3818a7..3e286c145 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/data/SubjectSchemaRegistration.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/data/SubjectSchemaRegistration.java @@ -7,6 +7,7 @@ package io.streamthoughts.jikkou.schema.registry.api.data; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import io.streamthoughts.jikkou.core.annotation.Reflectable; import io.streamthoughts.jikkou.schema.registry.model.SchemaType; @@ -16,6 +17,8 @@ @Reflectable public final class SubjectSchemaRegistration { + private final String id; + private final String version; private final String schema; private final SchemaType schemaType; private final List references; @@ -26,9 +29,11 @@ public final class SubjectSchemaRegistration { * @param schema subject under which the schema will be registered. * @param schemaType the schema format. */ - public SubjectSchemaRegistration(String schema, + public SubjectSchemaRegistration(String id, + String version, + String schema, SchemaType schemaType) { - this(schema, schemaType, Collections.emptyList()); + this(id, version, schema, schemaType, Collections.emptyList()); } /** @@ -39,14 +44,26 @@ public SubjectSchemaRegistration(String schema, * @param references specifies the names of referenced schemas. */ @JsonCreator - public SubjectSchemaRegistration(@JsonProperty("schema") String schema, + public SubjectSchemaRegistration(@JsonProperty("id") String id, + @JsonProperty("version") String version, + @JsonProperty("schema") String schema, @JsonProperty("schemaType") SchemaType schemaType, @JsonProperty("references") List references) { + this.id = id; + this.version = version; this.schema = schema; this.schemaType = schemaType; this.references = references; } + @JsonProperty("id") + @JsonInclude(JsonInclude.Include.NON_EMPTY) + public String id() { return id; } + + @JsonProperty("version") + @JsonInclude(JsonInclude.Include.NON_EMPTY) + public String version() { return version; } + @JsonProperty("schema") public String schema() { return schema; diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java index 6812a768d..a55d62cc5 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java @@ -117,7 +117,9 @@ public ResourceChange createChangeForUpdate(String key, V1SchemaRegistrySubject private SchemaSubjectChangeOptions getOptions(@NotNull V1SchemaRegistrySubject subject) { return new SchemaSubjectChangeOptions( SchemaRegistryAnnotations.isAnnotatedWitPermananteDelete(subject), - SchemaRegistryAnnotations.isAnnotatedWithNormalizeSchema(subject) + SchemaRegistryAnnotations.isAnnotatedWithNormalizeSchema(subject), + SchemaRegistryAnnotations.schemaId(subject), + SchemaRegistryAnnotations.version(subject) ); } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeOptions.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeOptions.java index a7fd8540e..a89173593 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeOptions.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeOptions.java @@ -16,12 +16,16 @@ @Reflectable public record SchemaSubjectChangeOptions( @JsonProperty("permanentDelete") boolean permanentDelete, - @JsonProperty("normalizeSchema") boolean normalizeSchema + @JsonProperty("normalizeSchema") boolean normalizeSchema, + @JsonProperty("schemaId") String schemaId, + @JsonProperty("version") String version ) { @ConstructorProperties({ "permanentDelete", - "normalizeSchema" + "normalizeSchema", + "schemaId", + "version" }) public SchemaSubjectChangeOptions { diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java index d8f7f1e6d..c7dcccec9 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java @@ -105,8 +105,12 @@ protected CompletableFuture registerSubjectVersion(@NotNull final Resource SchemaSubjectChangeOptions options = getSchemaSubjectChangeOptions(change); final String subjectName = change.getMetadata().getName(); + final String id = options.schemaId(); + final String version = options.version(); if (LOG.isInfoEnabled()) { - LOG.info("Registering new Schema Registry subject version: subject '{}', optimization={}, schema={}.", + LOG.info("Registering new Schema Registry subject version: id '{}', version '{}' subject '{}', optimization={}, schema={}.", + id, + version, subjectName, options.normalizeSchema(), schema @@ -121,7 +125,7 @@ protected CompletableFuture registerSubjectVersion(@NotNull final Resource return api .registerSubjectVersion( subjectName, - new SubjectSchemaRegistration(schema, type, references), + new SubjectSchemaRegistration(id, version, schema, type, references), options.normalizeSchema() ) .thenApply(subjectSchemaId -> { @@ -131,12 +135,12 @@ protected CompletableFuture registerSubjectVersion(@NotNull final Resource subjectName, subjectSchemaId.id() ); - change.getMetadata() - .addAnnotationIfAbsent( - SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID, - subjectSchemaId.id() - ); } + change.getMetadata() + .addAnnotationIfAbsent( + SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID, + subjectSchemaId.id() + ); return null; }); } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/CreateSchemaSubjectChangeHandler.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/CreateSchemaSubjectChangeHandler.java index 52796d476..dd33f0ebe 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/CreateSchemaSubjectChangeHandler.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/CreateSchemaSubjectChangeHandler.java @@ -52,22 +52,24 @@ public List> handleChanges(@NotNull List> results = new ArrayList<>(); for (ResourceChange change : changes) { - CompletableFuture future = registerSubjectVersion(change); + CompletableFuture future = CompletableFuture.completedFuture(null); - StateChange compatibilityLevels = StateChangeList + StateChange modes = StateChangeList .of(change.getSpec().getChanges()) - .getLast(DATA_COMPATIBILITY_LEVEL); + .getLast(DATA_MODE); - if (compatibilityLevels != null) { - future = future.thenComposeAsync(unused -> updateCompatibilityLevel(change)); + if (modes != null) { + future = future.thenCompose(unused -> updateMode(change)); } - StateChange modes = StateChangeList + future.thenCompose(unused -> registerSubjectVersion(change)); + + StateChange compatibilityLevels = StateChangeList .of(change.getSpec().getChanges()) - .getLast(DATA_MODE); + .getLast(DATA_COMPATIBILITY_LEVEL); - if (modes != null) { - future = future.thenComposeAsync(unused -> updateMode(change)); + if (compatibilityLevels != null) { + future = future.thenComposeAsync(unused -> updateCompatibilityLevel(change)); } results.add(toChangeResponse(change, future)); diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/collections/V1SchemaRegistrySubjectList.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/collections/V1SchemaRegistrySubjectList.java index 2bf17bc8f..a33432ece 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/collections/V1SchemaRegistrySubjectList.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/collections/V1SchemaRegistrySubjectList.java @@ -11,11 +11,10 @@ import io.streamthoughts.jikkou.core.models.DefaultResourceListObject; import io.streamthoughts.jikkou.core.models.ObjectMeta; import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - import java.beans.ConstructorProperties; import java.util.List; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; @ApiVersion("kafka.jikkou.io/v1beta2") @Kind("SchemaRegistrySubjectList") diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/validation/SchemaCompatibilityValidation.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/validation/SchemaCompatibilityValidation.java index 239f7ce2d..3cab0b400 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/validation/SchemaCompatibilityValidation.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/validation/SchemaCompatibilityValidation.java @@ -60,6 +60,8 @@ public static ValidationResult validate(@NotNull V1SchemaRegistrySubject resourc V1SchemaRegistrySubjectSpec spec = resource.getSpec(); SubjectSchemaRegistration registration = new SubjectSchemaRegistration( + null, + null, spec.getSchema().value(), spec.getSchemaType(), spec.getReferences() diff --git a/providers/jikkou-provider-schema-registry/src/test/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputerTest.java b/providers/jikkou-provider-schema-registry/src/test/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputerTest.java index f0ffb2e84..d3baad68d 100644 --- a/providers/jikkou-provider-schema-registry/src/test/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputerTest.java +++ b/providers/jikkou-provider-schema-registry/src/test/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputerTest.java @@ -81,7 +81,9 @@ void shouldGetAddChangeForNewSubject() { .withOperation(Operation.CREATE) .withData(Map.of( "permanentDelete", false, - "normalizeSchema", false + "normalizeSchema", false, + "schemaId", "", + "version", "" )) .withChange(StateChange.create(DATA_SCHEMA, SCHEMA_V1.toString())) .withChange(StateChange.create(DATA_SCHEMA_TYPE, SchemaType.AVRO)) @@ -125,7 +127,9 @@ void shouldGetNoneChangeForExistingSubjectGivenNoChange() { .withOperation(Operation.NONE) .withData(Map.of( "permanentDelete", false, - "normalizeSchema", false + "normalizeSchema", false, + "schemaId", "", + "version", "" )) .withChange(StateChange.none(DATA_COMPATIBILITY_LEVEL, null)) .withChange(StateChange.none(DATA_MODE, null)) @@ -185,7 +189,9 @@ void shouldGetUpdateChangeForExistingSubjectGivenUpdatedCompatibility() { .withOperation(Operation.UPDATE) .withData(Map.of( "permanentDelete", false, - "normalizeSchema", false + "normalizeSchema", false, + "schemaId", "", + "version", "" )) .withChange(StateChange.create(DATA_COMPATIBILITY_LEVEL, CompatibilityLevels.BACKWARD)) .withChange(StateChange.none(DATA_MODE, null)) @@ -245,7 +251,9 @@ void shouldGetUpdateChangeForExistingSubjectGivenUpdatedMode() { .withOperation(Operation.UPDATE) .withData(Map.of( "permanentDelete", false, - "normalizeSchema", false + "normalizeSchema", false, + "schemaId", "", + "version", "" )) .withChange(StateChange.create(DATA_MODE, Modes.IMPORT)) .withChange(StateChange.none(DATA_COMPATIBILITY_LEVEL, null)) @@ -303,7 +311,9 @@ void shouldGetUpdateChangeForExistingSubjectGivenUpdatedSchema() { .withOperation(Operation.UPDATE) .withData(Map.of( "permanentDelete", false, - "normalizeSchema", false + "normalizeSchema", false, + "schemaId", "", + "version", "" )) .withChange(StateChange.none(DATA_COMPATIBILITY_LEVEL, null)) .withChange(StateChange.none(DATA_MODE, null))