Skip to content

Commit

Permalink
Added support to exclude properties. Added fall through case for Stri…
Browse files Browse the repository at this point in the history
…ngs to just be passed as a string. Fixes #8. (#9)
  • Loading branch information
jcustenborder authored May 18, 2020
1 parent 239c64f commit 7960bbc
Show file tree
Hide file tree
Showing 12 changed files with 836 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,36 @@ protected SchemaAndValue processString(R record, Schema inputSchema, String inpu
}

FromJsonState fromJsonState;
FromJsonSchemaConverterFactory fromJsonSchemaConverterFactory;
ObjectMapper objectMapper;

@Override
public void configure(Map<String, ?> map) {
this.config = new FromJsonConfig(map);
this.fromJsonSchemaConverterFactory = new FromJsonSchemaConverterFactory(config);

org.everit.json.schema.Schema schema;
if (FromJsonConfig.SchemaLocation.Url == this.config.schemaLocation) {
if (JsonConfig.SchemaLocation.Url == this.config.schemaLocation) {
try {
try (InputStream inputStream = this.config.schemaUrl.openStream()) {
schema = Utils.loadSchema(inputStream);
}
} catch (IOException e) {
ConfigException exception = new ConfigException(FromJsonConfig.SCHEMA_URL_CONF, this.config.schemaUrl, "exception while loading schema");
ConfigException exception = new ConfigException(JsonConfig.SCHEMA_URL_CONF, this.config.schemaUrl, "exception while loading schema");
exception.initCause(e);
throw exception;
}
} else if (FromJsonConfig.SchemaLocation.Inline == this.config.schemaLocation) {
} else if (JsonConfig.SchemaLocation.Inline == this.config.schemaLocation) {
schema = Utils.loadSchema(this.config.schemaText);
} else {
throw new ConfigException(
FromJsonConfig.SCHEMA_LOCATION_CONF,
JsonConfig.SCHEMA_LOCATION_CONF,
this.config.schemaLocation.toString(),
"Location is not supported"
);
}

this.fromJsonState = FromJsonSchemaConverter.fromJSON(schema);
this.fromJsonState = this.fromJsonSchemaConverterFactory.fromJSON(schema);
this.objectMapper = JacksonFactory.create();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,88 +15,18 @@
*/
package com.github.jcustenborder.kafka.connect.json;

import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils;
import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.recommenders.Recommenders;
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import java.net.URL;
import java.util.Map;

class FromJsonConfig extends AbstractConfig {
public static final String SCHEMA_URL_CONF = "json.schema.url";
static final String SCHEMA_URL_DOC = "Url to retrieve the schema from. Urls can be anything that is " +
"supported by URL.openStream(). For example the local filesystem file:///schemas/something.json. " +
"A web address https://www.schemas.com/something.json";
public static final String SCHEMA_INLINE_CONF = "json.schema.inline";
static final String SCHEMA_INLINE_DOC = "The JSON schema to use as an escaped string.";
public static final String SCHEMA_LOCATION_CONF = "json.schema.location";
static final String SCHEMA_LOCATION_DOC = "Location to retrieve the schema from. " +
ConfigUtils.enumDescription(SchemaLocation.class);

public static final String VALIDATE_JSON_ENABLED_CONF = "json.schema.validation.enabled";
static final String VALIDATE_JSON_ENABLED_DOC = "Flag to determine if the JSON should be validated " +
"against the schema.";

public enum SchemaLocation {
@Description("Loads the schema from the url specified in `" + SCHEMA_URL_CONF + "`.")
Url,
@Description("Loads the schema from `" + SCHEMA_INLINE_CONF + "` as an inline string.")
Inline
}

public final URL schemaUrl;
public final SchemaLocation schemaLocation;
public final String schemaText;
public final boolean validateJson;
class FromJsonConfig extends JsonConfig {


public FromJsonConfig(Map<?, ?> originals) {
super(config(), originals);
this.schemaUrl = ConfigUtils.url(this, SCHEMA_URL_CONF);
this.schemaLocation = ConfigUtils.getEnum(SchemaLocation.class, this, SCHEMA_LOCATION_CONF);
this.schemaText = getString(SCHEMA_INLINE_CONF);
this.validateJson = getBoolean(VALIDATE_JSON_ENABLED_CONF);
}

public static void addConfigItems(ConfigDef configDef) {
configDef.define(
ConfigKeyBuilder.of(SCHEMA_URL_CONF, ConfigDef.Type.STRING)
.documentation(SCHEMA_URL_DOC)
.validator(Validators.validUrl())
.importance(ConfigDef.Importance.HIGH)
.recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Url.toString()))
.defaultValue("File:///doesNotExist")
.build()
).define(
ConfigKeyBuilder.of(SCHEMA_LOCATION_CONF, ConfigDef.Type.STRING)
.documentation(SCHEMA_LOCATION_DOC)
.validator(Validators.validEnum(SchemaLocation.class))
.recommender(Recommenders.enumValues(SchemaLocation.class))
.importance(ConfigDef.Importance.HIGH)
.defaultValue(SchemaLocation.Url.toString())
.build()
).define(
ConfigKeyBuilder.of(VALIDATE_JSON_ENABLED_CONF, ConfigDef.Type.BOOLEAN)
.documentation(VALIDATE_JSON_ENABLED_DOC)
.importance(ConfigDef.Importance.MEDIUM)
.defaultValue(false)
.build()
).define(
ConfigKeyBuilder.of(SCHEMA_INLINE_CONF, ConfigDef.Type.STRING)
.documentation(SCHEMA_INLINE_DOC)
.recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Inline.toString()))
.importance(ConfigDef.Importance.HIGH)
.defaultValue("")
.build());
}

