Skip to content

Commit

Permalink
Added documentation suggesting the use of the ByteArrayConverter or S…
Browse files Browse the repository at this point in the history
…tringConverter. Added configuration setting to enable validation of the json messages. Violations will result in an exception being thrown. Fixes #2. Fixes #3
  • Loading branch information
jcustenborder committed Apr 1, 2020
1 parent 16cf9b3 commit 042fb70
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 36 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<version>2.4.0</version>
</parent>
<artifactId>kafka-connect-json-schema</artifactId>
<version>0.0.2-SNAPSHOT</version>
<version>0.2-SNAPSHOT</version>
<name>kafka-connect-json-schema</name>
<description>A Kafka Connect connector receiving data from example.</description>
<url>https://github.com/jcustenborder/kafka-connect-json-schema</url>
Expand Down Expand Up @@ -106,7 +106,7 @@
</componentTypes>
<tags>
<tag>Transform</tag>
<tag>FIX</tag>
<tag>JSON</tag>
</tags>
<title>Kafka Connect JSON Schema Transformations</title>
<supportUrl>${pom.issueManagement.url}</supportUrl>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip;
import com.github.jcustenborder.kafka.connect.utils.config.Title;
import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -26,15 +27,25 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
import org.everit.json.schema.ValidationException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
import java.util.Map;

