Skip to content

Commit

Permalink
update avro type mappings
Browse files Browse the repository at this point in the history
{type:number, airbyte_type:integer} -> long
{type:number, airbyte_type:big_integer} -> string (i.e. "unbounded integer")
  • Loading branch information
edgao committed Jul 15, 2022
1 parent 018efd4 commit 5b0349b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
public enum JsonSchemaType {

STRING("string", true, null, Schema.Type.STRING),
// We currently don't distinguish between int and long
NUMBER_INT("number", true, "integer", Schema.Type.LONG),
NUMBER_BIGINT("number", true, "big_integer", Schema.Type.STRING),
NUMBER_FLOAT("number", true, "float", Schema.Type.FLOAT),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.s3.avro;

import static java.util.Collections.singletonList;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -59,15 +61,15 @@ static List<JsonSchemaType> getNonNullTypes(final String fieldName, final JsonNo
static List<JsonSchemaType> getTypes(final String fieldName, final JsonNode fieldDefinition) {
final Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
if (combinedRestriction.isPresent()) {
return Collections.singletonList(JsonSchemaType.COMBINED);
return singletonList(JsonSchemaType.COMBINED);
}

final JsonNode typeProperty = fieldDefinition.get(TYPE);
final JsonNode airbyteTypeProperty = fieldDefinition.get(AIRBYTE_TYPE);
final String airbyteType = airbyteTypeProperty == null ? null : airbyteTypeProperty.asText();
if (typeProperty == null || typeProperty.isNull()) {
LOGGER.warn("Field \"{}\" has no type specification. It will default to string", fieldName);
return Collections.singletonList(JsonSchemaType.STRING);
return singletonList(JsonSchemaType.STRING);
}

if (typeProperty.isArray()) {
Expand All @@ -77,11 +79,11 @@ static List<JsonSchemaType> getTypes(final String fieldName, final JsonNode fiel
}

if (typeProperty.isTextual()) {
return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText(), airbyteType));
return singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText(), airbyteType));
}

LOGGER.warn("Field \"{}\" has unexpected type {}. It will default to string.", fieldName, typeProperty);
return Collections.singletonList(JsonSchemaType.STRING);
return singletonList(JsonSchemaType.STRING);
}

static Optional<JsonNode> getCombinedRestriction(final JsonNode fieldDefinition) {
Expand Down Expand Up @@ -255,6 +257,8 @@ Schema parseSingleType(final String fieldName,
} else if (items.isArray()) {
final List<Schema> arrayElementTypes =
parseJsonTypeUnion(fieldName, fieldNamespace, (ArrayNode) items, appendExtraProps, addStringToLogicalTypes);
// If any of the array elements were already null, remove them so that we can reinsert it at position 0
arrayElementTypes.removeAll(singletonList(NULL_SCHEMA));
arrayElementTypes.add(0, NULL_SCHEMA);
fieldSchema = Schema.createArray(Schema.createUnion(arrayElementTypes));
} else {
Expand Down Expand Up @@ -406,6 +410,11 @@ List<Schema> mergeRecordSchemas(final String fieldName,
mergedSchemas.add(assembler.endRecord());
}

// If the schema is nullable, make sure that it's the first item in the list
if (mergedSchemas.removeAll(singletonList(NULL_SCHEMA))) {
mergedSchemas.add(0, NULL_SCHEMA);
}

return mergedSchemas;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
"type": "number",
"airbyte_type": "integer"
},
"avroFieldType": ["null", "int"]
"avroFieldType": ["null", "long"]
},
{
"fieldName": "big_integer_field",
"jsonFieldSchema": {
"type": "number",
"airbyte_type": "big_integer"
},
"avroFieldType": ["null", "long"]
"avroFieldType": ["null", "string"]
},
{
"fieldName": "float_field",
Expand Down Expand Up @@ -81,7 +81,7 @@
"null",
{
"type": "array",
"items": ["null", "string", "double", "long"]
"items": ["null", "string", "double"]
}
]
},
Expand Down Expand Up @@ -115,7 +115,7 @@
},
{
"name": "long_id",
"type": ["null", "long"],
"type": ["null", "string"],
"default": null
},
{
Expand Down Expand Up @@ -197,7 +197,7 @@
{ "type": "number", "airbyte_type": "big_integer" }
]
},
"avroFieldType": ["null", "string", "int", "long"]
"avroFieldType": ["null", "string", "int"]
},
{
"fieldName": "logical_type_date_time",
Expand Down

0 comments on commit 5b0349b

Please sign in to comment.