diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java index 68a86d3ddf1f..593957066e4d 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonSchemaType.java @@ -16,6 +16,7 @@ public enum JsonSchemaType { STRING("string", true, null, Schema.Type.STRING), + // We currently don't distinguish between int and long NUMBER_INT("number", true, "integer", Schema.Type.LONG), NUMBER_BIGINT("number", true, "big_integer", Schema.Type.STRING), NUMBER_FLOAT("number", true, "float", Schema.Type.FLOAT), diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java index aae14dae955a..f13e954d8be3 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.destination.s3.avro; +import static java.util.Collections.singletonList; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.base.Preconditions; @@ -59,7 +61,7 @@ static List getNonNullTypes(final String fieldName, final JsonNo static List getTypes(final String fieldName, final JsonNode fieldDefinition) { final Optional combinedRestriction = getCombinedRestriction(fieldDefinition); if (combinedRestriction.isPresent()) { - return Collections.singletonList(JsonSchemaType.COMBINED); + return singletonList(JsonSchemaType.COMBINED); } final JsonNode typeProperty = fieldDefinition.get(TYPE); @@ -67,7 +69,7 @@ static List getTypes(final String fieldName, final JsonNode fiel final String airbyteType = airbyteTypeProperty == null ? null : airbyteTypeProperty.asText(); if (typeProperty == null || typeProperty.isNull()) { LOGGER.warn("Field \"{}\" has no type specification. It will default to string", fieldName); - return Collections.singletonList(JsonSchemaType.STRING); + return singletonList(JsonSchemaType.STRING); } if (typeProperty.isArray()) { @@ -77,11 +79,11 @@ static List getTypes(final String fieldName, final JsonNode fiel } if (typeProperty.isTextual()) { - return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText(), airbyteType)); + return singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText(), airbyteType)); } LOGGER.warn("Field \"{}\" has unexpected type {}. It will default to string.", fieldName, typeProperty); - return Collections.singletonList(JsonSchemaType.STRING); + return singletonList(JsonSchemaType.STRING); } static Optional getCombinedRestriction(final JsonNode fieldDefinition) { @@ -255,6 +257,8 @@ Schema parseSingleType(final String fieldName, } else if (items.isArray()) { final List arrayElementTypes = parseJsonTypeUnion(fieldName, fieldNamespace, (ArrayNode) items, appendExtraProps, addStringToLogicalTypes); + // If any of the array elements were already null, remove them so that we can reinsert it at position 0 + arrayElementTypes.removeAll(singletonList(NULL_SCHEMA)); arrayElementTypes.add(0, NULL_SCHEMA); fieldSchema = Schema.createArray(Schema.createUnion(arrayElementTypes)); } else { @@ -406,6 +410,11 @@ List mergeRecordSchemas(final String fieldName, mergedSchemas.add(assembler.endRecord()); } + // If the schema is nullable, make sure that it's the first item in the list + if (mergedSchemas.removeAll(singletonList(NULL_SCHEMA))) { + mergedSchemas.add(0, NULL_SCHEMA); + } + return mergedSchemas; } diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/type_conversion_test_cases.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/type_conversion_test_cases.json index 2262b4ff76e6..071bc98b0bbb 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/type_conversion_test_cases.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/type_conversion_test_cases.json @@ -12,7 +12,7 @@ "type": "number", "airbyte_type": "integer" }, - "avroFieldType": ["null", "int"] + "avroFieldType": ["null", "long"] }, { "fieldName": "big_integer_field", @@ -20,7 +20,7 @@ "type": "number", "airbyte_type": "big_integer" }, - "avroFieldType": ["null", "long"] + "avroFieldType": ["null", "string"] }, { "fieldName": "float_field", @@ -81,7 +81,7 @@ "null", { "type": "array", - "items": ["null", "string", "double", "long"] + "items": ["null", "string", "double"] } ] }, @@ -115,7 +115,7 @@ }, { "name": "long_id", - "type": ["null", "long"], + "type": ["null", "string"], "default": null }, { @@ -197,7 +197,7 @@ { "type": "number", "airbyte_type": "big_integer" } ] }, - "avroFieldType": ["null", "string", "int", "long"] + "avroFieldType": ["null", "string", "int"] }, { "fieldName": "logical_type_date_time",