From c8626c8176181de192ac62f34b0ff5e13b1e6918 Mon Sep 17 00:00:00 2001 From: Jeremy Custenborder Date: Wed, 1 Apr 2020 19:54:10 -0500 Subject: [PATCH] v 0.2 (#5) * Added documentation suggesting the use of the ByteArrayConverter or StringConverter. Added configuration setting to enable validation of the json messages. Violations will result in an exception being thrown. Fixes #2. Fixes #3 * Added support for custom date formats. This allows the user to specify the date format for parsing. Fixes #4. --- pom.xml | 4 +- .../json/CustomTimestampFormatValidator.java | 32 +++++++++ .../kafka/connect/json/FromJson.java | 48 ++++++++++++++ .../kafka/connect/json/FromJsonConfig.java | 58 +++++++++------- .../connect/json/FromJsonSchemaConverter.java | 34 +++++++++- .../kafka/connect/json/FromJsonState.java | 11 ++-- .../kafka/connect/json/FromJsonVisitor.java | 34 +++++++++- .../kafka/connect/json/Utils.java | 9 ++- .../kafka/connect/json/FromJsonTest.java | 66 ++++++++++++++++++- .../kafka/connect/json/customdate.data.json | 3 + .../kafka/connect/json/customdate.schema.json | 14 ++++ 11 files changed, 276 insertions(+), 37 deletions(-) create mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/json/CustomTimestampFormatValidator.java create mode 100644 src/test/resources/com/github/jcustenborder/kafka/connect/json/customdate.data.json create mode 100644 src/test/resources/com/github/jcustenborder/kafka/connect/json/customdate.schema.json diff --git a/pom.xml b/pom.xml index d3c13d2..6d18ccb 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 2.4.0 kafka-connect-json-schema - 0.0.2-SNAPSHOT + 0.2-SNAPSHOT kafka-connect-json-schema A Kafka Connect connector receiving data from example. https://github.com/jcustenborder/kafka-connect-json-schema @@ -106,7 +106,7 @@ Transform - FIX + JSON Kafka Connect JSON Schema Transformations ${pom.issueManagement.url} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/CustomTimestampFormatValidator.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/CustomTimestampFormatValidator.java new file mode 100644 index 0000000..de69aba --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/CustomTimestampFormatValidator.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import org.everit.json.schema.FormatValidator; + +import java.util.Optional; + +public class CustomTimestampFormatValidator implements FormatValidator { + @Override + public Optional validate(String s) { + return Optional.empty(); + } + + @Override + public String formatName() { + return "custom-timestamp"; + } +} \ No newline at end of file diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJson.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJson.java index 85d1a2a..db1f32a 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJson.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJson.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip; import com.github.jcustenborder.kafka.connect.utils.config.Title; import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation; import org.apache.kafka.common.config.ConfigDef; @@ -26,15 +27,25 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.DataException; +import org.everit.json.schema.ValidationException; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.Reader; +import java.io.StringReader; import java.util.Map; @Title("From Json transformation") @Description("The FromJson will read JSON data that is in string on byte form and parse the data to " + "a connect structure based on the JSON schema provided.") +@DocumentationTip("This transformation expects data to be in either String or Byte format. You are " + + "most likely going to use the ByteArrayConverter or the StringConverter.") public class FromJson> extends BaseKeyValueTransformation { + private static final Logger log = LoggerFactory.getLogger(FromJson.class); FromJsonConfig config; protected FromJson(boolean isKey) { @@ -56,9 +67,40 @@ SchemaAndValue processJsonNode(R record, Schema inputSchema, JsonNode node) { return new SchemaAndValue(this.fromJsonState.schema, result); } + + void validateJson(JSONObject jsonObject) { + try { + this.fromJsonState.jsonSchema.validate(jsonObject); + } catch (ValidationException ex) { + StringBuilder builder = new StringBuilder(); + builder.append( + String.format( + "Could not validate JSON. Found %s violations(s).", + ex.getViolationCount() + ) + ); + for (ValidationException message : ex.getCausingExceptions()) { + log.error("Validation exception", message); + builder.append("\n"); + builder.append(message.getMessage()); + } + throw new DataException( + builder.toString(), + ex + ); + } + } + + @Override protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) { try { + if (this.config.validateJson) { + try (InputStream inputStream = new ByteArrayInputStream(input)) { + JSONObject jsonObject = Utils.loadObject(inputStream); + validateJson(jsonObject); + } + } JsonNode node = this.objectMapper.readValue(input, JsonNode.class); return processJsonNode(record, inputSchema, node); } catch (IOException e) { @@ -69,6 +111,12 @@ protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input @Override protected SchemaAndValue processString(R record, Schema inputSchema, String input) { try { + if (this.config.validateJson) { + try (Reader reader = new StringReader(input)) { + JSONObject jsonObject = Utils.loadObject(reader); + validateJson(jsonObject); + } + } JsonNode node = this.objectMapper.readValue(input, JsonNode.class); return processJsonNode(record, inputSchema, node); } catch (IOException e) { diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConfig.java index 2c36e4d..8133616 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConfig.java @@ -17,6 +17,7 @@ import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils; +import com.github.jcustenborder.kafka.connect.utils.config.Description; import com.github.jcustenborder.kafka.connect.utils.config.recommenders.Recommenders; import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators; import org.apache.kafka.common.config.AbstractConfig; @@ -27,22 +28,30 @@ class FromJsonConfig extends AbstractConfig { public static final String SCHEMA_URL_CONF = "json.schema.url"; - static final String SCHEMA_URL_DOC = "Url to retrieve the schema from."; + static final String SCHEMA_URL_DOC = "Url to retrieve the schema from. Urls can be anything that is " + + "supported by URL.openStream(). For example the local filesystem file:///schemas/something.json. " + + "A web address https://www.schemas.com/something.json"; public static final String SCHEMA_INLINE_CONF = "json.schema.inline"; static final String SCHEMA_INLINE_DOC = "The JSON schema to use as an escaped string."; public static final String SCHEMA_LOCATION_CONF = "json.schema.location"; - static final String SCHEMA_LOCATION_DOC = "Location to retrieve the schema from. `Url` is used " + - "to retrieve the schema from a url. `Inline` is used to read the schema from the `" + - SCHEMA_INLINE_CONF + "` configuration value."; + static final String SCHEMA_LOCATION_DOC = "Location to retrieve the schema from. " + + ConfigUtils.enumDescription(SchemaLocation.class); + + public static final String VALIDATE_JSON_ENABLED_CONF = "json.schema.validation.enabled"; + static final String VALIDATE_JSON_ENABLED_DOC = "Flag to determine if the JSON should be validated " + + "against the schema."; public enum SchemaLocation { + @Description("Loads the schema from the url specified in `" + SCHEMA_URL_CONF + "`.") Url, + @Description("Loads the schema from `" + SCHEMA_INLINE_CONF + "` as an inline string.") Inline } public final URL schemaUrl; public final SchemaLocation schemaLocation; public final String schemaText; + public final boolean validateJson; public FromJsonConfig(Map originals) { @@ -50,6 +59,7 @@ public FromJsonConfig(Map originals) { this.schemaUrl = ConfigUtils.url(this, SCHEMA_URL_CONF); this.schemaLocation = ConfigUtils.getEnum(SchemaLocation.class, this, SCHEMA_LOCATION_CONF); this.schemaText = getString(SCHEMA_INLINE_CONF); + this.validateJson = getBoolean(VALIDATE_JSON_ENABLED_CONF); } public static void addConfigItems(ConfigDef configDef) { @@ -58,26 +68,30 @@ public static void addConfigItems(ConfigDef configDef) { .documentation(SCHEMA_URL_DOC) .validator(Validators.validUrl()) .importance(ConfigDef.Importance.HIGH) + .recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Url.toString())) .defaultValue("File:///doesNotExist") .build() - ) - .define( - ConfigKeyBuilder.of(SCHEMA_LOCATION_CONF, ConfigDef.Type.STRING) - .documentation(SCHEMA_LOCATION_DOC) - .validator(Validators.validEnum(SchemaLocation.class)) - .recommender(Recommenders.enumValues(SchemaLocation.class)) - .importance(ConfigDef.Importance.HIGH) - .defaultValue(SchemaLocation.Url.toString()) - .build() - ) - .define( - ConfigKeyBuilder.of(SCHEMA_INLINE_CONF, ConfigDef.Type.STRING) - .documentation(SCHEMA_INLINE_DOC) - .recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Inline.toString())) - .importance(ConfigDef.Importance.HIGH) - .defaultValue("") - .build() - ); + ).define( + ConfigKeyBuilder.of(SCHEMA_LOCATION_CONF, ConfigDef.Type.STRING) + .documentation(SCHEMA_LOCATION_DOC) + .validator(Validators.validEnum(SchemaLocation.class)) + .recommender(Recommenders.enumValues(SchemaLocation.class)) + .importance(ConfigDef.Importance.HIGH) + .defaultValue(SchemaLocation.Url.toString()) + .build() + ).define( + ConfigKeyBuilder.of(VALIDATE_JSON_ENABLED_CONF, ConfigDef.Type.BOOLEAN) + .documentation(VALIDATE_JSON_ENABLED_DOC) + .importance(ConfigDef.Importance.MEDIUM) + .defaultValue(false) + .build() + ).define( + ConfigKeyBuilder.of(SCHEMA_INLINE_CONF, ConfigDef.Type.STRING) + .documentation(SCHEMA_INLINE_DOC) + .recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Inline.toString())) + .importance(ConfigDef.Importance.HIGH) + .defaultValue("") + .build()); } public static ConfigDef config() { diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java index 4e8f080..274c4da 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.node.NumericNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import org.apache.kafka.connect.data.Date; @@ -65,7 +66,8 @@ public abstract class FromJsonSchemaConverter c)); } @@ -102,7 +104,7 @@ public static FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, b converter.fromJSON(builder, jsonSchema, visitors); Schema schema = builder.build(); FromJsonVisitor visitor = converter.jsonVisitor(schema, visitors); - return FromJsonState.of(schema, visitor); + return FromJsonState.of(jsonSchema, schema, visitor); } protected abstract SchemaBuilder schemaBuilder(T schema); @@ -400,4 +402,32 @@ protected void fromJSON(SchemaBuilder builder, ArraySchema jsonSchema, Map { + + @Override + protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { + return new FromJsonVisitor.CustomDateVisitor(connectSchema); + } + + @Override + protected SchemaBuilder schemaBuilder(StringSchema schema) { + Object dateTimeFormat = schema.getUnprocessedProperties().get("dateTimeFormat"); + Preconditions.checkNotNull(dateTimeFormat, "dateTimeFormat cannot be null"); + return Timestamp.builder() + .parameter("dateFormat", dateTimeFormat.toString()); + } + + @Override + protected FromJsonConversionKey key() { + return FromJsonConversionKey.from(StringSchema.class) + .format("custom-timestamp") + .build(); + } + + @Override + protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map visitors) { + log.trace("fromJson() - Processing '{}'", jsonSchema); + } + } } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonState.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonState.java index 8034ab8..7dbbdfd 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonState.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonState.java @@ -15,18 +15,19 @@ */ package com.github.jcustenborder.kafka.connect.json; -import org.apache.kafka.connect.data.Schema; class FromJsonState { - public final Schema schema; + public final org.everit.json.schema.Schema jsonSchema; + public final org.apache.kafka.connect.data.Schema schema; public final FromJsonVisitor visitor; - FromJsonState(Schema schema, FromJsonVisitor visitor) { + FromJsonState(org.everit.json.schema.Schema jsonSchema, org.apache.kafka.connect.data.Schema schema, FromJsonVisitor visitor) { + this.jsonSchema = jsonSchema; this.schema = schema; this.visitor = visitor; } - public static FromJsonState of(Schema schema, FromJsonVisitor visitor) { - return new FromJsonState(schema, visitor); + public static FromJsonState of(org.everit.json.schema.Schema jsonSchema, org.apache.kafka.connect.data.Schema schema, FromJsonVisitor visitor) { + return new FromJsonState(jsonSchema, schema, visitor); } } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonVisitor.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonVisitor.java index 5c4ed3f..3ce627b 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonVisitor.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonVisitor.java @@ -35,6 +35,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -95,9 +96,16 @@ public StructVisitor(Schema schema, Map visitors) { protected Struct doVisit(ObjectNode node) { Struct result = new Struct(this.schema); visitors.forEach((fieldName, visitor) -> { - JsonNode rawValue = node.get(fieldName); - Object convertedValue = visitor.visit(rawValue); - result.put(fieldName, convertedValue); + try { + JsonNode rawValue = node.get(fieldName); + Object convertedValue = visitor.visit(rawValue); + result.put(fieldName, convertedValue); + } catch (Exception ex) { + throw new IllegalStateException( + String.format("Exception thrown while reading %s:%s", this.schema.name(), fieldName), + ex + ); + } }); return result; @@ -225,4 +233,24 @@ protected byte[] doVisit(TextNode node) { return BaseEncoding.base64().decode(node.textValue()); } } + + public static class CustomDateVisitor extends FromJsonVisitor { + private static final Logger log = LoggerFactory.getLogger(DateTimeVisitor.class); + + final DateTimeFormatter dateTimeFormatter; + + public CustomDateVisitor(Schema schema) { + super(schema); + String pattern = schema.parameters().get("dateFormat"); + this.dateTimeFormatter = DateTimeFormatter.ofPattern(pattern); + } + + @Override + protected Date doVisit(TextNode node) { + log.trace(node.asText()); + LocalDateTime localDateTime = LocalDateTime.parse(node.asText(), this.dateTimeFormatter); + Instant instant = localDateTime.toInstant(ZoneOffset.UTC); + return Date.from(instant); + } + } } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/Utils.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/Utils.java index fc0eaea..1bd0ea8 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/json/Utils.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/Utils.java @@ -70,6 +70,7 @@ public static org.everit.json.schema.Schema loadSchema(JSONObject rawSchema) { .addFormatValidator(new TimeFormatValidator()) .addFormatValidator(new DateTimeFormatValidator()) .addFormatValidator(new DecimalFormatValidator()) + .addFormatValidator(new CustomTimestampFormatValidator()) .schemaJson(rawSchema) .build() .load() @@ -96,12 +97,18 @@ public static int scale(org.apache.kafka.connect.data.Schema connectSchema) { return scale(scale); } + public static JSONObject loadObject(Reader reader) { + return new JSONObject(new JSONTokener(reader)); + } + public static Schema loadSchema(String schemaText) { try (Reader reader = new StringReader(schemaText)) { - JSONObject rawSchema = new JSONObject(new JSONTokener(reader)); + JSONObject rawSchema = loadObject(reader); return loadSchema(rawSchema); } catch (IOException ex) { throw new DataException("Could not load schema", ex); } } + + } diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonTest.java index 9420b67..ee020b7 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonTest.java @@ -5,17 +5,28 @@ import com.google.common.io.ByteStreams; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class FromJsonTest { + private static final Logger log = LoggerFactory.getLogger(FromJsonTest.class); FromJson transform; @BeforeEach @@ -34,9 +45,60 @@ public void basic() throws IOException { ); this.transform.configure(settings); SinkRecord inputRecord = SinkRecordHelper.write("foo", new SchemaAndValue(Schema.STRING_SCHEMA, "foo"), new SchemaAndValue(Schema.BYTES_SCHEMA, input)); - SinkRecord actual = this.transform.apply(inputRecord); - assertNotNull(actual); + SinkRecord transformedRecord = this.transform.apply(inputRecord); + assertNotNull(transformedRecord); + assertNotNull(transformedRecord.value()); + assertTrue(transformedRecord.value() instanceof Struct); + Struct actual = (Struct) transformedRecord.value(); + log.info("actual = '{}'", actual); } + @Test + public void customdate() throws IOException { + byte[] input = ByteStreams.toByteArray(this.getClass().getResourceAsStream( + "customdate.data.json" + )); + File schemaFile = new File("src/test/resources/com/github/jcustenborder/kafka/connect/json/customdate.schema.json"); + Map settings = ImmutableMap.of( + FromJsonConfig.SCHEMA_URL_CONF, schemaFile.toURI().toString() + ); + this.transform.configure(settings); + SinkRecord inputRecord = SinkRecordHelper.write("foo", new SchemaAndValue(Schema.STRING_SCHEMA, "foo"), new SchemaAndValue(Schema.BYTES_SCHEMA, input)); + SinkRecord transformedRecord = this.transform.apply(inputRecord); + assertNotNull(transformedRecord); + assertNotNull(transformedRecord.value()); + assertTrue(transformedRecord.value() instanceof Struct); + Struct actual = (Struct) transformedRecord.value(); + log.info("actual = '{}'", actual); + } + + @Test + public void validate() throws IOException { + byte[] input = ByteStreams.toByteArray(this.getClass().getResourceAsStream( + "basic.data.json" + )); + File schemaFile = new File("src/test/resources/com/github/jcustenborder/kafka/connect/json/geo.schema.json"); + Map settings = ImmutableMap.of( + FromJsonConfig.SCHEMA_URL_CONF, schemaFile.toURI().toString(), + FromJsonConfig.VALIDATE_JSON_ENABLED_CONF, "true" + ); + this.transform.configure(settings); + SinkRecord inputRecord = SinkRecordHelper.write("foo", new SchemaAndValue(Schema.STRING_SCHEMA, "foo"), new SchemaAndValue(Schema.BYTES_SCHEMA, input)); + DataException exception = assertThrows(DataException.class, () -> { + SinkRecord transformedRecord = this.transform.apply(inputRecord); + }); + + assertTrue(exception.getMessage().contains("required key [latitude] not found")); + assertTrue(exception.getMessage().contains("required key [longitude] not found")); + } + + @Test + public void foo() { + String timestamp = "2020-01-07 04:47:05.0000000"; + DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSS") + .withZone(ZoneId.of("UTC")); + log.info(dateFormat.format(LocalDateTime.now())); + ZonedDateTime dateTime = ZonedDateTime.parse(timestamp, dateFormat); + } } diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/json/customdate.data.json b/src/test/resources/com/github/jcustenborder/kafka/connect/json/customdate.data.json new file mode 100644 index 0000000..6b177b4 --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/json/customdate.data.json @@ -0,0 +1,3 @@ +{ + "Timestamp": "2020-01-07 04:47:05.0000000" +} \ No newline at end of file diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/json/customdate.schema.json b/src/test/resources/com/github/jcustenborder/kafka/connect/json/customdate.schema.json new file mode 100644 index 0000000..31af2c1 --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/json/customdate.schema.json @@ -0,0 +1,14 @@ +{ + "$id": "https://example.com/person.schema.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Person", + "type": "object", + "properties": { + "Timestamp": { + "type": "string", + "description": "Timestamp", + "format": "custom-timestamp", + "dateTimeFormat": "yyyy-MM-dd HH:mm:ss.SSSSSSS" + } + } +} \ No newline at end of file