From 77cb8b917786e5f3e271a946f36d240f6a219d2b Mon Sep 17 00:00:00 2001 From: Jeremy Custenborder Date: Tue, 19 May 2020 14:25:23 -0500 Subject: [PATCH] Extended the wikimedia test case to cover the entire schema. Added support to build a schema name for objects based on the location of the property. Fixes #10. (#11) --- pom.xml | 20 ------ .../connect/json/FromJsonSchemaConverter.java | 7 +++ .../json/FromJsonSchemaConverterFactory.java | 50 +++++++++++++-- .../json/FromJsonSchemaConverterTest.java | 61 +++++++++++++++++-- 4 files changed, 108 insertions(+), 30 deletions(-) diff --git a/pom.xml b/pom.xml index 8807af3..61ea736 100644 --- a/pom.xml +++ b/pom.xml @@ -74,26 +74,6 @@ - - com.github.jcustenborder.maven.plugins - kafka-connect-maven-plugin - 0.1.2 - - - generate-config - - generate-config-classes - - generate-sources - - - src/main/connect-config-classes/*.json - - - - - - io.confluent kafka-connect-maven-plugin diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java index bfe734e..909ddb5 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java @@ -107,12 +107,19 @@ protected FromJsonVisitor jsonVisitor(Schema connectSchema, return new FromJsonVisitor.StructVisitor(connectSchema, visitors); } + static final Set EXCLUDE_PROPERTIES = ImmutableSet.of("$schema"); + @Override protected void fromJSON(SchemaBuilder builder, ObjectSchema jsonSchema, Map visitors) { Set requiredProperties = ImmutableSet.copyOf(jsonSchema.getRequiredProperties()); jsonSchema.getPropertySchemas() .entrySet() .stream() + .filter(e -> { + boolean result = !EXCLUDE_PROPERTIES.contains(e.getKey()); + log.trace("fromJson() - filtering '{}' result = '{}'", e.getKey(), result); + return result; + }) .filter(e -> { String schemaLocation = e.getValue().getSchemaLocation(); boolean result = !this.config.excludeLocations.contains(schemaLocation); diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterFactory.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterFactory.java index bfb2d2f..c47002c 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterFactory.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterFactory.java @@ -21,12 +21,14 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.everit.json.schema.CombinedSchema; import org.everit.json.schema.NullSchema; +import org.everit.json.schema.ObjectSchema; import org.everit.json.schema.ReferenceSchema; import org.everit.json.schema.Schema; import org.everit.json.schema.StringSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -67,11 +69,15 @@ public FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema) { } public FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, boolean isOptional) { + final String description; + if (jsonSchema instanceof ReferenceSchema) { ReferenceSchema referenceSchema = (ReferenceSchema) jsonSchema; jsonSchema = referenceSchema.getReferredSchema(); + description = jsonSchema.getDescription(); } else if (jsonSchema instanceof CombinedSchema) { CombinedSchema combinedSchema = (CombinedSchema) jsonSchema; + description = combinedSchema.getDescription(); List nonNullSubSchemas = combinedSchema .getSubschemas() .stream() @@ -87,6 +93,8 @@ public FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, boolean ); } jsonSchema = nonNullSubSchemas.get(0); + } else { + description = jsonSchema.getDescription(); } FromJsonConversionKey key = FromJsonConversionKey.of(jsonSchema); @@ -104,11 +112,14 @@ public FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, boolean } SchemaBuilder builder = converter.schemaBuilder(jsonSchema); - if (!Strings.isNullOrEmpty(jsonSchema.getTitle())) { - builder.name(cleanName(jsonSchema.getTitle())); + if (jsonSchema instanceof ObjectSchema) { + ObjectSchema objectSchema = (ObjectSchema) jsonSchema; + String schemaName = schemaName(objectSchema); + builder.name(schemaName); } - if (!Strings.isNullOrEmpty(jsonSchema.getDescription())) { - builder.doc(jsonSchema.getDescription()); + + if (!Strings.isNullOrEmpty(description)) { + builder.doc(description); } if (isOptional) { builder.optional(); @@ -120,9 +131,38 @@ public FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, boolean return FromJsonState.of(jsonSchema, schema, visitor); } + private List clean(String text) { + List result; + + if (Strings.isNullOrEmpty(text)) { + result = Collections.EMPTY_LIST; + } else { + result = Stream.of(text.split("[#\\\\/\\.]+")) + .filter(p -> !Strings.isNullOrEmpty(p)) + .collect(Collectors.toList()); + } + + return result; + } + + private String schemaName(ObjectSchema objectSchema) { + final List parts; + + if (!Strings.isNullOrEmpty(objectSchema.getTitle())) { + parts = clean(objectSchema.getTitle()); + } else if (!Strings.isNullOrEmpty(objectSchema.getSchemaLocation())) { + parts = clean(objectSchema.getSchemaLocation()); + } else { + parts = Collections.EMPTY_LIST; + } + + return parts.isEmpty() ? null : Joiner.on('.').join(parts); + } + private String cleanName(String title) { - String result = title.replaceAll("[\\/]+", "."); + String result = title.replaceAll("[#\\\\/]+", "."); return result; } + } diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterTest.java index 8769f01..5d86d62 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterTest.java @@ -41,7 +41,6 @@ import java.util.stream.Stream; import static com.github.jcustenborder.kafka.connect.utils.AssertSchema.assertSchema; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.DynamicTest.dynamicTest; public class FromJsonSchemaConverterTest { @@ -82,8 +81,12 @@ org.everit.json.schema.Schema jsonSchema(String type, String key1, String value1 return TestUtils.jsonSchema(rawSchema); } + void assertJsonSchema(org.apache.kafka.connect.data.Schema expected, org.everit.json.schema.Schema input) { FromJsonState state = this.factory.fromJSON(input); + + + log.trace("schema:\n{}", state.schema); assertSchema(expected, state.schema); } @@ -160,18 +163,66 @@ public void productSchema() throws IOException { @Test public void wikiMediaRecentChangeSchema() throws IOException { org.everit.json.schema.Schema jsonSchema = loadSchema("SchemaConverterTest/wikimedia.recentchange.schema.json"); + Schema propertiesLength = SchemaBuilder.struct() + .name("properties.length") + .optional() + .doc("Length of old and new change") + .field("new", SchemaBuilder.int64().doc("(rc_new_len)").optional().build()) + .field("old", SchemaBuilder.int64().doc("(rc_old_len)").optional().build()) + .build(); + Schema propertiesMeta = SchemaBuilder.struct() + .name("properties.meta") + .field("domain", SchemaBuilder.string().optional().doc("Domain the event or entity pertains to").build()) + .field("dt", Timestamp.builder().doc("Event datetime, in ISO-8601 format").build()) + .field("id", SchemaBuilder.string().doc("Unique ID of this event").build()) + .field("request_id", SchemaBuilder.string().optional().doc("Unique ID of the request that caused the event").build()) + .field("stream", SchemaBuilder.string().doc("Name of the stream/queue/dataset that this event belongs in").build()) + .field("uri", SchemaBuilder.string().optional().doc("Unique URI identifying the event or entity").build()) + .build(); + Schema propertiesRevision = SchemaBuilder.struct() + .name("properties.revision") + .optional() + .doc("Old and new revision IDs") + .field("new", SchemaBuilder.int64().doc("(rc_last_oldid)").optional().build()) + .field("old", SchemaBuilder.int64().doc("(rc_this_oldid)").optional().build()) + .build(); + + Schema expected = SchemaBuilder.struct() .name("mediawiki.recentchange") .doc("Represents a MW RecentChange event. https://www.mediawiki.org/wiki/Manual:RCFeed\n") - .field("price", SchemaBuilder.float64().doc("The price of the product").build()) - .field("productId", SchemaBuilder.int64().doc("The unique identifier for a product").build()) - .field("productName", SchemaBuilder.string().doc("Name of the product").build()) + .field("bot", SchemaBuilder.bool().optional().doc("(rc_bot)").build()) + .field("comment", SchemaBuilder.string().optional().doc("(rc_comment)").build()) + .field("id", SchemaBuilder.int64().optional().doc("ID of the recentchange event (rcid).").build()) + .field("length", propertiesLength) + .field("log_action", SchemaBuilder.string().optional().doc("(rc_log_action)").build()) + .field("log_action_comment", SchemaBuilder.string().optional().build()) + .field("log_id", SchemaBuilder.int64().optional().doc("(rc_log_id)").build()) + .field("log_type", SchemaBuilder.string().optional().doc("(rc_log_type)").build()) + .field("meta", propertiesMeta) + .field("minor", SchemaBuilder.bool().optional().doc("(rc_minor).").build()) + .field("namespace", SchemaBuilder.int64().optional().doc("ID of relevant namespace of affected page (rc_namespace, page_namespace). This is -1 (\"Special\") for log events.\n").build()) + .field("parsedcomment", SchemaBuilder.string().optional().doc("The rc_comment parsed into simple HTML. Optional").build()) + .field("patrolled", SchemaBuilder.bool().optional().doc("(rc_patrolled). This property only exists if patrolling is supported for this event (based on $wgUseRCPatrol, $wgUseNPPatrol).\n").build()) + .field("revision", propertiesRevision) + .field("server_name", SchemaBuilder.string().optional().doc("$wgServerName").build()) + .field("server_script_path", SchemaBuilder.string().optional().doc("$wgScriptPath").build()) + .field("server_url", SchemaBuilder.string().optional().doc("$wgCanonicalServer").build()) + .field("timestamp", SchemaBuilder.int64().optional().doc("Unix timestamp (derived from rc_timestamp).").build()) + .field("title", SchemaBuilder.string().optional().doc("Full page name, from Title::getPrefixedText.").build()) + .field("type", SchemaBuilder.string().optional().doc("Type of recentchange event (rc_type). One of \"edit\", \"new\", \"log\", \"categorize\", or \"external\". (See Manual:Recentchanges table#rc_type)\n").build()) + .field("user", SchemaBuilder.string().optional().doc("(rc_user_text)").build()) + .field("wiki", SchemaBuilder.string().optional().doc("wfWikiID ($wgDBprefix, $wgDBname)").build()) + .build(); - assertNotNull(expected); + + + assertJsonSchema(expected, jsonSchema); } @Test public void nested() throws IOException { + org.everit.json.schema.Schema jsonSchema = loadSchema("SchemaConverterTest/nested.schema.json"); Schema addressSchema = SchemaBuilder.struct() .name("Address")