@Title("From Json transformation")
@Description("The FromJson will read JSON data that is in string on byte form and parse the data to " +
"a connect structure based on the JSON schema provided.")
@DocumentationTip("This transformation expects data to be in either String or Byte format. You are " +
"most likely going to use the ByteArrayConverter or the StringConverter.")
public class FromJson<R extends ConnectRecord<R>> extends BaseKeyValueTransformation<R> {
private static final Logger log = LoggerFactory.getLogger(FromJson.class);
FromJsonConfig config;

protected FromJson(boolean isKey) {
Expand All @@ -56,9 +67,40 @@ SchemaAndValue processJsonNode(R record, Schema inputSchema, JsonNode node) {
return new SchemaAndValue(this.fromJsonState.schema, result);
}


void validateJson(JSONObject jsonObject) {
try {
this.fromJsonState.jsonSchema.validate(jsonObject);
} catch (ValidationException ex) {
StringBuilder builder = new StringBuilder();
builder.append(
String.format(
"Could not validate JSON. Found %s violations(s).",
ex.getViolationCount()
)
);
for (ValidationException message : ex.getCausingExceptions()) {
log.error("Validation exception", message);
builder.append("\n");
builder.append(message.getMessage());
}
throw new DataException(
builder.toString(),
ex
);
}
}


@Override
protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) {
try {
if (this.config.validateJson) {
try (InputStream inputStream = new ByteArrayInputStream(input)) {
JSONObject jsonObject = Utils.loadObject(inputStream);
validateJson(jsonObject);
}
}
JsonNode node = this.objectMapper.readValue(input, JsonNode.class);
return processJsonNode(record, inputSchema, node);
} catch (IOException e) {
Expand All @@ -69,6 +111,12 @@ protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input
@Override
protected SchemaAndValue processString(R record, Schema inputSchema, String input) {
try {
if (this.config.validateJson) {
try (Reader reader = new StringReader(input)) {
JSONObject jsonObject = Utils.loadObject(reader);
validateJson(jsonObject);
}
}
JsonNode node = this.objectMapper.readValue(input, JsonNode.class);
return processJsonNode(record, inputSchema, node);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils;
import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.recommenders.Recommenders;
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
import org.apache.kafka.common.config.AbstractConfig;
Expand All @@ -27,29 +28,38 @@

class FromJsonConfig extends AbstractConfig {
public static final String SCHEMA_URL_CONF = "json.schema.url";
static final String SCHEMA_URL_DOC = "Url to retrieve the schema from.";
static final String SCHEMA_URL_DOC = "Url to retrieve the schema from. Urls can be anything that is " +
"supported by URL.openStream(). For example the local filesystem file:///schemas/something.json. " +
"A web address https://www.schemas.com/something.json";
public static final String SCHEMA_INLINE_CONF = "json.schema.inline";
static final String SCHEMA_INLINE_DOC = "The JSON schema to use as an escaped string.";
public static final String SCHEMA_LOCATION_CONF = "json.schema.location";
static final String SCHEMA_LOCATION_DOC = "Location to retrieve the schema from. `Url` is used " +
"to retrieve the schema from a url. `Inline` is used to read the schema from the `" +
SCHEMA_INLINE_CONF + "` configuration value.";
static final String SCHEMA_LOCATION_DOC = "Location to retrieve the schema from. " +
ConfigUtils.enumDescription(SchemaLocation.class);

public static final String VALIDATE_JSON_ENABLED_CONF = "json.schema.validation.enabled";
static final String VALIDATE_JSON_ENABLED_DOC = "Flag to determine if the JSON should be validated " +
"against the schema.";

public enum SchemaLocation {
@Description("Loads the schema from the url specified in `" + SCHEMA_URL_CONF + "`.")
Url,
@Description("Loads the schema from `" + SCHEMA_INLINE_CONF + "` as an inline string.")
Inline
}

public final URL schemaUrl;
public final SchemaLocation schemaLocation;
public final String schemaText;
public final boolean validateJson;


public FromJsonConfig(Map<?, ?> originals) {
super(config(), originals);
this.schemaUrl = ConfigUtils.url(this, SCHEMA_URL_CONF);
this.schemaLocation = ConfigUtils.getEnum(SchemaLocation.class, this, SCHEMA_LOCATION_CONF);
this.schemaText = getString(SCHEMA_INLINE_CONF);
this.validateJson = getBoolean(VALIDATE_JSON_ENABLED_CONF);
}

public static void addConfigItems(ConfigDef configDef) {
Expand All @@ -58,26 +68,30 @@ public static void addConfigItems(ConfigDef configDef) {
.documentation(SCHEMA_URL_DOC)
.validator(Validators.validUrl())
.importance(ConfigDef.Importance.HIGH)
.recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Url.toString()))
.defaultValue("File:///doesNotExist")
.build()
)
.define(
ConfigKeyBuilder.of(SCHEMA_LOCATION_CONF, ConfigDef.Type.STRING)
.documentation(SCHEMA_LOCATION_DOC)
.validator(Validators.validEnum(SchemaLocation.class))
.recommender(Recommenders.enumValues(SchemaLocation.class))
.importance(ConfigDef.Importance.HIGH)
.defaultValue(SchemaLocation.Url.toString())
.build()
)
.define(
ConfigKeyBuilder.of(SCHEMA_INLINE_CONF, ConfigDef.Type.STRING)
.documentation(SCHEMA_INLINE_DOC)
.recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Inline.toString()))
.importance(ConfigDef.Importance.HIGH)
.defaultValue("")
.build()
);
).define(
ConfigKeyBuilder.of(SCHEMA_LOCATION_CONF, ConfigDef.Type.STRING)
.documentation(SCHEMA_LOCATION_DOC)
.validator(Validators.validEnum(SchemaLocation.class))
.recommender(Recommenders.enumValues(SchemaLocation.class))
.importance(ConfigDef.Importance.HIGH)
.defaultValue(SchemaLocation.Url.toString())
.build()
).define(
ConfigKeyBuilder.of(VALIDATE_JSON_ENABLED_CONF, ConfigDef.Type.BOOLEAN)
.documentation(VALIDATE_JSON_ENABLED_DOC)
.importance(ConfigDef.Importance.MEDIUM)
.defaultValue(false)
.build()
).define(
ConfigKeyBuilder.of(SCHEMA_INLINE_CONF, ConfigDef.Type.STRING)
.documentation(SCHEMA_INLINE_DOC)
.recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Inline.toString()))
.importance(ConfigDef.Importance.HIGH)
.defaultValue("")
.build());
}

public static ConfigDef config() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, b
converter.fromJSON(builder, jsonSchema, visitors);
Schema schema = builder.build();
FromJsonVisitor visitor = converter.jsonVisitor(schema, visitors);
return FromJsonState.of(schema, visitor);
return FromJsonState.of(jsonSchema, schema, visitor);
}