public static ConfigDef config() {
ConfigDef configDef = new ConfigDef();
addConfigItems(configDef);
return configDef;
return JsonConfig.config();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
Expand All @@ -35,76 +34,22 @@
import org.everit.json.schema.BooleanSchema;
import org.everit.json.schema.NumberSchema;
import org.everit.json.schema.ObjectSchema;
import org.everit.json.schema.ReferenceSchema;
import org.everit.json.schema.StringSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public abstract class FromJsonSchemaConverter<T extends org.everit.json.schema.Schema, J extends JsonNode, V> {
static final Map<
FromJsonConversionKey,
FromJsonSchemaConverter<? extends org.everit.json.schema.Schema, ? extends JsonNode, ?>
> LOOKUP;
private static final Logger log = LoggerFactory.getLogger(FromJsonSchemaConverter.class);
protected final FromJsonSchemaConverterFactory factory;
protected final JsonConfig config;

static {
LOOKUP = Stream.of(
new ObjectSchemaConverter(),
new IntegerSchemaConverter(),
new StringSchemaConverter(),
new BooleanSchemaConverter(),
new TimeSchemaConverter(),
new DateSchemaConverter(),
new DateTimeSchemaConverter(),
new FloatSchemaConverter(),
new ArraySchemaConverter(),
new BytesSchemaConverter(),
new DecimalSchemaConverter(),
new CustomTimestampConverter()
).collect(Collectors.toMap(FromJsonSchemaConverter::key, c -> c));
}

public static FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema) {
return fromJSON(jsonSchema, false);
}

public static FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, boolean isOptional) {
if (jsonSchema instanceof ReferenceSchema) {
ReferenceSchema referenceSchema = (ReferenceSchema) jsonSchema;
jsonSchema = referenceSchema.getReferredSchema();
}
FromJsonConversionKey key = FromJsonConversionKey.of(jsonSchema);

FromJsonSchemaConverter converter = LOOKUP.get(key);

if (null == converter) {
throw new UnsupportedOperationException(
String.format("Schema type is not supported. %s:%s", jsonSchema.getClass().getName(), jsonSchema)
);
}

SchemaBuilder builder = converter.schemaBuilder(jsonSchema);
if (!Strings.isNullOrEmpty(jsonSchema.getTitle())) {
builder.name(jsonSchema.getTitle());
}
if (!Strings.isNullOrEmpty(jsonSchema.getDescription())) {
builder.doc(jsonSchema.getDescription());
}
if (isOptional) {
builder.optional();
}
Map<String, FromJsonVisitor> visitors = new LinkedHashMap<>();
converter.fromJSON(builder, jsonSchema, visitors);
Schema schema = builder.build();
FromJsonVisitor visitor = converter.jsonVisitor(schema, visitors);
return FromJsonState.of(jsonSchema, schema, visitor);
protected FromJsonSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
this.factory = factory;
this.config = config;
}

