Skip to content

Commit

Permalink
v 0.2 (#5)
Browse files Browse the repository at this point in the history
* Added documentation suggesting the use of the ByteArrayConverter or StringConverter. Added configuration setting to enable validation of the json messages. Violations will result in an exception being thrown. Fixes #2. Fixes #3

* Added support for custom date formats. This allows the user to specify the date format for parsing. Fixes #4.
  • Loading branch information
jcustenborder authored Apr 2, 2020
1 parent 16cf9b3 commit c8626c8
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 37 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<version>2.4.0</version>
</parent>
<artifactId>kafka-connect-json-schema</artifactId>
<version>0.0.2-SNAPSHOT</version>
<version>0.2-SNAPSHOT</version>
<name>kafka-connect-json-schema</name>
<description>A Kafka Connect connector receiving data from example.</description>
<url>https://github.com/jcustenborder/kafka-connect-json-schema</url>
Expand Down Expand Up @@ -106,7 +106,7 @@
</componentTypes>
<tags>
<tag>Transform</tag>
<tag>FIX</tag>
<tag>JSON</tag>
</tags>
<title>Kafka Connect JSON Schema Transformations</title>
<supportUrl>${pom.issueManagement.url}</supportUrl>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.json;

import org.everit.json.schema.FormatValidator;

import java.util.Optional;

public class CustomTimestampFormatValidator implements FormatValidator {
@Override
public Optional<String> validate(String s) {
return Optional.empty();
}

@Override
public String formatName() {
return "custom-timestamp";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip;
import com.github.jcustenborder.kafka.connect.utils.config.Title;
import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -26,15 +27,25 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
import org.everit.json.schema.ValidationException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
import java.util.Map;

@Title("From Json transformation")
@Description("The FromJson will read JSON data that is in string on byte form and parse the data to " +
"a connect structure based on the JSON schema provided.")
@DocumentationTip("This transformation expects data to be in either String or Byte format. You are " +
"most likely going to use the ByteArrayConverter or the StringConverter.")
public class FromJson<R extends ConnectRecord<R>> extends BaseKeyValueTransformation<R> {
private static final Logger log = LoggerFactory.getLogger(FromJson.class);
FromJsonConfig config;

protected FromJson(boolean isKey) {
Expand All @@ -56,9 +67,40 @@ SchemaAndValue processJsonNode(R record, Schema inputSchema, JsonNode node) {
return new SchemaAndValue(this.fromJsonState.schema, result);
}


void validateJson(JSONObject jsonObject) {
try {
this.fromJsonState.jsonSchema.validate(jsonObject);
} catch (ValidationException ex) {
StringBuilder builder = new StringBuilder();
builder.append(
String.format(
"Could not validate JSON. Found %s violations(s).",
ex.getViolationCount()
)
);
for (ValidationException message : ex.getCausingExceptions()) {
log.error("Validation exception", message);
builder.append("\n");
builder.append(message.getMessage());
}
throw new DataException(
builder.toString(),
ex
);
}
}


@Override
protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) {
try {
if (this.config.validateJson) {
try (InputStream inputStream = new ByteArrayInputStream(input)) {
JSONObject jsonObject = Utils.loadObject(inputStream);
validateJson(jsonObject);
}
}
JsonNode node = this.objectMapper.readValue(input, JsonNode.class);
return processJsonNode(record, inputSchema, node);
} catch (IOException e) {
Expand All @@ -69,6 +111,12 @@ protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input
@Override
protected SchemaAndValue processString(R record, Schema inputSchema, String input) {
try {
if (this.config.validateJson) {
try (Reader reader = new StringReader(input)) {
JSONObject jsonObject = Utils.loadObject(reader);
validateJson(jsonObject);
}
}
JsonNode node = this.objectMapper.readValue(input, JsonNode.class);
return processJsonNode(record, inputSchema, node);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils;
import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.recommenders.Recommenders;
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
import org.apache.kafka.common.config.AbstractConfig;
Expand All @@ -27,29 +28,38 @@

class FromJsonConfig extends AbstractConfig {
public static final String SCHEMA_URL_CONF = "json.schema.url";
static final String SCHEMA_URL_DOC = "Url to retrieve the schema from.";
static final String SCHEMA_URL_DOC = "Url to retrieve the schema from. Urls can be anything that is " +
"supported by URL.openStream(). For example the local filesystem file:///schemas/something.json. " +
"A web address https://www.schemas.com/something.json";
public static final String SCHEMA_INLINE_CONF = "json.schema.inline";
static final String SCHEMA_INLINE_DOC = "The JSON schema to use as an escaped string.";
public static final String SCHEMA_LOCATION_CONF = "json.schema.location";
static final String SCHEMA_LOCATION_DOC = "Location to retrieve the schema from. `Url` is used " +
"to retrieve the schema from a url. `Inline` is used to read the schema from the `" +
SCHEMA_INLINE_CONF + "` configuration value.";
static final String SCHEMA_LOCATION_DOC = "Location to retrieve the schema from. " +
ConfigUtils.enumDescription(SchemaLocation.class);

public static final String VALIDATE_JSON_ENABLED_CONF = "json.schema.validation.enabled";
static final String VALIDATE_JSON_ENABLED_DOC = "Flag to determine if the JSON should be validated " +
"against the schema.";

public enum SchemaLocation {
@Description("Loads the schema from the url specified in `" + SCHEMA_URL_CONF + "`.")
Url,
@Description("Loads the schema from `" + SCHEMA_INLINE_CONF + "` as an inline string.")
Inline
}

public final URL schemaUrl;
public final SchemaLocation schemaLocation;
public final String schemaText;
public final boolean validateJson;


public FromJsonConfig(Map<?, ?> originals) {
super(config(), originals);
this.schemaUrl = ConfigUtils.url(this, SCHEMA_URL_CONF);
this.schemaLocation = ConfigUtils.getEnum(SchemaLocation.class, this, SCHEMA_LOCATION_CONF);
this.schemaText = getString(SCHEMA_INLINE_CONF);
this.validateJson = getBoolean(VALIDATE_JSON_ENABLED_CONF);
}

public static void addConfigItems(ConfigDef configDef) {
Expand All @@ -58,26 +68,30 @@ public static void addConfigItems(ConfigDef configDef) {
.documentation(SCHEMA_URL_DOC)
.validator(Validators.validUrl())
.importance(ConfigDef.Importance.HIGH)
.recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Url.toString()))
.defaultValue("File:///doesNotExist")
.build()
)
.define(
ConfigKeyBuilder.of(SCHEMA_LOCATION_CONF, ConfigDef.Type.STRING)
.documentation(SCHEMA_LOCATION_DOC)
.validator(Validators.validEnum(SchemaLocation.class))
.recommender(Recommenders.enumValues(SchemaLocation.class))
.importance(ConfigDef.Importance.HIGH)
.defaultValue(SchemaLocation.Url.toString())
.build()
)
.define(
ConfigKeyBuilder.of(SCHEMA_INLINE_CONF, ConfigDef.Type.STRING)
.documentation(SCHEMA_INLINE_DOC)
.recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Inline.toString()))
.importance(ConfigDef.Importance.HIGH)
.defaultValue("")
.build()
);
).define(
ConfigKeyBuilder.of(SCHEMA_LOCATION_CONF, ConfigDef.Type.STRING)
.documentation(SCHEMA_LOCATION_DOC)
.validator(Validators.validEnum(SchemaLocation.class))
.recommender(Recommenders.enumValues(SchemaLocation.class))
.importance(ConfigDef.Importance.HIGH)
.defaultValue(SchemaLocation.Url.toString())
.build()
).define(
ConfigKeyBuilder.of(VALIDATE_JSON_ENABLED_CONF, ConfigDef.Type.BOOLEAN)
.documentation(VALIDATE_JSON_ENABLED_DOC)
.importance(ConfigDef.Importance.MEDIUM)
.defaultValue(false)
.build()
).define(
ConfigKeyBuilder.of(SCHEMA_INLINE_CONF, ConfigDef.Type.STRING)
.documentation(SCHEMA_INLINE_DOC)
.recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Inline.toString()))
.importance(ConfigDef.Importance.HIGH)
.defaultValue("")
.build());
}

public static ConfigDef config() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.node.NumericNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import org.apache.kafka.connect.data.Date;
Expand Down Expand Up @@ -65,7 +66,8 @@ public abstract class FromJsonSchemaConverter<T extends org.everit.json.schema.S
new FloatSchemaConverter(),
new ArraySchemaConverter(),
new BytesSchemaConverter(),
new DecimalSchemaConverter()
new DecimalSchemaConverter(),
new CustomTimestampConverter()
).collect(Collectors.toMap(FromJsonSchemaConverter::key, c -> c));
}