protected abstract SchemaBuilder schemaBuilder(T schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
*/
package com.github.jcustenborder.kafka.connect.json;

import org.apache.kafka.connect.data.Schema;

class FromJsonState {
public final Schema schema;
public final org.everit.json.schema.Schema jsonSchema;
public final org.apache.kafka.connect.data.Schema schema;
public final FromJsonVisitor visitor;

FromJsonState(Schema schema, FromJsonVisitor visitor) {
FromJsonState(org.everit.json.schema.Schema jsonSchema, org.apache.kafka.connect.data.Schema schema, FromJsonVisitor visitor) {
this.jsonSchema = jsonSchema;
this.schema = schema;
this.visitor = visitor;
}

public static FromJsonState of(Schema schema, FromJsonVisitor visitor) {
return new FromJsonState(schema, visitor);
public static FromJsonState of(org.everit.json.schema.Schema jsonSchema, org.apache.kafka.connect.data.Schema schema, FromJsonVisitor visitor) {
return new FromJsonState(jsonSchema, schema, visitor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,16 @@ public StructVisitor(Schema schema, Map<String, FromJsonVisitor> visitors) {
protected Struct doVisit(ObjectNode node) {
Struct result = new Struct(this.schema);
visitors.forEach((fieldName, visitor) -> {
JsonNode rawValue = node.get(fieldName);
Object convertedValue = visitor.visit(rawValue);
result.put(fieldName, convertedValue);
try {
JsonNode rawValue = node.get(fieldName);
Object convertedValue = visitor.visit(rawValue);
result.put(fieldName, convertedValue);
} catch (Exception ex) {
throw new IllegalStateException(
String.format("Exception thrown while reading %s:%s", this.schema.name(), fieldName),
ex
);
}
});

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,18 @@ public static int scale(org.apache.kafka.connect.data.Schema connectSchema) {
return scale(scale);
}

public static JSONObject loadObject(Reader reader) {
return new JSONObject(new JSONTokener(reader));
}

public static Schema loadSchema(String schemaText) {
try (Reader reader = new StringReader(schemaText)) {
JSONObject rawSchema = new JSONObject(new JSONTokener(reader));
JSONObject rawSchema = loadObject(reader);
return loadSchema(rawSchema);
} catch (IOException ex) {
throw new DataException("Could not load schema", ex);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@
import com.google.common.io.ByteStreams;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class FromJsonTest {
private static final Logger log = LoggerFactory.getLogger(FromJsonTest.class);
FromJson<SinkRecord> transform;

@BeforeEach
Expand All @@ -34,9 +41,32 @@ public void basic() throws IOException {
);
this.transform.configure(settings);
SinkRecord inputRecord = SinkRecordHelper.write("foo", new SchemaAndValue(Schema.STRING_SCHEMA, "foo"), new SchemaAndValue(Schema.BYTES_SCHEMA, input));
SinkRecord actual = this.transform.apply(inputRecord);
assertNotNull(actual);
SinkRecord transformedRecord = this.transform.apply(inputRecord);
assertNotNull(transformedRecord);
assertNotNull(transformedRecord.value());
assertTrue(transformedRecord.value() instanceof Struct);
Struct actual = (Struct) transformedRecord.value();
log.info("actual = '{}'", actual);
}

@Test
public void validate() throws IOException {
byte[] input = ByteStreams.toByteArray(this.getClass().getResourceAsStream(
"basic.data.json"
));
File schemaFile = new File("src/test/resources/com/github/jcustenborder/kafka/connect/json/geo.schema.json");
Map<String, String> settings = ImmutableMap.of(
FromJsonConfig.SCHEMA_URL_CONF, schemaFile.toURI().toString(),
FromJsonConfig.VALIDATE_JSON_ENABLED_CONF, "true"
);
this.transform.configure(settings);
SinkRecord inputRecord = SinkRecordHelper.write("foo", new SchemaAndValue(Schema.STRING_SCHEMA, "foo"), new SchemaAndValue(Schema.BYTES_SCHEMA, input));
DataException exception = assertThrows(DataException.class, ()->{
SinkRecord transformedRecord = this.transform.apply(inputRecord);
});

assertTrue(exception.getMessage().contains("required key [latitude] not found"));
assertTrue(exception.getMessage().contains("required key [longitude] not found"));
}

}

0 comments on commit 042fb70

Please sign in to comment.