From 646aae1ac8e63b7a7d562ca077d8dffe06081213 Mon Sep 17 00:00:00 2001 From: LiRen Tu Date: Wed, 7 Jul 2021 15:41:36 -0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Destination=20S3:=20support=20`a?= =?UTF-8?q?nyOf`=20`allOf`=20and=20`oneOf`=20(#4613)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Support combined restrictions in json schema * Bump s3 version * Add more test cases * Update changelog * Add more test cases * Update documentation * Format code --- .../4816b78f-1489-44c1-9060-4b19d5fa9362.json | 2 +- .../seed/destination_definitions.yaml | 2 +- .../connectors/destination-s3/Dockerfile | 2 +- .../destination/s3/avro/JsonSchemaType.java | 3 +- .../s3/avro/JsonToAvroSchemaConverter.java | 86 ++++++++---- .../avro/JsonToAvroSchemaConverterTest.java | 17 ++- .../get_avro_schema.json | 127 ++++++++++++++++++ .../json_schema_converter/get_field_type.json | 21 +++ docs/integrations/destinations/s3.md | 21 ++- 9 files changed, 248 insertions(+), 33 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json index 7a7973a48c81..ccdd21c58f4a 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362", "name": "S3", "dockerRepository": "airbyte/destination-s3", - "dockerImageTag": "0.1.7", + "dockerImageTag": "0.1.8", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index d6a5dd8446bc..61df492a508e 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -37,7 +37,7 @@ - destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 name: S3 dockerRepository: airbyte/destination-s3 - dockerImageTag: 0.1.7 + dockerImageTag: 0.1.8 documentationUrl: https://docs.airbyte.io/integrations/destinations/s3 - destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc name: Redshift diff --git a/airbyte-integrations/connectors/destination-s3/Dockerfile b/airbyte-integrations/connectors/destination-s3/Dockerfile index aea3084c0b4e..d9fde8c582b6 100644 --- a/airbyte-integrations/connectors/destination-s3/Dockerfile +++ b/airbyte-integrations/connectors/destination-s3/Dockerfile @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/destination-s3 diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java index 93d4c5633ced..92b3651d0631 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java @@ -37,7 +37,8 @@ public enum JsonSchemaType { BOOLEAN("boolean", true, Schema.Type.BOOLEAN), NULL("null", true, Schema.Type.NULL), OBJECT("object", false, Schema.Type.RECORD), - ARRAY("array", false, Schema.Type.ARRAY); + ARRAY("array", false, Schema.Type.ARRAY), + COMBINED("combined", false, Schema.Type.UNION); private final String jsonSchemaType; private final boolean isPrimitive; diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java index e53f318f58c1..d4468c62effe 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java @@ -25,6 +25,7 @@ package io.airbyte.integrations.destination.s3.avro; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.base.Preconditions; import io.airbyte.commons.util.MoreIterators; import io.airbyte.integrations.base.JavaBaseConstants; @@ -34,6 +35,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -64,23 +66,46 @@ public class JsonToAvroSchemaConverter { private final Map standardizedNames = new HashMap<>(); - static List getNonNullTypes(String fieldName, JsonNode typeProperty) { - return getTypes(fieldName, typeProperty).stream() + static List getNonNullTypes(String fieldName, JsonNode fieldDefinition) { + return getTypes(fieldName, fieldDefinition).stream() .filter(type -> type != JsonSchemaType.NULL).collect(Collectors.toList()); } - static List getTypes(String fieldName, JsonNode typeProperty) { - if (typeProperty == null) { + static List getTypes(String fieldName, JsonNode fieldDefinition) { + Optional combinedRestriction = getCombinedRestriction(fieldDefinition); + if (combinedRestriction.isPresent()) { + return Collections.singletonList(JsonSchemaType.COMBINED); + } + + JsonNode typeProperty = fieldDefinition.get("type"); + if (typeProperty == null || typeProperty.isNull()) { throw new IllegalStateException(String.format("Field %s has no type", fieldName)); - } else if (typeProperty.isArray()) { + } + + if (typeProperty.isArray()) { return MoreIterators.toList(typeProperty.elements()).stream() .map(s -> JsonSchemaType.fromJsonSchemaType(s.asText())) .collect(Collectors.toList()); - } else if (typeProperty.isTextual()) { + } + + if (typeProperty.isTextual()) { return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText())); - } else { - throw new IllegalStateException("Unexpected type: " + typeProperty); } + + throw new IllegalStateException("Unexpected type: " + typeProperty); + } + + static Optional getCombinedRestriction(JsonNode fieldDefinition) { + if (fieldDefinition.has("anyOf")) { + return Optional.of(fieldDefinition.get("anyOf")); + } + if (fieldDefinition.has("allOf")) { + return Optional.of(fieldDefinition.get("allOf")); + } + if (fieldDefinition.has("oneOf")) { + return Optional.of(fieldDefinition.get("oneOf")); + } + return Optional.empty(); } public Map getStandardizedNames() { @@ -141,33 +166,27 @@ public Schema getAvroSchema(JsonNode jsonSchema, return assembler.endRecord(); } - Schema getSingleFieldType(String fieldName, - JsonSchemaType fieldType, - JsonNode fieldDefinition, - boolean canBeComposite) { + Schema getSingleFieldType(String fieldName, JsonSchemaType fieldType, JsonNode fieldDefinition) { Preconditions .checkState(fieldType != JsonSchemaType.NULL, "Null types should have been filtered out"); - Preconditions - .checkState(canBeComposite || fieldType.isPrimitive(), "Field %s has invalid type %s", - fieldName, fieldType); + Schema fieldSchema; switch (fieldType) { case STRING, NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType()); + case COMBINED -> { + Optional combinedRestriction = getCombinedRestriction(fieldDefinition); + List unionTypes = getSchemasFromTypes(fieldName, (ArrayNode) combinedRestriction.get()); + fieldSchema = Schema.createUnion(unionTypes); + } case ARRAY -> { JsonNode items = fieldDefinition.get("items"); Preconditions.checkNotNull(items, "Array field %s misses the items property.", fieldName); if (items.isObject()) { - fieldSchema = Schema - .createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items)); + fieldSchema = Schema.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items)); } else if (items.isArray()) { - List arrayElementTypes = MoreIterators.toList(items.elements()) - .stream() - .flatMap(itemDefinition -> getNonNullTypes(fieldName, itemDefinition.get("type")).stream() - .map(type -> getSingleFieldType(fieldName, type, itemDefinition, false))) - .distinct() - .collect(Collectors.toList()); - arrayElementTypes.add(0, Schema.create(Schema.Type.NULL)); + List arrayElementTypes = getSchemasFromTypes(fieldName, (ArrayNode) items); + arrayElementTypes.add(0, Schema.create(Type.NULL)); fieldSchema = Schema.createArray(Schema.createUnion(arrayElementTypes)); } else { throw new IllegalStateException( @@ -181,15 +200,30 @@ Schema getSingleFieldType(String fieldName, return fieldSchema; } + List getSchemasFromTypes(String fieldName, ArrayNode types) { + return MoreIterators.toList(types.elements()) + .stream() + .flatMap(definition -> getNonNullTypes(fieldName, definition).stream().flatMap(type -> { + Schema singleFieldSchema = getSingleFieldType(fieldName, type, definition); + if (singleFieldSchema.isUnion()) { + return singleFieldSchema.getTypes().stream(); + } else { + return Stream.of(singleFieldSchema); + } + })) + .distinct() + .collect(Collectors.toList()); + } + /** * @param fieldDefinition - Json schema field definition. E.g. { type: "number" }. */ Schema getNullableFieldTypes(String fieldName, JsonNode fieldDefinition) { // Filter out null types, which will be added back in the end. - List nonNullFieldTypes = getNonNullTypes(fieldName, fieldDefinition.get("type")) + List nonNullFieldTypes = getNonNullTypes(fieldName, fieldDefinition) .stream() .flatMap(fieldType -> { - Schema singleFieldSchema = getSingleFieldType(fieldName, fieldType, fieldDefinition, true); + Schema singleFieldSchema = getSingleFieldType(fieldName, fieldType, fieldDefinition); if (singleFieldSchema.isUnion()) { return singleFieldSchema.getTypes().stream(); } else { diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverterTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverterTest.java index 4a501396804d..551183733b50 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverterTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverterTest.java @@ -25,6 +25,7 @@ package io.airbyte.integrations.destination.s3.avro; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; @@ -47,7 +48,7 @@ public void testGetSingleTypes() { JsonNode input1 = Jsons.deserialize("{ \"type\": \"number\" }"); assertEquals( Collections.singletonList(JsonSchemaType.NUMBER), - JsonToAvroSchemaConverter.getTypes("field", input1.get("type"))); + JsonToAvroSchemaConverter.getTypes("field", input1)); } @Test @@ -55,7 +56,19 @@ public void testGetUnionTypes() { JsonNode input2 = Jsons.deserialize("{ \"type\": [\"null\", \"string\"] }"); assertEquals( Lists.newArrayList(JsonSchemaType.NULL, JsonSchemaType.STRING), - JsonToAvroSchemaConverter.getTypes("field", input2.get("type"))); + JsonToAvroSchemaConverter.getTypes("field", input2)); + } + + @Test + public void testNoCombinedRestriction() { + JsonNode input1 = Jsons.deserialize("{ \"type\": \"number\" }"); + assertTrue(JsonToAvroSchemaConverter.getCombinedRestriction(input1).isEmpty()); + } + + @Test + public void testWithCombinedRestriction() { + JsonNode input2 = Jsons.deserialize("{ \"anyOf\": [{ \"type\": \"string\" }, { \"type\": \"integer\" }] }"); + assertTrue(JsonToAvroSchemaConverter.getCombinedRestriction(input2).isPresent()); } public static class GetFieldTypeTestCaseProvider implements ArgumentsProvider { diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json index 271d01dba038..409a7ce58e28 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json @@ -253,5 +253,132 @@ } ] } + }, + { + "schemaName": "field_with_combined_restriction", + "namespace": "namespace8", + "appendAirbyteFields": false, + "jsonSchema": { + "properties": { + "created_at": { + "anyOf": [ + { + "type": "string", + "format": "date-time" + }, + { + "type": ["null", "string"] + }, + { + "type": "integer" + } + ] + } + } + }, + "avroSchema": { + "type": "record", + "name": "field_with_combined_restriction", + "namespace": "namespace8", + "fields": [ + { + "name": "created_at", + "type": ["null", "string", "int"], + "default": null + } + ] + } + }, + { + "schemaName": "record_with_combined_restriction_field", + "namespace": "namespace9", + "appendAirbyteFields": false, + "jsonSchema": { + "properties": { + "user": { + "type": "object", + "properties": { + "created_at": { + "anyOf": [ + { + "type": "string", + "format": "date-time" + }, + { + "type": ["null", "string"] + }, + { + "type": "integer" + } + ] + } + } + } + } + }, + "avroSchema": { + "type": "record", + "name": "record_with_combined_restriction_field", + "namespace": "namespace9", + "fields": [ + { + "name": "user", + "type": [ + "null", + { + "type": "record", + "name": "user", + "namespace": "", + "fields": [ + { + "name": "created_at", + "type": ["null", "string", "int"], + "default": null + } + ] + } + ], + "default": null + } + ] + } + }, + { + "schemaName": "array_with_combined_restriction_field", + "namespace": "namespace10", + "appendAirbyteFields": false, + "jsonSchema": { + "properties": { + "identifiers": { + "type": "array", + "items": [ + { + "oneOf": [{ "type": "integer" }, { "type": "string" }] + }, + { + "type": "boolean" + } + ] + } + } + }, + "avroSchema": { + "type": "record", + "name": "array_with_combined_restriction_field", + "namespace": "namespace10", + "fields": [ + { + "name": "identifiers", + "type": [ + "null", + { + "type": "array", + "items": ["null", "int", "string", "boolean"] + } + ], + "default": null + } + ] + } } ] diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json index fe1c5c45aa4a..6dd9a503e984 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json @@ -103,5 +103,26 @@ ] } ] + }, + { + "fieldName": "any_of_field", + "jsonFieldSchema": { + "anyOf": [{ "type": "string" }, { "type": "integer" }] + }, + "avroFieldType": ["null", "string", "int"] + }, + { + "fieldName": "all_of_field", + "jsonFieldSchema": { + "allOf": [{ "type": "string" }, { "type": "integer" }] + }, + "avroFieldType": ["null", "string", "int"] + }, + { + "fieldName": "one_of_field", + "jsonFieldSchema": { + "oneOf": [{ "type": "string" }, { "type": "integer" }] + }, + "avroFieldType": ["null", "string", "int"] } ] diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index d642378a6688..f14d85d2ca16 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -113,7 +113,25 @@ Under the hood, an Airbyte data stream in Json schema is converted to an Avro sc | array | array | 2. Built-in Json schema formats are not mapped to Avro logical types at this moment. -2. Json schema compositions ("allOf", "anyOf", and "oneOf") are not supported at this moment. +2. Combined restrictions ("allOf", "anyOf", and "oneOf") will be converted to type unions. The corresponding Avro schema can be less stringent. For example, the following Json schema + + ```json + { + "oneOf": [ + { "type": "string" }, + { "type": "integer" } + ] + } + ``` + will become this in Avro schema: + + ```json + { + "type": ["null", "string", "int"] + } + ``` + +2. Keyword `not` is not supported, as there is no equivalent validation mechanism in Avro schema. 3. Only alphanumeric characters and underscores (`/a-zA-Z0-9_/`) are allowed in a stream or field name. Any special character will be converted to an alphabet or underscore. For example, `spécial:character_names` will become `special_character_names`. The original names will be stored in the `doc` property in this format: `_airbyte_original_name:`. 4. All field will be nullable. For example, a `string` Json field will be typed as `["null", "string"]` in Avro. This is necessary because the incoming data stream may have optional fields. 5. For array fields in Json schema, when the `items` property is an array, it means that each element in the array should follow its own schema sequentially. For example, the following specification means the first item in the array should be a string, and the second a number. @@ -356,6 +374,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.8 | 2021-07-07 | [#4613](https://github.com/airbytehq/airbyte/pull/4613) | Patched schema converter to support combined restrictions. | | 0.1.7 | 2021-06-23 | [#4227](https://github.com/airbytehq/airbyte/pull/4227) | Added Avro and JSONL output. | | 0.1.6 | 2021-06-16 | [#4130](https://github.com/airbytehq/airbyte/pull/4130) | Patched the check to verify prefix access instead of full-bucket access. | | 0.1.5 | 2021-06-14 | [#3908](https://github.com/airbytehq/airbyte/pull/3908) | Fixed default `max_padding_size_mb` in `spec.json`. |