From 14b17f75733324f20f7159787dec97717d07f649 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 7 Jul 2021 13:30:19 -0700 Subject: [PATCH 1/7] Support combined restrictions in json schema --- .../destination/s3/avro/JsonSchemaType.java | 4 +- .../s3/avro/JsonToAvroSchemaConverter.java | 70 ++++++++++++++----- .../avro/JsonToAvroSchemaConverterTest.java | 17 ++++- .../get_avro_schema.json | 39 +++++++++++ .../json_schema_converter/get_field_type.json | 10 +++ 5 files changed, 119 insertions(+), 21 deletions(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java index 93d4c5633ced..6a64c43a70b5 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java @@ -25,6 +25,7 @@ package io.airbyte.integrations.destination.s3.avro; import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; /** * Mapping of JsonSchema types to Avro types. @@ -37,7 +38,8 @@ public enum JsonSchemaType { BOOLEAN("boolean", true, Schema.Type.BOOLEAN), NULL("null", true, Schema.Type.NULL), OBJECT("object", false, Schema.Type.RECORD), - ARRAY("array", false, Schema.Type.ARRAY); + ARRAY("array", false, Schema.Type.ARRAY), + COMBINED("combined", false, Schema.Type.UNION); private final String jsonSchemaType; private final boolean isPrimitive; diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java index e53f318f58c1..8f0a2d0e40d0 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java @@ -25,6 +25,7 @@ package io.airbyte.integrations.destination.s3.avro; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.base.Preconditions; import io.airbyte.commons.util.MoreIterators; import io.airbyte.integrations.base.JavaBaseConstants; @@ -34,6 +35,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -64,23 +66,46 @@ public class JsonToAvroSchemaConverter { private final Map standardizedNames = new HashMap<>(); - static List getNonNullTypes(String fieldName, JsonNode typeProperty) { - return getTypes(fieldName, typeProperty).stream() + static List getNonNullTypes(String fieldName, JsonNode fieldDefinition) { + return getTypes(fieldName, fieldDefinition).stream() .filter(type -> type != JsonSchemaType.NULL).collect(Collectors.toList()); } - static List getTypes(String fieldName, JsonNode typeProperty) { - if (typeProperty == null) { + static List getTypes(String fieldName, JsonNode fieldDefinition) { + Optional combinedRestriction = getCombinedRestriction(fieldDefinition); + if (combinedRestriction.isPresent()) { + return Collections.singletonList(JsonSchemaType.COMBINED); + } + + JsonNode typeProperty = fieldDefinition.get("type"); + if (typeProperty == null || typeProperty.isNull()) { throw new IllegalStateException(String.format("Field %s has no type", fieldName)); - } else if (typeProperty.isArray()) { + } + + if (typeProperty.isArray()) { return MoreIterators.toList(typeProperty.elements()).stream() .map(s -> JsonSchemaType.fromJsonSchemaType(s.asText())) .collect(Collectors.toList()); - } else if (typeProperty.isTextual()) { + } + + if (typeProperty.isTextual()) { return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText())); - } else { - throw new IllegalStateException("Unexpected type: " + typeProperty); } + + throw new IllegalStateException("Unexpected type: " + typeProperty); + } + + static Optional getCombinedRestriction(JsonNode fieldDefinition) { + if (fieldDefinition.has("anyOf")) { + return Optional.of(fieldDefinition.get("anyOf")); + } + if (fieldDefinition.has("allOf")) { + return Optional.of(fieldDefinition.get("allOf")); + } + if (fieldDefinition.has("oneOf")) { + return Optional.of(fieldDefinition.get("oneOf")); + } + return Optional.empty(); } public Map getStandardizedNames() { @@ -153,21 +178,20 @@ Schema getSingleFieldType(String fieldName, Schema fieldSchema; switch (fieldType) { case STRING, NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType()); + case COMBINED -> { + Optional combinedRestriction = getCombinedRestriction(fieldDefinition); + List unionTypes = getSchemaFromTypes(fieldName, (ArrayNode) combinedRestriction.get()); + fieldSchema = Schema.createUnion(unionTypes); + } case ARRAY -> { JsonNode items = fieldDefinition.get("items"); Preconditions.checkNotNull(items, "Array field %s misses the items property.", fieldName); if (items.isObject()) { - fieldSchema = Schema - .createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items)); + fieldSchema = Schema.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items)); } else if (items.isArray()) { - List arrayElementTypes = MoreIterators.toList(items.elements()) - .stream() - .flatMap(itemDefinition -> getNonNullTypes(fieldName, itemDefinition.get("type")).stream() - .map(type -> getSingleFieldType(fieldName, type, itemDefinition, false))) - .distinct() - .collect(Collectors.toList()); - arrayElementTypes.add(0, Schema.create(Schema.Type.NULL)); + List arrayElementTypes = getSchemaFromTypes(fieldName, (ArrayNode) items); + arrayElementTypes.add(0, Schema.create(Type.NULL)); fieldSchema = Schema.createArray(Schema.createUnion(arrayElementTypes)); } else { throw new IllegalStateException( @@ -181,12 +205,22 @@ Schema getSingleFieldType(String fieldName, return fieldSchema; } + List getSchemaFromTypes(String fieldName, ArrayNode types) { + List schemas = MoreIterators.toList(types.elements()) + .stream() + .flatMap(definition -> getNonNullTypes(fieldName, definition).stream() + .map(type -> getSingleFieldType(fieldName, type, definition, false))) + .distinct() + .collect(Collectors.toList()); + return schemas; + } + /** * @param fieldDefinition - Json schema field definition. E.g. { type: "number" }. */ Schema getNullableFieldTypes(String fieldName, JsonNode fieldDefinition) { // Filter out null types, which will be added back in the end. - List nonNullFieldTypes = getNonNullTypes(fieldName, fieldDefinition.get("type")) + List nonNullFieldTypes = getNonNullTypes(fieldName, fieldDefinition) .stream() .flatMap(fieldType -> { Schema singleFieldSchema = getSingleFieldType(fieldName, fieldType, fieldDefinition, true); diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverterTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverterTest.java index 4a501396804d..551183733b50 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverterTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverterTest.java @@ -25,6 +25,7 @@ package io.airbyte.integrations.destination.s3.avro; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; @@ -47,7 +48,7 @@ public void testGetSingleTypes() { JsonNode input1 = Jsons.deserialize("{ \"type\": \"number\" }"); assertEquals( Collections.singletonList(JsonSchemaType.NUMBER), - JsonToAvroSchemaConverter.getTypes("field", input1.get("type"))); + JsonToAvroSchemaConverter.getTypes("field", input1)); } @Test @@ -55,7 +56,19 @@ public void testGetUnionTypes() { JsonNode input2 = Jsons.deserialize("{ \"type\": [\"null\", \"string\"] }"); assertEquals( Lists.newArrayList(JsonSchemaType.NULL, JsonSchemaType.STRING), - JsonToAvroSchemaConverter.getTypes("field", input2.get("type"))); + JsonToAvroSchemaConverter.getTypes("field", input2)); + } + + @Test + public void testNoCombinedRestriction() { + JsonNode input1 = Jsons.deserialize("{ \"type\": \"number\" }"); + assertTrue(JsonToAvroSchemaConverter.getCombinedRestriction(input1).isEmpty()); + } + + @Test + public void testWithCombinedRestriction() { + JsonNode input2 = Jsons.deserialize("{ \"anyOf\": [{ \"type\": \"string\" }, { \"type\": \"integer\" }] }"); + assertTrue(JsonToAvroSchemaConverter.getCombinedRestriction(input2).isPresent()); } public static class GetFieldTypeTestCaseProvider implements ArgumentsProvider { diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json index 271d01dba038..d7d7dd195b68 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json @@ -253,5 +253,44 @@ } ] } + }, + { + "schemaName": "combined_restriction", + "namespace": "namespace8", + "appendAirbyteFields": false, + "jsonSchema": { + "properties": { + "created_at": { + "anyOf": [ + { + "type": "string", + "format": "date-time" + }, + { + "type": ["null","string"] + }, + { + "type": "integer" + } + ] + } + } + }, + "avroSchema": { + "type": "record", + "name": "combined_restriction", + "namespace": "namespace8", + "fields": [ + { + "name": "created_at", + "type": [ + "null", + "string", + "int" + ], + "default": null + } + ] + } } ] diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json index fe1c5c45aa4a..f6466ebe48aa 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json @@ -103,5 +103,15 @@ ] } ] + }, + { + "fieldName": "combined_restriction_field", + "jsonFieldSchema": { + "anyOf": [ + { "type": "string" }, + { "type": "integer" } + ] + }, + "avroFieldType": ["null", "string", "int"] } ] From 8980a5a40e8ee771f6f3d160e7323d81edef8b9c Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 7 Jul 2021 13:31:35 -0700 Subject: [PATCH 2/7] Bump s3 version --- .../4816b78f-1489-44c1-9060-4b19d5fa9362.json | 2 +- .../init/src/main/resources/seed/destination_definitions.yaml | 2 +- airbyte-integrations/connectors/destination-s3/Dockerfile | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json index 7a7973a48c81..ccdd21c58f4a 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362", "name": "S3", "dockerRepository": "airbyte/destination-s3", - "dockerImageTag": "0.1.7", + "dockerImageTag": "0.1.8", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 5c9759bf8557..ff0766daf29c 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -37,7 +37,7 @@ - destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 name: S3 dockerRepository: airbyte/destination-s3 - dockerImageTag: 0.1.7 + dockerImageTag: 0.1.8 documentationUrl: https://docs.airbyte.io/integrations/destinations/s3 - destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc name: Redshift diff --git a/airbyte-integrations/connectors/destination-s3/Dockerfile b/airbyte-integrations/connectors/destination-s3/Dockerfile index aea3084c0b4e..d9fde8c582b6 100644 --- a/airbyte-integrations/connectors/destination-s3/Dockerfile +++ b/airbyte-integrations/connectors/destination-s3/Dockerfile @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/destination-s3 From 7658acfca5b2e8a33e273d515e84a6039a17f392 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 7 Jul 2021 13:37:12 -0700 Subject: [PATCH 3/7] Add more test cases --- .../json_schema_converter/get_field_type.json | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json index f6466ebe48aa..df8c7b953bbd 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json @@ -105,7 +105,7 @@ ] }, { - "fieldName": "combined_restriction_field", + "fieldName": "any_of_field", "jsonFieldSchema": { "anyOf": [ { "type": "string" }, @@ -113,5 +113,25 @@ ] }, "avroFieldType": ["null", "string", "int"] + }, + { + "fieldName": "all_of_field", + "jsonFieldSchema": { + "allOf": [ + { "type": "string" }, + { "type": "integer" } + ] + }, + "avroFieldType": ["null", "string", "int"] + }, + { + "fieldName": "one_of_field", + "jsonFieldSchema": { + "oneOf": [ + { "type": "string" }, + { "type": "integer" } + ] + }, + "avroFieldType": ["null", "string", "int"] } ] From 0865272ff3164379d0715f814e1be0d02c3ba516 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 7 Jul 2021 13:37:19 -0700 Subject: [PATCH 4/7] Update changelog --- docs/integrations/destinations/s3.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index d642378a6688..eefdf8be89da 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -356,6 +356,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.8 | 2021-07-07 | [#4613](https://github.com/airbytehq/airbyte/pull/4613) | Patched schema converter to support combined restrictions. | | 0.1.7 | 2021-06-23 | [#4227](https://github.com/airbytehq/airbyte/pull/4227) | Added Avro and JSONL output. | | 0.1.6 | 2021-06-16 | [#4130](https://github.com/airbytehq/airbyte/pull/4130) | Patched the check to verify prefix access instead of full-bucket access. | | 0.1.5 | 2021-06-14 | [#3908](https://github.com/airbytehq/airbyte/pull/3908) | Fixed default `max_padding_size_mb` in `spec.json`. | From 79ad52cab43edce35d258a6de672e3494b1de472 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 7 Jul 2021 14:42:34 -0700 Subject: [PATCH 5/7] Add more test cases --- .../s3/avro/JsonToAvroSchemaConverter.java | 30 ++--- .../get_avro_schema.json | 103 +++++++++++++++++- 2 files changed, 116 insertions(+), 17 deletions(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java index 8f0a2d0e40d0..d4468c62effe 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java @@ -166,21 +166,16 @@ public Schema getAvroSchema(JsonNode jsonSchema, return assembler.endRecord(); } - Schema getSingleFieldType(String fieldName, - JsonSchemaType fieldType, - JsonNode fieldDefinition, - boolean canBeComposite) { + Schema getSingleFieldType(String fieldName, JsonSchemaType fieldType, JsonNode fieldDefinition) { Preconditions .checkState(fieldType != JsonSchemaType.NULL, "Null types should have been filtered out"); - Preconditions - .checkState(canBeComposite || fieldType.isPrimitive(), "Field %s has invalid type %s", - fieldName, fieldType); + Schema fieldSchema; switch (fieldType) { case STRING, NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType()); case COMBINED -> { Optional combinedRestriction = getCombinedRestriction(fieldDefinition); - List unionTypes = getSchemaFromTypes(fieldName, (ArrayNode) combinedRestriction.get()); + List unionTypes = getSchemasFromTypes(fieldName, (ArrayNode) combinedRestriction.get()); fieldSchema = Schema.createUnion(unionTypes); } case ARRAY -> { @@ -190,7 +185,7 @@ Schema getSingleFieldType(String fieldName, if (items.isObject()) { fieldSchema = Schema.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items)); } else if (items.isArray()) { - List arrayElementTypes = getSchemaFromTypes(fieldName, (ArrayNode) items); + List arrayElementTypes = getSchemasFromTypes(fieldName, (ArrayNode) items); arrayElementTypes.add(0, Schema.create(Type.NULL)); fieldSchema = Schema.createArray(Schema.createUnion(arrayElementTypes)); } else { @@ -205,14 +200,19 @@ Schema getSingleFieldType(String fieldName, return fieldSchema; } - List getSchemaFromTypes(String fieldName, ArrayNode types) { - List schemas = MoreIterators.toList(types.elements()) + List getSchemasFromTypes(String fieldName, ArrayNode types) { + return MoreIterators.toList(types.elements()) .stream() - .flatMap(definition -> getNonNullTypes(fieldName, definition).stream() - .map(type -> getSingleFieldType(fieldName, type, definition, false))) + .flatMap(definition -> getNonNullTypes(fieldName, definition).stream().flatMap(type -> { + Schema singleFieldSchema = getSingleFieldType(fieldName, type, definition); + if (singleFieldSchema.isUnion()) { + return singleFieldSchema.getTypes().stream(); + } else { + return Stream.of(singleFieldSchema); + } + })) .distinct() .collect(Collectors.toList()); - return schemas; } /** @@ -223,7 +223,7 @@ Schema getNullableFieldTypes(String fieldName, JsonNode fieldDefinition) { List nonNullFieldTypes = getNonNullTypes(fieldName, fieldDefinition) .stream() .flatMap(fieldType -> { - Schema singleFieldSchema = getSingleFieldType(fieldName, fieldType, fieldDefinition, true); + Schema singleFieldSchema = getSingleFieldType(fieldName, fieldType, fieldDefinition); if (singleFieldSchema.isUnion()) { return singleFieldSchema.getTypes().stream(); } else { diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json index d7d7dd195b68..a307c471591c 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json @@ -255,7 +255,7 @@ } }, { - "schemaName": "combined_restriction", + "schemaName": "field_with_combined_restriction", "namespace": "namespace8", "appendAirbyteFields": false, "jsonSchema": { @@ -278,7 +278,7 @@ }, "avroSchema": { "type": "record", - "name": "combined_restriction", + "name": "field_with_combined_restriction", "namespace": "namespace8", "fields": [ { @@ -292,5 +292,104 @@ } ] } + }, + { + "schemaName": "record_with_combined_restriction_field", + "namespace": "namespace9", + "appendAirbyteFields": false, + "jsonSchema": { + "properties": { + "user": { + "type": "object", + "properties": { + "created_at": { + "anyOf": [ + { + "type": "string", + "format": "date-time" + }, + { + "type": ["null","string"] + }, + { + "type": "integer" + } + ] + } + } + } + } + }, + "avroSchema": { + "type": "record", + "name": "record_with_combined_restriction_field", + "namespace": "namespace9", + "fields": [ + { + "name": "user", + "type": [ + "null", + { + "type": "record", + "name": "user", + "namespace": "", + "fields": [ + { + "name": "created_at", + "type": [ + "null", + "string", + "int" + ], + "default": null + } + ] + } + ], + "default": null + } + ] + } + }, + { + "schemaName": "array_with_combined_restriction_field", + "namespace": "namespace10", + "appendAirbyteFields": false, + "jsonSchema": { + "properties": { + "identifiers": { + "type": "array", + "items": [ + { + "oneOf": [ + { "type": "integer" }, + { "type": "string" } + ] + }, + { + "type": "boolean" + } + ] + } + } + }, + "avroSchema": { + "type": "record", + "name": "array_with_combined_restriction_field", + "namespace": "namespace10", + "fields": [ + { + "name": "identifiers", + "type": [ + "null", + { + "type": "array", + "items": ["null", "int", "string", "boolean"] + } + ], + "default": null + } + ] + } } ] From 85f10d8da73e9007b49c7f0a636a9a66b34c6fa4 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 7 Jul 2021 14:50:03 -0700 Subject: [PATCH 6/7] Update documentation --- docs/integrations/destinations/s3.md | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index eefdf8be89da..f14d85d2ca16 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -113,7 +113,25 @@ Under the hood, an Airbyte data stream in Json schema is converted to an Avro sc | array | array | 2. Built-in Json schema formats are not mapped to Avro logical types at this moment. -2. Json schema compositions ("allOf", "anyOf", and "oneOf") are not supported at this moment. +2. Combined restrictions ("allOf", "anyOf", and "oneOf") will be converted to type unions. The corresponding Avro schema can be less stringent. For example, the following Json schema + + ```json + { + "oneOf": [ + { "type": "string" }, + { "type": "integer" } + ] + } + ``` + will become this in Avro schema: + + ```json + { + "type": ["null", "string", "int"] + } + ``` + +2. Keyword `not` is not supported, as there is no equivalent validation mechanism in Avro schema. 3. Only alphanumeric characters and underscores (`/a-zA-Z0-9_/`) are allowed in a stream or field name. Any special character will be converted to an alphabet or underscore. For example, `spécial:character_names` will become `special_character_names`. The original names will be stored in the `doc` property in this format: `_airbyte_original_name:`. 4. All field will be nullable. For example, a `string` Json field will be typed as `["null", "string"]` in Avro. This is necessary because the incoming data stream may have optional fields. 5. For array fields in Json schema, when the `items` property is an array, it means that each element in the array should follow its own schema sequentially. For example, the following specification means the first item in the array should be a string, and the second a number. From a5d57b499bd2ea1749e8e0500acd6dd3c764fcae Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 7 Jul 2021 15:24:27 -0700 Subject: [PATCH 7/7] Format code --- .../destination/s3/avro/JsonSchemaType.java | 1 - .../get_avro_schema.json | 21 +++++-------------- .../json_schema_converter/get_field_type.json | 15 +++---------- 3 files changed, 8 insertions(+), 29 deletions(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java index 6a64c43a70b5..92b3651d0631 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java @@ -25,7 +25,6 @@ package io.airbyte.integrations.destination.s3.avro; import org.apache.avro.Schema; -import org.apache.avro.Schema.Type; /** * Mapping of JsonSchema types to Avro types. diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json index a307c471591c..409a7ce58e28 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json @@ -267,7 +267,7 @@ "format": "date-time" }, { - "type": ["null","string"] + "type": ["null", "string"] }, { "type": "integer" @@ -283,11 +283,7 @@ "fields": [ { "name": "created_at", - "type": [ - "null", - "string", - "int" - ], + "type": ["null", "string", "int"], "default": null } ] @@ -309,7 +305,7 @@ "format": "date-time" }, { - "type": ["null","string"] + "type": ["null", "string"] }, { "type": "integer" @@ -336,11 +332,7 @@ "fields": [ { "name": "created_at", - "type": [ - "null", - "string", - "int" - ], + "type": ["null", "string", "int"], "default": null } ] @@ -361,10 +353,7 @@ "type": "array", "items": [ { - "oneOf": [ - { "type": "integer" }, - { "type": "string" } - ] + "oneOf": [{ "type": "integer" }, { "type": "string" }] }, { "type": "boolean" diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json index df8c7b953bbd..6dd9a503e984 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json @@ -107,30 +107,21 @@ { "fieldName": "any_of_field", "jsonFieldSchema": { - "anyOf": [ - { "type": "string" }, - { "type": "integer" } - ] + "anyOf": [{ "type": "string" }, { "type": "integer" }] }, "avroFieldType": ["null", "string", "int"] }, { "fieldName": "all_of_field", "jsonFieldSchema": { - "allOf": [ - { "type": "string" }, - { "type": "integer" } - ] + "allOf": [{ "type": "string" }, { "type": "integer" }] }, "avroFieldType": ["null", "string", "int"] }, { "fieldName": "one_of_field", "jsonFieldSchema": { - "oneOf": [ - { "type": "string" }, - { "type": "integer" } - ] + "oneOf": [{ "type": "string" }, { "type": "integer" }] }, "avroFieldType": ["null", "string", "int"] }