Expand Down Expand Up @@ -102,7 +104,7 @@ public static FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, b
converter.fromJSON(builder, jsonSchema, visitors);
Schema schema = builder.build();
FromJsonVisitor visitor = converter.jsonVisitor(schema, visitors);
return FromJsonState.of(schema, visitor);
return FromJsonState.of(jsonSchema, schema, visitor);
}

protected abstract SchemaBuilder schemaBuilder(T schema);
Expand Down Expand Up @@ -400,4 +402,32 @@ protected void fromJSON(SchemaBuilder builder, ArraySchema jsonSchema, Map<Strin
visitors.put("item", state.visitor);
}
}

static class CustomTimestampConverter extends FromJsonSchemaConverter<StringSchema, TextNode, java.util.Date> {

@Override
protected FromJsonVisitor<TextNode, java.util.Date> jsonVisitor(Schema connectSchema, Map<String, FromJsonVisitor> visitors) {
return new FromJsonVisitor.CustomDateVisitor(connectSchema);
}

@Override
protected SchemaBuilder schemaBuilder(StringSchema schema) {
Object dateTimeFormat = schema.getUnprocessedProperties().get("dateTimeFormat");
Preconditions.checkNotNull(dateTimeFormat, "dateTimeFormat cannot be null");
return Timestamp.builder()
.parameter("dateFormat", dateTimeFormat.toString());
}

@Override
protected FromJsonConversionKey key() {
return FromJsonConversionKey.from(StringSchema.class)
.format("custom-timestamp")
.build();
}

@Override
protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map<String, FromJsonVisitor> visitors) {
log.trace("fromJson() - Processing '{}'", jsonSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
*/
package com.github.jcustenborder.kafka.connect.json;

import org.apache.kafka.connect.data.Schema;

class FromJsonState {
public final Schema schema;
public final org.everit.json.schema.Schema jsonSchema;
public final org.apache.kafka.connect.data.Schema schema;
public final FromJsonVisitor visitor;

FromJsonState(Schema schema, FromJsonVisitor visitor) {
FromJsonState(org.everit.json.schema.Schema jsonSchema, org.apache.kafka.connect.data.Schema schema, FromJsonVisitor visitor) {
this.jsonSchema = jsonSchema;
this.schema = schema;
this.visitor = visitor;
}

public static FromJsonState of(Schema schema, FromJsonVisitor visitor) {
return new FromJsonState(schema, visitor);
public static FromJsonState of(org.everit.json.schema.Schema jsonSchema, org.apache.kafka.connect.data.Schema schema, FromJsonVisitor visitor) {
return new FromJsonState(jsonSchema, schema, visitor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -95,9 +96,16 @@ public StructVisitor(Schema schema, Map<String, FromJsonVisitor> visitors) {
protected Struct doVisit(ObjectNode node) {
Struct result = new Struct(this.schema);
visitors.forEach((fieldName, visitor) -> {
JsonNode rawValue = node.get(fieldName);
Object convertedValue = visitor.visit(rawValue);
result.put(fieldName, convertedValue);
try {
JsonNode rawValue = node.get(fieldName);
Object convertedValue = visitor.visit(rawValue);
result.put(fieldName, convertedValue);
} catch (Exception ex) {
throw new IllegalStateException(
String.format("Exception thrown while reading %s:%s", this.schema.name(), fieldName),
ex
);
}
});

return result;
Expand Down Expand Up @@ -225,4 +233,24 @@ protected byte[] doVisit(TextNode node) {
return BaseEncoding.base64().decode(node.textValue());
}
}

public static class CustomDateVisitor extends FromJsonVisitor<TextNode, java.util.Date> {
private static final Logger log = LoggerFactory.getLogger(DateTimeVisitor.class);

final DateTimeFormatter dateTimeFormatter;

public CustomDateVisitor(Schema schema) {
super(schema);
String pattern = schema.parameters().get("dateFormat");
this.dateTimeFormatter = DateTimeFormatter.ofPattern(pattern);
}

@Override
protected Date doVisit(TextNode node) {
log.trace(node.asText());
LocalDateTime localDateTime = LocalDateTime.parse(node.asText(), this.dateTimeFormatter);
Instant instant = localDateTime.toInstant(ZoneOffset.UTC);
return Date.from(instant);
}
}
}
Loading

0 comments on commit c8626c8

Please sign in to comment.