protected abstract SchemaBuilder schemaBuilder(T schema);
Expand All @@ -116,6 +61,10 @@ public static FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, b
protected abstract void fromJSON(SchemaBuilder builder, T jsonSchema, Map<String, FromJsonVisitor> visitors);

static class BooleanSchemaConverter extends FromJsonSchemaConverter<BooleanSchema, BooleanNode, Boolean> {
BooleanSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
super(factory, config);
}

@Override
protected SchemaBuilder schemaBuilder(BooleanSchema schema) {
return SchemaBuilder.bool();
Expand All @@ -139,6 +88,10 @@ protected void fromJSON(SchemaBuilder builder, BooleanSchema jsonSchema, Map<Str

static class ObjectSchemaConverter extends FromJsonSchemaConverter<ObjectSchema, ObjectNode, Struct> {

ObjectSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
super(factory, config);
}

@Override
protected SchemaBuilder schemaBuilder(ObjectSchema schema) {
return SchemaBuilder.struct();
Expand All @@ -154,20 +107,25 @@ protected FromJsonVisitor<ObjectNode, Struct> jsonVisitor(Schema connectSchema,
return new FromJsonVisitor.StructVisitor(connectSchema, visitors);
}


@Override
protected void fromJSON(SchemaBuilder builder, ObjectSchema jsonSchema, Map<String, FromJsonVisitor> visitors) {
Set<String> requiredProperties = ImmutableSet.copyOf(jsonSchema.getRequiredProperties());
jsonSchema.getPropertySchemas()
.entrySet()
.stream()
.filter(e -> {
String schemaLocation = e.getValue().getSchemaLocation();
boolean result = !this.config.excludeLocations.contains(schemaLocation);
log.trace("fromJson() - filtering '{}' location='{}' result = '{}'", e.getKey(), e.getValue().getSchemaLocation(), result);
return result;
})
.sorted(Map.Entry.comparingByKey())
.forEach(e -> {
final String propertyName = e.getKey();
final org.everit.json.schema.Schema propertyJsonSchema = e.getValue();
final boolean isOptional = !requiredProperties.contains(propertyName);
log.trace("fromJson() - Processing property '{}' '{}'", propertyName, propertyJsonSchema);
FromJsonState state = FromJsonSchemaConverter.fromJSON(propertyJsonSchema, isOptional);
FromJsonState state = this.factory.fromJSON(propertyJsonSchema, isOptional);
builder.field(propertyName, state.schema);
visitors.put(propertyName, state.visitor);
});
Expand All @@ -176,6 +134,10 @@ protected void fromJSON(SchemaBuilder builder, ObjectSchema jsonSchema, Map<Stri

static class IntegerSchemaConverter extends FromJsonSchemaConverter<NumberSchema, NumericNode, Number> {

IntegerSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
super(factory, config);
}

@Override
protected SchemaBuilder schemaBuilder(NumberSchema schema) {
return SchemaBuilder.int64();
Expand All @@ -201,6 +163,10 @@ protected void fromJSON(SchemaBuilder builder, NumberSchema jsonSchema, Map<Stri

static class FloatSchemaConverter extends FromJsonSchemaConverter<NumberSchema, NumericNode, Number> {

FloatSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
super(factory, config);
}

@Override
protected FromJsonVisitor<NumericNode, Number> jsonVisitor(Schema connectSchema, Map<String, FromJsonVisitor> visitors) {
return new FromJsonVisitor.FloatVisitor(connectSchema);
Expand All @@ -226,6 +192,10 @@ protected void fromJSON(SchemaBuilder builder, NumberSchema jsonSchema, Map<Stri

static class StringSchemaConverter extends FromJsonSchemaConverter<StringSchema, TextNode, String> {

StringSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
super(factory, config);
}

@Override
protected SchemaBuilder schemaBuilder(StringSchema schema) {
return SchemaBuilder.string();
Expand All @@ -249,6 +219,10 @@ protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map<Stri

static class DateSchemaConverter extends FromJsonSchemaConverter<StringSchema, TextNode, java.util.Date> {

DateSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
super(factory, config);
}

@Override
protected FromJsonVisitor<TextNode, java.util.Date> jsonVisitor(Schema connectSchema, Map<String, FromJsonVisitor> visitors) {
return new FromJsonVisitor.DateVisitor(connectSchema);
Expand All @@ -274,6 +248,10 @@ protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map<Stri

static class TimeSchemaConverter extends FromJsonSchemaConverter<StringSchema, TextNode, java.util.Date> {

TimeSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
super(factory, config);
}

@Override
protected SchemaBuilder schemaBuilder(StringSchema schema) {
return Time.builder();
Expand All @@ -299,6 +277,10 @@ protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map<Stri

static class DateTimeSchemaConverter extends FromJsonSchemaConverter<StringSchema, TextNode, java.util.Date> {

DateTimeSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
super(factory, config);
}

@Override
protected FromJsonVisitor<TextNode, java.util.Date> jsonVisitor(Schema connectSchema, Map<String, FromJsonVisitor> visitors) {
return new FromJsonVisitor.DateTimeVisitor(connectSchema);
Expand All @@ -324,6 +306,10 @@ protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map<Stri

static class BytesSchemaConverter extends FromJsonSchemaConverter<StringSchema, TextNode, byte[]> {

BytesSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
super(factory, config);
}

@Override
protected SchemaBuilder schemaBuilder(StringSchema schema) {
return SchemaBuilder.bytes();
Expand All @@ -348,8 +334,8 @@ protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map<Stri
}

static class DecimalSchemaConverter extends FromJsonSchemaConverter<StringSchema, TextNode, Number> {
public DecimalSchemaConverter() {

public DecimalSchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
super(factory, config);
}

@Override
Expand Down Expand Up @@ -379,9 +365,13 @@ protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map<Stri

static class ArraySchemaConverter extends FromJsonSchemaConverter<ArraySchema, ArrayNode, List> {

ArraySchemaConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
super(factory, config);
}

@Override
protected SchemaBuilder schemaBuilder(ArraySchema schema) {
FromJsonState state = FromJsonSchemaConverter.fromJSON(schema.getAllItemSchema());
FromJsonState state = this.factory.fromJSON(schema.getAllItemSchema());
return SchemaBuilder.array(state.schema);
}

Expand All @@ -398,13 +388,17 @@ protected FromJsonVisitor<ArrayNode, List> jsonVisitor(Schema connectSchema, Map

@Override
protected void fromJSON(SchemaBuilder builder, ArraySchema jsonSchema, Map<String, FromJsonVisitor> visitors) {
FromJsonState state = FromJsonSchemaConverter.fromJSON(jsonSchema.getAllItemSchema());
FromJsonState state = this.factory.fromJSON(jsonSchema.getAllItemSchema());
visitors.put("item", state.visitor);
}
}

static class CustomTimestampConverter extends FromJsonSchemaConverter<StringSchema, TextNode, java.util.Date> {

CustomTimestampConverter(FromJsonSchemaConverterFactory factory, JsonConfig config) {
super(factory, config);
}

@Override
protected FromJsonVisitor<TextNode, java.util.Date> jsonVisitor(Schema connectSchema, Map<String, FromJsonVisitor> visitors) {
return new FromJsonVisitor.CustomDateVisitor(connectSchema);
Expand Down
Loading

0 comments on commit 7960bbc

Please sign in to comment.