Skip to content

Commit

Permalink
feat(provider/schema-registry): add schemaregistry.jikkou.io/use-cano…
Browse files Browse the repository at this point in the history
…nical-fingerprint (#431)
  • Loading branch information
fhussonnois committed Jun 1, 2024
1 parent 3012f32 commit 30a0f8e
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.streamthoughts.jikkou.core.ReconciliationContext;
import io.streamthoughts.jikkou.core.ReconciliationMode;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.core.data.json.Json;
import io.streamthoughts.jikkou.core.extension.ClassExtensionAliasesGenerator;
import io.streamthoughts.jikkou.core.extension.DefaultExtensionDescriptorFactory;
import io.streamthoughts.jikkou.core.extension.DefaultExtensionFactory;
Expand All @@ -34,6 +33,7 @@
import io.streamthoughts.jikkou.extension.aiven.ApiVersions;
import io.streamthoughts.jikkou.schema.registry.SchemaRegistryAnnotations;
import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels;
import io.streamthoughts.jikkou.schema.registry.model.SchemaAndType;
import io.streamthoughts.jikkou.schema.registry.model.SchemaHandle;
import io.streamthoughts.jikkou.schema.registry.model.SchemaType;
import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject;
Expand Down Expand Up @@ -152,7 +152,7 @@ void shouldCreateSchemaRegistrySubject() {
.withMetadata(ObjectMeta
.builder()
.withName(TEST_SUBJECT)
.withAnnotation(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID, 1)
.withAnnotation(SchemaRegistryAnnotations.SCHEMA_REGISTRY_SCHEMA_ID, 1)
.build()
)
.withSpec(ResourceChangeSpec
Expand All @@ -163,7 +163,7 @@ void shouldCreateSchemaRegistrySubject() {
"normalizeSchema", false
))
.withChange(StateChange.create(DATA_COMPATIBILITY_LEVEL, CompatibilityLevels.BACKWARD))
.withChange(StateChange.create(DATA_SCHEMA, AVRO_SCHEMA_V1))
.withChange(StateChange.create(DATA_SCHEMA, new SchemaAndType(AVRO_SCHEMA_V1, SchemaType.AVRO)))
.withChange(StateChange.create(DATA_SCHEMA_TYPE, SchemaType.AVRO))
.withChange(StateChange.create(DATA_REFERENCES, Collections.emptyList()))
.build()
Expand Down Expand Up @@ -247,7 +247,7 @@ void shouldUpdateSchemaRegistrySubject() {
.withMetadata(ObjectMeta
.builder()
.withName(TEST_SUBJECT)
.withAnnotation(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID, 1)
.withAnnotation(SchemaRegistryAnnotations.SCHEMA_REGISTRY_SCHEMA_ID, 1)
.build()
)
.withSpec(ResourceChangeSpec
Expand All @@ -258,12 +258,14 @@ void shouldUpdateSchemaRegistrySubject() {
"normalizeSchema", false
))
.withChange(StateChange.none(DATA_COMPATIBILITY_LEVEL, CompatibilityLevels.BACKWARD))
.withChange(StateChange.update(DATA_SCHEMA, Json.normalize(AVRO_SCHEMA_V1), Json.normalize(AVRO_SCHEMA_V2)))
.withChange(StateChange.update(DATA_SCHEMA, new SchemaAndType(AVRO_SCHEMA_V1, SchemaType.AVRO), new SchemaAndType(AVRO_SCHEMA_V2, SchemaType.AVRO)))
.withChange(StateChange.none(DATA_SCHEMA_TYPE, SchemaType.AVRO))
.withChange(StateChange.none(DATA_REFERENCES, Collections.emptyList()))
.build()
)
.build();
Assertions.assertEquals(expected, actual);


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void shouldRegisterSchemaForNewResource() {
Assertions.assertEquals(1, results.size());
ChangeResult change = results.getFirst();
ResourceChange data = change.change();
Assertions.assertEquals(Optional.of(1), data.getMetadata().findAnnotationByKey(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID));
Assertions.assertEquals(Optional.of(1), data.getMetadata().findAnnotationByKey(SchemaRegistryAnnotations.SCHEMA_REGISTRY_SCHEMA_ID));
Assertions.assertEquals(Operation.CREATE, data.getSpec().getOp());
Assertions.assertEquals(SchemaType.AVRO, data.getSpec().getChanges().getLast("schemaType", TypeConverter.of(SchemaType.class)).getAfter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,28 @@
public final class SchemaRegistryAnnotations {

private static final String SCHEMAREGISTRY_JIKKOU_IO = "schemaregistry.jikkou.io/";
public static final String JIKKOU_IO_SCHEMA_REGISTRY_NORMALIZE_SCHEMA = SCHEMAREGISTRY_JIKKOU_IO + "normalize-schema";
public static final String JIKKOU_IO_SCHEMA_REGISTRY_PERMANANTE_DELETE = SCHEMAREGISTRY_JIKKOU_IO + "permanent-delete";
public static final String JIKKOU_IO_SCHEMA_REGISTRY_URL = SCHEMAREGISTRY_JIKKOU_IO + "url";
public static final String JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_VERSION = SCHEMAREGISTRY_JIKKOU_IO + "schema-version";
public static final String JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID = SCHEMAREGISTRY_JIKKOU_IO + "schema-id";

public static boolean isAnnotatedWithNormalizeSchema(V1SchemaRegistrySubject subject) {
return CoreAnnotations.isAnnotatedWith(subject, JIKKOU_IO_SCHEMA_REGISTRY_NORMALIZE_SCHEMA);
public static final String SCHEMA_REGISTRY_NORMALIZE_SCHEMA = SCHEMAREGISTRY_JIKKOU_IO + "normalize-schema";
public static final String SCHEMA_REGISTRY_PERMANANTE_DELETE = SCHEMAREGISTRY_JIKKOU_IO + "permanent-delete";
public static final String SCHEMA_REGISTRY_URL = SCHEMAREGISTRY_JIKKOU_IO + "url";
public static final String SCHEMA_REGISTRY_SCHEMA_VERSION = SCHEMAREGISTRY_JIKKOU_IO + "schema-version";
public static final String SCHEMA_REGISTRY_SCHEMA_ID = SCHEMAREGISTRY_JIKKOU_IO + "schema-id";
public static final String SCHEMA_REGISTRY_USE_CANONICAL_FINGERPRINT = SCHEMAREGISTRY_JIKKOU_IO + "use-canonical-fingerprint";

private final V1SchemaRegistrySubject resource;

public SchemaRegistryAnnotations(final V1SchemaRegistrySubject resource) {
this.resource = resource;
}

public boolean normalizeSchema() {
return CoreAnnotations.isAnnotatedWith(resource, SCHEMA_REGISTRY_NORMALIZE_SCHEMA);
}

public boolean permananteDelete() {
return CoreAnnotations.isAnnotatedWith(resource, SCHEMA_REGISTRY_PERMANANTE_DELETE);
}

public static boolean isAnnotatedWitPermananteDelete(V1SchemaRegistrySubject subject) {
return CoreAnnotations.isAnnotatedWith(subject, JIKKOU_IO_SCHEMA_REGISTRY_PERMANANTE_DELETE);
public boolean useCanonicalFingerPrint() {
return CoreAnnotations.isAnnotatedWith(resource, SCHEMA_REGISTRY_USE_CANONICAL_FINGERPRINT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ public V1SchemaRegistrySubject createSchemaRegistrySubject(@NotNull SubjectSchem
.withMetadata(ObjectMeta
.builder()
.withName(subjectSchema.subject())
.withAnnotation(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_URL,
.withAnnotation(SchemaRegistryAnnotations.SCHEMA_REGISTRY_URL,
schemaRegistryUrl)
.withAnnotation(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_VERSION,
.withAnnotation(SchemaRegistryAnnotations.SCHEMA_REGISTRY_SCHEMA_VERSION,
subjectSchema.version())
.withAnnotation(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID,
.withAnnotation(SchemaRegistryAnnotations.SCHEMA_REGISTRY_SCHEMA_ID,
subjectSchema.id())
.build()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package io.streamthoughts.jikkou.schema.registry.avro;

import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.jetbrains.annotations.NotNull;

/**
Expand All @@ -30,6 +31,10 @@ public Schema schema() {
return schema;
}

public long fingerprint64() {
return SchemaNormalization.parsingFingerprint64(schema());
}

/** {@inheritDoc} **/
public String toString() {
return schema.toString(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ public static class SchemaSubjectChangeFactory extends ResourceChangeFactory<Str
@Override
public ResourceChange createChangeForDelete(String key, V1SchemaRegistrySubject before) {
return GenericResourceChange
.builder(V1SchemaRegistrySubject.class)
.withMetadata(before.getMetadata())
.withSpec(ResourceChangeSpec
.builder()
.withData(TYPE_CONVERTER.convertValue(getOptions(before)))
.withOperation(Operation.DELETE)
.build()
)
.build();
.builder(V1SchemaRegistrySubject.class)
.withMetadata(before.getMetadata())
.withSpec(ResourceChangeSpec
.builder()
.withData(TYPE_CONVERTER.convertValue(getOptions(before)))
.withOperation(Operation.DELETE)
.build()
)
.build();
}

@Override
Expand All @@ -77,65 +77,66 @@ public ResourceChange createChangeForCreate(String key, V1SchemaRegistrySubject
}

return GenericResourceChange
.builder(V1SchemaRegistrySubject.class)
.withMetadata(after.getMetadata())
.withSpec(specBuilder.build())
.build();
.builder(V1SchemaRegistrySubject.class)
.withMetadata(after.getMetadata())
.withSpec(specBuilder.build())
.build();
}

@Override
public ResourceChange createChangeForUpdate(String key, V1SchemaRegistrySubject before, V1SchemaRegistrySubject after) {
StateChangeList<StateChange> changes = StateChangeList.emptyList()
.with(getChangeForCompatibility(before, after))
.with(getChangeForSchema(before, after))
.with(getChangeForSchemaType(before, after))
.with(getChangeForReferences(before, after));
.with(getChangeForCompatibility(before, after))
.with(getChangeForSchema(before, after))
.with(getChangeForSchemaType(before, after))
.with(getChangeForReferences(before, after));

return GenericResourceChange
.builder(V1SchemaRegistrySubject.class)
.withMetadata(after.getMetadata())
.withSpec(ResourceChangeSpec
.builder()
.withData(TYPE_CONVERTER.convertValue(getOptions(after)))
.withOperation(Change.computeOperation(changes.all()))
.withChanges(changes)
.build()
)
.build();
.builder(V1SchemaRegistrySubject.class)
.withMetadata(after.getMetadata())
.withSpec(ResourceChangeSpec
.builder()
.withData(TYPE_CONVERTER.convertValue(getOptions(after)))
.withOperation(Change.computeOperation(changes.all()))
.withChanges(changes)
.build()
)
.build();
}

@NotNull
private SchemaSubjectChangeOptions getOptions(@NotNull V1SchemaRegistrySubject subject) {
SchemaRegistryAnnotations annotations = new SchemaRegistryAnnotations(subject);
return new SchemaSubjectChangeOptions(
SchemaRegistryAnnotations.isAnnotatedWitPermananteDelete(subject),
SchemaRegistryAnnotations.isAnnotatedWithNormalizeSchema(subject)
annotations.permananteDelete(),
annotations.normalizeSchema()
);
}

@NotNull
private StateChange getChangeForReferences(
@NotNull V1SchemaRegistrySubject before, @NotNull V1SchemaRegistrySubject after) {
@NotNull V1SchemaRegistrySubject before, @NotNull V1SchemaRegistrySubject after) {

return StateChange.with(
DATA_REFERENCES,
before.getSpec().getReferences()
.stream()
.map(TYPE_CONVERTER::convertValue)
.toList(),
after.getSpec().getReferences()
.stream()
.map(TYPE_CONVERTER::convertValue)
.toList()
DATA_REFERENCES,
before.getSpec().getReferences()
.stream()
.map(TYPE_CONVERTER::convertValue)
.toList(),
after.getSpec().getReferences()
.stream()
.map(TYPE_CONVERTER::convertValue)
.toList()
);
}

@NotNull
private StateChange getChangeForSchemaType(V1SchemaRegistrySubject before,
V1SchemaRegistrySubject after) {
return StateChange.with(
DATA_SCHEMA_TYPE,
Optional.ofNullable(before).map(o -> o.getSpec().getSchemaType()).orElse(null),
Optional.ofNullable(after).map(o -> o.getSpec().getSchemaType()).orElse(null)
DATA_SCHEMA_TYPE,
Optional.ofNullable(before).map(o -> o.getSpec().getSchemaType()).orElse(null),
Optional.ofNullable(after).map(o -> o.getSpec().getSchemaType()).orElse(null)
);
}

Expand All @@ -144,9 +145,9 @@ private StateChange getChangeForSchemaType(V1SchemaRegistrySubject before,
private StateChange getChangeForCompatibility(V1SchemaRegistrySubject before,
V1SchemaRegistrySubject after) {
return StateChange.with(
DATA_COMPATIBILITY_LEVEL,
Optional.ofNullable(before).map(o -> o.getSpec().getCompatibilityLevel()).orElse(null),
Optional.ofNullable(after).map(o -> o.getSpec().getCompatibilityLevel()).orElse(null)
DATA_COMPATIBILITY_LEVEL,
Optional.ofNullable(before).map(o -> o.getSpec().getCompatibilityLevel()).orElse(null),
Optional.ofNullable(after).map(o -> o.getSpec().getCompatibilityLevel()).orElse(null)
);
}

Expand All @@ -162,9 +163,13 @@ private StateChange getChangeForSchema(V1SchemaRegistrySubject before,

private SchemaAndType getSchemaAndType(V1SchemaRegistrySubject subject) {
return Optional.ofNullable(subject)
.map(V1SchemaRegistrySubject::getSpec)
.map(spec -> new SchemaAndType(spec.getSchema().value(), spec.getSchemaType()))
.orElse(SchemaAndType.empty());
.map(V1SchemaRegistrySubject::getSpec)
.map(spec -> new SchemaAndType(
spec.getSchema().value(),
spec.getSchemaType(),
new SchemaRegistryAnnotations(subject).useCanonicalFingerPrint()
))
.orElse(SchemaAndType.empty());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected CompletableFuture<Void> registerSubjectVersion(@NotNull final Resource
);
change.getMetadata()
.addAnnotationIfAbsent(
SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID,
SchemaRegistryAnnotations.SCHEMA_REGISTRY_SCHEMA_ID,
subjectSchemaId.id()
);
}
Expand Down
Loading

0 comments on commit 30a0f8e

Please sign in to comment.