From 7111bbc15f760c6d89cadfb33e4bbcb38110d056 Mon Sep 17 00:00:00 2001 From: Cynthia Yin Date: Tue, 18 Jul 2023 14:59:40 -0700 Subject: [PATCH 01/10] general cleanup - move stuff around, add more comments --- .../typing_deduping/AirbyteType.java | 83 +++++++----- .../typing_deduping/AirbyteTypeUtils.java | 52 +++---- .../typing_deduping/AirbyteTypeTest.java | 127 +++++++++--------- .../typing_deduping/BigQuerySqlGenerator.java | 5 +- 4 files changed, 133 insertions(+), 134 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java index 4351bcc6cc40..4c3c21e01dbe 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java @@ -4,14 +4,19 @@ package io.airbyte.integrations.base.destination.typing_deduping; +import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils.getAirbyteProtocolType; +import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils.nodeIsType; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableList; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import org.slf4j.Logger; @@ -33,40 +38,13 @@ static AirbyteType fromJsonSchema(final JsonNode schema) { final JsonNode topLevelType = schema.get("type"); if (topLevelType != null) { if (topLevelType.isTextual()) { - if (AirbyteTypeUtils.nodeIsType(topLevelType, "object")) { + if (nodeIsType(topLevelType, "object")) { return getStruct(schema); - } else if (AirbyteTypeUtils.nodeIsType(topLevelType, "array")) { + } else if (nodeIsType(topLevelType, "array")) { return getArray(schema); } } else if (topLevelType.isArray()) { - final List typeOptions = new ArrayList<>(); - topLevelType.elements().forEachRemaining(element -> { - // ignore "null" type and remove duplicates - String type = element.asText(""); - if (!"null".equals(type) && !typeOptions.contains(type)) { - typeOptions.add(element.asText()); - } - }); - - // we encounter an array of types that actually represents a single type rather than a OneOf - if (typeOptions.size() == 1) { - if (typeOptions.get(0).equals("object")) { - return getStruct(schema); - } else if (typeOptions.get(0).equals("array")) { - return getArray(schema); - } else { - return AirbyteTypeUtils.getAirbyteProtocolType(schema); - } - } - - final List options = typeOptions.stream().map(typeOption -> { - // Recurse into a schema that forces a specific one of each option - JsonNode schemaClone = schema.deepCopy(); - // schema is guaranteed to be an object here, because we know it has a `type` key - ((ObjectNode) schemaClone).put("type", typeOption); - return fromJsonSchema(schemaClone); - }).toList(); - return new OneOf(options); + return fromArrayJsonSchema(schema, topLevelType); } } else if (schema.hasNonNull("oneOf")) { final List options = new ArrayList<>(); @@ -78,7 +56,7 @@ static AirbyteType fromJsonSchema(final JsonNode schema) { // This is for backwards-compatibility with legacy normalization. return getStruct(schema); } - return AirbyteTypeUtils.getAirbyteProtocolType(schema); + return getAirbyteProtocolType(schema); } catch (final Exception e) { LOGGER.error("Exception parsing JSON schema {}: {}; returning UNKNOWN.", schema, e); return AirbyteProtocolType.UNKNOWN; @@ -107,17 +85,48 @@ private static Array getArray(final JsonNode schema) { } } - enum AirbyteProtocolType implements AirbyteType { + private static AirbyteType fromArrayJsonSchema(final JsonNode schema, final JsonNode array) { + final List typeOptions = new ArrayList<>(); + array.elements().forEachRemaining(element -> { + // ignore "null" type and remove duplicates + final String type = element.asText(""); + if (!"null".equals(type) && !typeOptions.contains(type)) { + typeOptions.add(element.asText()); + } + }); - STRING, - NUMBER, - INTEGER, + // we encounter an array of types that actually represents a single type rather than a OneOf + if (typeOptions.size() == 1) { + if (typeOptions.get(0).equals("object")) { + return getStruct(schema); + } else if (typeOptions.get(0).equals("array")) { + return getArray(schema); + } else { + return getAirbyteProtocolType(schema); + } + } + + final List options = typeOptions.stream().map(typeOption -> { + // Recurse into a schema that forces a specific one of each option + final JsonNode schemaClone = schema.deepCopy(); + // schema is guaranteed to be an object here, because we know it has a `type` key + ((ObjectNode) schemaClone).put("type", typeOption); + return fromJsonSchema(schemaClone); + }).toList(); + return new OneOf(options); + } + + enum AirbyteProtocolType implements AirbyteType { + // Protocol types are ordered by precedence in the case of a OneOf that contains multiple types. BOOLEAN, - TIMESTAMP_WITH_TIMEZONE, + INTEGER, + NUMBER, TIMESTAMP_WITHOUT_TIMEZONE, + TIMESTAMP_WITH_TIMEZONE, + DATE, TIME_WITH_TIMEZONE, TIME_WITHOUT_TIMEZONE, - DATE, + STRING, UNKNOWN; public static AirbyteProtocolType matches(final String type) { diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java index b9673527ed0d..4573ea2a137a 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.base.destination.typing_deduping; +import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType.*; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.TextNode; import com.google.common.collect.ImmutableList; @@ -31,18 +33,6 @@ public class AirbyteTypeUtils { AirbyteProtocolType.INTEGER, ImmutableList.of(AirbyteProtocolType.STRING, AirbyteProtocolType.NUMBER), AirbyteProtocolType.NUMBER, ImmutableList.of(AirbyteProtocolType.STRING)); - // Protocol types in order of precedence - private static final List ORDERED_PROTOCOL_TYPES = ImmutableList.of( - AirbyteProtocolType.BOOLEAN, - AirbyteProtocolType.INTEGER, - AirbyteProtocolType.NUMBER, - AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE, - AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE, - AirbyteProtocolType.DATE, - AirbyteProtocolType.TIME_WITH_TIMEZONE, - AirbyteProtocolType.TIME_WITHOUT_TIMEZONE, - AirbyteProtocolType.STRING); - protected static boolean nodeIsType(final JsonNode node, final String type) { if (node == null || !node.isTextual()) { return false; @@ -65,49 +55,48 @@ private static boolean nodeIsOrContainsType(final JsonNode node, final String ty return false; } + // Extracts the appropriate protocol type from the representative JSON protected static AirbyteType getAirbyteProtocolType(final JsonNode node) { + // JSON could be a string (ex: "number") if (node.isTextual()) { - return AirbyteProtocolType.matches(node.asText()); + return matches(node.asText()); } + // Or, JSON could be a node with fields final JsonNode propertyType = node.get("type"); final JsonNode airbyteType = node.get("airbyte_type"); final JsonNode format = node.get("format"); if (nodeIsOrContainsType(propertyType, "boolean")) { - return AirbyteProtocolType.BOOLEAN; + return BOOLEAN; } else if (nodeIsOrContainsType(propertyType, "integer")) { - return AirbyteProtocolType.INTEGER; + return INTEGER; } else if (nodeIsOrContainsType(propertyType, "number")) { - if (nodeIsType(airbyteType, "integer")) { - return AirbyteProtocolType.INTEGER; - } else { - return AirbyteProtocolType.NUMBER; - } + return nodeIsType(airbyteType, "integer") ? INTEGER : NUMBER; } else if (nodeIsOrContainsType(propertyType, "string")) { if (nodeIsOrContainsType(format, "date")) { - return AirbyteProtocolType.DATE; + return DATE; } else if (nodeIsType(format, "time")) { if (nodeIsType(airbyteType, "time_without_timezone")) { - return AirbyteProtocolType.TIME_WITHOUT_TIMEZONE; + return TIME_WITHOUT_TIMEZONE; } else if (nodeIsType(airbyteType, "time_with_timezone")) { - return AirbyteProtocolType.TIME_WITH_TIMEZONE; + return TIME_WITH_TIMEZONE; } } else if (nodeIsOrContainsType(format, "date-time")) { if (nodeIsType(airbyteType, "timestamp_without_timezone")) { - return AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE; + return TIMESTAMP_WITHOUT_TIMEZONE; } else if (airbyteType == null || nodeIsType(airbyteType, "timestamp_with_timezone")) { - return AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE; + return TIMESTAMP_WITH_TIMEZONE; } } else { - return AirbyteProtocolType.STRING; + return STRING; } } - return AirbyteProtocolType.UNKNOWN; + return UNKNOWN; } - // Pick which type in a OneOf has precedence + // Picks which type in a OneOf takes precedence public static AirbyteType chooseOneOfType(final OneOf o) { final List options = o.options(); @@ -133,10 +122,11 @@ public static AirbyteType chooseOneOfType(final OneOf o) { } else if (foundStructType != null) { return foundStructType; } else { - for (final AirbyteProtocolType protocolType : ORDERED_PROTOCOL_TYPES) { + for (final AirbyteProtocolType protocolType : AirbyteProtocolType.values()) { if (typePresenceMap.getOrDefault(protocolType, false)) { boolean foundExcludedTypes = false; - final List excludedTypes = EXCLUDED_PROTOCOL_TYPES_MAP.getOrDefault(protocolType, Collections.emptyList()); + final List excludedTypes = + EXCLUDED_PROTOCOL_TYPES_MAP.getOrDefault(protocolType, Collections.emptyList()); for (final AirbyteProtocolType excludedType : excludedTypes) { if (typePresenceMap.getOrDefault(excludedType, false)) { foundExcludedTypes = true; @@ -150,7 +140,7 @@ public static AirbyteType chooseOneOfType(final OneOf o) { } } - return AirbyteProtocolType.UNKNOWN; + return UNKNOWN; } } diff --git a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java index 5d4120e5cd21..3ac2516a7a69 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java @@ -4,12 +4,13 @@ package io.airbyte.integrations.base.destination.typing_deduping; +import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType.*; +import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.fromJsonSchema; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; @@ -179,21 +180,21 @@ public void testStruct() { """); final LinkedHashMap propertiesMap = new LinkedHashMap<>(); - propertiesMap.put("key1", AirbyteProtocolType.BOOLEAN); - propertiesMap.put("key2", AirbyteProtocolType.INTEGER); - propertiesMap.put("key3", AirbyteProtocolType.INTEGER); - propertiesMap.put("key4", AirbyteProtocolType.NUMBER); - propertiesMap.put("key5", AirbyteProtocolType.DATE); - propertiesMap.put("key6", AirbyteProtocolType.TIME_WITHOUT_TIMEZONE); - propertiesMap.put("key7", AirbyteProtocolType.TIME_WITH_TIMEZONE); - propertiesMap.put("key8", AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE); - propertiesMap.put("key9", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); - propertiesMap.put("key10", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); - propertiesMap.put("key11", AirbyteProtocolType.STRING); + propertiesMap.put("key1", BOOLEAN); + propertiesMap.put("key2", INTEGER); + propertiesMap.put("key3", INTEGER); + propertiesMap.put("key4", NUMBER); + propertiesMap.put("key5", DATE); + propertiesMap.put("key6", TIME_WITHOUT_TIMEZONE); + propertiesMap.put("key7", TIME_WITH_TIMEZONE); + propertiesMap.put("key8", TIMESTAMP_WITHOUT_TIMEZONE); + propertiesMap.put("key9", TIMESTAMP_WITH_TIMEZONE); + propertiesMap.put("key10", TIMESTAMP_WITH_TIMEZONE); + propertiesMap.put("key11", STRING); final AirbyteType struct = new Struct(propertiesMap); for (final String schema : structSchema) { - assertEquals(struct, AirbyteType.fromJsonSchema(Jsons.deserialize(schema))); + assertEquals(struct, fromJsonSchema(Jsons.deserialize(schema))); } } @@ -218,7 +219,7 @@ public void testEmptyStruct() { final AirbyteType struct = new Struct(new LinkedHashMap<>()); for (final String schema : structSchema) { - assertEquals(struct, AirbyteType.fromJsonSchema(Jsons.deserialize(schema))); + assertEquals(struct, fromJsonSchema(Jsons.deserialize(schema))); } } @@ -235,10 +236,10 @@ public void testImplicitStruct() { """; final LinkedHashMap propertiesMap = new LinkedHashMap<>(); - propertiesMap.put("key1", AirbyteProtocolType.BOOLEAN); + propertiesMap.put("key1", BOOLEAN); final AirbyteType struct = new Struct(propertiesMap); - assertEquals(struct, AirbyteType.fromJsonSchema(Jsons.deserialize(structSchema))); + assertEquals(struct, fromJsonSchema(Jsons.deserialize(structSchema))); } @Test @@ -275,9 +276,9 @@ public void testArray() { } """); - final AirbyteType array = new Array(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); + final AirbyteType array = new Array(TIMESTAMP_WITH_TIMEZONE); for (final String schema : arraySchema) { - assertEquals(array, AirbyteType.fromJsonSchema(Jsons.deserialize(schema))); + assertEquals(array, fromJsonSchema(Jsons.deserialize(schema))); } } @@ -301,9 +302,9 @@ public void testEmptyArray() { } """); - final AirbyteType array = new Array(AirbyteProtocolType.UNKNOWN); + final AirbyteType array = new Array(UNKNOWN); for (final String schema : arraySchema) { - assertEquals(array, AirbyteType.fromJsonSchema(Jsons.deserialize(schema))); + assertEquals(array, fromJsonSchema(Jsons.deserialize(schema))); } } @@ -316,11 +317,11 @@ public void testUnsupportedOneOf() { """; final List options = new ArrayList<>(); - options.add(AirbyteProtocolType.NUMBER); - options.add(AirbyteProtocolType.STRING); + options.add(NUMBER); + options.add(STRING); final UnsupportedOneOf unsupportedOneOf = new UnsupportedOneOf(options); - assertEquals(unsupportedOneOf, AirbyteType.fromJsonSchema(Jsons.deserialize(unsupportedOneOfSchema))); + assertEquals(unsupportedOneOf, fromJsonSchema(Jsons.deserialize(unsupportedOneOfSchema))); } @Test @@ -333,16 +334,16 @@ public void testOneOf() { """; final List options = new ArrayList<>(); - options.add(AirbyteProtocolType.STRING); - options.add(AirbyteProtocolType.NUMBER); + options.add(STRING); + options.add(NUMBER); final OneOf oneOf = new OneOf(options); - assertEquals(oneOf, AirbyteType.fromJsonSchema(Jsons.deserialize(oneOfSchema))); + assertEquals(oneOf, fromJsonSchema(Jsons.deserialize(oneOfSchema))); } @Test public void testOneOfComplex() { - JsonNode schema = Jsons.deserialize(""" + final JsonNode schema = Jsons.deserialize(""" { "type": ["string", "object", "array", "null", "string", "object", "array", "null"], "properties": { @@ -352,35 +353,35 @@ public void testOneOfComplex() { } """); - AirbyteType parsed = AirbyteType.fromJsonSchema(schema); + final AirbyteType parsed = fromJsonSchema(schema); - AirbyteType expected = new OneOf(List.of( - AirbyteProtocolType.STRING, + final AirbyteType expected = new OneOf(List.of( + STRING, new Struct(new LinkedHashMap<>() { { - put("foo", AirbyteProtocolType.STRING); + put("foo", STRING); } }), - new Array(AirbyteProtocolType.STRING))); + new Array(STRING))); assertEquals(expected, parsed); } @Test public void testOneOfUnderspecifiedNonPrimitives() { - JsonNode schema = Jsons.deserialize(""" + final JsonNode schema = Jsons.deserialize(""" { "type": ["string", "object", "array", "null", "string", "object", "array", "null"] } """); - AirbyteType parsed = AirbyteType.fromJsonSchema(schema); + final AirbyteType parsed = fromJsonSchema(schema); - AirbyteType expected = new OneOf(List.of( - AirbyteProtocolType.STRING, + final AirbyteType expected = new OneOf(List.of( + STRING, new Struct(new LinkedHashMap<>()), - new Array(AirbyteProtocolType.UNKNOWN))); + new Array(UNKNOWN))); assertEquals(expected, parsed); } @@ -391,7 +392,7 @@ public void testInvalidTextualType() { "type": "foo" } """; - assertEquals(AirbyteProtocolType.UNKNOWN, AirbyteType.fromJsonSchema(Jsons.deserialize(invalidTypeSchema))); + assertEquals(UNKNOWN, fromJsonSchema(Jsons.deserialize(invalidTypeSchema))); } @Test @@ -401,7 +402,7 @@ public void testInvalidBooleanType() { "type": true } """; - assertEquals(AirbyteProtocolType.UNKNOWN, AirbyteType.fromJsonSchema(Jsons.deserialize(invalidTypeSchema))); + assertEquals(UNKNOWN, fromJsonSchema(Jsons.deserialize(invalidTypeSchema))); } @Test @@ -417,7 +418,7 @@ public void testInvalid() { invalidSchema.add("{}"); for (final String schema : invalidSchema) { - assertEquals(AirbyteProtocolType.UNKNOWN, AirbyteType.fromJsonSchema(Jsons.deserialize(schema))); + assertEquals(UNKNOWN, fromJsonSchema(Jsons.deserialize(schema))); } } @@ -425,56 +426,56 @@ public void testInvalid() { public void testChooseOneOf() { // test ordering - OneOf o = new OneOf(ImmutableList.of(AirbyteProtocolType.STRING, AirbyteProtocolType.DATE)); - assertEquals(AirbyteProtocolType.DATE, AirbyteTypeUtils.chooseOneOfType(o)); + OneOf o = new OneOf(ImmutableList.of(STRING, DATE)); + assertEquals(DATE, AirbyteTypeUtils.chooseOneOfType(o)); - final Array a = new Array(AirbyteProtocolType.TIME_WITH_TIMEZONE); - o = new OneOf(ImmutableList.of(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE, a)); + final Array a = new Array(TIME_WITH_TIMEZONE); + o = new OneOf(ImmutableList.of(TIMESTAMP_WITH_TIMEZONE, a)); assertEquals(a, AirbyteTypeUtils.chooseOneOfType(o)); final LinkedHashMap properties = new LinkedHashMap<>(); - properties.put("key1", AirbyteProtocolType.UNKNOWN); - properties.put("key2", AirbyteProtocolType.TIME_WITHOUT_TIMEZONE); + properties.put("key1", UNKNOWN); + properties.put("key2", TIME_WITHOUT_TIMEZONE); final Struct s = new Struct(properties); - o = new OneOf(ImmutableList.of(AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE, s)); + o = new OneOf(ImmutableList.of(TIMESTAMP_WITHOUT_TIMEZONE, s)); assertEquals(s, AirbyteTypeUtils.chooseOneOfType(o)); // test exclusion - o = new OneOf(ImmutableList.of(AirbyteProtocolType.BOOLEAN, AirbyteProtocolType.INTEGER)); - assertEquals(AirbyteProtocolType.INTEGER, AirbyteTypeUtils.chooseOneOfType(o)); + o = new OneOf(ImmutableList.of(BOOLEAN, INTEGER)); + assertEquals(INTEGER, AirbyteTypeUtils.chooseOneOfType(o)); - o = new OneOf(ImmutableList.of(AirbyteProtocolType.INTEGER, AirbyteProtocolType.NUMBER, AirbyteProtocolType.DATE)); - assertEquals(AirbyteProtocolType.NUMBER, AirbyteTypeUtils.chooseOneOfType(o)); + o = new OneOf(ImmutableList.of(INTEGER, NUMBER, DATE)); + assertEquals(NUMBER, AirbyteTypeUtils.chooseOneOfType(o)); - o = new OneOf(ImmutableList.of(AirbyteProtocolType.BOOLEAN, AirbyteProtocolType.NUMBER, AirbyteProtocolType.STRING)); - assertEquals(AirbyteProtocolType.STRING, AirbyteTypeUtils.chooseOneOfType(o)); + o = new OneOf(ImmutableList.of(BOOLEAN, NUMBER, STRING)); + assertEquals(STRING, AirbyteTypeUtils.chooseOneOfType(o)); } @Test public void testAsColumns() { - OneOf o = new OneOf(List.of( - AirbyteProtocolType.STRING, + final OneOf o = new OneOf(List.of( + STRING, new Struct(new LinkedHashMap<>() { { - put("foo", AirbyteProtocolType.STRING); + put("foo", STRING); } }), - new Array(AirbyteProtocolType.STRING), + new Array(STRING), // This is bad behavior, but it matches current behavior so we'll test it. // Ideally, we would recognize that the sub-oneOfs are also objects. new OneOf(List.of(new Struct(new LinkedHashMap<>()))), new UnsupportedOneOf(List.of(new Struct(new LinkedHashMap<>()))))); - LinkedHashMap columns = o.asColumns(); + final LinkedHashMap columns = o.asColumns(); assertEquals( new LinkedHashMap<>() { { - put("foo", AirbyteProtocolType.STRING); + put("foo", STRING); } }, @@ -483,7 +484,7 @@ public void testAsColumns() { @Test public void testAsColumnsMultipleObjects() { - OneOf o = new OneOf(List.of( + final OneOf o = new OneOf(List.of( new Struct(new LinkedHashMap<>()), new Struct(new LinkedHashMap<>()))); @@ -494,9 +495,9 @@ public void testAsColumnsMultipleObjects() { @Test public void testAsColumnsNoObjects() { - OneOf o = new OneOf(List.of( - AirbyteProtocolType.STRING, - new Array(AirbyteProtocolType.STRING), + final OneOf o = new OneOf(List.of( + STRING, + new Array(STRING), new UnsupportedOneOf(new ArrayList<>()), // Similar to testAsColumns(), this is bad behavior. new OneOf(List.of(new Struct(new LinkedHashMap<>()))), diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index 748e383a2e5c..62ab23396286 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -115,7 +115,7 @@ public StandardSQLTypeName toDialectType(final AirbyteType type) { } private String extractAndCast(final ColumnId column, final AirbyteType airbyteType) { - if (airbyteType instanceof OneOf o) { + if (airbyteType instanceof final OneOf o) { // This is guaranteed to not be a OneOf, so we won't recurse infinitely final AirbyteType chosenType = AirbyteTypeUtils.chooseOneOfType(o); return extractAndCast(column, chosenType); @@ -160,7 +160,6 @@ ELSE JSON_QUERY(`_airbyte_data`, '$.${column_name}') // TODO maybe make this a BiMap and elevate this method and its inverse (toDestinationSQLType?) to the SQLGenerator? public StandardSQLTypeName toDialectType(final AirbyteProtocolType airbyteProtocolType) { return switch (airbyteProtocolType) { - // TODO doublecheck these case STRING -> StandardSQLTypeName.STRING; case NUMBER -> StandardSQLTypeName.NUMERIC; case INTEGER -> StandardSQLTypeName.INT64; @@ -510,7 +509,7 @@ String cdcDeletes(final StreamConfig stream, } final String pkList = stream.primaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); - String pkCasts = stream.primaryKey().stream().map(pk -> extractAndCast(pk, streamColumns.get(pk))).collect(joining(",\n")); + final String pkCasts = stream.primaryKey().stream().map(pk -> extractAndCast(pk, streamColumns.get(pk))).collect(joining(",\n")); // we want to grab IDs for deletion from the raw table (not the final table itself) to hand out-of-order record insertions after the delete has been registered return new StringSubstitutor(Map.of( From f7598429cf7980f69e029754422a232a6ec7830a Mon Sep 17 00:00:00 2001 From: Cynthia Yin Date: Tue, 18 Jul 2023 19:22:32 -0700 Subject: [PATCH 02/10] guarantee `getAirbyteProtocolType` won't handle array values for `type` --- .../typing_deduping/AirbyteType.java | 26 ++++++----- .../typing_deduping/AirbyteTypeUtils.java | 45 +++++++------------ .../typing_deduping/AirbyteTypeTest.java | 6 +-- 3 files changed, 33 insertions(+), 44 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java index 4c3c21e01dbe..c6099a597f76 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.base.destination.typing_deduping; import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils.getAirbyteProtocolType; -import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils.nodeIsType; +import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils.nodeMatches; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -38,9 +38,9 @@ static AirbyteType fromJsonSchema(final JsonNode schema) { final JsonNode topLevelType = schema.get("type"); if (topLevelType != null) { if (topLevelType.isTextual()) { - if (nodeIsType(topLevelType, "object")) { + if (nodeMatches(topLevelType, "object")) { return getStruct(schema); - } else if (nodeIsType(topLevelType, "array")) { + } else if (nodeMatches(topLevelType, "array")) { return getArray(schema); } } else if (topLevelType.isArray()) { @@ -102,20 +102,24 @@ private static AirbyteType fromArrayJsonSchema(final JsonNode schema, final Json } else if (typeOptions.get(0).equals("array")) { return getArray(schema); } else { - return getAirbyteProtocolType(schema); + return getAirbyteProtocolType(getTrimmedJsonSchema(schema, typeOptions.get(0))); } } - final List options = typeOptions.stream().map(typeOption -> { - // Recurse into a schema that forces a specific one of each option - final JsonNode schemaClone = schema.deepCopy(); - // schema is guaranteed to be an object here, because we know it has a `type` key - ((ObjectNode) schemaClone).put("type", typeOption); - return fromJsonSchema(schemaClone); - }).toList(); + // Recurse into a schema that forces a specific one of each option + final List options = typeOptions.stream().map(typeOption -> + fromJsonSchema(getTrimmedJsonSchema(schema, typeOption))).toList(); return new OneOf(options); } + // Duplicates the JSON schema but keeps only one type + private static JsonNode getTrimmedJsonSchema(final JsonNode schema, final String type) { + final JsonNode schemaClone = schema.deepCopy(); + // schema is guaranteed to be an object here, because we know it has a `type` key + ((ObjectNode) schemaClone).put("type", type); + return schemaClone; + } + enum AirbyteProtocolType implements AirbyteType { // Protocol types are ordered by precedence in the case of a OneOf that contains multiple types. BOOLEAN, diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java index 4573ea2a137a..3f00c7d67ecf 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java @@ -33,26 +33,11 @@ public class AirbyteTypeUtils { AirbyteProtocolType.INTEGER, ImmutableList.of(AirbyteProtocolType.STRING, AirbyteProtocolType.NUMBER), AirbyteProtocolType.NUMBER, ImmutableList.of(AirbyteProtocolType.STRING)); - protected static boolean nodeIsType(final JsonNode node, final String type) { + protected static boolean nodeMatches(final JsonNode node, final String value) { if (node == null || !node.isTextual()) { return false; } - return node.equals(TextNode.valueOf(type)); - } - - private static boolean nodeIsOrContainsType(final JsonNode node, final String type) { - if (node == null) { - return false; - } else if (node.isTextual()) { - return nodeIsType(node, type); - } else if (node.isArray()) { - for (final JsonNode element : node) { - if (nodeIsType(element, type)) { - return true; - } - } - } - return false; + return node.equals(TextNode.valueOf(value)); } // Extracts the appropriate protocol type from the representative JSON @@ -62,30 +47,30 @@ protected static AirbyteType getAirbyteProtocolType(final JsonNode node) { return matches(node.asText()); } - // Or, JSON could be a node with fields + // or, JSON could be a node with fields final JsonNode propertyType = node.get("type"); final JsonNode airbyteType = node.get("airbyte_type"); final JsonNode format = node.get("format"); - if (nodeIsOrContainsType(propertyType, "boolean")) { + if (nodeMatches(propertyType, "boolean")) { return BOOLEAN; - } else if (nodeIsOrContainsType(propertyType, "integer")) { + } else if (nodeMatches(propertyType, "integer")) { return INTEGER; - } else if (nodeIsOrContainsType(propertyType, "number")) { - return nodeIsType(airbyteType, "integer") ? INTEGER : NUMBER; - } else if (nodeIsOrContainsType(propertyType, "string")) { - if (nodeIsOrContainsType(format, "date")) { + } else if (nodeMatches(propertyType, "number")) { + return nodeMatches(airbyteType, "integer") ? INTEGER : NUMBER; + } else if (nodeMatches(propertyType, "string")) { + if (nodeMatches(format, "date")) { return DATE; - } else if (nodeIsType(format, "time")) { - if (nodeIsType(airbyteType, "time_without_timezone")) { + } else if (nodeMatches(format, "time")) { + if (nodeMatches(airbyteType, "time_without_timezone")) { return TIME_WITHOUT_TIMEZONE; - } else if (nodeIsType(airbyteType, "time_with_timezone")) { + } else if (nodeMatches(airbyteType, "time_with_timezone")) { return TIME_WITH_TIMEZONE; } - } else if (nodeIsOrContainsType(format, "date-time")) { - if (nodeIsType(airbyteType, "timestamp_without_timezone")) { + } else if (nodeMatches(format, "date-time")) { + if (nodeMatches(airbyteType, "timestamp_without_timezone")) { return TIMESTAMP_WITHOUT_TIMEZONE; - } else if (airbyteType == null || nodeIsType(airbyteType, "timestamp_with_timezone")) { + } else if (airbyteType == null || nodeMatches(airbyteType, "timestamp_with_timezone")) { return TIMESTAMP_WITH_TIMEZONE; } } else { diff --git a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java index 3ac2516a7a69..aa431f6199f7 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java @@ -63,7 +63,7 @@ public void testStruct() { }, "key9": { "type": "string", - "format": ["date-time", "foo"], + "format": "date-time", "airbyte_type": "timestamp_with_timezone" }, "key10": { @@ -114,7 +114,7 @@ public void testStruct() { }, "key9": { "type": ["string"], - "format": ["date-time", "foo"], + "format": "date-time", "airbyte_type": "timestamp_with_timezone" }, "key10": { @@ -165,7 +165,7 @@ public void testStruct() { }, "key9": { "type": ["null", "string"], - "format": ["date-time", "foo"], + "format": "date-time", "airbyte_type": "timestamp_with_timezone" }, "key10": { From 6e21202b5f56a135f791937870e4159f34492948 Mon Sep 17 00:00:00 2001 From: Cynthia Yin Date: Tue, 18 Jul 2023 19:43:31 -0700 Subject: [PATCH 03/10] rename OneOf to Union --- .../typing_deduping/AirbyteType.java | 16 ++--- .../typing_deduping/AirbyteTypeUtils.java | 8 +-- .../typing_deduping/CatalogParser.java | 29 ++++----- .../typing_deduping/AirbyteTypeTest.java | 63 ++++++++++--------- .../typing_deduping/BigQuerySqlGenerator.java | 12 ++-- .../BigQuerySqlGeneratorTest.java | 14 ++--- 6 files changed, 72 insertions(+), 70 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java index c6099a597f76..b766b22b7cff 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java @@ -12,8 +12,8 @@ import com.google.common.collect.ImmutableList; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; import java.util.ArrayList; import java.util.Collections; @@ -22,7 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public sealed interface AirbyteType permits Array,OneOf,Struct,UnsupportedOneOf,AirbyteProtocolType { +public sealed interface AirbyteType permits AirbyteProtocolType,Struct,Array,UnsupportedOneOf,Union { Logger LOGGER = LoggerFactory.getLogger(AirbyteTypeUtils.class); @@ -31,7 +31,7 @@ public sealed interface AirbyteType permits Array,OneOf,Struct,UnsupportedOneOf, * it's an {@link Struct} schema, and then call {@link Struct#properties()} to get the columns. *

* If the top-level schema is not an object, then we can't really do anything with it, and should - * probably fail the sync. (but see also {@link OneOf#asColumns()}). + * probably fail the sync. (but see also {@link Union#asColumns()}). */ static AirbyteType fromJsonSchema(final JsonNode schema) { try { @@ -95,7 +95,7 @@ private static AirbyteType fromArrayJsonSchema(final JsonNode schema, final Json } }); - // we encounter an array of types that actually represents a single type rather than a OneOf + // we encounter an array of types that actually represents a single type rather than a Union if (typeOptions.size() == 1) { if (typeOptions.get(0).equals("object")) { return getStruct(schema); @@ -109,7 +109,7 @@ private static AirbyteType fromArrayJsonSchema(final JsonNode schema, final Json // Recurse into a schema that forces a specific one of each option final List options = typeOptions.stream().map(typeOption -> fromJsonSchema(getTrimmedJsonSchema(schema, typeOption))).toList(); - return new OneOf(options); + return new Union(options); } // Duplicates the JSON schema but keeps only one type @@ -121,7 +121,7 @@ private static JsonNode getTrimmedJsonSchema(final JsonNode schema, final String } enum AirbyteProtocolType implements AirbyteType { - // Protocol types are ordered by precedence in the case of a OneOf that contains multiple types. + // Protocol types are ordered by precedence in the case of a Union that contains multiple types. BOOLEAN, INTEGER, NUMBER, @@ -158,7 +158,7 @@ record Array(AirbyteType items) implements AirbyteType { /** * Represents a {oneOf: [...]} schema. *

- * This is purely a legacy type that we should eventually delete. See also {@link OneOf}. + * This is purely a legacy type that we should eventually delete. See also {@link Union}. */ record UnsupportedOneOf(List options) implements AirbyteType { @@ -177,7 +177,7 @@ record UnsupportedOneOf(List options) implements AirbyteType { *

  • Delete UnsupportedOneOf
  • * */ - record OneOf(List options) implements AirbyteType { + record Union(List options) implements AirbyteType { /** * This is a hack to handle weird schemas like {type: [object, string]}. If a stream's top-level diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java index 3f00c7d67ecf..1fa23ede178b 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java @@ -12,8 +12,8 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -81,9 +81,9 @@ protected static AirbyteType getAirbyteProtocolType(final JsonNode node) { return UNKNOWN; } - // Picks which type in a OneOf takes precedence - public static AirbyteType chooseOneOfType(final OneOf o) { - final List options = o.options(); + // Picks which type in a Union takes precedence + public static AirbyteType chooseUnionType(final Union u) { + final List options = u.options(); // record what types are present Array foundArrayType = null; diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java index 8401d925eef4..2a26a32fdf1b 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.base.destination.typing_deduping; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import java.util.ArrayList; @@ -24,23 +25,23 @@ public CatalogParser(final SqlGenerator sqlGenerator) { this(sqlGenerator, DEFAULT_RAW_TABLE_NAMESPACE); } - public CatalogParser(final SqlGenerator sqlGenerator, String rawNamespaceOverride) { + public CatalogParser(final SqlGenerator sqlGenerator, final String rawNamespaceOverride) { this.sqlGenerator = sqlGenerator; this.rawNamespaceOverride = rawNamespaceOverride; } - public ParsedCatalog parseCatalog(ConfiguredAirbyteCatalog catalog) { + public ParsedCatalog parseCatalog(final ConfiguredAirbyteCatalog catalog) { // this code is bad and I feel bad // it's mostly a port of the old normalization logic to prevent tablename collisions. // tbh I have no idea if it works correctly. final List streamConfigs = new ArrayList<>(); - for (ConfiguredAirbyteStream stream : catalog.getStreams()) { + for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { final StreamConfig originalStreamConfig = toStreamConfig(stream); // Use empty string quote because we don't really care if (streamConfigs.stream().anyMatch(s -> s.id().finalTableId("").equals(originalStreamConfig.id().finalTableId(""))) || streamConfigs.stream().anyMatch(s -> s.id().rawTableId("").equals(originalStreamConfig.id().rawTableId("")))) { - String originalNamespace = stream.getStream().getNamespace(); - String originalName = stream.getStream().getName(); + final String originalNamespace = stream.getStream().getNamespace(); + final String originalName = stream.getStream().getName(); // ... this logic is ported from legacy normalization, and maybe should change? // We're taking a hash of the quoted namespace and the unquoted stream name final String hash = DigestUtils.sha1Hex(originalStreamConfig.id().finalNamespace() + "&airbyte&" + originalName).substring(0, 3); @@ -59,13 +60,13 @@ public ParsedCatalog parseCatalog(ConfiguredAirbyteCatalog catalog) { return new ParsedCatalog(streamConfigs); } - private StreamConfig toStreamConfig(ConfiguredAirbyteStream stream) { - AirbyteType schema = AirbyteType.fromJsonSchema(stream.getStream().getJsonSchema()); - LinkedHashMap airbyteColumns; - if (schema instanceof Struct o) { + private StreamConfig toStreamConfig(final ConfiguredAirbyteStream stream) { + final AirbyteType schema = AirbyteType.fromJsonSchema(stream.getStream().getJsonSchema()); + final LinkedHashMap airbyteColumns; + if (schema instanceof final Struct o) { airbyteColumns = o.properties(); - } else if (schema instanceof AirbyteType.OneOf o) { - airbyteColumns = o.asColumns(); + } else if (schema instanceof final Union u) { + airbyteColumns = u.asColumns(); } else { throw new IllegalArgumentException("Top-level schema must be an object"); } @@ -89,8 +90,8 @@ private StreamConfig toStreamConfig(ConfiguredAirbyteStream stream) { // as with the tablename collisions thing above - we're trying to preserve legacy normalization's // naming conventions here. final LinkedHashMap columns = new LinkedHashMap<>(); - for (Entry entry : airbyteColumns.entrySet()) { - ColumnId originalColumnId = sqlGenerator.buildColumnId(entry.getKey()); + for (final Entry entry : airbyteColumns.entrySet()) { + final ColumnId originalColumnId = sqlGenerator.buildColumnId(entry.getKey()); ColumnId columnId; if (columns.keySet().stream().noneMatch(c -> c.canonicalName().equals(originalColumnId.canonicalName()))) { // None of the existing columns have the same name. We can add this new column as-is. @@ -101,7 +102,7 @@ private StreamConfig toStreamConfig(ConfiguredAirbyteStream stream) { int i = 1; while (true) { columnId = sqlGenerator.buildColumnId(entry.getKey() + "_" + i); - String canonicalName = columnId.canonicalName(); + final String canonicalName = columnId.canonicalName(); if (columns.keySet().stream().noneMatch(c -> c.canonicalName().equals(canonicalName))) { break; } else { diff --git a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java index aa431f6199f7..e18da14ce014 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java @@ -6,14 +6,15 @@ import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType.*; import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.fromJsonSchema; +import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils.chooseUnionType; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -325,9 +326,9 @@ public void testUnsupportedOneOf() { } @Test - public void testOneOf() { + public void testUnion() { - final String oneOfSchema = """ + final String unionSchema = """ { "type": ["string", "number"] } @@ -337,12 +338,12 @@ public void testOneOf() { options.add(STRING); options.add(NUMBER); - final OneOf oneOf = new OneOf(options); - assertEquals(oneOf, fromJsonSchema(Jsons.deserialize(oneOfSchema))); + final Union union = new Union(options); + assertEquals(union, fromJsonSchema(Jsons.deserialize(unionSchema))); } @Test - public void testOneOfComplex() { + public void testUnionComplex() { final JsonNode schema = Jsons.deserialize(""" { "type": ["string", "object", "array", "null", "string", "object", "array", "null"], @@ -355,7 +356,7 @@ public void testOneOfComplex() { final AirbyteType parsed = fromJsonSchema(schema); - final AirbyteType expected = new OneOf(List.of( + final AirbyteType expected = new Union(List.of( STRING, new Struct(new LinkedHashMap<>() { @@ -369,7 +370,7 @@ public void testOneOfComplex() { } @Test - public void testOneOfUnderspecifiedNonPrimitives() { + public void testUnionUnderspecifiedNonPrimitives() { final JsonNode schema = Jsons.deserialize(""" { "type": ["string", "object", "array", "null", "string", "object", "array", "null"] @@ -378,7 +379,7 @@ public void testOneOfUnderspecifiedNonPrimitives() { final AirbyteType parsed = fromJsonSchema(schema); - final AirbyteType expected = new OneOf(List.of( + final AirbyteType expected = new Union(List.of( STRING, new Struct(new LinkedHashMap<>()), new Array(UNKNOWN))); @@ -423,38 +424,38 @@ public void testInvalid() { } @Test - public void testChooseOneOf() { + public void testChooseUnion() { // test ordering - OneOf o = new OneOf(ImmutableList.of(STRING, DATE)); - assertEquals(DATE, AirbyteTypeUtils.chooseOneOfType(o)); + Union u = new Union(ImmutableList.of(STRING, DATE)); + assertEquals(DATE, chooseUnionType(u)); final Array a = new Array(TIME_WITH_TIMEZONE); - o = new OneOf(ImmutableList.of(TIMESTAMP_WITH_TIMEZONE, a)); - assertEquals(a, AirbyteTypeUtils.chooseOneOfType(o)); + u = new Union(ImmutableList.of(TIMESTAMP_WITH_TIMEZONE, a)); + assertEquals(a, chooseUnionType(u)); final LinkedHashMap properties = new LinkedHashMap<>(); properties.put("key1", UNKNOWN); properties.put("key2", TIME_WITHOUT_TIMEZONE); final Struct s = new Struct(properties); - o = new OneOf(ImmutableList.of(TIMESTAMP_WITHOUT_TIMEZONE, s)); - assertEquals(s, AirbyteTypeUtils.chooseOneOfType(o)); + u = new Union(ImmutableList.of(TIMESTAMP_WITHOUT_TIMEZONE, s)); + assertEquals(s, chooseUnionType(u)); // test exclusion - o = new OneOf(ImmutableList.of(BOOLEAN, INTEGER)); - assertEquals(INTEGER, AirbyteTypeUtils.chooseOneOfType(o)); + u = new Union(ImmutableList.of(BOOLEAN, INTEGER)); + assertEquals(INTEGER, chooseUnionType(u)); - o = new OneOf(ImmutableList.of(INTEGER, NUMBER, DATE)); - assertEquals(NUMBER, AirbyteTypeUtils.chooseOneOfType(o)); + u = new Union(ImmutableList.of(INTEGER, NUMBER, DATE)); + assertEquals(NUMBER, chooseUnionType(u)); - o = new OneOf(ImmutableList.of(BOOLEAN, NUMBER, STRING)); - assertEquals(STRING, AirbyteTypeUtils.chooseOneOfType(o)); + u = new Union(ImmutableList.of(BOOLEAN, NUMBER, STRING)); + assertEquals(STRING, chooseUnionType(u)); } @Test public void testAsColumns() { - final OneOf o = new OneOf(List.of( + final Union u = new Union(List.of( STRING, new Struct(new LinkedHashMap<>() { @@ -465,11 +466,11 @@ public void testAsColumns() { }), new Array(STRING), // This is bad behavior, but it matches current behavior so we'll test it. - // Ideally, we would recognize that the sub-oneOfs are also objects. - new OneOf(List.of(new Struct(new LinkedHashMap<>()))), + // Ideally, we would recognize that the sub-unions are also objects. + new Union(List.of(new Struct(new LinkedHashMap<>()))), new UnsupportedOneOf(List.of(new Struct(new LinkedHashMap<>()))))); - final LinkedHashMap columns = o.asColumns(); + final LinkedHashMap columns = u.asColumns(); assertEquals( new LinkedHashMap<>() { @@ -484,28 +485,28 @@ public void testAsColumns() { @Test public void testAsColumnsMultipleObjects() { - final OneOf o = new OneOf(List.of( + final Union u = new Union(List.of( new Struct(new LinkedHashMap<>()), new Struct(new LinkedHashMap<>()))); // This prooobably should throw an exception, but for the sake of smooth rollout it just logs a // warning for now. - assertEquals(new LinkedHashMap<>(), o.asColumns()); + assertEquals(new LinkedHashMap<>(), u.asColumns()); } @Test public void testAsColumnsNoObjects() { - final OneOf o = new OneOf(List.of( + final Union u = new Union(List.of( STRING, new Array(STRING), new UnsupportedOneOf(new ArrayList<>()), // Similar to testAsColumns(), this is bad behavior. - new OneOf(List.of(new Struct(new LinkedHashMap<>()))), + new Union(List.of(new Struct(new LinkedHashMap<>()))), new UnsupportedOneOf(List.of(new Struct(new LinkedHashMap<>()))))); // This prooobably should throw an exception, but for the sake of smooth rollout it just logs a // warning for now. - assertEquals(new LinkedHashMap<>(), o.asColumns()); + assertEquals(new LinkedHashMap<>(), u.asColumns()); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index 62ab23396286..447645afef29 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -17,8 +17,8 @@ import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils; import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; @@ -99,8 +99,8 @@ public StandardSQLTypeName toDialectType(final AirbyteType type) { return StandardSQLTypeName.JSON; } else if (type instanceof UnsupportedOneOf) { return StandardSQLTypeName.JSON; - } else if (type instanceof final OneOf o) { - final AirbyteType typeWithPrecedence = AirbyteTypeUtils.chooseOneOfType(o); + } else if (type instanceof final Union u) { + final AirbyteType typeWithPrecedence = AirbyteTypeUtils.chooseUnionType(u); final StandardSQLTypeName dialectType; if ((typeWithPrecedence instanceof Struct) || (typeWithPrecedence instanceof Array)) { dialectType = StandardSQLTypeName.JSON; @@ -115,9 +115,9 @@ public StandardSQLTypeName toDialectType(final AirbyteType type) { } private String extractAndCast(final ColumnId column, final AirbyteType airbyteType) { - if (airbyteType instanceof final OneOf o) { - // This is guaranteed to not be a OneOf, so we won't recurse infinitely - final AirbyteType chosenType = AirbyteTypeUtils.chooseOneOfType(o); + if (airbyteType instanceof final Union u) { + // This is guaranteed to not be a Union, so we won't recurse infinitely + final AirbyteType chosenType = AirbyteTypeUtils.chooseUnionType(u); return extractAndCast(column, chosenType); } else if (airbyteType instanceof Struct) { // We need to validate that the struct is actually a struct. diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java index cd290b12ef45..a1acb711874f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java @@ -14,7 +14,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; @@ -43,12 +43,12 @@ public void testToDialectType() { assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(a)); assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(new UnsupportedOneOf(new ArrayList<>()))); - OneOf o = new OneOf(ImmutableList.of(s)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(o)); - o = new OneOf(ImmutableList.of(a)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(o)); - o = new OneOf(ImmutableList.of(AirbyteProtocolType.BOOLEAN, AirbyteProtocolType.NUMBER)); - assertEquals(StandardSQLTypeName.NUMERIC, generator.toDialectType(o)); + Union u = new Union(ImmutableList.of(s)); + assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(u)); + u = new Union(ImmutableList.of(a)); + assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(u)); + u = new Union(ImmutableList.of(AirbyteProtocolType.BOOLEAN, AirbyteProtocolType.NUMBER)); + assertEquals(StandardSQLTypeName.NUMERIC, generator.toDialectType(u)); } @Test From 46bbcabd619177a5c7357585f4d0d37baf69dc7a Mon Sep 17 00:00:00 2001 From: Cynthia Yin Date: Wed, 19 Jul 2023 02:58:03 -0700 Subject: [PATCH 04/10] simplify union ordering logic --- .../typing_deduping/AirbyteType.java | 15 +++-- .../typing_deduping/AirbyteTypeUtils.java | 65 ++++--------------- 2 files changed, 20 insertions(+), 60 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java index b766b22b7cff..e7098e850d80 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java @@ -122,15 +122,16 @@ private static JsonNode getTrimmedJsonSchema(final JsonNode schema, final String enum AirbyteProtocolType implements AirbyteType { // Protocol types are ordered by precedence in the case of a Union that contains multiple types. - BOOLEAN, - INTEGER, - NUMBER, - TIMESTAMP_WITHOUT_TIMEZONE, - TIMESTAMP_WITH_TIMEZONE, + // Priority is given to wider scope types over narrower ones. + STRING, DATE, - TIME_WITH_TIMEZONE, TIME_WITHOUT_TIMEZONE, - STRING, + TIME_WITH_TIMEZONE, + TIMESTAMP_WITHOUT_TIMEZONE, + TIMESTAMP_WITH_TIMEZONE, + NUMBER, + INTEGER, + BOOLEAN, UNKNOWN; public static AirbyteProtocolType matches(final String type) { diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java index 1fa23ede178b..a4f55c258e94 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java @@ -8,17 +8,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.TextNode; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; +import java.util.Comparator; import java.util.List; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,13 +21,6 @@ public class AirbyteTypeUtils { private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteTypeUtils.class); - // Map from a protocol type to what other protocol types should take precedence over it if present - // in a OneOf - private static final Map> EXCLUDED_PROTOCOL_TYPES_MAP = ImmutableMap.of( - AirbyteProtocolType.BOOLEAN, ImmutableList.of(AirbyteProtocolType.STRING, AirbyteProtocolType.NUMBER, AirbyteProtocolType.INTEGER), - AirbyteProtocolType.INTEGER, ImmutableList.of(AirbyteProtocolType.STRING, AirbyteProtocolType.NUMBER), - AirbyteProtocolType.NUMBER, ImmutableList.of(AirbyteProtocolType.STRING)); - protected static boolean nodeMatches(final JsonNode node, final String value) { if (node == null || !node.isTextual()) { return false; @@ -83,49 +71,20 @@ protected static AirbyteType getAirbyteProtocolType(final JsonNode node) { // Picks which type in a Union takes precedence public static AirbyteType chooseUnionType(final Union u) { - final List options = u.options(); - - // record what types are present - Array foundArrayType = null; - Struct foundStructType = null; - final Map typePresenceMap = new HashMap<>(); - Arrays.stream(AirbyteProtocolType.values()).map(type -> typePresenceMap.put(type, false)); - - // looping through the options only once for efficiency - for (final AirbyteType option : options) { - if (option instanceof final Array a) { - foundArrayType = a; - } else if (option instanceof final Struct s) { - foundStructType = s; - } else if (option instanceof final AirbyteProtocolType p) { - typePresenceMap.put(p, true); - } - } - - if (foundArrayType != null) { - return foundArrayType; - } else if (foundStructType != null) { - return foundStructType; - } else { - for (final AirbyteProtocolType protocolType : AirbyteProtocolType.values()) { - if (typePresenceMap.getOrDefault(protocolType, false)) { - boolean foundExcludedTypes = false; - final List excludedTypes = - EXCLUDED_PROTOCOL_TYPES_MAP.getOrDefault(protocolType, Collections.emptyList()); - for (final AirbyteProtocolType excludedType : excludedTypes) { - if (typePresenceMap.getOrDefault(excludedType, false)) { - foundExcludedTypes = true; - break; - } - } - if (!foundExcludedTypes) { - return protocolType; + final Comparator comparator = Comparator.comparing(t -> + { + if (t instanceof Array) { + return -2; + } else if (t instanceof Struct) { + return -1; + } else if (t instanceof final AirbyteProtocolType p) { + return List.of(AirbyteProtocolType.values()).indexOf(p); } + return Integer.MAX_VALUE; } - } - } + ); - return UNKNOWN; + return u.options().stream().min(comparator).orElse(UNKNOWN); } } From fb6dcc6107a9c0b608493f552d8d6766eee778f9 Mon Sep 17 00:00:00 2001 From: Cynthia Yin Date: Wed, 19 Jul 2023 11:17:36 -0700 Subject: [PATCH 05/10] update testChooseUnion --- .../typing_deduping/AirbyteType.java | 10 +++--- .../typing_deduping/AirbyteTypeTest.java | 31 +++++++------------ 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java index e7098e850d80..c570b8cce362 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java @@ -9,14 +9,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableList; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import org.slf4j.Logger; @@ -120,9 +118,13 @@ private static JsonNode getTrimmedJsonSchema(final JsonNode schema, final String return schemaClone; } + /** + * Protocol types are ordered by precedence in the case of a Union that contains multiple types. + * Priority is given to wider scope types over narrower ones. + * (Note that because of dedup logic in {@link AirbyteType#fromJsonSchema(JsonNode)}, at most one + * string or date/time type can exist in a Union.) + */ enum AirbyteProtocolType implements AirbyteType { - // Protocol types are ordered by precedence in the case of a Union that contains multiple types. - // Priority is given to wider scope types over narrower ones. STRING, DATE, TIME_WITHOUT_TIMEZONE, diff --git a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java index e18da14ce014..2fce5b31bddd 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java @@ -17,8 +17,10 @@ import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import org.junit.jupiter.api.Test; public class AirbyteTypeTest { @@ -425,32 +427,23 @@ public void testInvalid() { @Test public void testChooseUnion() { - // test ordering + final Map unionToType = new HashMap<>(); - Union u = new Union(ImmutableList.of(STRING, DATE)); - assertEquals(DATE, chooseUnionType(u)); - - final Array a = new Array(TIME_WITH_TIMEZONE); - u = new Union(ImmutableList.of(TIMESTAMP_WITH_TIMEZONE, a)); - assertEquals(a, chooseUnionType(u)); + final Array a = new Array(BOOLEAN); final LinkedHashMap properties = new LinkedHashMap<>(); properties.put("key1", UNKNOWN); - properties.put("key2", TIME_WITHOUT_TIMEZONE); + properties.put("key2", INTEGER); final Struct s = new Struct(properties); - u = new Union(ImmutableList.of(TIMESTAMP_WITHOUT_TIMEZONE, s)); - assertEquals(s, chooseUnionType(u)); - - // test exclusion - - u = new Union(ImmutableList.of(BOOLEAN, INTEGER)); - assertEquals(INTEGER, chooseUnionType(u)); - u = new Union(ImmutableList.of(INTEGER, NUMBER, DATE)); - assertEquals(NUMBER, chooseUnionType(u)); + unionToType.put(new Union(ImmutableList.of(s, a)), a); + unionToType.put(new Union(ImmutableList.of(NUMBER, a)), a); + unionToType.put(new Union(ImmutableList.of(INTEGER, s)), s); + unionToType.put(new Union(ImmutableList.of(NUMBER, DATE, BOOLEAN)), DATE); + unionToType.put(new Union(ImmutableList.of(INTEGER, BOOLEAN, NUMBER)), NUMBER); + unionToType.put(new Union(ImmutableList.of(BOOLEAN, INTEGER)), INTEGER); - u = new Union(ImmutableList.of(BOOLEAN, NUMBER, STRING)); - assertEquals(STRING, chooseUnionType(u)); + unionToType.forEach((u, t) -> assertEquals(t, chooseUnionType(u))); } @Test From 71308078f26353057c9d5d9560674884f9fa07e4 Mon Sep 17 00:00:00 2001 From: Cynthia Yin Date: Wed, 19 Jul 2023 11:25:26 -0700 Subject: [PATCH 06/10] fix docs typos --- .../supported-data-types.md | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/docs/understanding-airbyte/supported-data-types.md b/docs/understanding-airbyte/supported-data-types.md index cb11bf5de539..f588436e6aa6 100644 --- a/docs/understanding-airbyte/supported-data-types.md +++ b/docs/understanding-airbyte/supported-data-types.md @@ -10,20 +10,20 @@ This type system does not constrain values. However, destinations may not fully This table summarizes the available types. See the [Specific Types](#specific-types) section for explanation of optional parameters. -| Airbyte type | JSON Schema | Examples | -| -------------------------------------------------------------- | ----------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------- | -| String | `{"type": "string""}` | `"foo bar"` | -| Boolean | `{"type": "boolean"}` | `true` or `false` | -| Date | `{"type": "string", "format": "date"}` | `"2021-01-23"`, `"2021-01-23 BC"` | -| Timestamp with timezone | `{"type": "string", "format": "date-time", "airbyte_type": "timestamp_with_timezone"}` | `"2022-11-22T01:23:45.123456+05:00"`, `"2022-11-22T01:23:45Z BC"` | -| Timestamp without timezone | `{"type": "string", "format": "date-time", "airbyte_type": "timestamp_without_timezone"}` | `"2022-11-22T01:23:45"`, `"2022-11-22T01:23:45.123456 BC"` | -| Time without timezone | `{"type": "string", "airbyte_type": "time_with_timezone"}` | `"01:23:45.123456"`, `"01:23:45"` | -| Time with timezone | `{"type": "string", "airbyte_type": "time_without_timezone"}` | `"01:23:45.123456+05:00"`, `"01:23:45Z"` | -| Integer | `{"type": "integer"}` or `{"type": "number", "airbyte_type": "integer"}` | `42` | -| Number | `{"type": "number"}` | `1234.56` | -| Array | `{"type": "array"}`; optionally `items` | `[1, 2, 3]` | -| Object | `{"type": "object"}`; optionally `properties` | `{"foo": "bar"}` | -| Union | `{"oneOf": [...]}` | |ß +| Airbyte type | JSON Schema | Examples | +|----------------------------|-----------------------------------------------------------------------------------------------------|-------------------------------------------------------------------| +| String | `{"type": "string"}` | `"foo bar"` | +| Boolean | `{"type": "boolean"}` | `true` or `false` | +| Date | `{"type": "string", "format": "date"}` | `"2021-01-23"`, `"2021-01-23 BC"` | +| Timestamp without timezone | `{"type": "string", "format": "date-time", "airbyte_type": "timestamp_without_timezone"}` | `"2022-11-22T01:23:45"`, `"2022-11-22T01:23:45.123456 BC"` | +| Timestamp with timezone | `{"type": "string", "format": "date-time"}`; optionally `"airbyte_type": "timestamp_with_timezone"` | `"2022-11-22T01:23:45.123456+05:00"`, `"2022-11-22T01:23:45Z BC"` | +| Time without timezone | `{"type": "string", "format": "time", "airbyte_type": "time_without_timezone"}` | `"01:23:45.123456"`, `"01:23:45"` | +| Time with timezone | `{"type": "string", "format": "time", "airbyte_type": "time_with_timezone"}` | `"01:23:45.123456+05:00"`, `"01:23:45Z"` | +| Integer | `{"type": "integer"}` or `{"type": "number", "airbyte_type": "integer"}` | `42` | +| Number | `{"type": "number"}` | `1234.56` | +| Array | `{"type": "array"}`; optionally `items` | `[1, 2, 3]` | +| Object | `{"type": "object"}`; optionally `properties` | `{"foo": "bar"}` | +| Union | `{"oneOf": [...]}` | | ### Record structure As a reminder, sources expose a `discover` command, which returns a list of [`AirbyteStreams`](https://github.com/airbytehq/airbyte/blob/111131a193359027d0081de1290eb4bb846662ef/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml#L122), and a `read` method, which emits a series of [`AirbyteRecordMessages`](https://github.com/airbytehq/airbyte/blob/111131a193359027d0081de1290eb4bb846662ef/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml#L46-L66). The type system determines what a valid `json_schema` is for an `AirbyteStream`, which in turn dictates what messages `read` is allowed to emit. From ceb9a0c9f529096b7869ae46bce98a5c10e185b7 Mon Sep 17 00:00:00 2001 From: cynthiaxyin Date: Wed, 19 Jul 2023 22:00:43 +0000 Subject: [PATCH 07/10] Automated Commit - Format and Process Resources Changes --- .../typing_deduping/AirbyteType.java | 10 ++++---- .../typing_deduping/AirbyteTypeUtils.java | 22 ++++++++--------- .../typing_deduping/AirbyteTypeTest.java | 24 +++++++++---------- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java index c570b8cce362..da0f75626563 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java @@ -105,8 +105,7 @@ private static AirbyteType fromArrayJsonSchema(final JsonNode schema, final Json } // Recurse into a schema that forces a specific one of each option - final List options = typeOptions.stream().map(typeOption -> - fromJsonSchema(getTrimmedJsonSchema(schema, typeOption))).toList(); + final List options = typeOptions.stream().map(typeOption -> fromJsonSchema(getTrimmedJsonSchema(schema, typeOption))).toList(); return new Union(options); } @@ -120,11 +119,12 @@ private static JsonNode getTrimmedJsonSchema(final JsonNode schema, final String /** * Protocol types are ordered by precedence in the case of a Union that contains multiple types. - * Priority is given to wider scope types over narrower ones. - * (Note that because of dedup logic in {@link AirbyteType#fromJsonSchema(JsonNode)}, at most one - * string or date/time type can exist in a Union.) + * Priority is given to wider scope types over narrower ones. (Note that because of dedup logic in + * {@link AirbyteType#fromJsonSchema(JsonNode)}, at most one string or date/time type can exist in a + * Union.) */ enum AirbyteProtocolType implements AirbyteType { + STRING, DATE, TIME_WITHOUT_TIMEZONE, diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java index a4f55c258e94..6a9a9d66ab78 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java @@ -71,18 +71,16 @@ protected static AirbyteType getAirbyteProtocolType(final JsonNode node) { // Picks which type in a Union takes precedence public static AirbyteType chooseUnionType(final Union u) { - final Comparator comparator = Comparator.comparing(t -> - { - if (t instanceof Array) { - return -2; - } else if (t instanceof Struct) { - return -1; - } else if (t instanceof final AirbyteProtocolType p) { - return List.of(AirbyteProtocolType.values()).indexOf(p); - } - return Integer.MAX_VALUE; - } - ); + final Comparator comparator = Comparator.comparing(t -> { + if (t instanceof Array) { + return -2; + } else if (t instanceof Struct) { + return -1; + } else if (t instanceof final AirbyteProtocolType p) { + return List.of(AirbyteProtocolType.values()).indexOf(p); + } + return Integer.MAX_VALUE; + }); return u.options().stream().min(comparator).orElse(UNKNOWN); } diff --git a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java index 2fce5b31bddd..eee35d7d013e 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java @@ -347,14 +347,14 @@ public void testUnion() { @Test public void testUnionComplex() { final JsonNode schema = Jsons.deserialize(""" - { - "type": ["string", "object", "array", "null", "string", "object", "array", "null"], - "properties": { - "foo": {"type": "string"} - }, - "items": {"type": "string"} - } - """); + { + "type": ["string", "object", "array", "null", "string", "object", "array", "null"], + "properties": { + "foo": {"type": "string"} + }, + "items": {"type": "string"} + } + """); final AirbyteType parsed = fromJsonSchema(schema); @@ -374,10 +374,10 @@ public void testUnionComplex() { @Test public void testUnionUnderspecifiedNonPrimitives() { final JsonNode schema = Jsons.deserialize(""" - { - "type": ["string", "object", "array", "null", "string", "object", "array", "null"] - } - """); + { + "type": ["string", "object", "array", "null", "string", "object", "array", "null"] + } + """); final AirbyteType parsed = fromJsonSchema(schema); From 0206b4ba711ced9127f294d3046ad0ff4be30c60 Mon Sep 17 00:00:00 2001 From: Cynthia Yin Date: Wed, 19 Jul 2023 17:19:29 -0700 Subject: [PATCH 08/10] address comments --- .../BaseTypingDedupingTest.java | 119 +++++++++--------- .../typing_deduping/RecordDiffer.java | 105 ++++++++-------- .../typing_deduping/AirbyteProtocolType.java | 74 +++++++++++ .../typing_deduping/AirbyteType.java | 111 ++-------------- .../typing_deduping/AirbyteTypeUtils.java | 88 ------------- .../destination/typing_deduping/Array.java | 5 + .../typing_deduping/CatalogParser.java | 2 - .../destination/typing_deduping/Struct.java | 10 ++ .../destination/typing_deduping/Union.java | 61 +++++++++ .../typing_deduping/UnsupportedOneOf.java | 12 ++ .../typing_deduping/AirbyteTypeTest.java | 12 +- .../typing_deduping/BigQuerySqlGenerator.java | 41 +++--- .../BigQuerySqlGeneratorIntegrationTest.java | 24 ++-- .../BigQuerySqlGeneratorTest.java | 10 +- 14 files changed, 325 insertions(+), 349 deletions(-) create mode 100644 airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteProtocolType.java delete mode 100644 airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java create mode 100644 airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Array.java create mode 100644 airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Struct.java create mode 100644 airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Union.java create mode 100644 airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/UnsupportedOneOf.java diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java index 85ca77f81b69..41a99169b866 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java @@ -11,7 +11,6 @@ import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.resources.MoreResources; import io.airbyte.configoss.WorkerDestinationConfig; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; @@ -64,7 +63,7 @@ public abstract class BaseTypingDedupingTest { static { try { SCHEMA = Jsons.deserialize(MoreResources.readResource("schema.json")); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException(e); } } @@ -161,7 +160,7 @@ public void setup() throws Exception { @AfterEach public void teardown() throws Exception { - for (AirbyteStreamNameNamespacePair streamId : streamsToTearDown) { + for (final AirbyteStreamNameNamespacePair streamId : streamsToTearDown) { teardownStreamAndNamespace(streamId.getNamespace(), streamId.getName()); } } @@ -173,7 +172,7 @@ public void teardown() throws Exception { */ @Test public void fullRefreshOverwrite() throws Exception { - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) @@ -183,21 +182,21 @@ public void fullRefreshOverwrite() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - List messages1 = readMessages("sync1_messages.jsonl"); + final List messages1 = readMessages("sync1_messages.jsonl"); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("sync2_messages.jsonl"); runSync(catalog, messages2); - List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_overwrite_final.jsonl"); + final List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_overwrite_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -208,7 +207,7 @@ public void fullRefreshOverwrite() throws Exception { */ @Test public void fullRefreshAppend() throws Exception { - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.APPEND) @@ -218,21 +217,21 @@ public void fullRefreshAppend() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - List messages1 = readMessages("sync1_messages.jsonl"); + final List messages1 = readMessages("sync1_messages.jsonl"); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("sync2_messages.jsonl"); runSync(catalog, messages2); - List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_final.jsonl"); + final List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -245,7 +244,7 @@ public void fullRefreshAppend() throws Exception { */ @Test public void incrementalAppend() throws Exception { - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() // These two lines are literally the only difference between this test and fullRefreshAppend .withSyncMode(SyncMode.INCREMENTAL) @@ -257,21 +256,21 @@ public void incrementalAppend() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - List messages1 = readMessages("sync1_messages.jsonl"); + final List messages1 = readMessages("sync1_messages.jsonl"); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("sync2_messages.jsonl"); runSync(catalog, messages2); - List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_final.jsonl"); + final List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -282,7 +281,7 @@ public void incrementalAppend() throws Exception { */ @Test public void incrementalDedup() throws Exception { - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) .withCursorField(List.of("updated_at")) @@ -294,21 +293,21 @@ public void incrementalDedup() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - List messages1 = readMessages("sync1_messages.jsonl"); + final List messages1 = readMessages("sync1_messages.jsonl"); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("sync2_messages.jsonl"); runSync(catalog, messages2); - List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); + final List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -317,7 +316,7 @@ public void incrementalDedup() throws Exception { */ @Test public void incrementalDedupDefaultNamespace() throws Exception { - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) .withCursorField(List.of("updated_at")) @@ -329,21 +328,21 @@ public void incrementalDedupDefaultNamespace() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - List messages1 = readMessages("sync1_messages.jsonl", null, streamName); + final List messages1 = readMessages("sync1_messages.jsonl", null, streamName); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1, null, streamName); // Second sync - List messages2 = readMessages("sync2_messages.jsonl", null, streamName); + final List messages2 = readMessages("sync2_messages.jsonl", null, streamName); runSync(catalog, messages2); - List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); + final List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2, null, streamName); } @@ -375,7 +374,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception { public void testSyncUsesAirbyteStreamNamespaceIfNotNull() throws Exception { // TODO duplicate this test for each sync mode. Run 1st+2nd syncs using a stream with null // namespace: - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.FULL_REFRESH) .withCursorField(List.of("updated_at")) @@ -396,9 +395,9 @@ public void testSyncUsesAirbyteStreamNamespaceIfNotNull() throws Exception { */ @Test public void incrementalDedupIdenticalName() throws Exception { - String namespace1 = streamNamespace + "_1"; - String namespace2 = streamNamespace + "_2"; - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final String namespace1 = streamNamespace + "_1"; + final String namespace2 = streamNamespace + "_2"; + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) .withCursorField(List.of("updated_at")) @@ -420,26 +419,26 @@ public void incrementalDedupIdenticalName() throws Exception { // First sync // Read the same set of messages for both streams - List messages1 = Stream.concat( + final List messages1 = Stream.concat( readMessages("sync1_messages.jsonl", namespace1, streamName).stream(), readMessages("sync1_messages.jsonl", namespace2, streamName).stream()).toList(); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1, namespace1, streamName); verifySyncResult(expectedRawRecords1, expectedFinalRecords1, namespace2, streamName); // Second sync - List messages2 = Stream.concat( + final List messages2 = Stream.concat( readMessages("sync2_messages.jsonl", namespace1, streamName).stream(), readMessages("sync2_messages.jsonl", namespace2, streamName).stream()).toList(); runSync(catalog, messages2); - List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); + final List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2, namespace1, streamName); verifySyncResult(expectedRawRecords2, expectedFinalRecords2, namespace2, streamName); } @@ -476,21 +475,21 @@ public void testDataTypes() throws Exception { // this test probably needs some configuration per destination to specify what values are supported? } - private void verifySyncResult(List expectedRawRecords, List expectedFinalRecords) throws Exception { + private void verifySyncResult(final List expectedRawRecords, final List expectedFinalRecords) throws Exception { verifySyncResult(expectedRawRecords, expectedFinalRecords, streamNamespace, streamName); } - private void verifySyncResult(List expectedRawRecords, - List expectedFinalRecords, - String streamNamespace, - String streamName) + private void verifySyncResult(final List expectedRawRecords, + final List expectedFinalRecords, + final String streamNamespace, + final String streamName) throws Exception { - List actualRawRecords = dumpRawTableRecords(streamNamespace, streamName); - List actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName); + final List actualRawRecords = dumpRawTableRecords(streamNamespace, streamName); + final List actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName); DIFFER.verifySyncResult(expectedRawRecords, actualRawRecords, expectedFinalRecords, actualFinalRecords); } - private static List readRecords(String filename) throws IOException { + private static List readRecords(final String filename) throws IOException { return MoreResources.readResource(filename).lines() .map(String::trim) .filter(line -> !line.isEmpty()) @@ -499,11 +498,11 @@ private static List readRecords(String filename) throws IOException { .toList(); } - private List readMessages(String filename) throws IOException { + private List readMessages(final String filename) throws IOException { return readMessages(filename, streamNamespace, streamName); } - private static List readMessages(String filename, String streamNamespace, String streamName) throws IOException { + private static List readMessages(final String filename, final String streamNamespace, final String streamName) throws IOException { return readRecords(filename).stream() .map(record -> Jsons.convertValue(record, AirbyteMessage.class)) .peek(message -> { @@ -527,7 +526,7 @@ public void setupProcessFactory() throws IOException { Files.createDirectories(testDir); final Path workspaceRoot = Files.createTempDirectory(testDir, "test"); jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job")); - Path localRoot = Files.createTempDirectory(testDir, "output"); + final Path localRoot = Files.createTempDirectory(testDir, "output"); processFactory = new DockerProcessFactory( workspaceRoot, workspaceRoot.toString(), @@ -536,7 +535,7 @@ public void setupProcessFactory() throws IOException { Collections.emptyMap()); } - private void runSync(ConfiguredAirbyteCatalog catalog, List messages) throws Exception { + private void runSync(final ConfiguredAirbyteCatalog catalog, final List messages) throws Exception { catalog.getStreams().forEach(s -> streamsToTearDown.add(AirbyteStreamNameNamespacePair.fromAirbyteStream(s.getStream()))); final WorkerDestinationConfig destinationConfig = new WorkerDestinationConfig() diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java index 846fb4a88bff..3c92ccd72c87 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java @@ -12,7 +12,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Streams; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -54,17 +53,17 @@ public RecordDiffer(final Pair... identifyingColumns) { * JSON null is represented as a NullNode. For example, in the JSON blob {"name": null}, the `name` * field is a JSON null, and the `address` field is a SQL null. */ - public void verifySyncResult(List expectedRawRecords, - List actualRawRecords, - List expectedFinalRecords, - List actualFinalRecords) { + public void verifySyncResult(final List expectedRawRecords, + final List actualRawRecords, + final List expectedFinalRecords, + final List actualFinalRecords) { assertAll( () -> diffRawTableRecords(expectedRawRecords, actualRawRecords), () -> diffFinalTableRecords(expectedFinalRecords, actualFinalRecords)); } - public void diffRawTableRecords(List expectedRecords, List actualRecords) { - String diff = diffRecords( + public void diffRawTableRecords(final List expectedRecords, final List actualRecords) { + final String diff = diffRecords( expectedRecords.stream().map(RecordDiffer::copyWithLiftedData).collect(toList()), actualRecords.stream().map(RecordDiffer::copyWithLiftedData).collect(toList()), recordIdentityComparator, @@ -76,8 +75,8 @@ public void diffRawTableRecords(List expectedRecords, List a } } - public void diffFinalTableRecords(List expectedRecords, List actualRecords) { - String diff = diffRecords( + public void diffFinalTableRecords(final List expectedRecords, final List actualRecords) { + final String diff = diffRecords( expectedRecords, actualRecords, recordIdentityComparator, @@ -92,8 +91,8 @@ public void diffFinalTableRecords(List expectedRecords, List /** * @return A copy of the record, but with all fields in _airbyte_data lifted to the top level. */ - private static JsonNode copyWithLiftedData(JsonNode record) { - ObjectNode copy = record.deepCopy(); + private static JsonNode copyWithLiftedData(final JsonNode record) { + final ObjectNode copy = record.deepCopy(); copy.remove("_airbyte_data"); Streams.stream(record.get("_airbyte_data").fields()).forEach(field -> { if (!copy.has(field.getKey())) { @@ -111,10 +110,10 @@ private static JsonNode copyWithLiftedData(JsonNode record) { * Build a Comparator to detect equality between two records. It first compares all the identifying * columns in order, and breaks ties using extracted_at. */ - private Comparator buildIdentityComparator(Pair[] identifyingColumns) { + private Comparator buildIdentityComparator(final Pair[] identifyingColumns) { // Start with a noop comparator for convenience Comparator comp = Comparator.comparing(record -> 0); - for (Pair column : identifyingColumns) { + for (final Pair column : identifyingColumns) { comp = comp.thenComparing(record -> extract(record, column.getKey(), column.getValue())); } comp = comp.thenComparing(record -> asTimestampWithTimezone(record.get("_airbyte_extracted_at"))); @@ -124,14 +123,14 @@ private Comparator buildIdentityComparator(Pair[] /** * See {@link #buildIdentityComparator(Pair[])} for an explanation of dataExtractor. */ - private Function buildIdentityExtractor(Pair[] identifyingColumns) { + private Function buildIdentityExtractor(final Pair[] identifyingColumns) { return record -> Arrays.stream(identifyingColumns) .map(column -> getPrintableFieldIfPresent(record, column.getKey())) .collect(Collectors.joining(", ")) + getPrintableFieldIfPresent(record, "_airbyte_extracted_at"); } - private static String getPrintableFieldIfPresent(JsonNode record, String field) { + private static String getPrintableFieldIfPresent(final JsonNode record, final String field) { if (record.has(field)) { return field + "=" + record.get(field); } else { @@ -156,13 +155,13 @@ private static String getPrintableFieldIfPresent(JsonNode record, String field) * @param recordIdExtractor Dump the record's PK+cursor+extracted_at into a human-readable string * @return The diff, or empty string if there were no differences */ - private static String diffRecords(List originalExpectedRecords, - List originalActualRecords, - Comparator identityComparator, - Comparator sortComparator, - Function recordIdExtractor) { - List expectedRecords = originalExpectedRecords.stream().sorted(sortComparator).toList(); - List actualRecords = originalActualRecords.stream().sorted(sortComparator).toList(); + private static String diffRecords(final List originalExpectedRecords, + final List originalActualRecords, + final Comparator identityComparator, + final Comparator sortComparator, + final Function recordIdExtractor) { + final List expectedRecords = originalExpectedRecords.stream().sorted(sortComparator).toList(); + final List actualRecords = originalActualRecords.stream().sorted(sortComparator).toList(); // Iterate through both lists in parallel and compare each record. // Build up an error message listing any incorrect, missing, or unexpected records. @@ -170,9 +169,9 @@ private static String diffRecords(List originalExpectedRecords, int expectedRecordIndex = 0; int actualRecordIndex = 0; while (expectedRecordIndex < expectedRecords.size() && actualRecordIndex < actualRecords.size()) { - JsonNode expectedRecord = expectedRecords.get(expectedRecordIndex); - JsonNode actualRecord = actualRecords.get(actualRecordIndex); - int compare = identityComparator.compare(expectedRecord, actualRecord); + final JsonNode expectedRecord = expectedRecords.get(expectedRecordIndex); + final JsonNode actualRecord = actualRecords.get(actualRecordIndex); + final int compare = identityComparator.compare(expectedRecord, actualRecord); if (compare == 0) { // These records should be the same. Find the specific fields that are different and move on // to the next records in both lists. @@ -204,23 +203,23 @@ private static String diffRecords(List originalExpectedRecords, return message; } - private static String diffSingleRecord(Function recordIdExtractor, JsonNode expectedRecord, JsonNode actualRecord) { + private static String diffSingleRecord(final Function recordIdExtractor, final JsonNode expectedRecord, final JsonNode actualRecord) { boolean foundMismatch = false; String mismatchedRecordMessage = "Row had incorrect data: " + recordIdExtractor.apply(expectedRecord) + "\n"; // Iterate through each column in the expected record and compare it to the actual record's value. - for (String column : Streams.stream(expectedRecord.fieldNames()).sorted().toList()) { + for (final String column : Streams.stream(expectedRecord.fieldNames()).sorted().toList()) { // For all other columns, we can just compare their values directly. - JsonNode expectedValue = expectedRecord.get(column); - JsonNode actualValue = actualRecord.get(column); + final JsonNode expectedValue = expectedRecord.get(column); + final JsonNode actualValue = actualRecord.get(column); if (!areJsonNodesEquivalent(expectedValue, actualValue)) { mismatchedRecordMessage += generateFieldError("column " + column, expectedValue, actualValue); foundMismatch = true; } } // Then check the entire actual record for any columns that we weren't expecting. - LinkedHashMap extraColumns = checkForExtraOrNonNullFields(expectedRecord, actualRecord); + final LinkedHashMap extraColumns = checkForExtraOrNonNullFields(expectedRecord, actualRecord); if (extraColumns.size() > 0) { - for (Map.Entry extraColumn : extraColumns.entrySet()) { + for (final Map.Entry extraColumn : extraColumns.entrySet()) { mismatchedRecordMessage += generateFieldError("column " + extraColumn.getKey(), null, extraColumn.getValue()); foundMismatch = true; } @@ -232,7 +231,7 @@ private static String diffSingleRecord(Function recordIdExtrac } } - private static boolean areJsonNodesEquivalent(JsonNode expectedValue, JsonNode actualValue) { + private static boolean areJsonNodesEquivalent(final JsonNode expectedValue, final JsonNode actualValue) { if (expectedValue == null || actualValue == null) { // If one of the values is null, then we expect both of them to be null. return expectedValue == null && actualValue == null; @@ -256,9 +255,9 @@ private static boolean areJsonNodesEquivalent(JsonNode expectedValue, JsonNode a * This has the side benefit of detecting completely unexpected columns, which would be a very weird * bug but is probably still useful to catch. */ - private static LinkedHashMap checkForExtraOrNonNullFields(JsonNode expectedRecord, JsonNode actualRecord) { - LinkedHashMap extraFields = new LinkedHashMap<>(); - for (String column : Streams.stream(actualRecord.fieldNames()).sorted().toList()) { + private static LinkedHashMap checkForExtraOrNonNullFields(final JsonNode expectedRecord, final JsonNode actualRecord) { + final LinkedHashMap extraFields = new LinkedHashMap<>(); + for (final String column : Streams.stream(actualRecord.fieldNames()).sorted().toList()) { // loaded_at and raw_id are generated dynamically, so we just ignore them. if (!"_airbyte_loaded_at".equals(column) && !"_airbyte_raw_id".equals(column) && !expectedRecord.has(column)) { extraFields.put(column, actualRecord.get(column)); @@ -272,15 +271,15 @@ private static LinkedHashMap checkForExtraOrNonNullFields(Json * spaces are intentional, to make the message easier to read when it's embedded in a larger * stacktrace. */ - private static String generateFieldError(String fieldname, JsonNode expectedValue, JsonNode actualValue) { - String expectedString = expectedValue == null ? "SQL NULL (i.e. no value)" : expectedValue.toString(); - String actualString = actualValue == null ? "SQL NULL (i.e. no value)" : actualValue.toString(); + private static String generateFieldError(final String fieldname, final JsonNode expectedValue, final JsonNode actualValue) { + final String expectedString = expectedValue == null ? "SQL NULL (i.e. no value)" : expectedValue.toString(); + final String actualString = actualValue == null ? "SQL NULL (i.e. no value)" : actualValue.toString(); return " For " + fieldname + ", expected " + expectedString + " but got " + actualString + "\n"; } // These asFoo methods are used for sorting records, so their defaults are intended to make broken // records stand out. - private static String asString(JsonNode node) { + private static String asString(final JsonNode node) { if (node == null || node.isNull()) { return ""; } else if (node.isTextual()) { @@ -290,7 +289,7 @@ private static String asString(JsonNode node) { } } - private static double asDouble(JsonNode node) { + private static double asDouble(final JsonNode node) { if (node == null || !node.isNumber()) { return Double.MIN_VALUE; } else { @@ -298,7 +297,7 @@ private static double asDouble(JsonNode node) { } } - private static long asInt(JsonNode node) { + private static long asInt(final JsonNode node) { if (node == null || !node.isIntegralNumber()) { return Long.MIN_VALUE; } else { @@ -306,7 +305,7 @@ private static long asInt(JsonNode node) { } } - private static boolean asBoolean(JsonNode node) { + private static boolean asBoolean(final JsonNode node) { if (node == null || !node.isBoolean()) { return false; } else { @@ -314,31 +313,31 @@ private static boolean asBoolean(JsonNode node) { } } - private static Instant asTimestampWithTimezone(JsonNode node) { + private static Instant asTimestampWithTimezone(final JsonNode node) { if (node == null || !node.isTextual()) { return Instant.ofEpochMilli(Long.MIN_VALUE); } else { try { return Instant.parse(node.asText()); - } catch (Exception e) { + } catch (final Exception e) { return Instant.ofEpochMilli(Long.MIN_VALUE); } } } - private static LocalDateTime asTimestampWithoutTimezone(JsonNode node) { + private static LocalDateTime asTimestampWithoutTimezone(final JsonNode node) { if (node == null || !node.isTextual()) { return LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.MIN_VALUE), ZoneOffset.UTC); } else { try { return LocalDateTime.parse(node.asText()); - } catch (Exception e) { + } catch (final Exception e) { return LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.MIN_VALUE), ZoneOffset.UTC); } } } - private static OffsetTime asTimeWithTimezone(JsonNode node) { + private static OffsetTime asTimeWithTimezone(final JsonNode node) { if (node == null || !node.isTextual()) { return OffsetTime.of(0, 0, 0, 0, ZoneOffset.UTC); } else { @@ -346,33 +345,33 @@ private static OffsetTime asTimeWithTimezone(JsonNode node) { } } - private static LocalTime asTimeWithoutTimezone(JsonNode node) { + private static LocalTime asTimeWithoutTimezone(final JsonNode node) { if (node == null || !node.isTextual()) { return LocalTime.of(0, 0, 0); } else { try { return LocalTime.parse(node.asText()); - } catch (Exception e) { + } catch (final Exception e) { return LocalTime.of(0, 0, 0); } } } - private static LocalDate asDate(JsonNode node) { + private static LocalDate asDate(final JsonNode node) { if (node == null || !node.isTextual()) { return LocalDate.ofInstant(Instant.ofEpochMilli(Long.MIN_VALUE), ZoneOffset.UTC); } else { try { return LocalDate.parse(node.asText()); - } catch (Exception e) { + } catch (final Exception e) { return LocalDate.ofInstant(Instant.ofEpochMilli(Long.MIN_VALUE), ZoneOffset.UTC); } } } // Generics? Never heard of 'em. (I'm sorry) - private static Comparable extract(JsonNode node, String field, AirbyteType type) { - if (type instanceof AirbyteProtocolType t) { + private static Comparable extract(final JsonNode node, final String field, final AirbyteType type) { + if (type instanceof final AirbyteProtocolType t) { return switch (t) { case STRING -> asString(node.get(field)); case NUMBER -> asDouble(node.get(field)); diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteProtocolType.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteProtocolType.java new file mode 100644 index 000000000000..f33f78be62e9 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteProtocolType.java @@ -0,0 +1,74 @@ +package io.airbyte.integrations.base.destination.typing_deduping; + +import com.fasterxml.jackson.databind.JsonNode; + +/** + * Protocol types are ordered by precedence in the case of a Union that contains multiple types. + * Priority is given to wider scope types over narrower ones. (Note that because of dedup logic in + * {@link AirbyteType#fromJsonSchema(JsonNode)}, at most one string or date/time type can exist in a + * Union.) + */ +public enum AirbyteProtocolType implements AirbyteType { + + STRING, + DATE, + TIME_WITHOUT_TIMEZONE, + TIME_WITH_TIMEZONE, + TIMESTAMP_WITHOUT_TIMEZONE, + TIMESTAMP_WITH_TIMEZONE, + NUMBER, + INTEGER, + BOOLEAN, + UNKNOWN; + + private static AirbyteProtocolType matches(final String type) { + try { + return AirbyteProtocolType.valueOf(type.toUpperCase()); + } catch (final IllegalArgumentException e) { + LOGGER.error(String.format("Could not find matching AirbyteProtocolType for \"%s\": %s", type, e)); + return UNKNOWN; + } + } + + // Extracts the appropriate protocol type from the representative JSON + protected static AirbyteProtocolType fromJson(final JsonNode node) { + // JSON could be a string (ex: "number") + if (node.isTextual()) { + return matches(node.asText()); + } + + // or, JSON could be a node with fields + final JsonNode propertyType = node.get("type"); + final JsonNode airbyteType = node.get("airbyte_type"); + final JsonNode format = node.get("format"); + + if (AirbyteType.nodeMatches(propertyType, "boolean")) { + return BOOLEAN; + } else if (AirbyteType.nodeMatches(propertyType, "integer")) { + return INTEGER; + } else if (AirbyteType.nodeMatches(propertyType, "number")) { + return AirbyteType.nodeMatches(airbyteType, "integer") ? INTEGER : NUMBER; + } else if (AirbyteType.nodeMatches(propertyType, "string")) { + if (AirbyteType.nodeMatches(format, "date")) { + return DATE; + } else if (AirbyteType.nodeMatches(format, "time")) { + if (AirbyteType.nodeMatches(airbyteType, "time_without_timezone")) { + return TIME_WITHOUT_TIMEZONE; + } else if (AirbyteType.nodeMatches(airbyteType, "time_with_timezone")) { + return TIME_WITH_TIMEZONE; + } + } else if (AirbyteType.nodeMatches(format, "date-time")) { + if (AirbyteType.nodeMatches(airbyteType, "timestamp_without_timezone")) { + return TIMESTAMP_WITHOUT_TIMEZONE; + } else if (airbyteType == null || AirbyteType.nodeMatches(airbyteType, "timestamp_with_timezone")) { + return TIMESTAMP_WITH_TIMEZONE; + } + } else { + return STRING; + } + } + + return UNKNOWN; + } + +} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java index da0f75626563..de59c763ed9c 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java @@ -4,16 +4,9 @@ package io.airbyte.integrations.base.destination.typing_deduping; -import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils.getAirbyteProtocolType; -import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils.nodeMatches; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; +import com.fasterxml.jackson.databind.node.TextNode; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -22,7 +15,7 @@ public sealed interface AirbyteType permits AirbyteProtocolType,Struct,Array,UnsupportedOneOf,Union { - Logger LOGGER = LoggerFactory.getLogger(AirbyteTypeUtils.class); + Logger LOGGER = LoggerFactory.getLogger(AirbyteType.class); /** * The most common call pattern is probably to use this method on the stream schema, verify that @@ -54,13 +47,20 @@ static AirbyteType fromJsonSchema(final JsonNode schema) { // This is for backwards-compatibility with legacy normalization. return getStruct(schema); } - return getAirbyteProtocolType(schema); + return AirbyteProtocolType.fromJson(schema); } catch (final Exception e) { LOGGER.error("Exception parsing JSON schema {}: {}; returning UNKNOWN.", schema, e); return AirbyteProtocolType.UNKNOWN; } } + static boolean nodeMatches(final JsonNode node, final String value) { + if (node == null || !node.isTextual()) { + return false; + } + return node.equals(TextNode.valueOf(value)); + } + private static Struct getStruct(final JsonNode schema) { final LinkedHashMap propertiesMap = new LinkedHashMap<>(); final JsonNode properties = schema.get("properties"); @@ -100,7 +100,7 @@ private static AirbyteType fromArrayJsonSchema(final JsonNode schema, final Json } else if (typeOptions.get(0).equals("array")) { return getArray(schema); } else { - return getAirbyteProtocolType(getTrimmedJsonSchema(schema, typeOptions.get(0))); + return AirbyteProtocolType.fromJson(getTrimmedJsonSchema(schema, typeOptions.get(0))); } } @@ -117,93 +117,4 @@ private static JsonNode getTrimmedJsonSchema(final JsonNode schema, final String return schemaClone; } - /** - * Protocol types are ordered by precedence in the case of a Union that contains multiple types. - * Priority is given to wider scope types over narrower ones. (Note that because of dedup logic in - * {@link AirbyteType#fromJsonSchema(JsonNode)}, at most one string or date/time type can exist in a - * Union.) - */ - enum AirbyteProtocolType implements AirbyteType { - - STRING, - DATE, - TIME_WITHOUT_TIMEZONE, - TIME_WITH_TIMEZONE, - TIMESTAMP_WITHOUT_TIMEZONE, - TIMESTAMP_WITH_TIMEZONE, - NUMBER, - INTEGER, - BOOLEAN, - UNKNOWN; - - public static AirbyteProtocolType matches(final String type) { - try { - return AirbyteProtocolType.valueOf(type.toUpperCase()); - } catch (final IllegalArgumentException e) { - LOGGER.error(String.format("Could not find matching AirbyteProtocolType for \"%s\": %s", type, e)); - return UNKNOWN; - } - } - - } - - /** - * @param properties Use LinkedHashMap to preserve insertion order. - */ - record Struct(LinkedHashMap properties) implements AirbyteType { - - } - - record Array(AirbyteType items) implements AirbyteType { - - } - - /** - * Represents a {oneOf: [...]} schema. - *

    - * This is purely a legacy type that we should eventually delete. See also {@link Union}. - */ - record UnsupportedOneOf(List options) implements AirbyteType { - - } - - /** - * Represents a {type: [a, b, ...]} schema. This is theoretically equivalent to {oneOf: [{type: a}, - * {type: b}, ...]} but legacy normalization only handles the {type: [...]} schemas. - *

    - * Eventually we should: - *

      - *
    1. Announce a breaking change to handle both oneOf styles the same
    2. - *
    3. Test against some number of API sources to verify that they won't break badly
    4. - *
    5. Update {@link AirbyteType#fromJsonSchema(JsonNode)} to parse both styles into - * SupportedOneOf
    6. - *
    7. Delete UnsupportedOneOf
    8. - *
    - */ - record Union(List options) implements AirbyteType { - - /** - * This is a hack to handle weird schemas like {type: [object, string]}. If a stream's top-level - * schema looks like this, we still want to be able to extract the object properties (i.e. treat it - * as though the string option didn't exist). - * - * @throws IllegalArgumentException if we cannot extract columns from this schema - */ - public LinkedHashMap asColumns() { - final long numObjectOptions = options.stream().filter(o -> o instanceof Struct).count(); - if (numObjectOptions > 1) { - LOGGER.error("Can't extract columns from a schema with multiple object options"); - return new LinkedHashMap<>(); - } - - return (options.stream().filter(o -> o instanceof Struct).findFirst()) - .map(o -> ((Struct) o).properties()) - .orElseGet(() -> { - LOGGER.error("Can't extract columns from a schema with no object options"); - return new LinkedHashMap<>(); - }); - } - - } - } diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java deleted file mode 100644 index 6a9a9d66ab78..000000000000 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.base.destination.typing_deduping; - -import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType.*; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.TextNode; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; -import java.util.Comparator; -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AirbyteTypeUtils { - - private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteTypeUtils.class); - - protected static boolean nodeMatches(final JsonNode node, final String value) { - if (node == null || !node.isTextual()) { - return false; - } - return node.equals(TextNode.valueOf(value)); - } - - // Extracts the appropriate protocol type from the representative JSON - protected static AirbyteType getAirbyteProtocolType(final JsonNode node) { - // JSON could be a string (ex: "number") - if (node.isTextual()) { - return matches(node.asText()); - } - - // or, JSON could be a node with fields - final JsonNode propertyType = node.get("type"); - final JsonNode airbyteType = node.get("airbyte_type"); - final JsonNode format = node.get("format"); - - if (nodeMatches(propertyType, "boolean")) { - return BOOLEAN; - } else if (nodeMatches(propertyType, "integer")) { - return INTEGER; - } else if (nodeMatches(propertyType, "number")) { - return nodeMatches(airbyteType, "integer") ? INTEGER : NUMBER; - } else if (nodeMatches(propertyType, "string")) { - if (nodeMatches(format, "date")) { - return DATE; - } else if (nodeMatches(format, "time")) { - if (nodeMatches(airbyteType, "time_without_timezone")) { - return TIME_WITHOUT_TIMEZONE; - } else if (nodeMatches(airbyteType, "time_with_timezone")) { - return TIME_WITH_TIMEZONE; - } - } else if (nodeMatches(format, "date-time")) { - if (nodeMatches(airbyteType, "timestamp_without_timezone")) { - return TIMESTAMP_WITHOUT_TIMEZONE; - } else if (airbyteType == null || nodeMatches(airbyteType, "timestamp_with_timezone")) { - return TIMESTAMP_WITH_TIMEZONE; - } - } else { - return STRING; - } - } - - return UNKNOWN; - } - - // Picks which type in a Union takes precedence - public static AirbyteType chooseUnionType(final Union u) { - final Comparator comparator = Comparator.comparing(t -> { - if (t instanceof Array) { - return -2; - } else if (t instanceof Struct) { - return -1; - } else if (t instanceof final AirbyteProtocolType p) { - return List.of(AirbyteProtocolType.values()).indexOf(p); - } - return Integer.MAX_VALUE; - }); - - return u.options().stream().min(comparator).orElse(UNKNOWN); - } - -} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Array.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Array.java new file mode 100644 index 000000000000..992e975b4b3a --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Array.java @@ -0,0 +1,5 @@ +package io.airbyte.integrations.base.destination.typing_deduping; + +public record Array(AirbyteType items) implements AirbyteType { + +} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java index 2a26a32fdf1b..8b885eaedaf0 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java @@ -4,8 +4,6 @@ package io.airbyte.integrations.base.destination.typing_deduping; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import java.util.ArrayList; diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Struct.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Struct.java new file mode 100644 index 000000000000..06d9e0daf2e9 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Struct.java @@ -0,0 +1,10 @@ +package io.airbyte.integrations.base.destination.typing_deduping; + +import java.util.LinkedHashMap; + +/** + * @param properties Use LinkedHashMap to preserve insertion order. + */ +public record Struct(LinkedHashMap properties) implements AirbyteType { + +} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Union.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Union.java new file mode 100644 index 000000000000..82fb98f927ce --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Union.java @@ -0,0 +1,61 @@ +package io.airbyte.integrations.base.destination.typing_deduping; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; + +/** + * Represents a {type: [a, b, ...]} schema. This is theoretically equivalent to {oneOf: [{type: a}, + * {type: b}, ...]} but legacy normalization only handles the {type: [...]} schemas. + *

    + * Eventually we should: + *

      + *
    1. Announce a breaking change to handle both oneOf styles the same
    2. + *
    3. Test against some number of API sources to verify that they won't break badly
    4. + *
    5. Update {@link AirbyteType#fromJsonSchema(JsonNode)} to parse both styles into + * SupportedOneOf
    6. + *
    7. Delete UnsupportedOneOf
    8. + *
    + */ +public record Union(List options) implements AirbyteType { + + /** + * This is a hack to handle weird schemas like {type: [object, string]}. If a stream's top-level + * schema looks like this, we still want to be able to extract the object properties (i.e. treat it + * as though the string option didn't exist). + * + * @throws IllegalArgumentException if we cannot extract columns from this schema + */ + public LinkedHashMap asColumns() { + final long numObjectOptions = options.stream().filter(o -> o instanceof Struct).count(); + if (numObjectOptions > 1) { + LOGGER.error("Can't extract columns from a schema with multiple object options"); + return new LinkedHashMap<>(); + } + + return (options.stream().filter(o -> o instanceof Struct).findFirst()) + .map(o -> ((Struct) o).properties()) + .orElseGet(() -> { + LOGGER.error("Can't extract columns from a schema with no object options"); + return new LinkedHashMap<>(); + }); + } + + // Picks which type in a Union takes precedence + public AirbyteType chooseType() { + final Comparator comparator = Comparator.comparing(t -> { + if (t instanceof Array) { + return -2; + } else if (t instanceof Struct) { + return -1; + } else if (t instanceof final AirbyteProtocolType p) { + return List.of(AirbyteProtocolType.values()).indexOf(p); + } + return Integer.MAX_VALUE; + }); + + return options.stream().min(comparator).orElse(AirbyteProtocolType.UNKNOWN); + } + +} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/UnsupportedOneOf.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/UnsupportedOneOf.java new file mode 100644 index 000000000000..d38e0559fcf9 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/UnsupportedOneOf.java @@ -0,0 +1,12 @@ +package io.airbyte.integrations.base.destination.typing_deduping; + +import java.util.List; + +/** + * Represents a {oneOf: [...]} schema. + *

    + * This is purely a legacy type that we should eventually delete. See also {@link Union}. + */ +public record UnsupportedOneOf(List options) implements AirbyteType { + +} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java index eee35d7d013e..85ffa5c8e11f 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java @@ -4,18 +4,14 @@ package io.airbyte.integrations.base.destination.typing_deduping; -import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType.*; +import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType.*; import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.fromJsonSchema; -import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils.chooseUnionType; +import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -443,7 +439,9 @@ public void testChooseUnion() { unionToType.put(new Union(ImmutableList.of(INTEGER, BOOLEAN, NUMBER)), NUMBER); unionToType.put(new Union(ImmutableList.of(BOOLEAN, INTEGER)), INTEGER); - unionToType.forEach((u, t) -> assertEquals(t, chooseUnionType(u))); + assertAll( + () -> unionToType.forEach((u, t) -> assertEquals(t, u.chooseType())) + ); } @Test diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index 447645afef29..c5f5a596fcc7 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -14,22 +14,20 @@ import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TimePartitioning; import com.google.common.annotations.VisibleForTesting; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils; import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; +import io.airbyte.integrations.base.destination.typing_deduping.Union; +import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; import io.airbyte.integrations.destination.bigquery.BigQuerySQLNameTransformer; import io.airbyte.protocol.models.v0.DestinationSyncMode; - import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; @@ -38,7 +36,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; import org.slf4j.Logger; @@ -100,7 +97,7 @@ public StandardSQLTypeName toDialectType(final AirbyteType type) { } else if (type instanceof UnsupportedOneOf) { return StandardSQLTypeName.JSON; } else if (type instanceof final Union u) { - final AirbyteType typeWithPrecedence = AirbyteTypeUtils.chooseUnionType(u); + final AirbyteType typeWithPrecedence = u.chooseType(); final StandardSQLTypeName dialectType; if ((typeWithPrecedence instanceof Struct) || (typeWithPrecedence instanceof Array)) { dialectType = StandardSQLTypeName.JSON; @@ -117,7 +114,7 @@ public StandardSQLTypeName toDialectType(final AirbyteType type) { private String extractAndCast(final ColumnId column, final AirbyteType airbyteType) { if (airbyteType instanceof final Union u) { // This is guaranteed to not be a Union, so we won't recurse infinitely - final AirbyteType chosenType = AirbyteTypeUtils.chooseUnionType(u); + final AirbyteType chosenType = u.chooseType(); return extractAndCast(column, chosenType); } else if (airbyteType instanceof Struct) { // We need to validate that the struct is actually a struct. @@ -200,7 +197,7 @@ PARTITION BY (DATE_TRUNC(_airbyte_extracted_at, DAY)) } private List clusteringColumns(final StreamConfig stream) { - List clusterColumns = new ArrayList<>(); + final List clusterColumns = new ArrayList<>(); if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { // We're doing deduping, therefore we have a primary key. // Cluster on all the PK columns @@ -229,7 +226,7 @@ public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, } boolean tableClusteringMatches = false; boolean tablePartitioningMatches = false; - if (existingTable instanceof StandardTableDefinition standardExistingTable) { + if (existingTable instanceof final StandardTableDefinition standardExistingTable) { tableClusteringMatches = clusteringMatches(stream, standardExistingTable); tablePartitioningMatches = partitioningMatches(standardExistingTable); } @@ -245,14 +242,14 @@ public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, } @VisibleForTesting - public boolean clusteringMatches(StreamConfig stream, StandardTableDefinition existingTable) { + public boolean clusteringMatches(final StreamConfig stream, final StandardTableDefinition existingTable) { return existingTable.getClustering() == null ? false : containsAllIgnoreCase( existingTable.getClustering().getFields().stream().collect(Collectors.toSet()), clusteringColumns(stream)); } @VisibleForTesting - public boolean partitioningMatches(StandardTableDefinition existingTable) { + public boolean partitioningMatches(final StandardTableDefinition existingTable) { return existingTable.getTimePartitioning() == null ? false : existingTable.getTimePartitioning() .getField() .equalsIgnoreCase("_airbyte_extracted_at") && @@ -307,21 +304,21 @@ public AlterTableReport buildAlterTableReport(final StreamConfig stream, final T * @return whether all the {@link SqlGenerator#FINAL_TABLE_AIRBYTE_COLUMNS} are present */ @VisibleForTesting - public static boolean schemaContainAllFinalTableV2AirbyteColumns(Collection columnNames) { + public static boolean schemaContainAllFinalTableV2AirbyteColumns(final Collection columnNames) { return FINAL_TABLE_AIRBYTE_COLUMNS.stream() .allMatch(column -> containsIgnoreCase(columnNames, column)); } @Override public List softReset(final StreamConfig stream) { - String createTempTable = createTable(stream, SOFT_RESET_SUFFIX); - String clearLoadedAt = clearLoadedAt(stream.id()); - String rebuildInTempTable = updateTable(SOFT_RESET_SUFFIX, stream); - String overwriteFinalTable = overwriteFinalTableStatement(stream, SOFT_RESET_SUFFIX); + final String createTempTable = createTable(stream, SOFT_RESET_SUFFIX); + final String clearLoadedAt = clearLoadedAt(stream.id()); + final String rebuildInTempTable = updateTable(SOFT_RESET_SUFFIX, stream); + final String overwriteFinalTable = overwriteFinalTableStatement(stream, SOFT_RESET_SUFFIX); return List.of(createTempTable, clearLoadedAt, rebuildInTempTable, overwriteFinalTable); } - private String clearLoadedAt(StreamId streamId) { + private String clearLoadedAt(final StreamId streamId) { return new StringSubstitutor(Map.of("raw_table_id", streamId.rawTableId(QUOTE))) .replace(""" UPDATE ${raw_table_id} SET _airbyte_loaded_at = NULL WHERE 1=1; @@ -572,7 +569,7 @@ public Optional overwriteFinalTable(final String finalSuffix, final Stre } } - private String overwriteFinalTableStatement(StreamConfig stream, String finalSuffix) { + private String overwriteFinalTableStatement(final StreamConfig stream, final String finalSuffix) { return new StringSubstitutor(Map.of( "final_table_id", stream.id().finalTableId(QUOTE), "tmp_final_table", stream.id().finalTableId(finalSuffix, QUOTE), diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index a1841a295bbb..8ea522da6233 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -22,14 +22,14 @@ import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableResult; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.base.destination.typing_deduping.RecordDiffer; +import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; +import io.airbyte.integrations.base.destination.typing_deduping.RecordDiffer; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; import io.airbyte.integrations.destination.bigquery.BigQueryDestination; import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.SyncMode; @@ -114,8 +114,8 @@ public class BigQuerySqlGeneratorIntegrationTest { @BeforeAll public static void setup() throws Exception { - String rawConfig = Files.readString(Path.of("secrets/credentials-gcs-staging.json")); - JsonNode config = Jsons.deserialize(rawConfig); + final String rawConfig = Files.readString(Path.of("secrets/credentials-gcs-staging.json")); + final JsonNode config = Jsons.deserialize(rawConfig); bq = BigQueryDestination.getBigQuery(config); destinationHandler = new BigQueryDestinationHandler(bq); @@ -145,7 +145,7 @@ public void teardownDataset() { @Test public void testCreateTableIncremental() throws InterruptedException { - StreamConfig stream = incrementalDedupStreamConfig(); + final StreamConfig stream = incrementalDedupStreamConfig(); destinationHandler.execute(GENERATOR.createTable(stream, "")); @@ -811,7 +811,7 @@ private void createFinalTable() throws InterruptedException { createFinalTable(""); } - private void createFinalTable(String suffix) throws InterruptedException { + private void createFinalTable(final String suffix) throws InterruptedException { bq.query(QueryJobConfiguration.newBuilder( new StringSubstitutor(Map.of( "dataset", testDataset, @@ -877,7 +877,7 @@ PARTITION BY (DATE_TRUNC(_airbyte_extracted_at, DAY)) * TableResult contains records in a somewhat nonintuitive format (and it avoids loading them all into memory). * That's annoying for us since we're working with small test data, so just pull everything into a list. */ - public static List toJsonRecords(TableResult result) { + public static List toJsonRecords(final TableResult result) { return result.streamAll().map(row -> toJson(result.getSchema(), row)).toList(); } @@ -886,12 +886,12 @@ public static List toJsonRecords(TableResult result) { * This method does that conversion, using the schema to determine which type is most appropriate. Then we just dump * everything into a jsonnode for interop with RecordDiffer. */ - private static JsonNode toJson(Schema schema, FieldValueList row) { + private static JsonNode toJson(final Schema schema, final FieldValueList row) { final ObjectNode json = (ObjectNode) Jsons.emptyObject(); for (int i = 0; i < schema.getFields().size(); i++) { final Field field = schema.getFields().get(i); final FieldValue value = row.get(i); - JsonNode typedValue; + final JsonNode typedValue; if (!value.isNull()) { typedValue = switch (field.getType().getStandardType()) { case BOOL -> Jsons.jsonNode(value.getBooleanValue()); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java index a1acb711874f..2313f3773770 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java @@ -11,13 +11,13 @@ import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TimePartitioning; import com.google.common.collect.ImmutableList; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Union; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; +import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.Union; +import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; From a3d54dc90ceb45c110c1f1db8ddd228bfaabf6de Mon Sep 17 00:00:00 2001 From: cynthiaxyin Date: Thu, 20 Jul 2023 01:48:32 +0000 Subject: [PATCH 09/10] Automated Commit - Format and Process Resources Changes --- .../base/destination/typing_deduping/RecordDiffer.java | 4 +++- .../base/destination/typing_deduping/AirbyteProtocolType.java | 4 ++++ .../integrations/base/destination/typing_deduping/Array.java | 4 ++++ .../integrations/base/destination/typing_deduping/Struct.java | 4 ++++ .../integrations/base/destination/typing_deduping/Union.java | 4 ++++ .../base/destination/typing_deduping/UnsupportedOneOf.java | 4 ++++ .../base/destination/typing_deduping/AirbyteTypeTest.java | 3 +-- 7 files changed, 24 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java index 3c92ccd72c87..e6b949f1f0eb 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java @@ -203,7 +203,9 @@ private static String diffRecords(final List originalExpectedRecords, return message; } - private static String diffSingleRecord(final Function recordIdExtractor, final JsonNode expectedRecord, final JsonNode actualRecord) { + private static String diffSingleRecord(final Function recordIdExtractor, + final JsonNode expectedRecord, + final JsonNode actualRecord) { boolean foundMismatch = false; String mismatchedRecordMessage = "Row had incorrect data: " + recordIdExtractor.apply(expectedRecord) + "\n"; // Iterate through each column in the expected record and compare it to the actual record's value. diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteProtocolType.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteProtocolType.java index f33f78be62e9..ab800697b0e1 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteProtocolType.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteProtocolType.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.base.destination.typing_deduping; import com.fasterxml.jackson.databind.JsonNode; diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Array.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Array.java index 992e975b4b3a..dccd687e033e 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Array.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Array.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.base.destination.typing_deduping; public record Array(AirbyteType items) implements AirbyteType { diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Struct.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Struct.java index 06d9e0daf2e9..80eb61be79c5 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Struct.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Struct.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.base.destination.typing_deduping; import java.util.LinkedHashMap; diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Union.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Union.java index 82fb98f927ce..e8b62dc36eed 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Union.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Union.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.base.destination.typing_deduping; import com.fasterxml.jackson.databind.JsonNode; diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/UnsupportedOneOf.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/UnsupportedOneOf.java index d38e0559fcf9..3d3c84636a3c 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/UnsupportedOneOf.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/UnsupportedOneOf.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.base.destination.typing_deduping; import java.util.List; diff --git a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java index 85ffa5c8e11f..f37d71754bcb 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java @@ -440,8 +440,7 @@ public void testChooseUnion() { unionToType.put(new Union(ImmutableList.of(BOOLEAN, INTEGER)), INTEGER); assertAll( - () -> unionToType.forEach((u, t) -> assertEquals(t, u.chooseType())) - ); + () -> unionToType.forEach((u, t) -> assertEquals(t, u.chooseType()))); } @Test From ae42899a1b14ceeede806e1d39b363babc7ff5fe Mon Sep 17 00:00:00 2001 From: Cynthia Yin Date: Thu, 20 Jul 2023 12:10:29 -0700 Subject: [PATCH 10/10] fix assertAll --- .../base/destination/typing_deduping/AirbyteTypeTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java index f37d71754bcb..da80eeee31c5 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java @@ -440,7 +440,7 @@ public void testChooseUnion() { unionToType.put(new Union(ImmutableList.of(BOOLEAN, INTEGER)), INTEGER); assertAll( - () -> unionToType.forEach((u, t) -> assertEquals(t, u.chooseType()))); + unionToType.entrySet().stream().map(e -> () -> assertEquals(e.getValue(), e.getKey().chooseType()))); } @Test