From 7960bbca6e1be7d001e484e4d4a8d1c826b9cb5e Mon Sep 17 00:00:00 2001 From: Jeremy Custenborder Date: Mon, 18 May 2020 15:17:31 -0500 Subject: [PATCH] Added support to exclude properties. Added fall through case for Strings to just be passed as a string. Fixes #8. (#9) --- .../kafka/connect/json/FromJson.java | 12 +- .../kafka/connect/json/FromJsonConfig.java | 74 +------ .../connect/json/FromJsonSchemaConverter.java | 126 ++++++----- .../json/FromJsonSchemaConverterFactory.java | 128 ++++++++++++ .../kafka/connect/json/JsonConfig.java | 110 ++++++++++ .../connect/json/JsonSchemaConverter.java | 12 +- .../json/JsonSchemaConverterConfig.java | 29 +-- .../json/FromJsonSchemaConverterTest.java | 67 +++++- .../kafka/connect/json/FromJsonTest.java | 23 +- .../wikimedia.recentchange.schema.json | 196 ++++++++++++++++++ .../json/wikimedia.recentchange.data.json | 36 ++++ .../json/wikimedia.recentchange.schema.json | 196 ++++++++++++++++++ 12 files changed, 836 insertions(+), 173 deletions(-) create mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterFactory.java create mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/json/JsonConfig.java create mode 100644 src/test/resources/com/github/jcustenborder/kafka/connect/json/SchemaConverterTest/wikimedia.recentchange.schema.json create mode 100644 src/test/resources/com/github/jcustenborder/kafka/connect/json/wikimedia.recentchange.data.json create mode 100644 src/test/resources/com/github/jcustenborder/kafka/connect/json/wikimedia.recentchange.schema.json diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJson.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJson.java index db1f32a..7816e54 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJson.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJson.java @@ -125,34 +125,36 @@ protected SchemaAndValue processString(R record, Schema inputSchema, String inpu } FromJsonState fromJsonState; + FromJsonSchemaConverterFactory fromJsonSchemaConverterFactory; ObjectMapper objectMapper; @Override public void configure(Map map) { this.config = new FromJsonConfig(map); + this.fromJsonSchemaConverterFactory = new FromJsonSchemaConverterFactory(config); org.everit.json.schema.Schema schema; - if (FromJsonConfig.SchemaLocation.Url == this.config.schemaLocation) { + if (JsonConfig.SchemaLocation.Url == this.config.schemaLocation) { try { try (InputStream inputStream = this.config.schemaUrl.openStream()) { schema = Utils.loadSchema(inputStream); } } catch (IOException e) { - ConfigException exception = new ConfigException(FromJsonConfig.SCHEMA_URL_CONF, this.config.schemaUrl, "exception while loading schema"); + ConfigException exception = new ConfigException(JsonConfig.SCHEMA_URL_CONF, this.config.schemaUrl, "exception while loading schema"); exception.initCause(e); throw exception; } - } else if (FromJsonConfig.SchemaLocation.Inline == this.config.schemaLocation) { + } else if (JsonConfig.SchemaLocation.Inline == this.config.schemaLocation) { schema = Utils.loadSchema(this.config.schemaText); } else { throw new ConfigException( - FromJsonConfig.SCHEMA_LOCATION_CONF, + JsonConfig.SCHEMA_LOCATION_CONF, this.config.schemaLocation.toString(), "Location is not supported" ); } - this.fromJsonState = FromJsonSchemaConverter.fromJSON(schema); + this.fromJsonState = this.fromJsonSchemaConverterFactory.fromJSON(schema); this.objectMapper = JacksonFactory.create(); } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConfig.java index 8133616..710182c 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConfig.java @@ -15,88 +15,18 @@ */ package com.github.jcustenborder.kafka.connect.json; -import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; -import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils; -import com.github.jcustenborder.kafka.connect.utils.config.Description; -import com.github.jcustenborder.kafka.connect.utils.config.recommenders.Recommenders; -import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import java.net.URL; import java.util.Map; -class FromJsonConfig extends AbstractConfig { - public static final String SCHEMA_URL_CONF = "json.schema.url"; - static final String SCHEMA_URL_DOC = "Url to retrieve the schema from. Urls can be anything that is " + - "supported by URL.openStream(). For example the local filesystem file:///schemas/something.json. " + - "A web address https://www.schemas.com/something.json"; - public static final String SCHEMA_INLINE_CONF = "json.schema.inline"; - static final String SCHEMA_INLINE_DOC = "The JSON schema to use as an escaped string."; - public static final String SCHEMA_LOCATION_CONF = "json.schema.location"; - static final String SCHEMA_LOCATION_DOC = "Location to retrieve the schema from. " + - ConfigUtils.enumDescription(SchemaLocation.class); - - public static final String VALIDATE_JSON_ENABLED_CONF = "json.schema.validation.enabled"; - static final String VALIDATE_JSON_ENABLED_DOC = "Flag to determine if the JSON should be validated " + - "against the schema."; - - public enum SchemaLocation { - @Description("Loads the schema from the url specified in `" + SCHEMA_URL_CONF + "`.") - Url, - @Description("Loads the schema from `" + SCHEMA_INLINE_CONF + "` as an inline string.") - Inline - } - - public final URL schemaUrl; - public final SchemaLocation schemaLocation; - public final String schemaText; - public final boolean validateJson; +class FromJsonConfig extends JsonConfig { public FromJsonConfig(Map originals) { super(config(), originals); - this.schemaUrl = ConfigUtils.url(this, SCHEMA_URL_CONF); - this.schemaLocation = ConfigUtils.getEnum(SchemaLocation.class, this, SCHEMA_LOCATION_CONF); - this.schemaText = getString(SCHEMA_INLINE_CONF); - this.validateJson = getBoolean(VALIDATE_JSON_ENABLED_CONF); - } - - public static void addConfigItems(ConfigDef configDef) { - configDef.define( - ConfigKeyBuilder.of(SCHEMA_URL_CONF, ConfigDef.Type.STRING) - .documentation(SCHEMA_URL_DOC) - .validator(Validators.validUrl()) - .importance(ConfigDef.Importance.HIGH) - .recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Url.toString())) - .defaultValue("File:///doesNotExist") - .build() - ).define( - ConfigKeyBuilder.of(SCHEMA_LOCATION_CONF, ConfigDef.Type.STRING) - .documentation(SCHEMA_LOCATION_DOC) - .validator(Validators.validEnum(SchemaLocation.class)) - .recommender(Recommenders.enumValues(SchemaLocation.class)) - .importance(ConfigDef.Importance.HIGH) - .defaultValue(SchemaLocation.Url.toString()) - .build() - ).define( - ConfigKeyBuilder.of(VALIDATE_JSON_ENABLED_CONF, ConfigDef.Type.BOOLEAN) - .documentation(VALIDATE_JSON_ENABLED_DOC) - .importance(ConfigDef.Importance.MEDIUM) - .defaultValue(false) - .build() - ).define( - ConfigKeyBuilder.of(SCHEMA_INLINE_CONF, ConfigDef.Type.STRING) - .documentation(SCHEMA_INLINE_DOC) - .recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Inline.toString())) - .importance(ConfigDef.Importance.HIGH) - .defaultValue("") - .build()); } public static ConfigDef config() { - ConfigDef configDef = new ConfigDef(); - addConfigItems(configDef); - return configDef; + return JsonConfig.config(); } } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java index 274c4da..bfe734e 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; @@ -35,76 +34,22 @@ import org.everit.json.schema.BooleanSchema; import org.everit.json.schema.NumberSchema; import org.everit.json.schema.ObjectSchema; -import org.everit.json.schema.ReferenceSchema; import org.everit.json.schema.StringSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; public abstract class FromJsonSchemaConverter { - static final Map< - FromJsonConversionKey, - FromJsonSchemaConverter - > LOOKUP; private static final Logger log = LoggerFactory.getLogger(FromJsonSchemaConverter.class); + protected final FromJsonSchemaConverterFactory factory; + protected final JsonConfig config; - static { - LOOKUP = Stream.of( - new ObjectSchemaConverter(), - new IntegerSchemaConverter(), - new StringSchemaConverter(), - new BooleanSchemaConverter(), - new TimeSchemaConverter(), - new DateSchemaConverter(), - new DateTimeSchemaConverter(), - new FloatSchemaConverter(), - new ArraySchemaConverter(), - new BytesSchemaConverter(), - new DecimalSchemaConverter(), - new CustomTimestampConverter() - ).collect(Collectors.toMap(FromJsonSchemaConverter::key, c -> c)); - } - - public static FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema) { - return fromJSON(jsonSchema, false); - } - - public static FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, boolean isOptional) { - if (jsonSchema instanceof ReferenceSchema) { - ReferenceSchema referenceSchema = (ReferenceSchema) jsonSchema; - jsonSchema = referenceSchema.getReferredSchema(); - } - FromJsonConversionKey key = FromJsonConversionKey.of(jsonSchema); - - FromJsonSchemaConverter converter = LOOKUP.get(key); - - if (null == converter) { - throw new UnsupportedOperationException( - String.format("Schema type is not supported. %s:%s", jsonSchema.getClass().getName(), jsonSchema) - ); - } - - SchemaBuilder builder = converter.schemaBuilder(jsonSchema); - if (!Strings.isNullOrEmpty(jsonSchema.getTitle())) { - builder.name(jsonSchema.getTitle()); - } - if (!Strings.isNullOrEmpty(jsonSchema.getDescription())) { - builder.doc(jsonSchema.getDescription()); - } - if (isOptional) { - builder.optional(); - } - Map visitors = new LinkedHashMap<>(); - converter.fromJSON(builder, jsonSchema, visitors); - Schema schema = builder.build(); - FromJsonVisitor visitor = converter.jsonVisitor(schema, visitors); - return FromJsonState.of(jsonSchema, schema, visitor); + protected FromJsonSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + this.factory = factory; + this.config = config; } protected abstract SchemaBuilder schemaBuilder(T schema); @@ -116,6 +61,10 @@ public static FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, b protected abstract void fromJSON(SchemaBuilder builder, T jsonSchema, Map visitors); static class BooleanSchemaConverter extends FromJsonSchemaConverter { + BooleanSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + super(factory, config); + } + @Override protected SchemaBuilder schemaBuilder(BooleanSchema schema) { return SchemaBuilder.bool(); @@ -139,6 +88,10 @@ protected void fromJSON(SchemaBuilder builder, BooleanSchema jsonSchema, Map { + ObjectSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + super(factory, config); + } + @Override protected SchemaBuilder schemaBuilder(ObjectSchema schema) { return SchemaBuilder.struct(); @@ -154,20 +107,25 @@ protected FromJsonVisitor jsonVisitor(Schema connectSchema, return new FromJsonVisitor.StructVisitor(connectSchema, visitors); } - @Override protected void fromJSON(SchemaBuilder builder, ObjectSchema jsonSchema, Map visitors) { Set requiredProperties = ImmutableSet.copyOf(jsonSchema.getRequiredProperties()); jsonSchema.getPropertySchemas() .entrySet() .stream() + .filter(e -> { + String schemaLocation = e.getValue().getSchemaLocation(); + boolean result = !this.config.excludeLocations.contains(schemaLocation); + log.trace("fromJson() - filtering '{}' location='{}' result = '{}'", e.getKey(), e.getValue().getSchemaLocation(), result); + return result; + }) .sorted(Map.Entry.comparingByKey()) .forEach(e -> { final String propertyName = e.getKey(); final org.everit.json.schema.Schema propertyJsonSchema = e.getValue(); final boolean isOptional = !requiredProperties.contains(propertyName); log.trace("fromJson() - Processing property '{}' '{}'", propertyName, propertyJsonSchema); - FromJsonState state = FromJsonSchemaConverter.fromJSON(propertyJsonSchema, isOptional); + FromJsonState state = this.factory.fromJSON(propertyJsonSchema, isOptional); builder.field(propertyName, state.schema); visitors.put(propertyName, state.visitor); }); @@ -176,6 +134,10 @@ protected void fromJSON(SchemaBuilder builder, ObjectSchema jsonSchema, Map { + IntegerSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + super(factory, config); + } + @Override protected SchemaBuilder schemaBuilder(NumberSchema schema) { return SchemaBuilder.int64(); @@ -201,6 +163,10 @@ protected void fromJSON(SchemaBuilder builder, NumberSchema jsonSchema, Map { + FloatSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + super(factory, config); + } + @Override protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { return new FromJsonVisitor.FloatVisitor(connectSchema); @@ -226,6 +192,10 @@ protected void fromJSON(SchemaBuilder builder, NumberSchema jsonSchema, Map { + StringSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + super(factory, config); + } + @Override protected SchemaBuilder schemaBuilder(StringSchema schema) { return SchemaBuilder.string(); @@ -249,6 +219,10 @@ protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map { + DateSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + super(factory, config); + } + @Override protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { return new FromJsonVisitor.DateVisitor(connectSchema); @@ -274,6 +248,10 @@ protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map { + TimeSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + super(factory, config); + } + @Override protected SchemaBuilder schemaBuilder(StringSchema schema) { return Time.builder(); @@ -299,6 +277,10 @@ protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map { + DateTimeSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + super(factory, config); + } + @Override protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { return new FromJsonVisitor.DateTimeVisitor(connectSchema); @@ -324,6 +306,10 @@ protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map { + BytesSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + super(factory, config); + } + @Override protected SchemaBuilder schemaBuilder(StringSchema schema) { return SchemaBuilder.bytes(); @@ -348,8 +334,8 @@ protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map { - public DecimalSchemaConverter() { - + public DecimalSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + super(factory, config); } @Override @@ -379,9 +365,13 @@ protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map { + ArraySchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + super(factory, config); + } + @Override protected SchemaBuilder schemaBuilder(ArraySchema schema) { - FromJsonState state = FromJsonSchemaConverter.fromJSON(schema.getAllItemSchema()); + FromJsonState state = this.factory.fromJSON(schema.getAllItemSchema()); return SchemaBuilder.array(state.schema); } @@ -398,13 +388,17 @@ protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map @Override protected void fromJSON(SchemaBuilder builder, ArraySchema jsonSchema, Map visitors) { - FromJsonState state = FromJsonSchemaConverter.fromJSON(jsonSchema.getAllItemSchema()); + FromJsonState state = this.factory.fromJSON(jsonSchema.getAllItemSchema()); visitors.put("item", state.visitor); } } static class CustomTimestampConverter extends FromJsonSchemaConverter { + CustomTimestampConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) { + super(factory, config); + } + @Override protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { return new FromJsonVisitor.CustomDateVisitor(connectSchema); diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterFactory.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterFactory.java new file mode 100644 index 0000000..bfb2d2f --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterFactory.java @@ -0,0 +1,128 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.everit.json.schema.CombinedSchema; +import org.everit.json.schema.NullSchema; +import org.everit.json.schema.ReferenceSchema; +import org.everit.json.schema.Schema; +import org.everit.json.schema.StringSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class FromJsonSchemaConverterFactory { + private static final Logger log = LoggerFactory.getLogger(FromJsonSchemaConverterFactory.class); + private final Map< + FromJsonConversionKey, + FromJsonSchemaConverter + > lookup; + private final JsonConfig config; + private final FromJsonConversionKey genericStringKey; + + public FromJsonSchemaConverterFactory(JsonConfig config) { + this.config = config; + FromJsonSchemaConverter.StringSchemaConverter stringSchemaConverter = new FromJsonSchemaConverter.StringSchemaConverter(this, config); + genericStringKey = stringSchemaConverter.key(); + lookup = Stream.of( + stringSchemaConverter, + new FromJsonSchemaConverter.ObjectSchemaConverter(this, config), + new FromJsonSchemaConverter.IntegerSchemaConverter(this, config), + new FromJsonSchemaConverter.BooleanSchemaConverter(this, config), + new FromJsonSchemaConverter.TimeSchemaConverter(this, config), + new FromJsonSchemaConverter.DateSchemaConverter(this, config), + new FromJsonSchemaConverter.DateTimeSchemaConverter(this, config), + new FromJsonSchemaConverter.FloatSchemaConverter(this, config), + new FromJsonSchemaConverter.ArraySchemaConverter(this, config), + new FromJsonSchemaConverter.BytesSchemaConverter(this, config), + new FromJsonSchemaConverter.DecimalSchemaConverter(this, config), + new FromJsonSchemaConverter.CustomTimestampConverter(this, config) + ).collect(Collectors.toMap(FromJsonSchemaConverter::key, c -> c)); + } + + public FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema) { + return fromJSON(jsonSchema, false); + } + + public FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, boolean isOptional) { + if (jsonSchema instanceof ReferenceSchema) { + ReferenceSchema referenceSchema = (ReferenceSchema) jsonSchema; + jsonSchema = referenceSchema.getReferredSchema(); + } else if (jsonSchema instanceof CombinedSchema) { + CombinedSchema combinedSchema = (CombinedSchema) jsonSchema; + List nonNullSubSchemas = combinedSchema + .getSubschemas() + .stream() + .filter(s -> !(s instanceof NullSchema)) + .collect(Collectors.toList()); + if (1 != nonNullSubSchemas.size()) { + throw new UnsupportedOperationException( + String.format( + "More than one choice for non null schemas. Schema location %s: %s", + jsonSchema.getSchemaLocation(), + Joiner.on(", ").join(nonNullSubSchemas) + ) + ); + } + jsonSchema = nonNullSubSchemas.get(0); + } + FromJsonConversionKey key = FromJsonConversionKey.of(jsonSchema); + + FromJsonSchemaConverter converter = lookup.get(key); + + if (null == converter && jsonSchema instanceof StringSchema) { + log.trace("fromJSON() - falling back to string passthrough for {}", jsonSchema); + converter = lookup.get(genericStringKey); + } + + if (null == converter) { + throw new UnsupportedOperationException( + String.format("Schema type is not supported. %s:%s", jsonSchema.getClass().getName(), jsonSchema) + ); + } + + SchemaBuilder builder = converter.schemaBuilder(jsonSchema); + if (!Strings.isNullOrEmpty(jsonSchema.getTitle())) { + builder.name(cleanName(jsonSchema.getTitle())); + } + if (!Strings.isNullOrEmpty(jsonSchema.getDescription())) { + builder.doc(jsonSchema.getDescription()); + } + if (isOptional) { + builder.optional(); + } + Map visitors = new LinkedHashMap<>(); + converter.fromJSON(builder, jsonSchema, visitors); + org.apache.kafka.connect.data.Schema schema = builder.build(); + FromJsonVisitor visitor = converter.jsonVisitor(schema, visitors); + return FromJsonState.of(jsonSchema, schema, visitor); + } + + private String cleanName(String title) { + String result = title.replaceAll("[\\/]+", "."); + return result; + } + +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonConfig.java new file mode 100644 index 0000000..4ed43e3 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonConfig.java @@ -0,0 +1,110 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; +import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils; +import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.recommenders.Recommenders; +import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.net.URL; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +class JsonConfig extends AbstractConfig { + public static final String SCHEMA_URL_CONF = "json.schema.url"; + public static final String SCHEMA_INLINE_CONF = "json.schema.inline"; + public static final String SCHEMA_LOCATION_CONF = "json.schema.location"; + public static final String VALIDATE_JSON_ENABLED_CONF = "json.schema.validation.enabled"; + public static final String EXCLUDE_LOCATIONS_CONF = "json.exclude.locations"; + static final String SCHEMA_URL_DOC = "Url to retrieve the schema from. Urls can be anything that is " + + "supported by URL.openStream(). For example the local filesystem file:///schemas/something.json. " + + "A web address https://www.schemas.com/something.json"; + static final String SCHEMA_INLINE_DOC = "The JSON schema to use as an escaped string."; + static final String SCHEMA_LOCATION_DOC = "Location to retrieve the schema from. " + + ConfigUtils.enumDescription(SchemaLocation.class); + static final String VALIDATE_JSON_ENABLED_DOC = "Flag to determine if the JSON should be validated " + + "against the schema."; + static final String EXCLUDE_LOCATIONS_DOC = "Location(s) in the schema to exclude. This is primarily " + + "because connect cannot support those locations. For example types that would require a union type."; + + public JsonConfig(ConfigDef definition, Map originals) { + super(definition, originals); + this.schemaUrl = ConfigUtils.url(this, SCHEMA_URL_CONF); + this.schemaLocation = ConfigUtils.getEnum(SchemaLocation.class, this, SCHEMA_LOCATION_CONF); + this.schemaText = getString(SCHEMA_INLINE_CONF); + this.validateJson = getBoolean(VALIDATE_JSON_ENABLED_CONF); + this.excludeLocations = ConfigUtils.getSet(this, EXCLUDE_LOCATIONS_CONF); + + } + + + public enum SchemaLocation { + @Description("Loads the schema from the url specified in `" + SCHEMA_URL_CONF + "`.") + Url, + @Description("Loads the schema from `" + SCHEMA_INLINE_CONF + "` as an inline string.") + Inline + } + + public final URL schemaUrl; + public final SchemaLocation schemaLocation; + public final String schemaText; + public final boolean validateJson; + public final Set excludeLocations; + + public static ConfigDef config() { + return new ConfigDef().define( + ConfigKeyBuilder.of(SCHEMA_URL_CONF, ConfigDef.Type.STRING) + .documentation(SCHEMA_URL_DOC) + .validator(Validators.validUrl()) + .importance(ConfigDef.Importance.HIGH) + .recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Url.toString())) + .defaultValue("File:///doesNotExist") + .build() + ).define( + ConfigKeyBuilder.of(SCHEMA_LOCATION_CONF, ConfigDef.Type.STRING) + .documentation(SCHEMA_LOCATION_DOC) + .validator(Validators.validEnum(SchemaLocation.class)) + .recommender(Recommenders.enumValues(SchemaLocation.class)) + .importance(ConfigDef.Importance.HIGH) + .defaultValue(SchemaLocation.Url.toString()) + .build() + ).define( + ConfigKeyBuilder.of(VALIDATE_JSON_ENABLED_CONF, ConfigDef.Type.BOOLEAN) + .documentation(VALIDATE_JSON_ENABLED_DOC) + .importance(ConfigDef.Importance.MEDIUM) + .defaultValue(false) + .build() + ).define( + ConfigKeyBuilder.of(SCHEMA_INLINE_CONF, ConfigDef.Type.STRING) + .documentation(SCHEMA_INLINE_DOC) + .recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Inline.toString())) + .importance(ConfigDef.Importance.HIGH) + .defaultValue("") + .build() + ).define( + ConfigKeyBuilder.of(EXCLUDE_LOCATIONS_CONF, ConfigDef.Type.LIST) + .documentation(EXCLUDE_LOCATIONS_DOC) + .defaultValue(Collections.EMPTY_LIST) + .importance(ConfigDef.Importance.LOW) + .build() + ); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverter.java index 7afadda..827d9ef 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverter.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverter.java @@ -45,6 +45,7 @@ public class JsonSchemaConverter implements Converter { String jsonSchemaHeader; Charset encodingCharset; ObjectMapper objectMapper; + FromJsonSchemaConverterFactory fromJsonSchemaConverterFactory; Map fromConnectStateLookup = new HashMap<>(); Map toConnectStateLookup = new HashMap<>(); Header fallbackHeader; @@ -55,24 +56,25 @@ public void configure(Map settings, boolean isKey) { this.jsonSchemaHeader = isKey ? KEY_HEADER : VALUE_HEADER; this.encodingCharset = Charsets.UTF_8; this.objectMapper = JacksonFactory.create(); + this.fromJsonSchemaConverterFactory = new FromJsonSchemaConverterFactory(config); if (this.config.insertSchema) { byte[] headerValue; - if (FromJsonConfig.SchemaLocation.Url == this.config.schemaLocation) { + if (JsonConfig.SchemaLocation.Url == this.config.schemaLocation) { try { try (InputStream inputStream = this.config.schemaUrl.openStream()) { headerValue = ByteStreams.toByteArray(inputStream); } } catch (IOException e) { - ConfigException exception = new ConfigException(FromJsonConfig.SCHEMA_URL_CONF, this.config.schemaUrl, "exception while loading schema"); + ConfigException exception = new ConfigException(JsonConfig.SCHEMA_URL_CONF, this.config.schemaUrl, "exception while loading schema"); exception.initCause(e); throw exception; } - } else if (FromJsonConfig.SchemaLocation.Inline == this.config.schemaLocation) { + } else if (JsonConfig.SchemaLocation.Inline == this.config.schemaLocation) { headerValue = this.jsonSchemaHeader.getBytes(Charsets.UTF_8); } else { throw new ConfigException( - FromJsonConfig.SCHEMA_LOCATION_CONF, + JsonConfig.SCHEMA_LOCATION_CONF, this.config.schemaLocation.toString(), "Location is not supported" ); @@ -147,7 +149,7 @@ public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) .toString(); FromJsonState state = this.toConnectStateLookup.computeIfAbsent(hash, h -> { org.everit.json.schema.Schema schema = Utils.loadSchema(schemaHeader); - return FromJsonSchemaConverter.fromJSON(schema); + return this.fromJsonSchemaConverterFactory.fromJSON(schema); }); JsonNode jsonNode; diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverterConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverterConfig.java index 8bdb967..536b14a 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverterConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverterConfig.java @@ -16,17 +16,11 @@ package com.github.jcustenborder.kafka.connect.json; import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; -import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import java.net.URL; import java.util.Map; -class JsonSchemaConverterConfig extends AbstractConfig { - public final URL schemaUrl; - public final FromJsonConfig.SchemaLocation schemaLocation; - public final String schemaText; +class JsonSchemaConverterConfig extends JsonConfig { public final boolean insertSchema; public final static String INSERT_SCHEMA_ENABLED_CONF = "json.insert.schema.enabled"; @@ -36,22 +30,17 @@ class JsonSchemaConverterConfig extends AbstractConfig { public JsonSchemaConverterConfig(Map originals) { super(config(), originals); - this.schemaUrl = ConfigUtils.url(this, FromJsonConfig.SCHEMA_URL_CONF); - this.schemaLocation = ConfigUtils.getEnum(FromJsonConfig.SchemaLocation.class, this, FromJsonConfig.SCHEMA_LOCATION_CONF); - this.schemaText = getString(FromJsonConfig.SCHEMA_INLINE_CONF); this.insertSchema = getBoolean(INSERT_SCHEMA_ENABLED_CONF); } public static ConfigDef config() { - ConfigDef configDef = new ConfigDef(); - configDef.define( - ConfigKeyBuilder.of(INSERT_SCHEMA_ENABLED_CONF, ConfigDef.Type.BOOLEAN) - .documentation(INSERT_SCHEMA_ENABLED_DOC) - .importance(ConfigDef.Importance.HIGH) - .defaultValue(false) - .build() - ); - FromJsonConfig.addConfigItems(configDef); - return configDef; + return JsonConfig.config() + .define( + ConfigKeyBuilder.of(INSERT_SCHEMA_ENABLED_CONF, ConfigDef.Type.BOOLEAN) + .documentation(INSERT_SCHEMA_ENABLED_DOC) + .importance(ConfigDef.Importance.HIGH) + .defaultValue(false) + .build() + ); } } diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterTest.java index ff75b1c..8769f01 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterTest.java @@ -15,6 +15,7 @@ */ package com.github.jcustenborder.kafka.connect.json; +import com.google.common.collect.ImmutableMap; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -26,14 +27,39 @@ import org.everit.json.schema.loader.SchemaLoader; import org.json.JSONObject; import org.json.JSONTokener; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; import static com.github.jcustenborder.kafka.connect.utils.AssertSchema.assertSchema; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.DynamicTest.dynamicTest; public class FromJsonSchemaConverterTest { + private static final Logger log = LoggerFactory.getLogger(FromJsonSchemaConverterTest.class); + + JsonConfig config; + FromJsonSchemaConverterFactory factory; + + @BeforeEach + public void before() { + config = new FromJsonConfig(ImmutableMap.of( + FromJsonConfig.SCHEMA_INLINE_CONF, "\"string\"", + FromJsonConfig.SCHEMA_LOCATION_CONF, JsonConfig.SchemaLocation.Inline.toString(), + JsonConfig.EXCLUDE_LOCATIONS_CONF, "#/properties/log_params" + )); + this.factory = new FromJsonSchemaConverterFactory(config); + } + org.everit.json.schema.Schema jsonSchema(String type) { JSONObject rawSchema = new JSONObject(); @@ -57,7 +83,7 @@ org.everit.json.schema.Schema jsonSchema(String type, String key1, String value1 } void assertJsonSchema(org.apache.kafka.connect.data.Schema expected, org.everit.json.schema.Schema input) { - FromJsonState state = FromJsonSchemaConverter.fromJSON(input); + FromJsonState state = this.factory.fromJSON(input); assertSchema(expected, state.schema); } @@ -131,6 +157,19 @@ public void productSchema() throws IOException { assertJsonSchema(expected, jsonSchema); } + @Test + public void wikiMediaRecentChangeSchema() throws IOException { + org.everit.json.schema.Schema jsonSchema = loadSchema("SchemaConverterTest/wikimedia.recentchange.schema.json"); + Schema expected = SchemaBuilder.struct() + .name("mediawiki.recentchange") + .doc("Represents a MW RecentChange event. https://www.mediawiki.org/wiki/Manual:RCFeed\n") + .field("price", SchemaBuilder.float64().doc("The price of the product").build()) + .field("productId", SchemaBuilder.int64().doc("The unique identifier for a product").build()) + .field("productName", SchemaBuilder.string().doc("Name of the product").build()) + .build(); + assertNotNull(expected); + } + @Test public void nested() throws IOException { org.everit.json.schema.Schema jsonSchema = loadSchema("SchemaConverterTest/nested.schema.json"); @@ -158,5 +197,31 @@ public void array() { assertJsonSchema(SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(), jsonSchema); } + @TestFactory + public Stream stringFormats() { + Map formats = new LinkedHashMap<>(); + formats.put("email", "test@example.com"); + formats.put("idn-email", "test@example.com"); + formats.put("hostname", "example.com"); + formats.put("idn-hostname", "example.com"); + formats.put("ipv4", "127.0.0.1"); + formats.put("ipv6", "::1"); + formats.put("uri", "http://example.com"); + formats.put("uri-reference", "http://example.com"); + formats.put("iri", "http://example.com"); + formats.put("iri-reference", "http://example.com"); + formats.put("uri-template", "http://example.com/~{username}/"); + + return formats.entrySet() + .stream() + .map(e -> dynamicTest(e.getKey(), () -> { + JSONObject rawSchema = new JSONObject() + .put("type", "string") + .put("format", e.getKey()); + log.trace("schema = '{}'", rawSchema); + org.everit.json.schema.Schema jsonSchema = TestUtils.jsonSchema(rawSchema); + assertJsonSchema(Schema.STRING_SCHEMA, jsonSchema); + })); + } } diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonTest.java index ee020b7..80f6f52 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonTest.java @@ -41,7 +41,7 @@ public void basic() throws IOException { )); File schemaFile = new File("src/test/resources/com/github/jcustenborder/kafka/connect/json/basic.schema.json"); Map settings = ImmutableMap.of( - FromJsonConfig.SCHEMA_URL_CONF, schemaFile.toURI().toString() + JsonConfig.SCHEMA_URL_CONF, schemaFile.toURI().toString() ); this.transform.configure(settings); SinkRecord inputRecord = SinkRecordHelper.write("foo", new SchemaAndValue(Schema.STRING_SCHEMA, "foo"), new SchemaAndValue(Schema.BYTES_SCHEMA, input)); @@ -60,7 +60,7 @@ public void customdate() throws IOException { )); File schemaFile = new File("src/test/resources/com/github/jcustenborder/kafka/connect/json/customdate.schema.json"); Map settings = ImmutableMap.of( - FromJsonConfig.SCHEMA_URL_CONF, schemaFile.toURI().toString() + JsonConfig.SCHEMA_URL_CONF, schemaFile.toURI().toString() ); this.transform.configure(settings); SinkRecord inputRecord = SinkRecordHelper.write("foo", new SchemaAndValue(Schema.STRING_SCHEMA, "foo"), new SchemaAndValue(Schema.BYTES_SCHEMA, input)); @@ -79,8 +79,8 @@ public void validate() throws IOException { )); File schemaFile = new File("src/test/resources/com/github/jcustenborder/kafka/connect/json/geo.schema.json"); Map settings = ImmutableMap.of( - FromJsonConfig.SCHEMA_URL_CONF, schemaFile.toURI().toString(), - FromJsonConfig.VALIDATE_JSON_ENABLED_CONF, "true" + JsonConfig.SCHEMA_URL_CONF, schemaFile.toURI().toString(), + JsonConfig.VALIDATE_JSON_ENABLED_CONF, "true" ); this.transform.configure(settings); SinkRecord inputRecord = SinkRecordHelper.write("foo", new SchemaAndValue(Schema.STRING_SCHEMA, "foo"), new SchemaAndValue(Schema.BYTES_SCHEMA, input)); @@ -91,6 +91,21 @@ public void validate() throws IOException { assertTrue(exception.getMessage().contains("required key [latitude] not found")); assertTrue(exception.getMessage().contains("required key [longitude] not found")); } + @Test + public void wikiMediaRecentChange() throws IOException { + byte[] input = ByteStreams.toByteArray(this.getClass().getResourceAsStream( + "wikimedia.recentchange.data.json" + )); + File schemaFile = new File("src/test/resources/com/github/jcustenborder/kafka/connect/json/wikimedia.recentchange.schema.json"); + Map settings = ImmutableMap.of( + JsonConfig.SCHEMA_URL_CONF, schemaFile.toURI().toString(), + JsonConfig.VALIDATE_JSON_ENABLED_CONF, "true", + JsonConfig.EXCLUDE_LOCATIONS_CONF, "#/properties/log_params" + ); + this.transform.configure(settings); + SinkRecord inputRecord = SinkRecordHelper.write("foo", new SchemaAndValue(Schema.STRING_SCHEMA, "foo"), new SchemaAndValue(Schema.BYTES_SCHEMA, input)); + assertNotNull(inputRecord); + } @Test public void foo() { diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/json/SchemaConverterTest/wikimedia.recentchange.schema.json b/src/test/resources/com/github/jcustenborder/kafka/connect/json/SchemaConverterTest/wikimedia.recentchange.schema.json new file mode 100644 index 0000000..28de517 --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/json/SchemaConverterTest/wikimedia.recentchange.schema.json @@ -0,0 +1,196 @@ +{ + "title": "mediawiki/recentchange", + "description": "Represents a MW RecentChange event. https://www.mediawiki.org/wiki/Manual:RCFeed\n", + "$id": "/mediawiki/recentchange/1.0.0", + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": true, + "required": [ + "$schema", + "meta" + ], + "properties": { + "$schema": { + "type": "string", + "description": "A URI identifying the JSONSchema for this event. This should match an schema's $id in a schema repository. E.g. /schema_name/1.0.0\n" + }, + "meta": { + "type": "object", + "required": [ + "id", + "dt", + "stream" + ], + "properties": { + "uri": { + "type": "string", + "format": "uri-reference", + "maxLength": 8192, + "description": "Unique URI identifying the event or entity" + }, + "request_id": { + "type": "string", + "description": "Unique ID of the request that caused the event" + }, + "id": { + "type": "string", + "pattern": "^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$", + "maxLength": 36, + "description": "Unique ID of this event" + }, + "dt": { + "type": "string", + "format": "date-time", + "maxLength": 128, + "description": "Event datetime, in ISO-8601 format" + }, + "domain": { + "type": "string", + "description": "Domain the event or entity pertains to", + "minLength": 1 + }, + "stream": { + "type": "string", + "description": "Name of the stream/queue/dataset that this event belongs in", + "minLength": 1 + } + } + }, + "id": { + "description": "ID of the recentchange event (rcid).", + "type": [ + "integer", + "null" + ] + }, + "type": { + "description": "Type of recentchange event (rc_type). One of \"edit\", \"new\", \"log\", \"categorize\", or \"external\". (See Manual:Recentchanges table#rc_type)\n", + "type": "string" + }, + "title": { + "description": "Full page name, from Title::getPrefixedText.", + "type": "string" + }, + "namespace": { + "description": "ID of relevant namespace of affected page (rc_namespace, page_namespace). This is -1 (\"Special\") for log events.\n", + "type": "integer" + }, + "comment": { + "description": "(rc_comment)", + "type": "string" + }, + "parsedcomment": { + "description": "The rc_comment parsed into simple HTML. Optional", + "type": "string" + }, + "timestamp": { + "description": "Unix timestamp (derived from rc_timestamp).", + "type": "integer" + }, + "user": { + "description": "(rc_user_text)", + "type": "string" + }, + "bot": { + "description": "(rc_bot)", + "type": "boolean" + }, + "server_url": { + "description": "$wgCanonicalServer", + "type": "string" + }, + "server_name": { + "description": "$wgServerName", + "type": "string" + }, + "server_script_path": { + "description": "$wgScriptPath", + "type": "string" + }, + "wiki": { + "description": "wfWikiID ($wgDBprefix, $wgDBname)", + "type": "string" + }, + "minor": { + "description": "(rc_minor).", + "type": "boolean" + }, + "patrolled": { + "description": "(rc_patrolled). This property only exists if patrolling is supported for this event (based on $wgUseRCPatrol, $wgUseNPPatrol).\n", + "type": "boolean" + }, + "length": { + "description": "Length of old and new change", + "type": "object", + "properties": { + "old": { + "description": "(rc_old_len)", + "type": [ + "integer", + "null" + ] + }, + "new": { + "description": "(rc_new_len)", + "type": [ + "integer", + "null" + ] + } + } + }, + "revision": { + "description": "Old and new revision IDs", + "type": "object", + "properties": { + "new": { + "description": "(rc_last_oldid)", + "type": [ + "integer", + "null" + ] + }, + "old": { + "description": "(rc_this_oldid)", + "type": [ + "integer", + "null" + ] + } + } + }, + "log_id": { + "description": "(rc_log_id)", + "type": [ + "integer", + "null" + ] + }, + "log_type": { + "description": "(rc_log_type)", + "type": [ + "string", + "null" + ] + }, + "log_action": { + "description": "(rc_log_action)", + "type": "string" + }, + "log_params": { + "description": "Property only exists if event has rc_params.", + "type": [ + "array", + "object", + "string" + ], + "additionalProperties": true + }, + "log_action_comment": { + "type": [ + "string", + "null" + ] + } + } +} \ No newline at end of file diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/json/wikimedia.recentchange.data.json b/src/test/resources/com/github/jcustenborder/kafka/connect/json/wikimedia.recentchange.data.json new file mode 100644 index 0000000..873e2e2 --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/json/wikimedia.recentchange.data.json @@ -0,0 +1,36 @@ +{ + "$schema": "/mediawiki/recentchange/1.0.0", + "meta": { + "uri": "https://sl.wikipedia.org/wiki/Iztrebek", + "request_id": "61a2da40-3fea-450f-8404-8ed5e98e784a", + "id": "5fe0b22e-1ed1-4acd-9e06-07a39ef800e7", + "dt": "2020-05-14T16:24:27Z", + "domain": "sl.wikipedia.org", + "stream": "mediawiki.recentchange", + "topic": "eqiad.mediawiki.recentchange", + "partition": 0, + "offset": 2403979241 + }, + "id": 21345744, + "type": "edit", + "namespace": 0, + "title": "Iztrebek", + "comment": "vrnitev sprememb uporabnika [[Special:Contributions/176.76.241.31|176.76.241.31]] ([[User talk:176.76.241.31|pogovor]]) na zadnje urejanje uporabnika [[User:Samuele2002|Samuele2002]]", + "timestamp": 1589473467, + "user": "Upwinxp", + "bot": false, + "minor": true, + "length": { + "old": 561, + "new": 1061 + }, + "revision": { + "old": 5320868, + "new": 5320869 + }, + "server_url": "https://sl.wikipedia.org", + "server_name": "sl.wikipedia.org", + "server_script_path": "/w", + "wiki": "slwiki", + "parsedcomment": "vrnitev sprememb uporabnika 176.76.241.31 (pogovor) na zadnje urejanje uporabnika Samuele2002" +} \ No newline at end of file diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/json/wikimedia.recentchange.schema.json b/src/test/resources/com/github/jcustenborder/kafka/connect/json/wikimedia.recentchange.schema.json new file mode 100644 index 0000000..28de517 --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/json/wikimedia.recentchange.schema.json @@ -0,0 +1,196 @@ +{ + "title": "mediawiki/recentchange", + "description": "Represents a MW RecentChange event. https://www.mediawiki.org/wiki/Manual:RCFeed\n", + "$id": "/mediawiki/recentchange/1.0.0", + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": true, + "required": [ + "$schema", + "meta" + ], + "properties": { + "$schema": { + "type": "string", + "description": "A URI identifying the JSONSchema for this event. This should match an schema's $id in a schema repository. E.g. /schema_name/1.0.0\n" + }, + "meta": { + "type": "object", + "required": [ + "id", + "dt", + "stream" + ], + "properties": { + "uri": { + "type": "string", + "format": "uri-reference", + "maxLength": 8192, + "description": "Unique URI identifying the event or entity" + }, + "request_id": { + "type": "string", + "description": "Unique ID of the request that caused the event" + }, + "id": { + "type": "string", + "pattern": "^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$", + "maxLength": 36, + "description": "Unique ID of this event" + }, + "dt": { + "type": "string", + "format": "date-time", + "maxLength": 128, + "description": "Event datetime, in ISO-8601 format" + }, + "domain": { + "type": "string", + "description": "Domain the event or entity pertains to", + "minLength": 1 + }, + "stream": { + "type": "string", + "description": "Name of the stream/queue/dataset that this event belongs in", + "minLength": 1 + } + } + }, + "id": { + "description": "ID of the recentchange event (rcid).", + "type": [ + "integer", + "null" + ] + }, + "type": { + "description": "Type of recentchange event (rc_type). One of \"edit\", \"new\", \"log\", \"categorize\", or \"external\". (See Manual:Recentchanges table#rc_type)\n", + "type": "string" + }, + "title": { + "description": "Full page name, from Title::getPrefixedText.", + "type": "string" + }, + "namespace": { + "description": "ID of relevant namespace of affected page (rc_namespace, page_namespace). This is -1 (\"Special\") for log events.\n", + "type": "integer" + }, + "comment": { + "description": "(rc_comment)", + "type": "string" + }, + "parsedcomment": { + "description": "The rc_comment parsed into simple HTML. Optional", + "type": "string" + }, + "timestamp": { + "description": "Unix timestamp (derived from rc_timestamp).", + "type": "integer" + }, + "user": { + "description": "(rc_user_text)", + "type": "string" + }, + "bot": { + "description": "(rc_bot)", + "type": "boolean" + }, + "server_url": { + "description": "$wgCanonicalServer", + "type": "string" + }, + "server_name": { + "description": "$wgServerName", + "type": "string" + }, + "server_script_path": { + "description": "$wgScriptPath", + "type": "string" + }, + "wiki": { + "description": "wfWikiID ($wgDBprefix, $wgDBname)", + "type": "string" + }, + "minor": { + "description": "(rc_minor).", + "type": "boolean" + }, + "patrolled": { + "description": "(rc_patrolled). This property only exists if patrolling is supported for this event (based on $wgUseRCPatrol, $wgUseNPPatrol).\n", + "type": "boolean" + }, + "length": { + "description": "Length of old and new change", + "type": "object", + "properties": { + "old": { + "description": "(rc_old_len)", + "type": [ + "integer", + "null" + ] + }, + "new": { + "description": "(rc_new_len)", + "type": [ + "integer", + "null" + ] + } + } + }, + "revision": { + "description": "Old and new revision IDs", + "type": "object", + "properties": { + "new": { + "description": "(rc_last_oldid)", + "type": [ + "integer", + "null" + ] + }, + "old": { + "description": "(rc_this_oldid)", + "type": [ + "integer", + "null" + ] + } + } + }, + "log_id": { + "description": "(rc_log_id)", + "type": [ + "integer", + "null" + ] + }, + "log_type": { + "description": "(rc_log_type)", + "type": [ + "string", + "null" + ] + }, + "log_action": { + "description": "(rc_log_action)", + "type": "string" + }, + "log_params": { + "description": "Property only exists if event has rc_params.", + "type": [ + "array", + "object", + "string" + ], + "additionalProperties": true + }, + "log_action_comment": { + "type": [ + "string", + "null" + ] + } + } +} \ No newline at end of file