Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support subject modes #397

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,5 @@ docs/node_modules
docs/resources
docs/public
docs/public

.sdkmanrc
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@

import io.streamthoughts.jikkou.extension.aiven.api.data.CompatibilityCheckResponse;
import io.streamthoughts.jikkou.schema.registry.api.AsyncSchemaRegistryApi;
import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck;
import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject;
import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion;
import io.streamthoughts.jikkou.schema.registry.api.data.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please no wildcard import :)

import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -68,8 +63,10 @@ public CompletableFuture<List<Integer>> deleteSubjectVersions(@NotNull String su
public CompletableFuture<SubjectSchemaId> registerSubjectVersion(@NotNull String subject,
@NotNull SubjectSchemaRegistration schema,
boolean normalize) {
// Drop references - not supported through the Aiven's API.
// Drop id, version, and references - not supported through the Aiven's API.
SubjectSchemaRegistration registration = new SubjectSchemaRegistration(
null,
null,
schema.schema(),
schema.schemaType(),
null
Expand Down Expand Up @@ -171,4 +168,19 @@ public CompletableFuture<CompatibilityCheck> testCompatibilityLatest(@NotNull St
public void close() {
api.close();
}

@Override
public CompletableFuture<ModeObject> getSubjectMode(@NotNull String subject, boolean defaultToGlobal) {
throw new UnsupportedOperationException("Aiven schema registry does not support subject mode");
}

@Override
public CompletableFuture<ModeObject> updateSubjectMode(@NotNull String subject, @NotNull ModeObject mode) {
throw new UnsupportedOperationException("Aiven schema registry does not support subject mode");
}

@Override
public CompletableFuture<ModeObject> deleteSubjectMode(@NotNull String subject) {
throw new UnsupportedOperationException("Aiven schema registry does not support subject mode");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private static CompletableFuture<V1SchemaRegistrySubject> getSchemaRegistrySubje
() -> Pair.of(subjectSchemaVersion, api.getSchemaRegistrySubjectCompatibility(subject))))
.thenApply(pair ->
// Create SchemaRegistrySubject object
factory.createSchemaRegistrySubject(pair._1().version(), pair._2().compatibilityLevel())
factory.createSchemaRegistrySubject(pair._1().version(), pair._2().compatibilityLevel(), null)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void shouldRegisterSchemaVersionForNewSubject() throws ExecutionException, Inter
// When
CompletableFuture<SubjectSchemaId> future = async.registerSubjectVersion(
TEST_SUBJECT,
new SubjectSchemaRegistration(AVRO_SCHEMA, SchemaType.AVRO),
new SubjectSchemaRegistration(null, null, AVRO_SCHEMA, SchemaType.AVRO),
true
);

Expand Down Expand Up @@ -213,7 +213,7 @@ void shouldGetTrueForTestingCompatibleSchema() throws ExecutionException, Interr
TEST_SUBJECT,
"-1",
true,
new SubjectSchemaRegistration(AVRO_SCHEMA, SchemaType.AVRO)
new SubjectSchemaRegistration(null, null, AVRO_SCHEMA, SchemaType.AVRO)
);

// Then
Expand All @@ -230,12 +230,12 @@ void shouldGetFalseForTestingCompatibleSchema() throws ExecutionException, Inter
TEST_SUBJECT,
"-1",
true,
new SubjectSchemaRegistration(AVRO_SCHEMA_NOT_COMPATIBLE, SchemaType.AVRO)
new SubjectSchemaRegistration(null, null, AVRO_SCHEMA_NOT_COMPATIBLE, SchemaType.AVRO)
);

// Then
CompatibilityCheck result = future.get();
Assertions.assertFalse(result.isCompatible());
Assertions.assertFalse(result.getMessages().isEmpty());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.streamthoughts.jikkou.schema.registry.api.AsyncSchemaRegistryApi;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration;
import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels;
import io.streamthoughts.jikkou.schema.registry.model.Modes;
import io.streamthoughts.jikkou.schema.registry.model.SchemaType;
import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject;
import java.util.List;
Expand All @@ -32,15 +33,16 @@ public void beforeEach() throws ExecutionException, InterruptedException {
AsyncSchemaRegistryApi api = getAsyncSchemaRegistryApi();
api.registerSubjectVersion(
TEST_SUBJECT,
new SubjectSchemaRegistration(AVRO_SCHEMA, SchemaType.AVRO),
new SubjectSchemaRegistration(null, null, AVRO_SCHEMA, SchemaType.AVRO),
false
).get();
}

@Test
public void shouldGetAllSchemasWithGlobalCompatibilityLevelTrue() {
public void shouldGetAllSchemasWithGlobalCompatibilityLevelAndGlobalModeTrue() {
// Given
collector.defaultToGlobalCompatibilityLevel(true);
collector.defaultToGlobalMode(true);

// When
List<V1SchemaRegistrySubject> resources = collector.listAll(Configuration.empty(), Selectors.NO_SELECTOR)
Expand All @@ -54,12 +56,14 @@ public void shouldGetAllSchemasWithGlobalCompatibilityLevelTrue() {
Assertions.assertEquals(TEST_SUBJECT, subject.getMetadata().getName());
Assertions.assertEquals(SchemaType.AVRO, subject.getSpec().getSchemaType());
Assertions.assertEquals(CompatibilityLevels.BACKWARD, subject.getSpec().getCompatibilityLevel());
Assertions.assertEquals(Modes.READWRITE, subject.getSpec().getMode());
}

@Test
public void shouldGetAllSchemasWithGlobalCompatibilityLevelFalse() {
public void shouldGetAllSchemasWithGlobalCompatibilityLevelAndGlobalModeFalse() {
// Given
collector.defaultToGlobalCompatibilityLevel(false);
collector.defaultToGlobalMode(false);

// When
List<V1SchemaRegistrySubject> resources = collector.listAll(Configuration.empty(), Selectors.NO_SELECTOR)
Expand All @@ -73,5 +77,6 @@ public void shouldGetAllSchemasWithGlobalCompatibilityLevelFalse() {
Assertions.assertEquals(TEST_SUBJECT, subject.getMetadata().getName());
Assertions.assertEquals(SchemaType.AVRO, subject.getSpec().getSchemaType());
Assertions.assertNull(subject.getSpec().getCompatibilityLevel());
Assertions.assertNull(subject.getSpec().getMode());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import io.streamthoughts.jikkou.schema.registry.AbstractIntegrationTest;
import io.streamthoughts.jikkou.schema.registry.SchemaRegistryAnnotations;
import io.streamthoughts.jikkou.schema.registry.api.SchemaRegistryClientConfig;
import io.streamthoughts.jikkou.schema.registry.model.Modes;
import io.streamthoughts.jikkou.schema.registry.model.SchemaHandle;
import io.streamthoughts.jikkou.schema.registry.model.SchemaType;
import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject;
import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubjectSpec;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -83,4 +85,41 @@ void shouldRegisterSchemaForNewResource() {
Assertions.assertEquals(Operation.CREATE, data.getSpec().getOp());
Assertions.assertEquals(SchemaType.AVRO, data.getSpec().getChanges().getLast("schemaType", TypeConverter.of(SchemaType.class)).getAfter());
}
}

@Test
void shouldImportSchemaForNewResource() {
// Given
V1SchemaRegistrySubject resource = V1SchemaRegistrySubject.builder()
.withMetadata(ObjectMeta.builder()
.withName(TEST_SUBJECT)
.withAnnotations(Map.of(
SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID, 123,
SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_VERSION, 4
))
.build()
)
.withSpec(V1SchemaRegistrySubjectSpec
.builder()
.withSchemaType(SchemaType.AVRO)
.withSchema(new SchemaHandle(AVRO_SCHEMA))
.withMode(Modes.IMPORT)
.build())
.build();
// When
ApiChangeResultList result = api.reconcile(
ResourceListObject.of(List.of(resource)),
ReconciliationMode.CREATE,
ReconciliationContext.builder().dryRun(false).build()
);
// Then
List<ChangeResult> results = result.results();
Assertions.assertEquals(1, results.size());
ChangeResult change = results.getFirst();
ResourceChange data = change.change();
Assertions.assertEquals(Optional.of(123), data.getMetadata().findAnnotationByKey(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID));
Assertions.assertEquals(Optional.of(4), data.getMetadata().findAnnotationByKey(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_VERSION));
Assertions.assertEquals(Operation.CREATE, data.getSpec().getOp());
Assertions.assertEquals(SchemaType.AVRO, data.getSpec().getChanges().getLast("schemaType", TypeConverter.of(SchemaType.class)).getAfter());
Assertions.assertEquals(Modes.IMPORT, data.getSpec().getChanges().getLast("mode", TypeConverter.of(Modes.class)).getAfter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
* limitations under the License.
*/

import io.streamthoughts.jikkou.core.data.TypeConverter;
import io.streamthoughts.jikkou.core.models.CoreAnnotations;
import io.streamthoughts.jikkou.core.models.HasMetadata;
import io.streamthoughts.jikkou.core.models.NamedValue;
import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject;

public final class SchemaRegistryAnnotations {
Expand All @@ -42,4 +45,18 @@ public static boolean isAnnotatedWithNormalizeSchema(V1SchemaRegistrySubject sub
public static boolean isAnnotatedWitPermananteDelete(V1SchemaRegistrySubject subject) {
return CoreAnnotations.isAnnotatedWith(subject, JIKKOU_IO_SCHEMA_REGISTRY_PERMANANTE_DELETE);
}

public static String schemaId(V1SchemaRegistrySubject subject) {
return HasMetadata.getMetadataAnnotation(subject, JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID)
.map(NamedValue::getValue)
.map(o -> TypeConverter.String().convertValue(o))
.orElse("");
}

public static String version(V1SchemaRegistrySubject subject) {
return HasMetadata.getMetadataAnnotation(subject, JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_VERSION)
.map(NamedValue::getValue)
.map(o -> TypeConverter.String().convertValue(o))
.orElse("");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.streamthoughts.jikkou.core.models.ObjectMeta;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion;
import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels;
import io.streamthoughts.jikkou.schema.registry.model.Modes;
import io.streamthoughts.jikkou.schema.registry.model.SchemaHandle;
import io.streamthoughts.jikkou.schema.registry.model.SchemaType;
import io.streamthoughts.jikkou.schema.registry.models.SchemaRegistry;
Expand Down Expand Up @@ -36,7 +37,8 @@ public V1SchemaRegistrySubjectFactory(String schemaRegistryVendor,

@NotNull
public V1SchemaRegistrySubject createSchemaRegistrySubject(@NotNull SubjectSchemaVersion subjectSchema,
@Nullable CompatibilityLevels compatibilityLevels) {
@Nullable CompatibilityLevels compatibilityLevels,
@Nullable Modes modes) {
SchemaType schemaType = Optional.ofNullable(subjectSchema.schemaType())
.map(SchemaType::getForNameIgnoreCase)
.orElse(SchemaType.defaultType());
Expand All @@ -55,6 +57,10 @@ public V1SchemaRegistrySubject createSchemaRegistrySubject(@NotNull SubjectSchem
specBuilder = specBuilder.withCompatibilityLevel(compatibilityLevels);
}

if (modes != null) {
specBuilder = specBuilder.withMode(modes);
}

V1SchemaRegistrySubject res = V1SchemaRegistrySubject
.builder()
.withMetadata(ObjectMeta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,7 @@
*/
package io.streamthoughts.jikkou.schema.registry.api;

import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck;
import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject;
import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion;
import io.streamthoughts.jikkou.schema.registry.api.data.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please no wildcard import :)

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -94,6 +89,34 @@ CompletableFuture<CompatibilityObject> updateSubjectCompatibilityLevel(@NotNull
*/
CompletableFuture<CompatibilityObject> deleteSubjectCompatibilityLevel(@NotNull String subject);

/**
* Gets mode level for the specified subject.
*
* @param subject the name of the subject.
* @param defaultToGlobal flag to default to global mode.
* @return the mode.
*/
CompletableFuture<ModeObject> getSubjectMode(@NotNull String subject,
boolean defaultToGlobal);

/**
* Updates mode for the specified subject.
*
* @param subject the name of the subject.
* @param mode the new mode for the subject.
* @return the updated mode.
*/
CompletableFuture<ModeObject> updateSubjectMode(@NotNull String subject,
@NotNull ModeObject mode);

/**
* Deletes the specified subject-level mode and reverts to the global default.
*
* @param subject the name of the subject.
* @return the mode.
*/
CompletableFuture<ModeObject> deleteSubjectMode(@NotNull String subject);

CompletableFuture<CompatibilityCheck> testCompatibility(@NotNull String subject,
String version,
boolean verbose,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,7 @@
*/
package io.streamthoughts.jikkou.schema.registry.api;

import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck;
import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject;
import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion;
import io.streamthoughts.jikkou.schema.registry.api.data.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please no wildcard import :)

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -104,6 +99,21 @@ public CompletableFuture<CompatibilityObject> deleteSubjectCompatibilityLevel(@N

}

@Override
public CompletableFuture<ModeObject> getSubjectMode(@NotNull String subject, boolean defaultToGlobal) {
return CompletableFuture.supplyAsync(() -> api.getMode(subject, defaultToGlobal));
}

@Override
public CompletableFuture<ModeObject> updateSubjectMode(@NotNull String subject, @NotNull ModeObject mode) {
return CompletableFuture.supplyAsync(() -> api.updateMode(subject, mode));
}

@Override
public CompletableFuture<ModeObject> deleteSubjectMode(@NotNull final String subject) {
return CompletableFuture.supplyAsync(() -> api.deleteMode(subject));
}

/**
* @see SchemaRegistryApi#deleteConfigCompatibility(String)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,7 @@
*/
package io.streamthoughts.jikkou.schema.registry.api;

import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityCheck;
import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityLevelObject;
import io.streamthoughts.jikkou.schema.registry.api.data.CompatibilityObject;
import io.streamthoughts.jikkou.schema.registry.api.data.SchemaString;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaId;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaRegistration;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectVersion;
import io.streamthoughts.jikkou.schema.registry.api.data.*;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.DefaultValue;
Expand Down Expand Up @@ -229,6 +222,34 @@ CompatibilityLevelObject getConfigCompatibility(@PathParam("subject") String sub
@Produces("application/vnd.schemaregistry.v1+json")
CompatibilityObject deleteConfigCompatibility(@PathParam("subject") String subject);

/*
* ----------------------------------------------------------------------------------------------------------------
* MODE
* ----------------------------------------------------------------------------------------------------------------
*/

@GET
@Path("mode/{subject}")
@Produces("application/vnd.schemaregistry.v1+json")
ModeObject getMode(@PathParam("subject") String subject,
@QueryParam("defaultToGlobal") @DefaultValue("false") boolean defaultToGlobal);

@PUT
@Path("mode/{subject}")
@Consumes({"application/vnd.schemaregistry.v1+json", "application/vnd.schemaregistry+json", "application/json"})
ModeObject updateMode(@PathParam("subject") String subject, ModeObject mode);

/**
* Deletes the specified subject-level mode and reverts to the global default.
*
* @param subject the name of the subject.
* @return the mode.
*/
@DELETE
@Path("mode/{subject}")
@Produces("application/vnd.schemaregistry.v1+json")
ModeObject deleteMode(@PathParam("subject") String subject);

/*
* ----------------------------------------------------------------------------------------------------------------
* COMPATIBILITY
Expand Down
Loading
Loading