diff --git a/Jenkinsfile.example b/Jenkinsfile similarity index 100% rename from Jenkinsfile.example rename to Jenkinsfile diff --git a/README.md b/README.md index d95e94b..39f18d5 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,18 @@ # Introduction +[Documentation](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-json-schema) -This is a template repository for creating Kafka Connect Plugins. +Installation through the [Confluent Hub Client](https://docs.confluent.io/current/connect/managing/confluent-hub/client.html) +This plugin is used to add additional JSON parsing functionality to Kafka Connect. +## [From Json transformation](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-json-schema/transformations/FromJson.html) +The FromJson will read JSON data that is in string on byte form and parse the data to a connect structure based on the JSON schema provided. + +# Development + +## Building the source + +```bash +mvn clean package +``` \ No newline at end of file diff --git a/bin/debug.sh b/bin/debug.sh index 4987a15..e45fa22 100644 --- a/bin/debug.sh +++ b/bin/debug.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash # -# Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) +# Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/config/connect-avro-docker.properties b/config/connect-avro-docker.properties index 8a0eec5..d4b5539 100644 --- a/config/connect-avro-docker.properties +++ b/config/connect-avro-docker.properties @@ -1,5 +1,5 @@ # -# Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) +# Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/docker-compose.yml b/docker-compose.yml index 2f76d59..359b883 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,5 @@ # -# Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) +# Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,13 +17,13 @@ version: "2" services: zookeeper: - image: confluentinc/cp-zookeeper:5.2.2 + image: confluentinc/cp-zookeeper:5.4.0 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: - image: confluentinc/cp-kafka:5.2.2 + image: confluentinc/cp-kafka:5.4.0 depends_on: - zookeeper ports: @@ -33,7 +33,7 @@ services: KAFKA_ADVERTISED_LISTENERS: "plaintext://kafka:9092" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 schema-registry: - image: confluentinc/cp-schema-registry:5.2.2 + image: confluentinc/cp-schema-registry:5.4.0 depends_on: - kafka - zookeeper diff --git a/pom.xml b/pom.xml index b4ba3bb..d3c13d2 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,7 @@ diff --git a/src/main/connect-config-classes/BaseConnectorConfig.json b/src/main/connect-config-classes/BaseConnectorConfig.json deleted file mode 100644 index 2bb1fb2..0000000 --- a/src/main/connect-config-classes/BaseConnectorConfig.json +++ /dev/null @@ -1,61 +0,0 @@ -{ - "name": "com.github.jcustenborder.kafka.connect.example.BaseConnectorConfig", - "prefix": "example.", - "groups": [ - { - "name": "SESSION", - "display": "Session Configuration", - "prefix": "session.", - "configItems": [ - { - "configKey": "servers", - "type": "LIST", - "documentation": "Server(s) to connect to.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": [ - "http://localhost:8081" - ] - } - ] - }, - { - "name": "SSL", - "display": "SSL Configuration", - "prefix": "ssl.", - "configItems": [ - { - "configKey": "keystore.path", - "type": "STRING", - "documentation": "Location of the Java keystore to use.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": "" - }, - { - "configKey": "keystore.password", - "type": "PASSWORD", - "documentation": "Location of the Java keystore to use.", - "importance": "HIGH", - "width": "MEDIUM" - }, - { - "configKey": "truststore.path", - "type": "STRING", - "documentation": "Location of the Java truststore to use.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": "" - }, - { - "configKey": "truststore.password", - "type": "PASSWORD", - "documentation": "Location of the Java truststore to use.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": "changeit" - } - ] - } - ] -} \ No newline at end of file diff --git a/src/main/connect-config-classes/ExampleSinkConnectorConfig.json b/src/main/connect-config-classes/ExampleSinkConnectorConfig.json deleted file mode 100644 index ab3677c..0000000 --- a/src/main/connect-config-classes/ExampleSinkConnectorConfig.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "name": "com.github.jcustenborder.kafka.connect.example.ExampleSinkConnectorConfig", - "extends": "com.github.jcustenborder.kafka.connect.example.BaseConnectorConfig", - "prefix": "myconn.", - "groups": [ - { - "name": "TARGET", - "display": "Target Configuration", - "prefix": "target.", - "configItems": [ - { - "configKey": "servers", - "type": "LIST", - "documentation": "Server(s) to connect to.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": [ - "http://localhost:8081" - ] - } - ] - } - ] -} \ No newline at end of file diff --git a/src/main/connect-config-classes/ExampleSourceConnectorConfig.json b/src/main/connect-config-classes/ExampleSourceConnectorConfig.json deleted file mode 100644 index a178aa7..0000000 --- a/src/main/connect-config-classes/ExampleSourceConnectorConfig.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "name": "com.github.jcustenborder.kafka.connect.example.ExampleSourceConnectorConfig", - "extends": "com.github.jcustenborder.kafka.connect.example.BaseConnectorConfig", - "prefix": "myconn.", - "groups": [ - { - "name": "TARGET", - "display": "Target Configuration", - "prefix": "target.", - "configItems": [ - { - "configKey": "servers", - "type": "LIST", - "documentation": "Server(s) to connect to.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": [ - "http://localhost:8081" - ] - } - ] - } - ] -} \ No newline at end of file diff --git a/src/main/connect-config-classes/ExampleTransformationConfig.json b/src/main/connect-config-classes/ExampleTransformationConfig.json deleted file mode 100644 index 15ae5f3..0000000 --- a/src/main/connect-config-classes/ExampleTransformationConfig.json +++ /dev/null @@ -1,61 +0,0 @@ -{ - "name": "com.github.jcustenborder.kafka.connect.example.ExampleTransformationConfig", - "prefix": "example.", - "groups": [ - { - "name": "SESSION", - "display": "Session Configuration", - "prefix": "session.", - "configItems": [ - { - "configKey": "servers", - "type": "LIST", - "documentation": "Server(s) to connect to.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": [ - "http://localhost:8081" - ] - } - ] - }, - { - "name": "SSL", - "display": "SSL Configuration", - "prefix": "ssl.", - "configItems": [ - { - "configKey": "keystore.path", - "type": "STRING", - "documentation": "Location of the Java keystore to use.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": "" - }, - { - "configKey": "keystore.password", - "type": "PASSWORD", - "documentation": "Location of the Java keystore to use.", - "importance": "HIGH", - "width": "MEDIUM" - }, - { - "configKey": "truststore.path", - "type": "STRING", - "documentation": "Location of the Java truststore to use.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": "" - }, - { - "configKey": "truststore.password", - "type": "PASSWORD", - "documentation": "Location of the Java truststore to use.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": "changeit" - } - ] - } - ] -} \ No newline at end of file diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkConnector.java deleted file mode 100644 index b811770..0000000 --- a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkConnector.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.github.jcustenborder.kafka.connect.example; - -import com.github.jcustenborder.kafka.connect.utils.VersionUtil; -import com.github.jcustenborder.kafka.connect.utils.config.Description; -import com.github.jcustenborder.kafka.connect.utils.config.TaskConfigs; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.connector.Task; -import org.apache.kafka.connect.sink.SinkConnector; - -import java.util.List; -import java.util.Map; - -@Description("This is the description of the connector.") -public class ExampleSinkConnector extends SinkConnector { - Map settings; - - @Override - public void start(Map settings) { - ExampleSinkConnectorConfig config = new ExampleSinkConnectorConfig(settings); - - /** - * Do whatever you need to do to setup your connector on a global scale. This is something that - * will execute once per connector instance. - */ - - this.settings = settings; - } - - @Override - public Class taskClass() { - return ExampleSinkTask.class; - } - - @Override - public List> taskConfigs(int maxTasks) { - return TaskConfigs.multiple(this.settings, maxTasks); - } - - @Override - public void stop() { - - } - - @Override - public ConfigDef config() { - return ExampleSinkConnectorConfig.config(ExampleSinkConnectorConfig.DEFAULT_CONFIG_OPTIONS); - } - - @Override - public String version() { - return VersionUtil.version(this.getClass()); - } -} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkTask.java deleted file mode 100644 index 26fe51b..0000000 --- a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkTask.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.github.jcustenborder.kafka.connect.example; - -import com.github.jcustenborder.kafka.connect.utils.VersionUtil; -import org.apache.kafka.connect.sink.SinkRecord; -import org.apache.kafka.connect.sink.SinkTask; - -import java.util.Collection; -import java.util.Map; - -public class ExampleSinkTask extends SinkTask { - @Override - public String version() { - return VersionUtil.version(this.getClass()); - } - - ExampleSinkConnectorConfig config; - - @Override - public void start(Map settings) { - this.config = new ExampleSinkConnectorConfig(settings); - /** - * Do whatever you need to do to setup each of the tasks that the connector launches. - */ - } - - @Override - public void put(Collection records) { - - } - - @Override - public void stop() { - - } -} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceConnector.java deleted file mode 100644 index ff1a75f..0000000 --- a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceConnector.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.github.jcustenborder.kafka.connect.example; - -import com.github.jcustenborder.kafka.connect.utils.VersionUtil; -import com.github.jcustenborder.kafka.connect.utils.config.Description; -import com.github.jcustenborder.kafka.connect.utils.config.TaskConfigs; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.connector.Task; -import org.apache.kafka.connect.source.SourceConnector; - -import java.util.List; -import java.util.Map; - -@Description("This connector is used to pull data from a source system and write it " + - "to Kafka.") -public class ExampleSourceConnector extends SourceConnector { - - Map settings; - - @Override - public void start(Map settings) { - ExampleSourceConnectorConfig config = new ExampleSourceConnectorConfig(settings); - /** - * Do whatever you need to do to setup your connector on a global scale. This is something that - * will execute once per connector instance. - */ - this.settings = settings; - } - - @Override - public Class taskClass() { - return ExampleSourceTask.class; - } - - @Override - public List> taskConfigs(int maxTasks) { - return TaskConfigs.multiple(this.settings, maxTasks); - } - - @Override - public void stop() { - - } - - @Override - public ConfigDef config() { - return ExampleSourceConnectorConfig.config(ExampleSourceConnectorConfig.DEFAULT_CONFIG_OPTIONS); - } - - @Override - public String version() { - return VersionUtil.version(this.getClass()); - } -} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceTask.java deleted file mode 100644 index a8d2436..0000000 --- a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceTask.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.github.jcustenborder.kafka.connect.example; - -import com.github.jcustenborder.kafka.connect.utils.VersionUtil; -import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.source.SourceTask; - -import java.util.List; -import java.util.Map; - -public class ExampleSourceTask extends SourceTask { - @Override - public String version() { - return VersionUtil.version(this.getClass()); - } - - ExampleSourceConnectorConfig config; - - @Override - public void start(Map settings) { - this.config = new ExampleSourceConnectorConfig(settings); - } - - @Override - public List poll() throws InterruptedException { - - return null; - } - - @Override - public void stop() { - - } -} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleTransformation.java b/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleTransformation.java deleted file mode 100644 index 8fe0349..0000000 --- a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleTransformation.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.github.jcustenborder.kafka.connect.example; - -import com.github.jcustenborder.kafka.connect.utils.config.Description; -import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.connector.ConnectRecord; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.data.Struct; - -import java.util.Map; - -@Description("This is an example transformation.") -public abstract class ExampleTransformation> extends BaseKeyValueTransformation { - protected ExampleTransformation(boolean isKey) { - super(isKey); - } - - public static class Key> extends ExampleTransformation { - public Key() { - super(true); - } - } - - public static class Value> extends ExampleTransformation { - public Value() { - super(false); - } - } - - @Override - public ConfigDef config() { - return new ConfigDef(); - } - - @Override - public void close() { - - } - - @Override - public void configure(Map map) { - - } - - @Override - protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { - //TODO; Do something - return null; - } -} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/DecimalFormatValidator.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/DecimalFormatValidator.java new file mode 100644 index 0000000..97afbf9 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/DecimalFormatValidator.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import org.everit.json.schema.FormatValidator; + +import java.util.Optional; + +class DecimalFormatValidator implements FormatValidator { + @Override + public Optional validate(String s) { + return Optional.empty(); + } + + @Override + public String formatName() { + return "decimal"; + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromConnectConversionKey.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromConnectConversionKey.java new file mode 100644 index 0000000..8967e42 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromConnectConversionKey.java @@ -0,0 +1,81 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; + +public class FromConnectConversionKey { + public final Schema.Type type; + public final String schemaName; + public final Integer scale; + + private FromConnectConversionKey(Schema.Type type, String schemaName, Integer scale) { + this.type = type; + this.schemaName = schemaName; + this.scale = scale; + } + + public static FromConnectConversionKey of(Schema schema) { + Integer scale; + if (Decimal.LOGICAL_NAME.equals(schema.name())) { + String scaleText = schema.parameters().get(Decimal.SCALE_FIELD); + scale = Integer.parseInt(scaleText); + } else { + scale = null; + } + return of(schema.type(), schema.name(), scale); + } + + public static FromConnectConversionKey of(Schema.Type type) { + return new FromConnectConversionKey(type, null, null); + } + + public static FromConnectConversionKey of(Schema.Type type, String schemaName) { + return new FromConnectConversionKey(type, schemaName, null); + } + + public static FromConnectConversionKey of(Schema.Type type, String schemaName, Integer scale) { + return new FromConnectConversionKey(type, schemaName, scale); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FromConnectConversionKey that = (FromConnectConversionKey) o; + return type == that.type && + Objects.equal(schemaName, that.schemaName) && + Objects.equal(scale, that.scale); + } + + @Override + public int hashCode() { + return Objects.hashCode(type, schemaName, scale); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("type", type) + .add("schemaName", schemaName) + .add("scale", scale) + .toString(); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromConnectSchemaConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromConnectSchemaConverter.java new file mode 100644 index 0000000..b23997f --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromConnectSchemaConverter.java @@ -0,0 +1,267 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.google.common.base.Charsets; +import com.google.common.base.MoreObjects; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +public class FromConnectSchemaConverter { + static final Map> PRIMITIVE_TYPES; + static final Map PRIMITIVE_VISITORS; + private static final Logger log = LoggerFactory.getLogger(FromConnectSchemaConverter.class); + + private static void addType( + Map primitiveVisitors, + Map> primitiveTypes, + FromConnectConversionKey key, + FromConnectVisitor visitor, + ImmutableMap properties) { + + primitiveTypes.put(key, properties); + primitiveVisitors.put(key, visitor); + } + + static { + Map visitors = new LinkedHashMap<>(); + Map> types = new LinkedHashMap<>(); + addType( + visitors, types, + FromConnectConversionKey.of(Type.BOOLEAN), + new FromConnectVisitor.BooleanVisitor(), + ImmutableMap.of("type", "boolean") + ); + addType( + visitors, types, + FromConnectConversionKey.of(Type.BYTES), + new FromConnectVisitor.BytesVisitor(), + ImmutableMap.of("type", "string", "contentEncoding", "base64") + ); + addType( + visitors, types, + FromConnectConversionKey.of(Type.FLOAT32), + new FromConnectVisitor.FloatVisitor(), + ImmutableMap.of("type", "number") + ); + addType( + visitors, types, + FromConnectConversionKey.of(Type.FLOAT64), + new FromConnectVisitor.FloatVisitor(), + ImmutableMap.of("type", "number") + ); + Stream.of(Type.INT8, Type.INT16, Type.INT32, Type.INT64) + .forEach(type -> { + addType( + visitors, types, + FromConnectConversionKey.of(type), + new FromConnectVisitor.IntegerVisitor(), + ImmutableMap.of("type", "integer") + ); + }); + IntStream.range(0, 100) + .forEach(scale -> { + Schema decimalSchema = Decimal.schema(scale); + addType( + visitors, types, + FromConnectConversionKey.of(decimalSchema), + new FromConnectVisitor.DecimalVisitor(scale), + ImmutableMap.of( + "type", "string", + "format", "decimal", + "scale", Integer.toString(scale) + ) + ); + }); + + visitors.put(FromConnectConversionKey.of(Type.STRING), new FromConnectVisitor.StringVisitor()); + types.put(FromConnectConversionKey.of(Type.STRING), ImmutableMap.of("type", "string")); + + visitors.put(FromConnectConversionKey.of(Date.SCHEMA), new FromConnectVisitor.DateVisitor()); + types.put(FromConnectConversionKey.of(Date.SCHEMA), ImmutableMap.of("type", "string", "format", "date")); + + visitors.put(FromConnectConversionKey.of(Time.SCHEMA), new FromConnectVisitor.TimeVisitor()); + types.put(FromConnectConversionKey.of(Time.SCHEMA), ImmutableMap.of("type", "string", "format", "time")); + + visitors.put(FromConnectConversionKey.of(Timestamp.SCHEMA), new FromConnectVisitor.DateTimeVisitor()); + types.put(FromConnectConversionKey.of(Timestamp.SCHEMA), ImmutableMap.of("type", "string", "format", "date-time")); + PRIMITIVE_TYPES = ImmutableMap.copyOf(types); + PRIMITIVE_VISITORS = ImmutableMap.copyOf(visitors); + } + + public static FromConnectState toJsonSchema(org.apache.kafka.connect.data.Schema schema, String headerName) { + Map definitions = new LinkedHashMap<>(); + List visitors = new ArrayList<>(); + JSONObject result = toJsonSchema(schema, definitions, visitors); + result.put("$schema", "http://json-schema.org/draft-07/schema#"); + if (!definitions.isEmpty()) { + //definitions + JSONObject definitionsObject = new JSONObject(); + definitions.forEach((definitionName, definition) -> { + definitionsObject.put(definition.name(), definition.jsonSchema()); + }); + result.put("definitions", definitionsObject); + } + + Header header = new RecordHeader( + headerName, + result.toString().getBytes(Charsets.UTF_8) + ); + + FromConnectVisitor visitor = visitors.get(0); + return FromConnectState.of(header, visitor); + } + + private static JSONObject toJsonSchema(org.apache.kafka.connect.data.Schema schema, Map definitions, List visitors) { + JSONObject result = new JSONObject(); + if (!Strings.isNullOrEmpty(schema.doc())) { + result.put("description", schema.doc()); + } + FromConnectConversionKey key = FromConnectConversionKey.of(schema); + log.trace("toJsonSchema() - Checking for '{}'", key); + Map primitiveType = PRIMITIVE_TYPES.get(key); + if (null != primitiveType) { + primitiveType.forEach(result::put); + FromConnectVisitor visitor = PRIMITIVE_VISITORS.get(key); + visitors.add(visitor); + return result; + } + + if (!Strings.isNullOrEmpty(schema.name())) { + result.put("title", schema.name()); + } + + + if (Type.ARRAY == schema.type()) { + result.put("type", "array"); + FromConnectVisitor elementVisitor; + if (Type.STRUCT == schema.valueSchema().type()) { + Definition definition = definitions.computeIfAbsent(schema.valueSchema(), s -> { + List childVisitors = new ArrayList<>(); + JSONObject fieldJsonSchema = toJsonSchema(schema.valueSchema(), definitions, childVisitors); + String definitionName = schema.valueSchema().name().toLowerCase(); + return Definition.of(fieldJsonSchema, definitionName, childVisitors); + }); + result.put("items", definition.ref()); + elementVisitor = definition.visitors().get(0); + } else { + List childVisitors = new ArrayList<>(); + JSONObject arrayValueSchema = toJsonSchema(schema.valueSchema(), definitions, childVisitors); + elementVisitor = childVisitors.get(0); + result.put("items", arrayValueSchema); + } + visitors.add(new FromConnectVisitor.ArrayVisitor(elementVisitor)); + } + if (Type.STRUCT == schema.type()) { + List requiredFields = new ArrayList<>(schema.fields().size()); + Map properties = new LinkedHashMap<>(schema.fields().size()); + Map structVisitors = new LinkedHashMap<>(); + schema.fields().forEach(field -> { + log.trace("toJsonSchema() - field:{} type:{}", field.name(), field.schema().type()); + List childVisitors = new ArrayList<>(); + if (!field.schema().isOptional()) { + requiredFields.add(field.name()); + } + if (Type.STRUCT == field.schema().type()) { + Definition definition = definitions.computeIfAbsent(field.schema(), s -> { + List definitionVisitors = new ArrayList<>(); + JSONObject fieldJsonSchema = toJsonSchema(field.schema(), definitions, definitionVisitors); + String definitionName = field.schema().name().toLowerCase(); + return Definition.of(fieldJsonSchema, definitionName, definitionVisitors); + }); + childVisitors.addAll(definition.visitors()); + properties.put(field.name(), definition.ref()); + } else { + JSONObject fieldJsonSchema = toJsonSchema(field.schema(), definitions, childVisitors); + properties.put(field.name(), fieldJsonSchema); + } + FromConnectVisitor fieldVisitor = childVisitors.get(0); + structVisitors.put(field.name(), fieldVisitor); + }); + result.put("properties", properties); + result.put("required", requiredFields); + result.put("type", "object"); + visitors.add(new FromConnectVisitor.StructVisitor(structVisitors)); + } + + + log.trace("toJsonSchema() - '{}' is not primitive.", schema.type()); + + return result; + } + + static class Definition { + private final JSONObject jsonSchema; + private final String name; + private final List visitors; + + + private Definition(JSONObject jsonSchema, String name, List visitors) { + this.jsonSchema = jsonSchema; + this.name = name; + this.visitors = visitors; + } + + public static Definition of(JSONObject jsonSchema, String ref, List visitors) { + return new Definition(jsonSchema, ref, visitors); + } + + public JSONObject jsonSchema() { + return this.jsonSchema; + } + + public String name() { + return this.name; + } + + public JSONObject ref() { + return new JSONObject() + .put("$ref", String.format("#/definitions/%s", this.name)); + } + + public List visitors() { + return this.visitors; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("jsonSchema", jsonSchema) + .add("name", name) + .toString(); + } + } + + +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromConnectState.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromConnectState.java new file mode 100644 index 0000000..fe6ef02 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromConnectState.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import org.apache.kafka.common.header.Header; + +class FromConnectState { + final Header header; + final FromConnectVisitor visitor; + + + private FromConnectState(Header header, FromConnectVisitor visitor) { + this.header = header; + this.visitor = visitor; + } + + public static FromConnectState of(Header header, FromConnectVisitor visitor) { + return new FromConnectState(header, visitor); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromConnectVisitor.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromConnectVisitor.java new file mode 100644 index 0000000..f953353 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromConnectVisitor.java @@ -0,0 +1,158 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.google.common.io.BaseEncoding; +import org.apache.kafka.connect.data.Struct; + +import java.io.IOException; +import java.math.BigDecimal; +import java.text.DecimalFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; + +public abstract class FromConnectVisitor { + public abstract void doVisit(JsonGenerator jsonGenerator, T value) throws IOException; + + public static class StringVisitor extends FromConnectVisitor { + @Override + public void doVisit(JsonGenerator jsonGenerator, String value) throws IOException { + jsonGenerator.writeString(value); + } + } + + public static class StructVisitor extends FromConnectVisitor { + final Map visitors; + + public StructVisitor(Map visitors) { + this.visitors = visitors; + } + + @Override + public void doVisit(JsonGenerator jsonGenerator, Struct value) throws IOException { + jsonGenerator.writeStartObject(); + for (Map.Entry e : this.visitors.entrySet()) { + final String fieldName = e.getKey(); + final FromConnectVisitor visitor = e.getValue(); + final Object fieldValue = value.get(fieldName); + jsonGenerator.writeFieldName(fieldName); + visitor.doVisit(jsonGenerator, fieldValue); + } + jsonGenerator.writeEndObject(); + } + } + + public static class BooleanVisitor extends FromConnectVisitor { + @Override + public void doVisit(JsonGenerator jsonGenerator, Boolean value) throws IOException { + jsonGenerator.writeBoolean(value); + } + } + + public static class BytesVisitor extends FromConnectVisitor { + @Override + public void doVisit(JsonGenerator jsonGenerator, byte[] value) throws IOException { + jsonGenerator.writeString( + BaseEncoding.base64().encode(value) + ); + } + } + + public static class FloatVisitor extends FromConnectVisitor { + + @Override + public void doVisit(JsonGenerator jsonGenerator, Number value) throws IOException { + jsonGenerator.writeNumber(value.doubleValue()); + } + } + + public static class DateTimeVisitor extends FromConnectVisitor { + @Override + public void doVisit(JsonGenerator jsonGenerator, Date value) throws IOException { + jsonGenerator.writeString( + Utils.TIMESTAMP_FORMATTER.format( + value.toInstant() + ) + ); + } + } + + public static class DateVisitor extends FromConnectVisitor { + @Override + public void doVisit(JsonGenerator jsonGenerator, Date value) throws IOException { + jsonGenerator.writeString( + Utils.DATE_FORMATTER.format( + value.toInstant() + ) + ); + } + } + + public static class TimeVisitor extends FromConnectVisitor { + @Override + public void doVisit(JsonGenerator jsonGenerator, Date value) throws IOException { + jsonGenerator.writeString( + Utils.TIME_FORMATTER.format( + value.toInstant() + ) + ); + } + } + + public static class IntegerVisitor extends FromConnectVisitor { + + + @Override + public void doVisit(JsonGenerator jsonGenerator, Number value) throws IOException { + jsonGenerator.writeNumber(value.longValue()); + } + } + + public static class ArrayVisitor extends FromConnectVisitor { + final FromConnectVisitor elementVisitor; + + public ArrayVisitor(FromConnectVisitor elementVisitor) { + this.elementVisitor = elementVisitor; + } + + @Override + public void doVisit(JsonGenerator jsonGenerator, List value) throws IOException { + jsonGenerator.writeStartArray(); + for (Object o : value) { + this.elementVisitor.doVisit(jsonGenerator, o); + } + jsonGenerator.writeEndArray(); + } + } + + public static class DecimalVisitor extends FromConnectVisitor { + DecimalFormat decimalFormat; + + public DecimalVisitor(int scale) { + this.decimalFormat = new DecimalFormat("#"); + this.decimalFormat.setMaximumFractionDigits(scale); + } + + @Override + public void doVisit(JsonGenerator jsonGenerator, BigDecimal value) throws IOException { + jsonGenerator.writeString( + this.decimalFormat.format(value) + ); + } + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJson.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJson.java new file mode 100644 index 0000000..85d1a2a --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJson.java @@ -0,0 +1,122 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.Title; +import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.DataException; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +@Title("From Json transformation") +@Description("The FromJson will read JSON data that is in string on byte form and parse the data to " + + "a connect structure based on the JSON schema provided.") +public class FromJson> extends BaseKeyValueTransformation { + FromJsonConfig config; + + protected FromJson(boolean isKey) { + super(isKey); + } + + @Override + public ConfigDef config() { + return FromJsonConfig.config(); + } + + @Override + public void close() { + + } + + SchemaAndValue processJsonNode(R record, Schema inputSchema, JsonNode node) { + Object result = this.fromJsonState.visitor.visit(node); + return new SchemaAndValue(this.fromJsonState.schema, result); + } + + @Override + protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) { + try { + JsonNode node = this.objectMapper.readValue(input, JsonNode.class); + return processJsonNode(record, inputSchema, node); + } catch (IOException e) { + throw new DataException(e); + } + } + + @Override + protected SchemaAndValue processString(R record, Schema inputSchema, String input) { + try { + JsonNode node = this.objectMapper.readValue(input, JsonNode.class); + return processJsonNode(record, inputSchema, node); + } catch (IOException e) { + throw new DataException(e); + } + } + + FromJsonState fromJsonState; + ObjectMapper objectMapper; + + @Override + public void configure(Map map) { + this.config = new FromJsonConfig(map); + + org.everit.json.schema.Schema schema; + if (FromJsonConfig.SchemaLocation.Url == this.config.schemaLocation) { + try { + try (InputStream inputStream = this.config.schemaUrl.openStream()) { + schema = Utils.loadSchema(inputStream); + } + } catch (IOException e) { + ConfigException exception = new ConfigException(FromJsonConfig.SCHEMA_URL_CONF, this.config.schemaUrl, "exception while loading schema"); + exception.initCause(e); + throw exception; + } + } else if (FromJsonConfig.SchemaLocation.Inline == this.config.schemaLocation) { + schema = Utils.loadSchema(this.config.schemaText); + } else { + throw new ConfigException( + FromJsonConfig.SCHEMA_LOCATION_CONF, + this.config.schemaLocation.toString(), + "Location is not supported" + ); + } + + this.fromJsonState = FromJsonSchemaConverter.fromJSON(schema); + this.objectMapper = JacksonFactory.create(); + } + + public static class Key> extends FromJson { + public Key() { + super(true); + } + } + + public static class Value> extends FromJson { + public Value() { + super(false); + } + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConfig.java new file mode 100644 index 0000000..2c36e4d --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConfig.java @@ -0,0 +1,88 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; +import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils; +import com.github.jcustenborder.kafka.connect.utils.config.recommenders.Recommenders; +import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.net.URL; +import java.util.Map; + +class FromJsonConfig extends AbstractConfig { + public static final String SCHEMA_URL_CONF = "json.schema.url"; + static final String SCHEMA_URL_DOC = "Url to retrieve the schema from."; + public static final String SCHEMA_INLINE_CONF = "json.schema.inline"; + static final String SCHEMA_INLINE_DOC = "The JSON schema to use as an escaped string."; + public static final String SCHEMA_LOCATION_CONF = "json.schema.location"; + static final String SCHEMA_LOCATION_DOC = "Location to retrieve the schema from. `Url` is used " + + "to retrieve the schema from a url. `Inline` is used to read the schema from the `" + + SCHEMA_INLINE_CONF + "` configuration value."; + + public enum SchemaLocation { + Url, + Inline + } + + public final URL schemaUrl; + public final SchemaLocation schemaLocation; + public final String schemaText; + + + public FromJsonConfig(Map originals) { + super(config(), originals); + this.schemaUrl = ConfigUtils.url(this, SCHEMA_URL_CONF); + this.schemaLocation = ConfigUtils.getEnum(SchemaLocation.class, this, SCHEMA_LOCATION_CONF); + this.schemaText = getString(SCHEMA_INLINE_CONF); + } + + public static void addConfigItems(ConfigDef configDef) { + configDef.define( + ConfigKeyBuilder.of(SCHEMA_URL_CONF, ConfigDef.Type.STRING) + .documentation(SCHEMA_URL_DOC) + .validator(Validators.validUrl()) + .importance(ConfigDef.Importance.HIGH) + .defaultValue("File:///doesNotExist") + .build() + ) + .define( + ConfigKeyBuilder.of(SCHEMA_LOCATION_CONF, ConfigDef.Type.STRING) + .documentation(SCHEMA_LOCATION_DOC) + .validator(Validators.validEnum(SchemaLocation.class)) + .recommender(Recommenders.enumValues(SchemaLocation.class)) + .importance(ConfigDef.Importance.HIGH) + .defaultValue(SchemaLocation.Url.toString()) + .build() + ) + .define( + ConfigKeyBuilder.of(SCHEMA_INLINE_CONF, ConfigDef.Type.STRING) + .documentation(SCHEMA_INLINE_DOC) + .recommender(Recommenders.visibleIf(SCHEMA_LOCATION_CONF, SchemaLocation.Inline.toString())) + .importance(ConfigDef.Importance.HIGH) + .defaultValue("") + .build() + ); + } + + public static ConfigDef config() { + ConfigDef configDef = new ConfigDef(); + addConfigItems(configDef); + return configDef; + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConversionKey.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConversionKey.java new file mode 100644 index 0000000..f7d2851 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonConversionKey.java @@ -0,0 +1,140 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import org.everit.json.schema.NumberSchema; +import org.everit.json.schema.Schema; +import org.everit.json.schema.StringSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class FromJsonConversionKey { + static final String UNNAMED_FORMAT = "unnamed-format"; + private static final Logger log = LoggerFactory.getLogger(FromJsonConversionKey.class); + final Class schemaClass; + final String format; + final Boolean requiresInteger; + final String contentEncoding; + + private FromJsonConversionKey(Class schemaClass, String format, Boolean requiresInteger, String contentEncoding) { + this.schemaClass = schemaClass; + this.format = format; + this.requiresInteger = requiresInteger; + this.contentEncoding = contentEncoding; + } + + public static FromJsonConversionKey of(org.everit.json.schema.Schema jsonSchema) { + String format; + Boolean requiresInteger; + String contentEncoding; + if (jsonSchema instanceof StringSchema) { + StringSchema stringSchema = (StringSchema) jsonSchema; + format = UNNAMED_FORMAT.equals(stringSchema.getFormatValidator().formatName()) ? null : stringSchema.getFormatValidator().formatName(); + contentEncoding = (String) stringSchema.getUnprocessedProperties().get("contentEncoding"); + requiresInteger = null; + log.trace("jsonSchema = '{}' format = '{}'", jsonSchema, format); + } else if (jsonSchema instanceof NumberSchema) { + NumberSchema numberSchema = (NumberSchema) jsonSchema; + requiresInteger = numberSchema.requiresInteger(); + format = null; + contentEncoding = null; + } else { + format = null; + requiresInteger = null; + contentEncoding = null; + } + + return new FromJsonConversionKey(jsonSchema.getClass(), format, requiresInteger, contentEncoding); + } + + public static Builder from(Class schemaClass) { + return new Builder().schemaClass(schemaClass); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FromJsonConversionKey that = (FromJsonConversionKey) o; + return Objects.equal(schemaClass, that.schemaClass) && + Objects.equal(format, that.format) && + Objects.equal(requiresInteger, that.requiresInteger) && + Objects.equal(contentEncoding, that.contentEncoding); + } + + @Override + public int hashCode() { + return Objects.hashCode(schemaClass, format, requiresInteger, contentEncoding); + } + + // public static ConversionKey of(Class schemaClass) { +// return new ConversionKey(schemaClass, null, null, contentMediaType); +// } +// +// public static ConversionKey of(Class schemaClass, String format) { +// return new ConversionKey(schemaClass, format, null, contentMediaType); +// } +// +// public static ConversionKey of(Class schemaClass, Boolean requiesInteger) { +// return new ConversionKey(schemaClass, null, requiesInteger, contentMediaType); +// } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("schemaClass", schemaClass) + .add("format", format) + .add("requiresInteger", requiresInteger) + .add("contentEncoding", contentEncoding) + .toString(); + } + + static class Builder { + Class schemaClass; + String format; + Boolean requiresInteger; + String contentEncoding; + private Builder() { + } + + public Builder schemaClass(Class schemaClass) { + this.schemaClass = schemaClass; + return this; + } + + public Builder format(String format) { + this.format = format; + return this; + } + + public Builder requiresInteger(Boolean requiresInteger) { + this.requiresInteger = requiresInteger; + return this; + } + + public Builder contentEncoding(String contentMediaType) { + this.contentEncoding = contentMediaType; + return this; + } + + public FromJsonConversionKey build() { + return new FromJsonConversionKey(this.schemaClass, this.format, this.requiresInteger, this.contentEncoding); + } + } + +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java new file mode 100644 index 0000000..4e8f080 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverter.java @@ -0,0 +1,403 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.BooleanNode; +import com.fasterxml.jackson.databind.node.NumericNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.everit.json.schema.ArraySchema; +import org.everit.json.schema.BooleanSchema; +import org.everit.json.schema.NumberSchema; +import org.everit.json.schema.ObjectSchema; +import org.everit.json.schema.ReferenceSchema; +import org.everit.json.schema.StringSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class FromJsonSchemaConverter { + static final Map< + FromJsonConversionKey, + FromJsonSchemaConverter + > LOOKUP; + private static final Logger log = LoggerFactory.getLogger(FromJsonSchemaConverter.class); + + static { + LOOKUP = Stream.of( + new ObjectSchemaConverter(), + new IntegerSchemaConverter(), + new StringSchemaConverter(), + new BooleanSchemaConverter(), + new TimeSchemaConverter(), + new DateSchemaConverter(), + new DateTimeSchemaConverter(), + new FloatSchemaConverter(), + new ArraySchemaConverter(), + new BytesSchemaConverter(), + new DecimalSchemaConverter() + ).collect(Collectors.toMap(FromJsonSchemaConverter::key, c -> c)); + } + + public static FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema) { + return fromJSON(jsonSchema, false); + } + + public static FromJsonState fromJSON(org.everit.json.schema.Schema jsonSchema, boolean isOptional) { + if (jsonSchema instanceof ReferenceSchema) { + ReferenceSchema referenceSchema = (ReferenceSchema) jsonSchema; + jsonSchema = referenceSchema.getReferredSchema(); + } + FromJsonConversionKey key = FromJsonConversionKey.of(jsonSchema); + + FromJsonSchemaConverter converter = LOOKUP.get(key); + + if (null == converter) { + throw new UnsupportedOperationException( + String.format("Schema type is not supported. %s:%s", jsonSchema.getClass().getName(), jsonSchema) + ); + } + + SchemaBuilder builder = converter.schemaBuilder(jsonSchema); + if (!Strings.isNullOrEmpty(jsonSchema.getTitle())) { + builder.name(jsonSchema.getTitle()); + } + if (!Strings.isNullOrEmpty(jsonSchema.getDescription())) { + builder.doc(jsonSchema.getDescription()); + } + if (isOptional) { + builder.optional(); + } + Map visitors = new LinkedHashMap<>(); + converter.fromJSON(builder, jsonSchema, visitors); + Schema schema = builder.build(); + FromJsonVisitor visitor = converter.jsonVisitor(schema, visitors); + return FromJsonState.of(schema, visitor); + } + + protected abstract SchemaBuilder schemaBuilder(T schema); + + protected abstract FromJsonConversionKey key(); + + protected abstract FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors); + + protected abstract void fromJSON(SchemaBuilder builder, T jsonSchema, Map visitors); + + static class BooleanSchemaConverter extends FromJsonSchemaConverter { + @Override + protected SchemaBuilder schemaBuilder(BooleanSchema schema) { + return SchemaBuilder.bool(); + } + + @Override + protected FromJsonConversionKey key() { + return FromJsonConversionKey.from(BooleanSchema.class).build(); + } + + @Override + protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { + return new FromJsonVisitor.BooleanVisitor(connectSchema); + } + + @Override + protected void fromJSON(SchemaBuilder builder, BooleanSchema jsonSchema, Map visitors) { + + } + } + + static class ObjectSchemaConverter extends FromJsonSchemaConverter { + + @Override + protected SchemaBuilder schemaBuilder(ObjectSchema schema) { + return SchemaBuilder.struct(); + } + + @Override + protected FromJsonConversionKey key() { + return FromJsonConversionKey.from(ObjectSchema.class).build(); + } + + @Override + protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { + return new FromJsonVisitor.StructVisitor(connectSchema, visitors); + } + + + @Override + protected void fromJSON(SchemaBuilder builder, ObjectSchema jsonSchema, Map visitors) { + Set requiredProperties = ImmutableSet.copyOf(jsonSchema.getRequiredProperties()); + jsonSchema.getPropertySchemas() + .entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .forEach(e -> { + final String propertyName = e.getKey(); + final org.everit.json.schema.Schema propertyJsonSchema = e.getValue(); + final boolean isOptional = !requiredProperties.contains(propertyName); + log.trace("fromJson() - Processing property '{}' '{}'", propertyName, propertyJsonSchema); + FromJsonState state = FromJsonSchemaConverter.fromJSON(propertyJsonSchema, isOptional); + builder.field(propertyName, state.schema); + visitors.put(propertyName, state.visitor); + }); + } + } + + static class IntegerSchemaConverter extends FromJsonSchemaConverter { + + @Override + protected SchemaBuilder schemaBuilder(NumberSchema schema) { + return SchemaBuilder.int64(); + } + + @Override + protected FromJsonConversionKey key() { + return FromJsonConversionKey.from(NumberSchema.class) + .requiresInteger(true) + .build(); + } + + @Override + protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { + return new FromJsonVisitor.IntegerVisitor(connectSchema); + } + + @Override + protected void fromJSON(SchemaBuilder builder, NumberSchema jsonSchema, Map visitors) { + log.trace("fromJson() - Processing '{}'", jsonSchema); + } + } + + static class FloatSchemaConverter extends FromJsonSchemaConverter { + + @Override + protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { + return new FromJsonVisitor.FloatVisitor(connectSchema); + } + + @Override + protected SchemaBuilder schemaBuilder(NumberSchema schema) { + return SchemaBuilder.float64(); + } + + @Override + protected FromJsonConversionKey key() { + return FromJsonConversionKey.from(NumberSchema.class) + .requiresInteger(false) + .build(); + } + + @Override + protected void fromJSON(SchemaBuilder builder, NumberSchema jsonSchema, Map visitors) { + log.trace("fromJson() - Processing '{}'", jsonSchema); + } + } + + static class StringSchemaConverter extends FromJsonSchemaConverter { + + @Override + protected SchemaBuilder schemaBuilder(StringSchema schema) { + return SchemaBuilder.string(); + } + + @Override + protected FromJsonConversionKey key() { + return FromJsonConversionKey.from(StringSchema.class).build(); + } + + @Override + protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { + return new FromJsonVisitor.StringVisitor(connectSchema); + } + + @Override + protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map visitors) { + log.trace("fromJson() - Processing '{}'", jsonSchema); + } + } + + static class DateSchemaConverter extends FromJsonSchemaConverter { + + @Override + protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { + return new FromJsonVisitor.DateVisitor(connectSchema); + } + + @Override + protected SchemaBuilder schemaBuilder(StringSchema schema) { + return Date.builder(); + } + + @Override + protected FromJsonConversionKey key() { + return FromJsonConversionKey.from(StringSchema.class) + .format("date") + .build(); + } + + @Override + protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map visitors) { + log.trace("fromJson() - Processing '{}'", jsonSchema); + } + } + + static class TimeSchemaConverter extends FromJsonSchemaConverter { + + @Override + protected SchemaBuilder schemaBuilder(StringSchema schema) { + return Time.builder(); + } + + @Override + protected FromJsonConversionKey key() { + return FromJsonConversionKey.from(StringSchema.class) + .format("time") + .build(); + } + + @Override + protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { + return new FromJsonVisitor.TimeVisitor(connectSchema); + } + + @Override + protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map visitors) { + log.trace("fromJson() - Processing '{}'", jsonSchema); + } + } + + static class DateTimeSchemaConverter extends FromJsonSchemaConverter { + + @Override + protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { + return new FromJsonVisitor.DateTimeVisitor(connectSchema); + } + + @Override + protected SchemaBuilder schemaBuilder(StringSchema schema) { + return Timestamp.builder(); + } + + @Override + protected FromJsonConversionKey key() { + return FromJsonConversionKey.from(StringSchema.class) + .format("date-time") + .build(); + } + + @Override + protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map visitors) { + log.trace("fromJson() - Processing '{}'", jsonSchema); + } + } + + static class BytesSchemaConverter extends FromJsonSchemaConverter { + + @Override + protected SchemaBuilder schemaBuilder(StringSchema schema) { + return SchemaBuilder.bytes(); + } + + @Override + protected FromJsonConversionKey key() { + return FromJsonConversionKey.from(StringSchema.class) + .contentEncoding("base64") + .build(); + } + + @Override + protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { + return new FromJsonVisitor.BytesVisitor(connectSchema); + } + + @Override + protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map visitors) { + + } + } + + static class DecimalSchemaConverter extends FromJsonSchemaConverter { + public DecimalSchemaConverter() { + + } + + @Override + protected SchemaBuilder schemaBuilder(StringSchema schema) { + int scale = Utils.scale(schema); + return Decimal.builder(scale); + } + + @Override + protected FromJsonConversionKey key() { + return FromJsonConversionKey.from(StringSchema.class) + .format("decimal") + .build(); + } + + @Override + protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { + int scale = Utils.scale(connectSchema); + return new FromJsonVisitor.DecimalVisitor(connectSchema, scale); + } + + @Override + protected void fromJSON(SchemaBuilder builder, StringSchema jsonSchema, Map visitors) { + + } + } + + static class ArraySchemaConverter extends FromJsonSchemaConverter { + + @Override + protected SchemaBuilder schemaBuilder(ArraySchema schema) { + FromJsonState state = FromJsonSchemaConverter.fromJSON(schema.getAllItemSchema()); + return SchemaBuilder.array(state.schema); + } + + @Override + protected FromJsonConversionKey key() { + return FromJsonConversionKey.from(ArraySchema.class).build(); + } + + @Override + protected FromJsonVisitor jsonVisitor(Schema connectSchema, Map visitors) { + FromJsonVisitor visitor = visitors.get("item"); + return new FromJsonVisitor.ArrayVisitor(connectSchema, visitor); + } + + @Override + protected void fromJSON(SchemaBuilder builder, ArraySchema jsonSchema, Map visitors) { + FromJsonState state = FromJsonSchemaConverter.fromJSON(jsonSchema.getAllItemSchema()); + visitors.put("item", state.visitor); + } + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonState.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonState.java new file mode 100644 index 0000000..8034ab8 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonState.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import org.apache.kafka.connect.data.Schema; + +class FromJsonState { + public final Schema schema; + public final FromJsonVisitor visitor; + + FromJsonState(Schema schema, FromJsonVisitor visitor) { + this.schema = schema; + this.visitor = visitor; + } + + public static FromJsonState of(Schema schema, FromJsonVisitor visitor) { + return new FromJsonState(schema, visitor); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonVisitor.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonVisitor.java new file mode 100644 index 0000000..5c4ed3f --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/FromJsonVisitor.java @@ -0,0 +1,228 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.BooleanNode; +import com.fasterxml.jackson.databind.node.NumericNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.google.common.io.BaseEncoding; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.DecimalFormat; +import java.text.ParseException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +public abstract class FromJsonVisitor { + protected final Schema schema; + + protected FromJsonVisitor(Schema schema) { + this.schema = schema; + } + + public V visit(T node) { + V result; + + if (null == node || node.isNull()) { + result = null; + } else { + result = doVisit(node); + } + + return result; + } + + protected abstract V doVisit(T node); + + public static class StringVisitor extends FromJsonVisitor { + public StringVisitor(Schema schema) { + super(schema); + } + + @Override + public String doVisit(TextNode node) { + return node.textValue(); + } + } + + public static class BooleanVisitor extends FromJsonVisitor { + public BooleanVisitor(Schema schema) { + super(schema); + } + + @Override + protected Boolean doVisit(BooleanNode node) { + return node.booleanValue(); + } + } + + public static class StructVisitor extends FromJsonVisitor { + private final Map visitors; + + public StructVisitor(Schema schema, Map visitors) { + super(schema); + this.visitors = visitors; + } + + @Override + protected Struct doVisit(ObjectNode node) { + Struct result = new Struct(this.schema); + visitors.forEach((fieldName, visitor) -> { + JsonNode rawValue = node.get(fieldName); + Object convertedValue = visitor.visit(rawValue); + result.put(fieldName, convertedValue); + }); + + return result; + } + } + + public static class IntegerVisitor extends FromJsonVisitor { + public IntegerVisitor(Schema schema) { + super(schema); + } + + @Override + protected Number doVisit(NumericNode node) { + return node.longValue(); + } + } + + public static class FloatVisitor extends FromJsonVisitor { + public FloatVisitor(Schema schema) { + super(schema); + } + + @Override + protected Number doVisit(NumericNode node) { + return node.doubleValue(); + } + } + + public static class DateTimeVisitor extends FromJsonVisitor { + private static final Logger log = LoggerFactory.getLogger(DateTimeVisitor.class); + + public DateTimeVisitor(Schema schema) { + super(schema); + } + + @Override + protected Date doVisit(TextNode node) { + log.trace(node.asText()); + LocalDateTime localDateTime = LocalDateTime.parse(node.asText(), Utils.TIMESTAMP_FORMATTER); + Instant instant = localDateTime.toInstant(ZoneOffset.UTC); + return Date.from(instant); + } + } + + public static class DateVisitor extends FromJsonVisitor { + private static final Logger log = LoggerFactory.getLogger(DateTimeVisitor.class); + + public DateVisitor(Schema schema) { + super(schema); + } + + @Override + protected Date doVisit(TextNode node) { + log.trace(node.asText()); + LocalDate localDateTime = LocalDate.parse(node.asText(), Utils.DATE_FORMATTER); + Instant instant = localDateTime.atStartOfDay().toInstant(ZoneOffset.UTC); + return Date.from(instant); + } + } + + public static class TimeVisitor extends FromJsonVisitor { + private static final Logger log = LoggerFactory.getLogger(DateTimeVisitor.class); + + public TimeVisitor(Schema schema) { + super(schema); + } + + @Override + protected Date doVisit(TextNode node) { + log.trace(node.asText()); + LocalTime localDateTime = LocalTime.parse(node.asText(), Utils.TIME_FORMATTER); + Instant instant = LocalDate.ofEpochDay(0).atTime(localDateTime).toInstant(ZoneOffset.UTC); + return Date.from(instant); + } + } + + public static class DecimalVisitor extends FromJsonVisitor { + final int scale; + final DecimalFormat decimalFormat; + + protected DecimalVisitor(Schema schema, int scale) { + super(schema); + this.scale = scale; + this.decimalFormat = new DecimalFormat("#"); + this.decimalFormat.setParseBigDecimal(true); + this.decimalFormat.setMinimumFractionDigits(scale); + } + + @Override + protected Number doVisit(TextNode node) { + try { + return this.decimalFormat.parse(node.asText()); + } catch (ParseException e) { + throw new DataException(e); + } + } + } + + public static class ArrayVisitor extends FromJsonVisitor { + final FromJsonVisitor itemVisitor; + + public ArrayVisitor(Schema schema, FromJsonVisitor itemVisitor) { + super(schema); + this.itemVisitor = itemVisitor; + } + + @Override + protected List doVisit(ArrayNode node) { + List result = new ArrayList(); + for (JsonNode jsonNode : node) { + Object value = itemVisitor.visit(jsonNode); + result.add(value); + } + return result; + } + } + + public static class BytesVisitor extends FromJsonVisitor { + public BytesVisitor(Schema schema) { + super(schema); + } + + @Override + protected byte[] doVisit(TextNode node) { + return BaseEncoding.base64().decode(node.textValue()); + } + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/JacksonFactory.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/JacksonFactory.java new file mode 100644 index 0000000..c5420d4 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/JacksonFactory.java @@ -0,0 +1,56 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; + +import java.io.IOException; +import java.text.DecimalFormat; + +public class JacksonFactory { + + public static ObjectMapper create() { + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper; + } + + static class SerializationModule extends SimpleModule { + public SerializationModule() { + addSerializer(double.class, new DoubleSerializer()); + } + } + + static class DoubleSerializer extends JsonSerializer { + final DecimalFormat decimalFormat; + + public DoubleSerializer() { + this.decimalFormat = new DecimalFormat("#"); +// this.df.setMaximumFractionDigits(8); + } + + + @Override + public void serialize(Double aDouble, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeRaw(this.decimalFormat.format(aDouble)); + } + } + + +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverter.java new file mode 100644 index 0000000..7afadda --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverter.java @@ -0,0 +1,162 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.hash.Hashing; +import com.google.common.io.ByteStreams; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.storage.Converter; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +public class JsonSchemaConverter implements Converter { + private static final String KEY_HEADER = "json.key.schema"; + private static final String VALUE_HEADER = "json.value.schema"; + JsonSchemaConverterConfig config; + String jsonSchemaHeader; + Charset encodingCharset; + ObjectMapper objectMapper; + Map fromConnectStateLookup = new HashMap<>(); + Map toConnectStateLookup = new HashMap<>(); + Header fallbackHeader; + + @Override + public void configure(Map settings, boolean isKey) { + this.config = new JsonSchemaConverterConfig(settings); + this.jsonSchemaHeader = isKey ? KEY_HEADER : VALUE_HEADER; + this.encodingCharset = Charsets.UTF_8; + this.objectMapper = JacksonFactory.create(); + + if (this.config.insertSchema) { + byte[] headerValue; + if (FromJsonConfig.SchemaLocation.Url == this.config.schemaLocation) { + try { + try (InputStream inputStream = this.config.schemaUrl.openStream()) { + headerValue = ByteStreams.toByteArray(inputStream); + } + } catch (IOException e) { + ConfigException exception = new ConfigException(FromJsonConfig.SCHEMA_URL_CONF, this.config.schemaUrl, "exception while loading schema"); + exception.initCause(e); + throw exception; + } + } else if (FromJsonConfig.SchemaLocation.Inline == this.config.schemaLocation) { + headerValue = this.jsonSchemaHeader.getBytes(Charsets.UTF_8); + } else { + throw new ConfigException( + FromJsonConfig.SCHEMA_LOCATION_CONF, + this.config.schemaLocation.toString(), + "Location is not supported" + ); + } + this.fallbackHeader = new RecordHeader(this.jsonSchemaHeader, headerValue); + } else { + fallbackHeader = null; + } + } + + @Override + public byte[] fromConnectData(String s, Schema schema, Object o) { + throw new UnsupportedOperationException( + "This converter requires Kafka 2.4.0 or higher with header support." + ); + } + + @Override + public byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) { + if (null == value) { + return null; + } + + + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + try (JsonGenerator jsonGenerator = objectMapper.getFactory().createGenerator(outputStream)) { + FromConnectState fromConnectState = fromConnectStateLookup.computeIfAbsent(schema, s -> FromConnectSchemaConverter.toJsonSchema(schema, jsonSchemaHeader)); + headers.add(fromConnectState.header); + fromConnectState.visitor.doVisit(jsonGenerator, value); + } + return outputStream.toByteArray(); + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + + @Override + public SchemaAndValue toConnectData(String s, byte[] bytes) { + throw new UnsupportedOperationException( + "This converter requires Kafka 2.4.0 or higher with header support." + ); + } + + Header schemaHeader(Headers headers) { + Header schemaHeader = headers.lastHeader(this.jsonSchemaHeader); + if (null == schemaHeader) { + schemaHeader = this.fallbackHeader; + } + return schemaHeader; + } + + @Override + public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) { + if (null == value) { + return SchemaAndValue.NULL; + } + + final Header schemaHeader = schemaHeader(headers); + + if (null == schemaHeader) { + throw new DataException( + String.format( + "Record does not have '{}' header and '%s' is not enabled.", + this.jsonSchemaHeader, + JsonSchemaConverterConfig.INSERT_SCHEMA_ENABLED_CONF + ) + ); + } + + String hash = Hashing.goodFastHash(32) + .hashBytes(schemaHeader.value()) + .toString(); + FromJsonState state = this.toConnectStateLookup.computeIfAbsent(hash, h -> { + org.everit.json.schema.Schema schema = Utils.loadSchema(schemaHeader); + return FromJsonSchemaConverter.fromJSON(schema); + }); + + JsonNode jsonNode; + try { + jsonNode = this.objectMapper.readValue(value, JsonNode.class); + } catch (IOException ex) { + throw new SerializationException(ex); + } + Object result = state.visitor.visit(jsonNode); + return new SchemaAndValue(state.schema, result); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverterConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverterConfig.java new file mode 100644 index 0000000..8bdb967 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverterConfig.java @@ -0,0 +1,57 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; +import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.net.URL; +import java.util.Map; + +class JsonSchemaConverterConfig extends AbstractConfig { + public final URL schemaUrl; + public final FromJsonConfig.SchemaLocation schemaLocation; + public final String schemaText; + public final boolean insertSchema; + + public final static String INSERT_SCHEMA_ENABLED_CONF = "json.insert.schema.enabled"; + final static String INSERT_SCHEMA_ENABLED_DOC = "Flag to determine if the schema specified should be " + + "used if there is no schema header found. This allows a connector to consume a topic " + + "that does not have schema headers and apply an external header."; + + public JsonSchemaConverterConfig(Map originals) { + super(config(), originals); + this.schemaUrl = ConfigUtils.url(this, FromJsonConfig.SCHEMA_URL_CONF); + this.schemaLocation = ConfigUtils.getEnum(FromJsonConfig.SchemaLocation.class, this, FromJsonConfig.SCHEMA_LOCATION_CONF); + this.schemaText = getString(FromJsonConfig.SCHEMA_INLINE_CONF); + this.insertSchema = getBoolean(INSERT_SCHEMA_ENABLED_CONF); + } + + public static ConfigDef config() { + ConfigDef configDef = new ConfigDef(); + configDef.define( + ConfigKeyBuilder.of(INSERT_SCHEMA_ENABLED_CONF, ConfigDef.Type.BOOLEAN) + .documentation(INSERT_SCHEMA_ENABLED_DOC) + .importance(ConfigDef.Importance.HIGH) + .defaultValue(false) + .build() + ); + FromJsonConfig.addConfigItems(configDef); + return configDef; + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/json/Utils.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/Utils.java new file mode 100644 index 0000000..fc0eaea --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/Utils.java @@ -0,0 +1,107 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.errors.DataException; +import org.everit.json.schema.Schema; +import org.everit.json.schema.StringSchema; +import org.everit.json.schema.internal.DateFormatValidator; +import org.everit.json.schema.internal.DateTimeFormatValidator; +import org.everit.json.schema.internal.TimeFormatValidator; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONObject; +import org.json.JSONTokener; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.io.StringReader; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +public class Utils { + + static final ZoneId ZONE_ID = ZoneId.of("UTC"); + public static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ISO_INSTANT + .withZone(ZONE_ID); + public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE + .withZone(ZONE_ID); + public static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ISO_LOCAL_TIME + .withZone(ZONE_ID); + + + public static JSONObject loadObject(Header header) { + try (InputStream inputStream = new ByteArrayInputStream(header.value())) { + return loadObject(inputStream); + } catch (IOException ex) { + throw new DataException("Could not load schema", ex); + } + } + + public static JSONObject loadObject(InputStream inputStream) { + return new JSONObject(new JSONTokener(inputStream)); + } + + public static Schema loadSchema(InputStream inputStream) { + JSONObject rawSchema = loadObject(inputStream); + return loadSchema(rawSchema); + } + + public static org.everit.json.schema.Schema loadSchema(JSONObject rawSchema) { + return SchemaLoader.builder() + .draftV7Support() + .addFormatValidator(new DateFormatValidator()) + .addFormatValidator(new TimeFormatValidator()) + .addFormatValidator(new DateTimeFormatValidator()) + .addFormatValidator(new DecimalFormatValidator()) + .schemaJson(rawSchema) + .build() + .load() + .build(); + } + + public static org.everit.json.schema.Schema loadSchema(Header header) { + JSONObject rawSchema = loadObject(header); + return loadSchema(rawSchema); + } + + + public static int scale(StringSchema schema) { + String scale = schema.getUnprocessedProperties().get("scale").toString(); + return scale(scale); + } + + private static int scale(String scale) { + return Integer.parseInt(scale); + } + + public static int scale(org.apache.kafka.connect.data.Schema connectSchema) { + String scale = connectSchema.parameters().get(Decimal.SCALE_FIELD); + return scale(scale); + } + + public static Schema loadSchema(String schemaText) { + try (Reader reader = new StringReader(schemaText)) { + JSONObject rawSchema = new JSONObject(new JSONTokener(reader)); + return loadSchema(rawSchema); + } catch (IOException ex) { + throw new DataException("Could not load schema", ex); + } + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/example/package-info.java b/src/main/java/com/github/jcustenborder/kafka/connect/json/package-info.java similarity index 71% rename from src/main/java/com/github/jcustenborder/kafka/connect/example/package-info.java rename to src/main/java/com/github/jcustenborder/kafka/connect/json/package-info.java index 50f401b..2720738 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/example/package-info.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/json/package-info.java @@ -1,5 +1,5 @@ /** - * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,14 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@Introduction("This is information about the connector.") -@Title("Title") -@DocumentationWarning("This is a warning") +@Introduction("This plugin is used to add additional JSON parsing functionality to Kafka Connect.") +@Title("Json Schema") @PluginOwner("jcustenborder") -@PluginName("kafka-connect-example") -package com.github.jcustenborder.kafka.connect.example; +@PluginName("kafka-connect-json-schema") +package com.github.jcustenborder.kafka.connect.json; -import com.github.jcustenborder.kafka.connect.utils.config.DocumentationWarning; import com.github.jcustenborder.kafka.connect.utils.config.Introduction; import com.github.jcustenborder.kafka.connect.utils.config.PluginName; import com.github.jcustenborder.kafka.connect.utils.config.PluginOwner; diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/json/DocumentationTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/json/DocumentationTest.java new file mode 100644 index 0000000..f767e65 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/json/DocumentationTest.java @@ -0,0 +1,7 @@ +package com.github.jcustenborder.kafka.connect.json; + +import com.github.jcustenborder.kafka.connect.utils.BaseDocumentationTest; + +public class DocumentationTest extends BaseDocumentationTest { + +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/json/FromConnectSchemaConverterTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromConnectSchemaConverterTest.java new file mode 100644 index 0000000..66223cb --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromConnectSchemaConverterTest.java @@ -0,0 +1,110 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.google.common.base.Strings; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.json.JSONObject; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.DynamicTest.dynamicTest; + +public class FromConnectSchemaConverterTest { + private static final Logger log = LoggerFactory.getLogger(FromConnectSchemaConverterTest.class); + + @Test + public void array() { + Schema addressSchema = SchemaBuilder.struct() + .name("Address") + .optional() + .field("city", SchemaBuilder.string().build()) + .field("state", SchemaBuilder.string().build()) + .field("street_address", SchemaBuilder.string().build()) + .build(); + Schema arraySchema = SchemaBuilder.array(addressSchema); + Schema expected = SchemaBuilder.struct() + .name("Person") + .field("previous_addresses", arraySchema) + .build(); + FromConnectState state = FromConnectSchemaConverter.toJsonSchema(expected, "foo"); + assertNotNull(state, "rawJsonSchema should not be null."); + JSONObject rawJsonSchema = Utils.loadObject(state.header); + log.trace("rawJsonSchema = {}", rawJsonSchema.toString(2)); + org.everit.json.schema.Schema jsonSchema = TestUtils.jsonSchema(rawJsonSchema); + assertNotNull(jsonSchema); + } + + @Test + public void struct() { + Schema addressSchema = SchemaBuilder.struct() + .name("Address") + .doc("An object to store an address.") + .optional() + .field("city", SchemaBuilder.string().doc("city of the address.").build()) + .field("state", SchemaBuilder.string().doc("state of the address.").build()) + .field("street_address", SchemaBuilder.string().doc("street address of the address.").build()) + .build(); + Schema expected = SchemaBuilder.struct() + .name("Customer") + .field("first_name", SchemaBuilder.string().doc("First name of the customer").build()) + .field("billing_address", addressSchema) + .field("shipping_address", addressSchema) + .build(); + + FromConnectState state = FromConnectSchemaConverter.toJsonSchema(expected, "foo"); + assertNotNull(state, "rawJsonSchema should not be null."); + JSONObject rawJsonSchema = Utils.loadObject(state.header); + log.trace("rawJsonSchema = {}", rawJsonSchema.toString(2)); + org.everit.json.schema.Schema jsonSchema = TestUtils.jsonSchema(rawJsonSchema); + assertNotNull(jsonSchema); + } + + @TestFactory + public Stream primitives() { + return FromConnectSchemaConverter.PRIMITIVE_TYPES.entrySet() + .stream() + .filter(e-> Strings.isNullOrEmpty(e.getKey().schemaName)) + .map(e -> dynamicTest(e.getKey().toString(), () -> { + String description = String.format("This schema represents a %s", e.getKey()); + Schema expected = SchemaBuilder.type(e.getKey().type) + .doc(description) + .build(); + FromConnectState state = FromConnectSchemaConverter.toJsonSchema(expected, "foo"); + assertNotNull(state, "rawJsonSchema should not be null."); + JSONObject rawJsonSchema = Utils.loadObject(state.header); + assertNotNull(rawJsonSchema, "rawJsonSchema should not be null."); + log.trace("rawJsonSchema = {}", rawJsonSchema.toString(2)); + e.getValue().forEach((propertyName, expectedValue) -> { + Object propertyValue = rawJsonSchema.get(propertyName); + assertEquals(expectedValue, propertyValue); + }); + assertEquals(description, rawJsonSchema.getString("description")); + org.everit.json.schema.Schema jsonSchema = TestUtils.jsonSchema(rawJsonSchema); + + })); + } + + +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterTest.java new file mode 100644 index 0000000..ff75b1c --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonSchemaConverterTest.java @@ -0,0 +1,162 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.everit.json.schema.internal.DateFormatValidator; +import org.everit.json.schema.internal.DateTimeFormatValidator; +import org.everit.json.schema.internal.TimeFormatValidator; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; + +import static com.github.jcustenborder.kafka.connect.utils.AssertSchema.assertSchema; + +public class FromJsonSchemaConverterTest { + + org.everit.json.schema.Schema jsonSchema(String type) { + JSONObject rawSchema = new JSONObject(); + rawSchema.put("type", type); + return TestUtils.jsonSchema(rawSchema); + } + + org.everit.json.schema.Schema jsonSchema(String type, String key1, String value1) { + JSONObject rawSchema = new JSONObject(); + rawSchema.put("type", type); + rawSchema.put(key1, value1); + return TestUtils.jsonSchema(rawSchema); + } + + org.everit.json.schema.Schema jsonSchema(String type, String key1, String value1, String key2, String value2) { + JSONObject rawSchema = new JSONObject(); + rawSchema.put("type", type); + rawSchema.put(key1, value1); + rawSchema.put(key2, value2); + return TestUtils.jsonSchema(rawSchema); + } + + void assertJsonSchema(org.apache.kafka.connect.data.Schema expected, org.everit.json.schema.Schema input) { + FromJsonState state = FromJsonSchemaConverter.fromJSON(input); + assertSchema(expected, state.schema); + } + + @Test + public void booleanSchema() { + org.everit.json.schema.Schema jsonSchema = jsonSchema("boolean"); + assertJsonSchema(Schema.BOOLEAN_SCHEMA, jsonSchema); + } + + @Test + public void stringSchema() { + org.everit.json.schema.Schema jsonSchema = jsonSchema("string"); + assertJsonSchema(Schema.STRING_SCHEMA, jsonSchema); + } + + @Test + public void integerSchema() { + org.everit.json.schema.Schema jsonSchema = jsonSchema("integer"); + assertJsonSchema(Schema.INT64_SCHEMA, jsonSchema); + } + + @Test + public void numberSchema() { + org.everit.json.schema.Schema jsonSchema = jsonSchema("number"); + assertJsonSchema(Schema.FLOAT64_SCHEMA, jsonSchema); + } + + @Test + public void dateSchema() { + org.everit.json.schema.Schema jsonSchema = jsonSchema("string", "format", "date"); + assertJsonSchema(Date.SCHEMA, jsonSchema); + } + + @Test + public void timeSchema() { + org.everit.json.schema.Schema jsonSchema = jsonSchema("string", "format", "time"); + assertJsonSchema(Time.SCHEMA, jsonSchema); + } + + @Test + public void datetimeSchema() { + org.everit.json.schema.Schema jsonSchema = jsonSchema("string", "format", "date-time"); + assertJsonSchema(Timestamp.SCHEMA, jsonSchema); + } + + org.everit.json.schema.Schema loadSchema(String name) throws IOException { + try (InputStream inputStream = this.getClass().getResourceAsStream(name)) { + JSONObject rawSchema = new JSONObject(new JSONTokener(inputStream)); + return SchemaLoader.builder() + .draftV7Support() + .addFormatValidator(new DateFormatValidator()) + .addFormatValidator(new TimeFormatValidator()) + .addFormatValidator(new DateTimeFormatValidator()) + .schemaJson(rawSchema) + .build() + .load() + .build(); + } + } + + @Test + public void productSchema() throws IOException { + org.everit.json.schema.Schema jsonSchema = loadSchema("SchemaConverterTest/product.schema.json"); + Schema expected = SchemaBuilder.struct() + .name("Product") + .doc("A product from Acme's catalog") + .field("price", SchemaBuilder.float64().doc("The price of the product").build()) + .field("productId", SchemaBuilder.int64().doc("The unique identifier for a product").build()) + .field("productName", SchemaBuilder.string().doc("Name of the product").build()) + .build(); + assertJsonSchema(expected, jsonSchema); + } + + @Test + public void nested() throws IOException { + org.everit.json.schema.Schema jsonSchema = loadSchema("SchemaConverterTest/nested.schema.json"); + Schema addressSchema = SchemaBuilder.struct() + .name("Address") + .optional() + .field("city", SchemaBuilder.string().build()) + .field("state", SchemaBuilder.string().build()) + .field("street_address", SchemaBuilder.string().build()) + .build(); + Schema expected = SchemaBuilder.struct() + .name("Customer") + .field("billing_address", addressSchema) + .field("shipping_address", addressSchema) + .build(); + assertJsonSchema(expected, jsonSchema); + } + + @Test + public void array() { + JSONObject rawSchema = new JSONObject() + .put("type", "array") + .put("items", new JSONObject().put("type", "number")); + org.everit.json.schema.Schema jsonSchema = TestUtils.jsonSchema(rawSchema); + assertJsonSchema(SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(), jsonSchema); + } + + +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonTest.java new file mode 100644 index 0000000..9420b67 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/json/FromJsonTest.java @@ -0,0 +1,42 @@ +package com.github.jcustenborder.kafka.connect.json; + +import com.github.jcustenborder.kafka.connect.utils.SinkRecordHelper; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteStreams; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class FromJsonTest { + FromJson transform; + + @BeforeEach + public void beforeEach() { + this.transform = new FromJson.Value<>(); + } + + @Test + public void basic() throws IOException { + byte[] input = ByteStreams.toByteArray(this.getClass().getResourceAsStream( + "basic.data.json" + )); + File schemaFile = new File("src/test/resources/com/github/jcustenborder/kafka/connect/json/basic.schema.json"); + Map settings = ImmutableMap.of( + FromJsonConfig.SCHEMA_URL_CONF, schemaFile.toURI().toString() + ); + this.transform.configure(settings); + SinkRecord inputRecord = SinkRecordHelper.write("foo", new SchemaAndValue(Schema.STRING_SCHEMA, "foo"), new SchemaAndValue(Schema.BYTES_SCHEMA, input)); + SinkRecord actual = this.transform.apply(inputRecord); + assertNotNull(actual); + } + + +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverterTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverterTest.java new file mode 100644 index 0000000..752c5e1 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverterTest.java @@ -0,0 +1,250 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static com.github.jcustenborder.kafka.connect.utils.AssertSchema.assertSchema; +import static com.github.jcustenborder.kafka.connect.utils.AssertStruct.assertStruct; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.DynamicTest.dynamicTest; + +public class JsonSchemaConverterTest { + private static final Logger log = LoggerFactory.getLogger(JsonSchemaConverterTest.class); + JsonSchemaConverter converter; + + @BeforeEach + public void beforeEach() { + this.converter = new JsonSchemaConverter(); + } + + @Test + public void nulls() { + this.converter.configure( + ImmutableMap.of(), + false + ); + SchemaAndValue expected = SchemaAndValue.NULL; + Headers headers = new RecordHeaders(); + byte[] buffer = this.converter.fromConnectData("topic", headers, expected.schema(), expected.value()); + assertNull(buffer, "buffer should be null."); + } + + @Test + public void roundTripString() { + this.converter.configure( + ImmutableMap.of(), + false + ); + SchemaAndValue expected = new SchemaAndValue( + Schema.STRING_SCHEMA, + "This is a test" + ); + Headers headers = new RecordHeaders(); + byte[] buffer = this.converter.fromConnectData("topic", headers, expected.schema(), expected.value()); + assertNotNull(buffer, "buffer should not be null."); + assertTrue(buffer.length > 0, "buffer should be longer than zero."); + Header schemaHeader = headers.lastHeader(this.converter.jsonSchemaHeader); + assertNotNull(schemaHeader, "schemaHeader should not be null."); + SchemaAndValue actual = this.converter.toConnectData("topic", headers, buffer); + assertNotNull(actual, "actual should not be null."); + assertSchema(expected.schema(), actual.schema()); + assertEquals(expected.value(), actual.value()); + } + + @TestFactory + public Stream roundtrip() { + Map tests = new LinkedHashMap<>(); + Schema arraySchema = SchemaBuilder.array(Schema.STRING_SCHEMA).build(); + tests.put( + new SchemaAndValue(arraySchema, ImmutableList.of("one", "two", "three")), + new SchemaAndValue(arraySchema, ImmutableList.of("one", "two", "three")) + ); + tests.put( + new SchemaAndValue(Schema.STRING_SCHEMA, "This is a test"), + new SchemaAndValue(Schema.STRING_SCHEMA, "This is a test") + ); + tests.put( + new SchemaAndValue(Schema.BYTES_SCHEMA, "This is a test".getBytes(Charsets.UTF_8)), + new SchemaAndValue(Schema.BYTES_SCHEMA, "This is a test".getBytes(Charsets.UTF_8)) + ); + tests.put( + new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), + new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true) + ); + tests.put( + new SchemaAndValue(Schema.INT8_SCHEMA, Byte.MAX_VALUE), + new SchemaAndValue(Schema.INT64_SCHEMA, (long) Byte.MAX_VALUE) + ); + tests.put( + new SchemaAndValue(Schema.INT16_SCHEMA, Short.MAX_VALUE), + new SchemaAndValue(Schema.INT64_SCHEMA, (long) Short.MAX_VALUE) + ); + tests.put( + new SchemaAndValue(Schema.INT32_SCHEMA, Integer.MAX_VALUE), + new SchemaAndValue(Schema.INT64_SCHEMA, (long) Integer.MAX_VALUE) + ); + tests.put( + new SchemaAndValue(Schema.INT64_SCHEMA, Long.MAX_VALUE), + new SchemaAndValue(Schema.INT64_SCHEMA, Long.MAX_VALUE) + ); + tests.put( + new SchemaAndValue(Schema.FLOAT32_SCHEMA, Float.MAX_VALUE), + new SchemaAndValue(Schema.FLOAT64_SCHEMA, (double) Float.MAX_VALUE) + ); + tests.put( + new SchemaAndValue(Schema.FLOAT64_SCHEMA, Double.MAX_VALUE), + new SchemaAndValue(Schema.FLOAT64_SCHEMA, Double.MAX_VALUE) + ); + Date date = Date.from(LocalDate.of(2020, 02, 02).atTime(LocalTime.MIDNIGHT).toInstant(ZoneOffset.UTC)); + tests.put( + new SchemaAndValue(org.apache.kafka.connect.data.Date.SCHEMA, date), + new SchemaAndValue(org.apache.kafka.connect.data.Date.SCHEMA, date) + ); + + Date time = date.from(LocalTime.MIDNIGHT.atDate(LocalDate.of(1970, 1, 1)).toInstant(ZoneOffset.UTC)); + tests.put( + new SchemaAndValue(org.apache.kafka.connect.data.Time.SCHEMA, time), + new SchemaAndValue(org.apache.kafka.connect.data.Time.SCHEMA, time) + ); + + Date timestamp = new Date(1583363608123L); + tests.put( + new SchemaAndValue(org.apache.kafka.connect.data.Timestamp.SCHEMA, timestamp), + new SchemaAndValue(org.apache.kafka.connect.data.Timestamp.SCHEMA, timestamp) + ); + + IntStream.range(0, 30) + .forEach(scale -> { + BigDecimal input = BigDecimal.valueOf(Long.MAX_VALUE, scale); + SchemaAndValue schemaAndValue = new SchemaAndValue(Decimal.schema(scale), input); + tests.put(schemaAndValue, schemaAndValue); + }); + return tests.entrySet().stream() + .map(p -> dynamicTest(p.getKey().schema().toString(), () -> { + assertRoundTrip(p.getKey(), p.getValue()); + })); + } + + + void assertRoundTrip(SchemaAndValue input, SchemaAndValue expected) { + this.converter.configure( + ImmutableMap.of(), + false + ); + Headers headers = new RecordHeaders(); + byte[] buffer = this.converter.fromConnectData("topic", headers, input.schema(), input.value()); + log.trace(new String(buffer, Charsets.UTF_8)); + assertNotNull(buffer, "buffer should not be null."); + assertTrue(buffer.length > 0, "buffer should be longer than zero."); + Header schemaHeader = headers.lastHeader(this.converter.jsonSchemaHeader); + assertNotNull(schemaHeader, "schemaHeader should not be null."); + SchemaAndValue actual = this.converter.toConnectData("topic", headers, buffer); + assertNotNull(actual, "actual should not be null."); + assertSchema(expected.schema(), actual.schema()); + + if (Decimal.LOGICAL_NAME.equals(expected.schema().name())) { + assertEquals(expected.value(), actual.value()); + } else { + switch (expected.schema().type()) { + case BYTES: + assertArrayEquals((byte[]) expected.value(), (byte[]) actual.value()); + break; + case STRUCT: + assertStruct((Struct) expected.value(), (Struct) actual.value()); + break; + default: + assertEquals(expected.value(), actual.value()); + break; + } + } + } + + @Test + public void nested() throws IOException { + this.converter.configure( + ImmutableMap.of(), + false + ); + Schema addressSchema = SchemaBuilder.struct() + .name("Address") + .optional() + .field("city", SchemaBuilder.string().build()) + .field("state", SchemaBuilder.string().build()) + .field("street_address", SchemaBuilder.string().build()) + .build(); + Schema customer = SchemaBuilder.struct() + .name("Customer") + .field("billing_address", addressSchema) + .field("shipping_address", addressSchema) + .build(); + Struct billingAddress = new Struct(addressSchema) + .put("city", "Austin") + .put("state", "TX") + .put("street_address", "123 Main St"); + Struct shippingAddress = new Struct(addressSchema) + .put("city", "Dallas") + .put("state", "TX") + .put("street_address", "321 Something St"); + Struct struct = new Struct(customer) + .put("billing_address", billingAddress) + .put("shipping_address", shippingAddress); + SchemaAndValue expected = new SchemaAndValue(customer, struct); + Headers headers = new RecordHeaders(); + byte[] buffer = this.converter.fromConnectData("topic", headers, expected.schema(), expected.value()); + log.trace(new String(buffer, Charsets.UTF_8)); + assertNotNull(buffer, "buffer should not be null."); + assertTrue(buffer.length > 0, "buffer should be longer than zero."); + Header schemaHeader = headers.lastHeader(this.converter.jsonSchemaHeader); + assertNotNull(schemaHeader, "schemaHeader should not be null."); + SchemaAndValue actual = this.converter.toConnectData("topic", headers, buffer); + assertNotNull(actual, "actual should not be null."); + assertSchema(expected.schema(), actual.schema()); + assertEquals(expected.value(), actual.value()); + } + +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/json/TestUtils.java b/src/test/java/com/github/jcustenborder/kafka/connect/json/TestUtils.java new file mode 100644 index 0000000..c22718e --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/json/TestUtils.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2020 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.json; + +import org.everit.json.schema.internal.DateFormatValidator; +import org.everit.json.schema.internal.DateTimeFormatValidator; +import org.everit.json.schema.internal.TimeFormatValidator; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONObject; + +public class TestUtils { + public static org.everit.json.schema.Schema jsonSchema(JSONObject rawSchema) { + return SchemaLoader.builder() + .draftV7Support() + .addFormatValidator(new DateFormatValidator()) + .addFormatValidator(new TimeFormatValidator()) + .addFormatValidator(new DateTimeFormatValidator()) + .schemaJson(rawSchema) + .build() + .load() + .build(); + } +} diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/json/FromJson/inline.json b/src/test/resources/com/github/jcustenborder/kafka/connect/json/FromJson/inline.json new file mode 100644 index 0000000..49ad140 --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/json/FromJson/inline.json @@ -0,0 +1,28 @@ +{ + "title" : "Inline Config", + "input" : { + "topic" : "foo", + "kafkaPartition" : 1, + "keySchema" : { + "type" : "STRING", + "isOptional" : false + }, + "key" : "foo", + "valueSchema" : { + "type" : "BYTES", + "isOptional" : false + }, + "value" : "ewogICJmaXJzdE5hbWUiOiAiSm9obiIsCiAgImxhc3ROYW1lIjogIkRvZSIsCiAgImFnZSI6IDIxCn0=", + "timestamp" : 1530286549123, + "timestampType" : "CREATE_TIME", + "offset" : 91283741, + "headers" : [ ] + }, + "description" : "This example takes an input value that is a byte array and reads this value based on the supplied schema to a Kafka Connect value. The result is data that is based on the schema", + "name" : "Inline Config", + "config" : { + "json.schema.location" : "Inline", + "json.schema.inline" : "{\n \"$id\": \"https://example.com/person.schema.json\",\n \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n \"title\": \"Person\",\n \"type\": \"object\",\n \"properties\": {\n \"firstName\": {\n \"type\": \"string\",\n \"description\": \"The person's first name.\"\n },\n \"lastName\": {\n \"type\": \"string\",\n \"description\": \"The person's last name.\"\n },\n \"age\": {\n \"description\": \"Age in years which must be equal to or greater than zero.\",\n \"type\": \"integer\",\n \"minimum\": 0\n }\n }\n}" + }, + "childClass" : "Value" +} \ No newline at end of file diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/json/SchemaConverterTest/nested.schema.json b/src/test/resources/com/github/jcustenborder/kafka/connect/json/SchemaConverterTest/nested.schema.json new file mode 100644 index 0000000..b3409b3 --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/json/SchemaConverterTest/nested.schema.json @@ -0,0 +1,23 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Customer", + "definitions": { + "address": { + "title": "Address", + "type": "object", + "properties": { + "street_address": { "type": "string" }, + "city": { "type": "string" }, + "state": { "type": "string" } + }, + "required": ["street_address", "city", "state"] + } + }, + + "type": "object", + + "properties": { + "billing_address": { "$ref": "#/definitions/address" }, + "shipping_address": { "$ref": "#/definitions/address" } + } +} \ No newline at end of file diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/json/SchemaConverterTest/product.schema.json b/src/test/resources/com/github/jcustenborder/kafka/connect/json/SchemaConverterTest/product.schema.json new file mode 100644 index 0000000..265d904 --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/json/SchemaConverterTest/product.schema.json @@ -0,0 +1,23 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://example.com/product.schema.json", + "title": "Product", + "description": "A product from Acme's catalog", + "type": "object", + "properties": { + "price": { + "description": "The price of the product", + "type": "number", + "exclusiveMinimum": 0 + }, + "productId": { + "description": "The unique identifier for a product", + "type": "integer" + }, + "productName": { + "description": "Name of the product", + "type": "string" + } + }, + "required": [ "productId", "productName", "price" ] +} \ No newline at end of file diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/json/basic.data.json b/src/test/resources/com/github/jcustenborder/kafka/connect/json/basic.data.json new file mode 100644 index 0000000..c8228d6 --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/json/basic.data.json @@ -0,0 +1,5 @@ +{ + "firstName": "John", + "lastName": "Doe", + "age": 21 +} \ No newline at end of file diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/json/basic.schema.json b/src/test/resources/com/github/jcustenborder/kafka/connect/json/basic.schema.json new file mode 100644 index 0000000..69ec678 --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/json/basic.schema.json @@ -0,0 +1,21 @@ +{ + "$id": "https://example.com/person.schema.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Person", + "type": "object", + "properties": { + "firstName": { + "type": "string", + "description": "The person's first name." + }, + "lastName": { + "type": "string", + "description": "The person's last name." + }, + "age": { + "description": "Age in years which must be equal to or greater than zero.", + "type": "integer", + "minimum": 0 + } + } +} \ No newline at end of file diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/json/geo.data.json b/src/test/resources/com/github/jcustenborder/kafka/connect/json/geo.data.json new file mode 100644 index 0000000..e05fec9 --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/json/geo.data.json @@ -0,0 +1,4 @@ +{ + "latitude": 48.858093, + "longitude": 2.294694 +} \ No newline at end of file diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/json/geo.schema.json b/src/test/resources/com/github/jcustenborder/kafka/connect/json/geo.schema.json new file mode 100644 index 0000000..605cefd --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/json/geo.schema.json @@ -0,0 +1,23 @@ +{ + "$id": "https://example.com/geographical-location.schema.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Longitude and Latitude Values", + "description": "A geographical coordinate.", + "required": [ + "latitude", + "longitude" + ], + "type": "object", + "properties": { + "latitude": { + "type": "number", + "minimum": -90, + "maximum": 90 + }, + "longitude": { + "type": "number", + "minimum": -180, + "maximum": 180 + } + } +} \ No newline at end of file diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml new file mode 100644 index 0000000..07d5536 --- /dev/null +++ b/src/test/resources/logback.xml @@ -0,0 +1,12 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n + + + + + + + + \ No newline at end of file