diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java index f71bad6ff012..8581885528b2 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ContainerNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; @@ -19,6 +20,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.db.DataTypeUtils; import io.airbyte.db.SourceOperations; +import io.airbyte.db.util.JsonUtil; import io.airbyte.protocol.models.JsonSchemaType; import java.text.DateFormat; import java.text.ParseException; @@ -43,20 +45,19 @@ public JsonNode rowToJson(final BigQueryResultSet bigQueryResultSet) { return jsonNode; } - private void fillObjectNode(final String fieldName, final StandardSQLTypeName fieldType, final FieldValue fieldValue, final ObjectNode node) { + private void fillObjectNode(final String fieldName, final StandardSQLTypeName fieldType, final FieldValue fieldValue, final ContainerNode node) { switch (fieldType) { - case BOOL -> node.put(fieldName, fieldValue.getBooleanValue()); - case INT64 -> node.put(fieldName, fieldValue.getLongValue()); - case FLOAT64 -> node.put(fieldName, fieldValue.getDoubleValue()); - case NUMERIC -> node.put(fieldName, fieldValue.getNumericValue()); - case BIGNUMERIC -> node.put(fieldName, returnNullIfInvalid(fieldValue::getNumericValue)); - case STRING -> node.put(fieldName, fieldValue.getStringValue()); - case BYTES -> node.put(fieldName, fieldValue.getBytesValue()); - case DATE -> node.put(fieldName, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATE_FORMAT))); - case DATETIME -> node.put(fieldName, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATETIME_FORMAT))); - case TIMESTAMP -> node.put(fieldName, toISO8601String(fieldValue.getTimestampValue() / 1000)); - case TIME -> node.put(fieldName, fieldValue.getStringValue()); - default -> node.put(fieldName, fieldValue.getStringValue()); + case BOOL -> JsonUtil.putBooleanValueIntoJson(node, fieldValue.getBooleanValue(), fieldName); + case INT64 -> JsonUtil.putLongValueIntoJson(node, fieldValue.getLongValue(), fieldName); + case FLOAT64 -> JsonUtil.putDoubleValueIntoJson(node, fieldValue.getDoubleValue(), fieldName); + case NUMERIC -> JsonUtil.putBigDecimalValueIntoJson(node, fieldValue.getNumericValue(), fieldName); + case BIGNUMERIC -> JsonUtil.putBigDecimalValueIntoJson(node, returnNullIfInvalid(fieldValue::getNumericValue), fieldName); + case STRING, TIME -> JsonUtil.putStringValueIntoJson(node, fieldValue.getStringValue(), fieldName); + case BYTES -> JsonUtil.putBytesValueIntoJson(node, fieldValue.getBytesValue(), fieldName); + case DATE -> JsonUtil.putStringValueIntoJson(node, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATE_FORMAT)), fieldName); + case DATETIME -> JsonUtil.putStringValueIntoJson(node, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATETIME_FORMAT)), fieldName); + case TIMESTAMP -> JsonUtil.putStringValueIntoJson(node, toISO8601String(fieldValue.getTimestampValue() / 1000), fieldName); + default -> JsonUtil.putStringValueIntoJson(node, fieldValue.getStringValue(), fieldName); } } @@ -74,7 +75,7 @@ private void setJsonField(final Field field, final FieldValue fieldValue, final final FieldList subFields = field.getSubFields(); // Array of primitive if (subFields == null || subFields.isEmpty()) { - fieldValue.getRepeatedValue().forEach(arrayFieldValue -> fillObjectNode(fieldName, fieldType, arrayFieldValue, arrayNode.addObject())); + fieldValue.getRepeatedValue().forEach(arrayFieldValue -> fillObjectNode(fieldName, fieldType, arrayFieldValue, arrayNode)); // Array of records } else { for (final FieldValue arrayFieldValue : fieldValue.getRepeatedValue()) { diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/util/JsonUtil.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/util/JsonUtil.java new file mode 100644 index 000000000000..967c3f49e4db --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/util/JsonUtil.java @@ -0,0 +1,70 @@ +package io.airbyte.db.util; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ContainerNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.math.BigDecimal; + +public class JsonUtil { + + public static void putBooleanValueIntoJson(final ContainerNode node, final boolean value, final String fieldName) { + if (node instanceof ArrayNode) { + ((ArrayNode) node).add(value); + } else if (node instanceof ObjectNode) { + ((ObjectNode) node).put(fieldName, value); + } else { + throw new RuntimeException("Can't populate the node type : " + node.getClass().getName()); + } + } + + public static void putLongValueIntoJson(final ContainerNode node, final long value, final String fieldName) { + if (node instanceof ArrayNode) { + ((ArrayNode) node).add(value); + } else if (node instanceof ObjectNode) { + ((ObjectNode) node).put(fieldName, value); + } else { + throw new RuntimeException("Can't populate the node type : " + node.getClass().getName()); + } + } + + public static void putDoubleValueIntoJson(final ContainerNode node, final double value, final String fieldName) { + if (node instanceof ArrayNode) { + ((ArrayNode) node).add(value); + } else if (node instanceof ObjectNode) { + ((ObjectNode) node).put(fieldName, value); + } else { + throw new RuntimeException("Can't populate the node type : " + node.getClass().getName()); + } + } + + public static void putBigDecimalValueIntoJson(final ContainerNode node, final BigDecimal value, final String fieldName) { + if (node instanceof ArrayNode) { + ((ArrayNode) node).add(value); + } else if (node instanceof ObjectNode) { + ((ObjectNode) node).put(fieldName, value); + } else { + throw new RuntimeException("Can't populate the node type : " + node.getClass().getName()); + } + } + + public static void putStringValueIntoJson(final ContainerNode node, final String value, final String fieldName) { + if (node instanceof ArrayNode) { + ((ArrayNode) node).add(value); + } else if (node instanceof ObjectNode) { + ((ObjectNode) node).put(fieldName, value); + } else { + throw new RuntimeException("Can't populate the node type : " + node.getClass().getName()); + } + } + + public static void putBytesValueIntoJson(final ContainerNode node, final byte[] value, final String fieldName) { + if (node instanceof ArrayNode) { + ((ArrayNode) node).add(value); + } else if (node instanceof ObjectNode) { + ((ObjectNode) node).put(fieldName, value); + } else { + throw new RuntimeException("Can't populate the node type : " + node.getClass().getName()); + } + } + +} diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index 7485fdd1bed5..affef4165c0d 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -28,5 +28,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.2.1 +LABEL io.airbyte.version=0.2.2 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql index e1e54439e657..82ca9655b3ff 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql @@ -236,3 +236,18 @@ {% macro clickhouse__json_extract_array(json_column, json_path_list, normalized_json_path) -%} JSONExtractArrayRaw(assumeNotNull({{ json_column }}), {{ format_json_path(json_path_list) }}) {%- endmacro %} + +{# json_extract_string_array ------------------------------------------------- #} + +{% macro json_extract_string_array(json_column, json_path_list, normalized_json_path) -%} + {{ adapter.dispatch('json_extract_string_array')(json_column, json_path_list, normalized_json_path) }} +{%- endmacro %} + +{% macro default__json_extract_string_array(json_column, json_path_list, normalized_json_path) -%} + json_extract_array({{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} + +# https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_extract_string_array +{% macro bigquery__json_extract_string_array(json_column, json_path_list, normalized_json_path) -%} + json_extract_string_array({{ json_column }}, {{ format_json_path(normalized_json_path) }}) +{%- endmacro %} diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index 161e9bcfae38..d00652d09016 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -455,6 +455,8 @@ def extract_json_column(property_name: str, json_column_name: str, definition: D if "type" in definition: if is_array(definition["type"]): json_extract = jinja_call(f"json_extract_array({json_column_name}, {json_path}, {normalized_json_path})") + if is_simple_property(definition.get("items", {"type": "object"}).get("type", "object")): + json_extract = jinja_call(f"json_extract_string_array({json_column_name}, {json_path}, {normalized_json_path})") elif is_object(definition["type"]): json_extract = jinja_call(f"json_extract('{table_alias}', {json_column_name}, {json_path}, {normalized_json_path})") elif is_simple_property(definition["type"]): diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.java index 79cdb083508b..dd775e0d1026 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.java @@ -89,12 +89,13 @@ protected boolean compareJsonNodes(final JsonNode expectedValue, final JsonNode return compareDateTimeValues(expectedValue.asText(), actualValue.asText()); } else if (isDateValue(expectedValue.asText())) { return compareDateValues(expectedValue.asText(), actualValue.asText()); - } else if (expectedValue.isArray() && actualValue.isArray()) { + } else if (expectedValue.isArray()) { return compareArrays(expectedValue, actualValue); - } else if (expectedValue.isObject() && actualValue.isObject()) { + } else if (expectedValue.isObject()) { compareObjects(expectedValue, actualValue); return true; } else { + LOGGER.warn("Default comparison method!"); return compareString(expectedValue, actualValue); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java index 45a1074d0706..61f9be7d225e 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java @@ -112,8 +112,7 @@ protected boolean supportBasicDataTypeTest() { @Override protected boolean supportArrayDataTypeTest() { - // #13154 Normalization issue - return false; + return true; } @Override @@ -182,7 +181,7 @@ private List retrieveRecordsFromTable(final String tableName, final St final FieldList fields = queryResults.getSchema().getFields(); BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations(); - return Streams.stream(queryResults.iterateAll()) + return Streams.stream(queryResults.iterateAll()) .map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList()); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java index e2223f5494e2..8c7be65f6fad 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.destination.bigquery; +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; import java.time.LocalDate; @@ -49,6 +51,19 @@ private LocalDateTime parseDateTime(String dateTimeValue) { } } + @Override + protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) { + if (destinationValue != null) { + if (destinationValue.matches(".+Z")) { + return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)), ZoneOffset.UTC); + } else { + return ZonedDateTime.parse(destinationValue, getAirbyteDateTimeWithTzFormatter()).withZoneSameInstant(ZoneOffset.UTC); + } + } else { + return null; + } + } + @Override protected boolean compareDateTimeValues(String expectedValue, String actualValue) { var destinationDate = parseDateTime(actualValue); @@ -70,11 +85,6 @@ protected boolean compareDateValues(String expectedValue, String actualValue) { return expectedDate.equals(destinationDate); } - @Override - protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) { - return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)), ZoneOffset.UTC); - } - @Override protected boolean compareDateTimeWithTzValues(String airbyteMessageValue, String destinationValue) { // #13123 Normalization issue @@ -92,4 +102,9 @@ private ZonedDateTime getBrokenDate() { return ZonedDateTime.of(1583, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); } + @Override + protected void compareObjects(JsonNode expectedObject, JsonNode actualObject) { + JsonNode actualJsonNode = (actualObject.isTextual() ? Jsons.deserialize(actualObject.textValue()) : actualObject); + super.compareObjects(expectedObject, actualJsonNode); + } } diff --git a/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceDatatypeTest.java b/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceDatatypeTest.java index 05bc5b55ccff..fb7fad3e5231 100644 --- a/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceDatatypeTest.java @@ -284,7 +284,7 @@ protected void initTests() { .airbyteType(JsonSchemaType.STRING) .createTablePatternSql(CREATE_SQL_PATTERN) .addInsertValues("['a', 'b']") - .addExpectedValues("[{\"test_column\":\"a\"},{\"test_column\":\"b\"}]") + .addExpectedValues("[\"a\",\"b\"]") .build()); addDataTypeTestData( diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java index b7dc0c0ce1e8..df51f8073a1c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java @@ -14,7 +14,7 @@ public class NormalizationRunnerFactory { public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization"; - public static final String NORMALIZATION_VERSION = "0.2.1"; + public static final String NORMALIZATION_VERSION = "0.2.2"; static final Map> NORMALIZATION_MAPPING = ImmutableMap.>builder() diff --git a/docs/understanding-airbyte/basic-normalization.md b/docs/understanding-airbyte/basic-normalization.md index 720b4fd214c6..5061de5c5722 100644 --- a/docs/understanding-airbyte/basic-normalization.md +++ b/docs/understanding-airbyte/basic-normalization.md @@ -352,6 +352,7 @@ Therefore, in order to "upgrade" to the desired normalization version, you need | Airbyte Version | Normalization Version | Date | Pull Request | Subject | |:----------------| :--- | :--- | :--- | :--- | +| | 0.2.2 | 2022-06-02 | [\#13289](https://github.com/airbytehq/airbyte/pull/13289) | BigQuery use `json_extract_string_array` for array of simple type elements | | | 0.2.1 | 2022-05-17 | [\#12924](https://github.com/airbytehq/airbyte/pull/12924) | Fixed checking --event-buffer-size on old dbt crashed entrypoint.sh | | | 0.2.0 | 2022-05-15 | [\#12745](https://github.com/airbytehq/airbyte/pull/12745) | Snowflake: add datetime without timezone | | | 0.1.78 | 2022-05-06 | [\#12305](https://github.com/airbytehq/airbyte/pull/12305) | Mssql: use NVARCHAR and datetime2 by default |