From 30a0f8ee55304301c65acb72daae3d7d678930e2 Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Sun, 2 Jun 2024 01:07:12 +0200 Subject: [PATCH] feat(provider/schema-registry): add schemaregistry.jikkou.io/use-canonical-fingerprint (#431) --- ...ivenSchemaRegistrySubjectControllerIT.java | 12 ++- .../SchemaRegistrySubjectControllerTest.java | 2 +- .../registry/SchemaRegistryAnnotations.java | 31 ++++-- .../V1SchemaRegistrySubjectFactory.java | 6 +- .../schema/registry/avro/AvroSchema.java | 5 + .../change/SchemaSubjectChangeComputer.java | 101 +++++++++--------- .../AbstractSchemaSubjectChangeHandler.java | 2 +- .../schema/registry/model/SchemaAndType.java | 60 ++++++----- .../schema/registry/model/SchemaType.java | 32 +++++- 9 files changed, 155 insertions(+), 96 deletions(-) diff --git a/providers/jikkou-provider-aiven/src/integration-test/java/io/streamthoughts/jikkou/extension/aiven/reconciler/AivenSchemaRegistrySubjectControllerIT.java b/providers/jikkou-provider-aiven/src/integration-test/java/io/streamthoughts/jikkou/extension/aiven/reconciler/AivenSchemaRegistrySubjectControllerIT.java index 75ce7e28a..80a4d4974 100644 --- a/providers/jikkou-provider-aiven/src/integration-test/java/io/streamthoughts/jikkou/extension/aiven/reconciler/AivenSchemaRegistrySubjectControllerIT.java +++ b/providers/jikkou-provider-aiven/src/integration-test/java/io/streamthoughts/jikkou/extension/aiven/reconciler/AivenSchemaRegistrySubjectControllerIT.java @@ -16,7 +16,6 @@ import io.streamthoughts.jikkou.core.ReconciliationContext; import io.streamthoughts.jikkou.core.ReconciliationMode; import io.streamthoughts.jikkou.core.config.Configuration; -import io.streamthoughts.jikkou.core.data.json.Json; import io.streamthoughts.jikkou.core.extension.ClassExtensionAliasesGenerator; import io.streamthoughts.jikkou.core.extension.DefaultExtensionDescriptorFactory; import io.streamthoughts.jikkou.core.extension.DefaultExtensionFactory; @@ -34,6 +33,7 @@ import io.streamthoughts.jikkou.extension.aiven.ApiVersions; import io.streamthoughts.jikkou.schema.registry.SchemaRegistryAnnotations; import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels; +import io.streamthoughts.jikkou.schema.registry.model.SchemaAndType; import io.streamthoughts.jikkou.schema.registry.model.SchemaHandle; import io.streamthoughts.jikkou.schema.registry.model.SchemaType; import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject; @@ -152,7 +152,7 @@ void shouldCreateSchemaRegistrySubject() { .withMetadata(ObjectMeta .builder() .withName(TEST_SUBJECT) - .withAnnotation(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID, 1) + .withAnnotation(SchemaRegistryAnnotations.SCHEMA_REGISTRY_SCHEMA_ID, 1) .build() ) .withSpec(ResourceChangeSpec @@ -163,7 +163,7 @@ void shouldCreateSchemaRegistrySubject() { "normalizeSchema", false )) .withChange(StateChange.create(DATA_COMPATIBILITY_LEVEL, CompatibilityLevels.BACKWARD)) - .withChange(StateChange.create(DATA_SCHEMA, AVRO_SCHEMA_V1)) + .withChange(StateChange.create(DATA_SCHEMA, new SchemaAndType(AVRO_SCHEMA_V1, SchemaType.AVRO))) .withChange(StateChange.create(DATA_SCHEMA_TYPE, SchemaType.AVRO)) .withChange(StateChange.create(DATA_REFERENCES, Collections.emptyList())) .build() @@ -247,7 +247,7 @@ void shouldUpdateSchemaRegistrySubject() { .withMetadata(ObjectMeta .builder() .withName(TEST_SUBJECT) - .withAnnotation(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID, 1) + .withAnnotation(SchemaRegistryAnnotations.SCHEMA_REGISTRY_SCHEMA_ID, 1) .build() ) .withSpec(ResourceChangeSpec @@ -258,12 +258,14 @@ void shouldUpdateSchemaRegistrySubject() { "normalizeSchema", false )) .withChange(StateChange.none(DATA_COMPATIBILITY_LEVEL, CompatibilityLevels.BACKWARD)) - .withChange(StateChange.update(DATA_SCHEMA, Json.normalize(AVRO_SCHEMA_V1), Json.normalize(AVRO_SCHEMA_V2))) + .withChange(StateChange.update(DATA_SCHEMA, new SchemaAndType(AVRO_SCHEMA_V1, SchemaType.AVRO), new SchemaAndType(AVRO_SCHEMA_V2, SchemaType.AVRO))) .withChange(StateChange.none(DATA_SCHEMA_TYPE, SchemaType.AVRO)) .withChange(StateChange.none(DATA_REFERENCES, Collections.emptyList())) .build() ) .build(); Assertions.assertEquals(expected, actual); + + } } diff --git a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectControllerTest.java b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectControllerTest.java index 3d84413f4..32cd808fa 100644 --- a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectControllerTest.java +++ b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectControllerTest.java @@ -79,7 +79,7 @@ void shouldRegisterSchemaForNewResource() { Assertions.assertEquals(1, results.size()); ChangeResult change = results.getFirst(); ResourceChange data = change.change(); - Assertions.assertEquals(Optional.of(1), data.getMetadata().findAnnotationByKey(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID)); + Assertions.assertEquals(Optional.of(1), data.getMetadata().findAnnotationByKey(SchemaRegistryAnnotations.SCHEMA_REGISTRY_SCHEMA_ID)); Assertions.assertEquals(Operation.CREATE, data.getSpec().getOp()); Assertions.assertEquals(SchemaType.AVRO, data.getSpec().getChanges().getLast("schemaType", TypeConverter.of(SchemaType.class)).getAfter()); } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/SchemaRegistryAnnotations.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/SchemaRegistryAnnotations.java index 27c361eab..8a0f3acc0 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/SchemaRegistryAnnotations.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/SchemaRegistryAnnotations.java @@ -29,17 +29,28 @@ public final class SchemaRegistryAnnotations { private static final String SCHEMAREGISTRY_JIKKOU_IO = "schemaregistry.jikkou.io/"; - public static final String JIKKOU_IO_SCHEMA_REGISTRY_NORMALIZE_SCHEMA = SCHEMAREGISTRY_JIKKOU_IO + "normalize-schema"; - public static final String JIKKOU_IO_SCHEMA_REGISTRY_PERMANANTE_DELETE = SCHEMAREGISTRY_JIKKOU_IO + "permanent-delete"; - public static final String JIKKOU_IO_SCHEMA_REGISTRY_URL = SCHEMAREGISTRY_JIKKOU_IO + "url"; - public static final String JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_VERSION = SCHEMAREGISTRY_JIKKOU_IO + "schema-version"; - public static final String JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID = SCHEMAREGISTRY_JIKKOU_IO + "schema-id"; - - public static boolean isAnnotatedWithNormalizeSchema(V1SchemaRegistrySubject subject) { - return CoreAnnotations.isAnnotatedWith(subject, JIKKOU_IO_SCHEMA_REGISTRY_NORMALIZE_SCHEMA); + public static final String SCHEMA_REGISTRY_NORMALIZE_SCHEMA = SCHEMAREGISTRY_JIKKOU_IO + "normalize-schema"; + public static final String SCHEMA_REGISTRY_PERMANANTE_DELETE = SCHEMAREGISTRY_JIKKOU_IO + "permanent-delete"; + public static final String SCHEMA_REGISTRY_URL = SCHEMAREGISTRY_JIKKOU_IO + "url"; + public static final String SCHEMA_REGISTRY_SCHEMA_VERSION = SCHEMAREGISTRY_JIKKOU_IO + "schema-version"; + public static final String SCHEMA_REGISTRY_SCHEMA_ID = SCHEMAREGISTRY_JIKKOU_IO + "schema-id"; + public static final String SCHEMA_REGISTRY_USE_CANONICAL_FINGERPRINT = SCHEMAREGISTRY_JIKKOU_IO + "use-canonical-fingerprint"; + + private final V1SchemaRegistrySubject resource; + + public SchemaRegistryAnnotations(final V1SchemaRegistrySubject resource) { + this.resource = resource; + } + + public boolean normalizeSchema() { + return CoreAnnotations.isAnnotatedWith(resource, SCHEMA_REGISTRY_NORMALIZE_SCHEMA); + } + + public boolean permananteDelete() { + return CoreAnnotations.isAnnotatedWith(resource, SCHEMA_REGISTRY_PERMANANTE_DELETE); } - public static boolean isAnnotatedWitPermananteDelete(V1SchemaRegistrySubject subject) { - return CoreAnnotations.isAnnotatedWith(subject, JIKKOU_IO_SCHEMA_REGISTRY_PERMANANTE_DELETE); + public boolean useCanonicalFingerPrint() { + return CoreAnnotations.isAnnotatedWith(resource, SCHEMA_REGISTRY_USE_CANONICAL_FINGERPRINT); } } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/V1SchemaRegistrySubjectFactory.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/V1SchemaRegistrySubjectFactory.java index e976c3bc9..28465d6fa 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/V1SchemaRegistrySubjectFactory.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/V1SchemaRegistrySubjectFactory.java @@ -60,11 +60,11 @@ public V1SchemaRegistrySubject createSchemaRegistrySubject(@NotNull SubjectSchem .withMetadata(ObjectMeta .builder() .withName(subjectSchema.subject()) - .withAnnotation(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_URL, + .withAnnotation(SchemaRegistryAnnotations.SCHEMA_REGISTRY_URL, schemaRegistryUrl) - .withAnnotation(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_VERSION, + .withAnnotation(SchemaRegistryAnnotations.SCHEMA_REGISTRY_SCHEMA_VERSION, subjectSchema.version()) - .withAnnotation(SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID, + .withAnnotation(SchemaRegistryAnnotations.SCHEMA_REGISTRY_SCHEMA_ID, subjectSchema.id()) .build() ) diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/avro/AvroSchema.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/avro/AvroSchema.java index f6e129b87..c3d265dbf 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/avro/AvroSchema.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/avro/AvroSchema.java @@ -7,6 +7,7 @@ package io.streamthoughts.jikkou.schema.registry.avro; import org.apache.avro.Schema; +import org.apache.avro.SchemaNormalization; import org.jetbrains.annotations.NotNull; /** @@ -30,6 +31,10 @@ public Schema schema() { return schema; } + public long fingerprint64() { + return SchemaNormalization.parsingFingerprint64(schema()); + } + /** {@inheritDoc} **/ public String toString() { return schema.toString(false); diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java index 52dda1c79..643fec877 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/SchemaSubjectChangeComputer.java @@ -50,15 +50,15 @@ public static class SchemaSubjectChangeFactory extends ResourceChangeFactory changes = StateChangeList.emptyList() - .with(getChangeForCompatibility(before, after)) - .with(getChangeForSchema(before, after)) - .with(getChangeForSchemaType(before, after)) - .with(getChangeForReferences(before, after)); + .with(getChangeForCompatibility(before, after)) + .with(getChangeForSchema(before, after)) + .with(getChangeForSchemaType(before, after)) + .with(getChangeForReferences(before, after)); return GenericResourceChange - .builder(V1SchemaRegistrySubject.class) - .withMetadata(after.getMetadata()) - .withSpec(ResourceChangeSpec - .builder() - .withData(TYPE_CONVERTER.convertValue(getOptions(after))) - .withOperation(Change.computeOperation(changes.all())) - .withChanges(changes) - .build() - ) - .build(); + .builder(V1SchemaRegistrySubject.class) + .withMetadata(after.getMetadata()) + .withSpec(ResourceChangeSpec + .builder() + .withData(TYPE_CONVERTER.convertValue(getOptions(after))) + .withOperation(Change.computeOperation(changes.all())) + .withChanges(changes) + .build() + ) + .build(); } @NotNull private SchemaSubjectChangeOptions getOptions(@NotNull V1SchemaRegistrySubject subject) { + SchemaRegistryAnnotations annotations = new SchemaRegistryAnnotations(subject); return new SchemaSubjectChangeOptions( - SchemaRegistryAnnotations.isAnnotatedWitPermananteDelete(subject), - SchemaRegistryAnnotations.isAnnotatedWithNormalizeSchema(subject) + annotations.permananteDelete(), + annotations.normalizeSchema() ); } @NotNull private StateChange getChangeForReferences( - @NotNull V1SchemaRegistrySubject before, @NotNull V1SchemaRegistrySubject after) { + @NotNull V1SchemaRegistrySubject before, @NotNull V1SchemaRegistrySubject after) { return StateChange.with( - DATA_REFERENCES, - before.getSpec().getReferences() - .stream() - .map(TYPE_CONVERTER::convertValue) - .toList(), - after.getSpec().getReferences() - .stream() - .map(TYPE_CONVERTER::convertValue) - .toList() + DATA_REFERENCES, + before.getSpec().getReferences() + .stream() + .map(TYPE_CONVERTER::convertValue) + .toList(), + after.getSpec().getReferences() + .stream() + .map(TYPE_CONVERTER::convertValue) + .toList() ); } @@ -133,9 +134,9 @@ private StateChange getChangeForReferences( private StateChange getChangeForSchemaType(V1SchemaRegistrySubject before, V1SchemaRegistrySubject after) { return StateChange.with( - DATA_SCHEMA_TYPE, - Optional.ofNullable(before).map(o -> o.getSpec().getSchemaType()).orElse(null), - Optional.ofNullable(after).map(o -> o.getSpec().getSchemaType()).orElse(null) + DATA_SCHEMA_TYPE, + Optional.ofNullable(before).map(o -> o.getSpec().getSchemaType()).orElse(null), + Optional.ofNullable(after).map(o -> o.getSpec().getSchemaType()).orElse(null) ); } @@ -144,9 +145,9 @@ private StateChange getChangeForSchemaType(V1SchemaRegistrySubject before, private StateChange getChangeForCompatibility(V1SchemaRegistrySubject before, V1SchemaRegistrySubject after) { return StateChange.with( - DATA_COMPATIBILITY_LEVEL, - Optional.ofNullable(before).map(o -> o.getSpec().getCompatibilityLevel()).orElse(null), - Optional.ofNullable(after).map(o -> o.getSpec().getCompatibilityLevel()).orElse(null) + DATA_COMPATIBILITY_LEVEL, + Optional.ofNullable(before).map(o -> o.getSpec().getCompatibilityLevel()).orElse(null), + Optional.ofNullable(after).map(o -> o.getSpec().getCompatibilityLevel()).orElse(null) ); } @@ -162,9 +163,13 @@ private StateChange getChangeForSchema(V1SchemaRegistrySubject before, private SchemaAndType getSchemaAndType(V1SchemaRegistrySubject subject) { return Optional.ofNullable(subject) - .map(V1SchemaRegistrySubject::getSpec) - .map(spec -> new SchemaAndType(spec.getSchema().value(), spec.getSchemaType())) - .orElse(SchemaAndType.empty()); + .map(V1SchemaRegistrySubject::getSpec) + .map(spec -> new SchemaAndType( + spec.getSchema().value(), + spec.getSchemaType(), + new SchemaRegistryAnnotations(subject).useCanonicalFingerPrint() + )) + .orElse(SchemaAndType.empty()); } } } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java index b67b8a7cf..eff300800 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/AbstractSchemaSubjectChangeHandler.java @@ -117,7 +117,7 @@ protected CompletableFuture registerSubjectVersion(@NotNull final Resource ); change.getMetadata() .addAnnotationIfAbsent( - SchemaRegistryAnnotations.JIKKOU_IO_SCHEMA_REGISTRY_SCHEMA_ID, + SchemaRegistryAnnotations.SCHEMA_REGISTRY_SCHEMA_ID, subjectSchemaId.id() ); } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/model/SchemaAndType.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/model/SchemaAndType.java index d83dd15a7..3724054dd 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/model/SchemaAndType.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/model/SchemaAndType.java @@ -7,13 +7,9 @@ package io.streamthoughts.jikkou.schema.registry.model; import com.fasterxml.jackson.annotation.JsonValue; -import io.streamthoughts.jikkou.core.data.json.Json; -import io.streamthoughts.jikkou.schema.registry.avro.AvroSchema; import java.util.Objects; import lombok.Builder; import lombok.extern.jackson.Jacksonized; -import org.apache.avro.Schema; -import org.apache.avro.SchemaNormalization; import org.jetbrains.annotations.NotNull; @Builder @@ -29,6 +25,7 @@ public static SchemaAndType empty() { @JsonValue private final String schema; private final SchemaType type; + private final boolean useCanonicalFingerPrint; /** * Creates a new {@link SchemaAndType} instance. @@ -36,17 +33,34 @@ public static SchemaAndType empty() { private SchemaAndType() { this.schema = null; this.type = null; + this.useCanonicalFingerPrint = false; } /** * Creates a new {@link SchemaAndType} instance. - * @param schema the schema string. - * @param type the schema type. + * + * @param schema the schema string. + * @param type the schema type. */ public SchemaAndType(@NotNull String schema, @NotNull SchemaType type) { this.schema = Objects.requireNonNull(schema, "schema must not be null"); this.type = Objects.requireNonNull(type, "type must not be null"); + this.useCanonicalFingerPrint = false; + } + + /** + * Creates a new {@link SchemaAndType} instance. + * + * @param schema the schema string. + * @param type the schema type. + */ + public SchemaAndType(@NotNull String schema, + @NotNull SchemaType type, + boolean useCanonicalFingerPrint) { + this.schema = Objects.requireNonNull(schema, "schema must not be null"); + this.type = Objects.requireNonNull(type, "type must not be null"); + this.useCanonicalFingerPrint = useCanonicalFingerPrint; } @JsonValue @@ -58,39 +72,31 @@ public SchemaType type() { return type; } - /** {@inheritDoc} **/ + /** + * {@inheritDoc} + **/ @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SchemaAndType that = (SchemaAndType) o; - if (type == SchemaType.AVRO && type == that.type) { - return avroEquals(that); - } else if (type == SchemaType.JSON && type == that.type) { - return jsonEquals(that); - } else { - return Objects.equals(schema, that.schema) && type == that.type; - } + return Objects.equals( + this.type != null ? this.type.comparableSchemaForm(this.schema, this.useCanonicalFingerPrint) : null, + that.type != null ? that.type.comparableSchemaForm(that.schema, that.useCanonicalFingerPrint) : null + ); } - /** {@inheritDoc} **/ + /** + * {@inheritDoc} + **/ @Override public int hashCode() { return Objects.hash(schema, type); } - private boolean avroEquals(SchemaAndType that) { - Schema thisSchema = new AvroSchema(schema).schema(); - Schema thatSchema = new AvroSchema(that.schema).schema(); - - return SchemaNormalization.parsingFingerprint64(thisSchema) == - SchemaNormalization.parsingFingerprint64(thatSchema); - } - - private boolean jsonEquals(SchemaAndType that) { - return Objects.equals(Json.normalize(schema), Json.normalize(that.schema)); - } - + /** + * {@inheritDoc} + **/ public String toString() { return schema; } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/model/SchemaType.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/model/SchemaType.java index b11ec62b3..8cc8fa7ab 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/model/SchemaType.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/model/SchemaType.java @@ -8,11 +8,30 @@ import com.fasterxml.jackson.annotation.JsonCreator; import io.streamthoughts.jikkou.common.utils.Enums; +import io.streamthoughts.jikkou.core.data.json.Json; +import io.streamthoughts.jikkou.schema.registry.avro.AvroSchema; import org.jetbrains.annotations.Nullable; public enum SchemaType { - AVRO, PROTOBUF, JSON; + AVRO { + @Override + public Object comparableSchemaForm(final String schema, boolean useCanonicalFingerPrint) { + if (schema == null) return null; + + return useCanonicalFingerPrint ? new AvroSchema(schema).fingerprint64() : Json.normalize(schema); + } + }, PROTOBUF { + @Override + public Object comparableSchemaForm(final String schema, boolean useCanonicalFingerPrint) { + return schema; + } + }, JSON { + @Override + public Object comparableSchemaForm(final String schema, boolean useCanonicalFingerPrint) { + return Json.normalize(schema); + } + }; @JsonCreator public static SchemaType getForNameIgnoreCase(final @Nullable String str) { @@ -24,4 +43,15 @@ public static SchemaType defaultType() { return SchemaType.AVRO; } + + /** + * Transforms the given schema to an object that will be used to check schema equality. + * + * @param schema The schema. + * @param useCanonicalFingerPrint flag whether to use a canonical-print. + * @return the object used to check schema equality. + */ + public abstract Object comparableSchemaForm(final String schema, + final boolean useCanonicalFingerPrint); + }