Skip to content

Commit

Permalink
Extended the wikimedia test case to cover the entire schema. Added su…
Browse files Browse the repository at this point in the history
…pport to build a schema name for objects based on the location of the property. Fixes jcustenborder#10. (jcustenborder#11)
  • Loading branch information
jcustenborder authored May 19, 2020
1 parent 347c2f7 commit 77cb8b9
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 30 deletions.
20 changes: 0 additions & 20 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,6 @@
</repositories>
<build>
<plugins>
<plugin>
<groupId>com.github.jcustenborder.maven.plugins</groupId>
<artifactId>kafka-connect-maven-plugin</artifactId>
<version>0.1.2</version>
<executions>
<execution>
<id>generate-config</id>
<goals>
<goal>generate-config-classes</goal>
</goals>
<phase>generate-sources</phase>
<configuration>
<includeFiles>
<includeFile>src/main/connect-config-classes/*.json</includeFile>
</includeFiles>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,19 @@ protected FromJsonVisitor<ObjectNode, Struct> jsonVisitor(Schema connectSchema,
return new FromJsonVisitor.StructVisitor(connectSchema, visitors);
}

static final Set<String> EXCLUDE_PROPERTIES = ImmutableSet.of("$schema");

@Override
protected void fromJSON(SchemaBuilder builder, ObjectSchema jsonSchema, Map<String, FromJsonVisitor> visitors) {
Set<String> requiredProperties = ImmutableSet.copyOf(jsonSchema.getRequiredProperties());
jsonSchema.getPropertySchemas()
.entrySet()
.stream()
.filter(e -> {
boolean result = !EXCLUDE_PROPERTIES.contains(e.getKey());
log.trace("fromJson() - filtering '{}' result = '{}'", e.getKey(), result);
return result;
})
.filter(e -> {
String schemaLocation = e.getValue().getSchemaLocation();
boolean result = !this.config.excludeLocations.contains(schemaLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import org.everit.json.schema.CombinedSchema;
import org.everit.json.schema.NullSchema;
import org.everit.json.schema.ObjectSchema;
import org.everit.json.schema.ReferenceSchema;
import org.everit.json.schema.Schema;
import org.everit.json.schema.StringSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -67,11 +69,15 @@ public FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema) {
}

public FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, boolean isOptional) {
final String description;

if (jsonSchema instanceof ReferenceSchema) {
ReferenceSchema referenceSchema = (ReferenceSchema) jsonSchema;
jsonSchema = referenceSchema.getReferredSchema();
description = jsonSchema.getDescription();
} else if (jsonSchema instanceof CombinedSchema) {
CombinedSchema combinedSchema = (CombinedSchema) jsonSchema;
description = combinedSchema.getDescription();
List<Schema> nonNullSubSchemas = combinedSchema
.getSubschemas()
.stream()
Expand All @@ -87,6 +93,8 @@ public FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, boolean
);
}
jsonSchema = nonNullSubSchemas.get(0);
} else {
description = jsonSchema.getDescription();
}
FromJsonConversionKey key = FromJsonConversionKey.of(jsonSchema);

Expand All @@ -104,11 +112,14 @@ public FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, boolean
}

SchemaBuilder builder = converter.schemaBuilder(jsonSchema);
if (!Strings.isNullOrEmpty(jsonSchema.getTitle())) {
builder.name(cleanName(jsonSchema.getTitle()));
if (jsonSchema instanceof ObjectSchema) {
ObjectSchema objectSchema = (ObjectSchema) jsonSchema;
String schemaName = schemaName(objectSchema);
builder.name(schemaName);
}
if (!Strings.isNullOrEmpty(jsonSchema.getDescription())) {
builder.doc(jsonSchema.getDescription());

if (!Strings.isNullOrEmpty(description)) {
builder.doc(description);
}
if (isOptional) {
builder.optional();
Expand All @@ -120,9 +131,38 @@ public FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, boolean
return FromJsonState.of(jsonSchema, schema, visitor);
}

private List<String> clean(String text) {
List<String> result;

if (Strings.isNullOrEmpty(text)) {
result = Collections.EMPTY_LIST;
} else {
result = Stream.of(text.split("[#\\\\/\\.]+"))
.filter(p -> !Strings.isNullOrEmpty(p))
.collect(Collectors.toList());
}

return result;
}

private String schemaName(ObjectSchema objectSchema) {
final List<String> parts;

if (!Strings.isNullOrEmpty(objectSchema.getTitle())) {
parts = clean(objectSchema.getTitle());
} else if (!Strings.isNullOrEmpty(objectSchema.getSchemaLocation())) {
parts = clean(objectSchema.getSchemaLocation());
} else {
parts = Collections.EMPTY_LIST;
}

return parts.isEmpty() ? null : Joiner.on('.').join(parts);
}

private String cleanName(String title) {
String result = title.replaceAll("[\\/]+", ".");
String result = title.replaceAll("[#\\\\/]+", ".");
return result;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.stream.Stream;

import static com.github.jcustenborder.kafka.connect.utils.AssertSchema.assertSchema;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.DynamicTest.dynamicTest;

public class FromJsonSchemaConverterTest {
Expand Down Expand Up @@ -82,8 +81,12 @@ org.everit.json.schema.Schema jsonSchema(String type, String key1, String value1
return TestUtils.jsonSchema(rawSchema);
}


void assertJsonSchema(org.apache.kafka.connect.data.Schema expected, org.everit.json.schema.Schema input) {
FromJsonState state = this.factory.fromJSON(input);


log.trace("schema:\n{}", state.schema);
assertSchema(expected, state.schema);
}

Expand Down Expand Up @@ -160,18 +163,66 @@ public void productSchema() throws IOException {
@Test
public void wikiMediaRecentChangeSchema() throws IOException {
org.everit.json.schema.Schema jsonSchema = loadSchema("SchemaConverterTest/wikimedia.recentchange.schema.json");
Schema propertiesLength = SchemaBuilder.struct()
.name("properties.length")
.optional()
.doc("Length of old and new change")
.field("new", SchemaBuilder.int64().doc("(rc_new_len)").optional().build())
.field("old", SchemaBuilder.int64().doc("(rc_old_len)").optional().build())
.build();
Schema propertiesMeta = SchemaBuilder.struct()
.name("properties.meta")
.field("domain", SchemaBuilder.string().optional().doc("Domain the event or entity pertains to").build())
.field("dt", Timestamp.builder().doc("Event datetime, in ISO-8601 format").build())
.field("id", SchemaBuilder.string().doc("Unique ID of this event").build())
.field("request_id", SchemaBuilder.string().optional().doc("Unique ID of the request that caused the event").build())
.field("stream", SchemaBuilder.string().doc("Name of the stream/queue/dataset that this event belongs in").build())
.field("uri", SchemaBuilder.string().optional().doc("Unique URI identifying the event or entity").build())
.build();
Schema propertiesRevision = SchemaBuilder.struct()
.name("properties.revision")
.optional()
.doc("Old and new revision IDs")
.field("new", SchemaBuilder.int64().doc("(rc_last_oldid)").optional().build())
.field("old", SchemaBuilder.int64().doc("(rc_this_oldid)").optional().build())
.build();


Schema expected = SchemaBuilder.struct()
.name("mediawiki.recentchange")
.doc("Represents a MW RecentChange event. https://www.mediawiki.org/wiki/Manual:RCFeed\n")
.field("price", SchemaBuilder.float64().doc("The price of the product").build())
.field("productId", SchemaBuilder.int64().doc("The unique identifier for a product").build())
.field("productName", SchemaBuilder.string().doc("Name of the product").build())
.field("bot", SchemaBuilder.bool().optional().doc("(rc_bot)").build())
.field("comment", SchemaBuilder.string().optional().doc("(rc_comment)").build())
.field("id", SchemaBuilder.int64().optional().doc("ID of the recentchange event (rcid).").build())
.field("length", propertiesLength)
.field("log_action", SchemaBuilder.string().optional().doc("(rc_log_action)").build())
.field("log_action_comment", SchemaBuilder.string().optional().build())
.field("log_id", SchemaBuilder.int64().optional().doc("(rc_log_id)").build())
.field("log_type", SchemaBuilder.string().optional().doc("(rc_log_type)").build())
.field("meta", propertiesMeta)
.field("minor", SchemaBuilder.bool().optional().doc("(rc_minor).").build())
.field("namespace", SchemaBuilder.int64().optional().doc("ID of relevant namespace of affected page (rc_namespace, page_namespace). This is -1 (\"Special\") for log events.\n").build())
.field("parsedcomment", SchemaBuilder.string().optional().doc("The rc_comment parsed into simple HTML. Optional").build())
.field("patrolled", SchemaBuilder.bool().optional().doc("(rc_patrolled). This property only exists if patrolling is supported for this event (based on $wgUseRCPatrol, $wgUseNPPatrol).\n").build())
.field("revision", propertiesRevision)
.field("server_name", SchemaBuilder.string().optional().doc("$wgServerName").build())
.field("server_script_path", SchemaBuilder.string().optional().doc("$wgScriptPath").build())
.field("server_url", SchemaBuilder.string().optional().doc("$wgCanonicalServer").build())
.field("timestamp", SchemaBuilder.int64().optional().doc("Unix timestamp (derived from rc_timestamp).").build())
.field("title", SchemaBuilder.string().optional().doc("Full page name, from Title::getPrefixedText.").build())
.field("type", SchemaBuilder.string().optional().doc("Type of recentchange event (rc_type). One of \"edit\", \"new\", \"log\", \"categorize\", or \"external\". (See Manual:Recentchanges table#rc_type)\n").build())
.field("user", SchemaBuilder.string().optional().doc("(rc_user_text)").build())
.field("wiki", SchemaBuilder.string().optional().doc("wfWikiID ($wgDBprefix, $wgDBname)").build())

.build();
assertNotNull(expected);


assertJsonSchema(expected, jsonSchema);
}

@Test
public void nested() throws IOException {

org.everit.json.schema.Schema jsonSchema = loadSchema("SchemaConverterTest/nested.schema.json");
Schema addressSchema = SchemaBuilder.struct()
.name("Address")
Expand Down

0 comments on commit 77cb8b9

Please sign in to comment.