From db5c5a3056724720a8077e7f9c3d5c762a3f33bd Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Fri, 27 May 2022 21:19:39 +0300 Subject: [PATCH 01/10] json_extract_array -> json_extract_string_array Signed-off-by: Sergey Chvalyuk --- airbyte-integrations/bases/base-normalization/Dockerfile | 2 +- .../macros/cross_db_utils/json_operations.sql | 2 +- .../workers/normalization/NormalizationRunnerFactory.java | 2 +- docs/understanding-airbyte/basic-normalization.md | 1 + 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index 7485fdd1bed5..affef4165c0d 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -28,5 +28,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.2.1 +LABEL io.airbyte.version=0.2.2 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql index e1e54439e657..1dc905f49499 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql @@ -206,7 +206,7 @@ {%- endmacro %} {% macro bigquery__json_extract_array(json_column, json_path_list, normalized_json_path) -%} - json_extract_array({{ json_column }}, {{ format_json_path(normalized_json_path) }}) + json_extract_string_array({{ json_column }}, {{ format_json_path(normalized_json_path) }}) {%- endmacro %} {% macro postgres__json_extract_array(json_column, json_path_list, normalized_json_path) -%} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java index b7dc0c0ce1e8..df51f8073a1c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java @@ -14,7 +14,7 @@ public class NormalizationRunnerFactory { public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization"; - public static final String NORMALIZATION_VERSION = "0.2.1"; + public static final String NORMALIZATION_VERSION = "0.2.2"; static final Map> NORMALIZATION_MAPPING = ImmutableMap.>builder() diff --git a/docs/understanding-airbyte/basic-normalization.md b/docs/understanding-airbyte/basic-normalization.md index 720b4fd214c6..b0561868537a 100644 --- a/docs/understanding-airbyte/basic-normalization.md +++ b/docs/understanding-airbyte/basic-normalization.md @@ -352,6 +352,7 @@ Therefore, in order to "upgrade" to the desired normalization version, you need | Airbyte Version | Normalization Version | Date | Pull Request | Subject | |:----------------| :--- | :--- | :--- | :--- | +| | 0.2.2 | 2022-05-27 | []() | Improve array parsing for BigQuery | | | 0.2.1 | 2022-05-17 | [\#12924](https://github.com/airbytehq/airbyte/pull/12924) | Fixed checking --event-buffer-size on old dbt crashed entrypoint.sh | | | 0.2.0 | 2022-05-15 | [\#12745](https://github.com/airbytehq/airbyte/pull/12745) | Snowflake: add datetime without timezone | | | 0.1.78 | 2022-05-06 | [\#12305](https://github.com/airbytehq/airbyte/pull/12305) | Mssql: use NVARCHAR and datetime2 by default | From 722a7b544e50e7533f99df47ae928f8f6baf8138 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Fri, 27 May 2022 21:22:28 +0300 Subject: [PATCH 02/10] basic-normalization.md updated Signed-off-by: Sergey Chvalyuk --- docs/understanding-airbyte/basic-normalization.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/understanding-airbyte/basic-normalization.md b/docs/understanding-airbyte/basic-normalization.md index b0561868537a..0f8626917c4c 100644 --- a/docs/understanding-airbyte/basic-normalization.md +++ b/docs/understanding-airbyte/basic-normalization.md @@ -352,7 +352,7 @@ Therefore, in order to "upgrade" to the desired normalization version, you need | Airbyte Version | Normalization Version | Date | Pull Request | Subject | |:----------------| :--- | :--- | :--- | :--- | -| | 0.2.2 | 2022-05-27 | []() | Improve array parsing for BigQuery | +| | 0.2.2 | 2022-05-27 | [\#13289](https://github.com/airbytehq/airbyte/pull/13289) | Improve array parsing for BigQuery | | | 0.2.1 | 2022-05-17 | [\#12924](https://github.com/airbytehq/airbyte/pull/12924) | Fixed checking --event-buffer-size on old dbt crashed entrypoint.sh | | | 0.2.0 | 2022-05-15 | [\#12745](https://github.com/airbytehq/airbyte/pull/12745) | Snowflake: add datetime without timezone | | | 0.1.78 | 2022-05-06 | [\#12305](https://github.com/airbytehq/airbyte/pull/12305) | Mssql: use NVARCHAR and datetime2 by default | From f2e1aa3018d69a25774a39fa21c6c2b12fafffe3 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Fri, 27 May 2022 21:28:51 +0300 Subject: [PATCH 03/10] REF -> ref (revert me) Signed-off-by: Sergey Chvalyuk --- .../redshift_normalization_migration/test_pokemon_super.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/redshift_normalization_migration/test_pokemon_super.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/redshift_normalization_migration/test_pokemon_super.sql index ffd4737034b3..72d9b8ae8461 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/redshift_normalization_migration/test_pokemon_super.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/redshift_normalization_migration/test_pokemon_super.sql @@ -1,6 +1,6 @@ SELECT forms FROM - {{ REF('pokemon') }} + {{ ref('pokemon') }} WHERE - forms != json_parse('[{"name":"ditto","url":"https://pokeapi.co/api/v2/pokemon-form/132/"}]') \ No newline at end of file + forms != json_parse('[{"name":"ditto","url":"https://pokeapi.co/api/v2/pokemon-form/132/"}]') From 3e8211f93f50e4e9e7a628f70899da98005f181c Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Sat, 28 May 2022 14:11:52 +0300 Subject: [PATCH 04/10] json_extract_string_array added Signed-off-by: Sergey Chvalyuk --- .../macros/cross_db_utils/json_operations.sql | 17 ++++++++++++++++- .../transform_catalog/stream_processor.py | 2 ++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql index 1dc905f49499..82ca9655b3ff 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql @@ -206,7 +206,7 @@ {%- endmacro %} {% macro bigquery__json_extract_array(json_column, json_path_list, normalized_json_path) -%} - json_extract_string_array({{ json_column }}, {{ format_json_path(normalized_json_path) }}) + json_extract_array({{ json_column }}, {{ format_json_path(normalized_json_path) }}) {%- endmacro %} {% macro postgres__json_extract_array(json_column, json_path_list, normalized_json_path) -%} @@ -236,3 +236,18 @@ {% macro clickhouse__json_extract_array(json_column, json_path_list, normalized_json_path) -%} JSONExtractArrayRaw(assumeNotNull({{ json_column }}), {{ format_json_path(json_path_list) }}) {%- endmacro %} + +{# json_extract_string_array ------------------------------------------------- #} + +{% macro json_extract_string_array(json_column, json_path_list, normalized_json_path) -%} + {{ adapter.dispatch('json_extract_string_array')(json_column, json_path_list, normalized_json_path) }} +{%- endmacro %} + +{% macro default__json_extract_string_array(json_column, json_path_list, normalized_json_path) -%} + json_extract_array({{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} + +# https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_extract_string_array +{% macro bigquery__json_extract_string_array(json_column, json_path_list, normalized_json_path) -%} + json_extract_string_array({{ json_column }}, {{ format_json_path(normalized_json_path) }}) +{%- endmacro %} diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index 161e9bcfae38..8a805cd58b3f 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -455,6 +455,8 @@ def extract_json_column(property_name: str, json_column_name: str, definition: D if "type" in definition: if is_array(definition["type"]): json_extract = jinja_call(f"json_extract_array({json_column_name}, {json_path}, {normalized_json_path})") + if is_simple_property(definition["items"]["type"]): + json_extract = jinja_call(f"json_extract_string_array({json_column_name}, {json_path}, {normalized_json_path})") elif is_object(definition["type"]): json_extract = jinja_call(f"json_extract('{table_alias}', {json_column_name}, {json_path}, {normalized_json_path})") elif is_simple_property(definition["type"]): From 5c7fa0ec91c50aeb7a1ac8c3c7a0c5c6b5b9e965 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Sat, 28 May 2022 17:11:34 +0300 Subject: [PATCH 05/10] improve checking items.type Signed-off-by: Sergey Chvalyuk --- .../normalization/transform_catalog/stream_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index 8a805cd58b3f..d00652d09016 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -455,7 +455,7 @@ def extract_json_column(property_name: str, json_column_name: str, definition: D if "type" in definition: if is_array(definition["type"]): json_extract = jinja_call(f"json_extract_array({json_column_name}, {json_path}, {normalized_json_path})") - if is_simple_property(definition["items"]["type"]): + if is_simple_property(definition.get("items", {"type": "object"}).get("type", "object")): json_extract = jinja_call(f"json_extract_string_array({json_column_name}, {json_path}, {normalized_json_path})") elif is_object(definition["type"]): json_extract = jinja_call(f"json_extract('{table_alias}', {json_column_name}, {json_path}, {normalized_json_path})") From 205d3b28e2b6df82c23741f1170180f25086ab4e Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Sun, 29 May 2022 18:11:45 +0300 Subject: [PATCH 06/10] turn on tests supportArrayDataTypeTest --- .../bigquery/BigQueryDestinationAcceptanceTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java index 45a1074d0706..edda58225624 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java @@ -112,8 +112,7 @@ protected boolean supportBasicDataTypeTest() { @Override protected boolean supportArrayDataTypeTest() { - // #13154 Normalization issue - return false; + return true; } @Override From bb1863bd9eb73665bec32c2085099a1b153f134c Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 2 Jun 2022 12:28:40 +0300 Subject: [PATCH 07/10] updated basic-normalization.md Signed-off-by: Sergey Chvalyuk --- docs/understanding-airbyte/basic-normalization.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/understanding-airbyte/basic-normalization.md b/docs/understanding-airbyte/basic-normalization.md index 0f8626917c4c..5061de5c5722 100644 --- a/docs/understanding-airbyte/basic-normalization.md +++ b/docs/understanding-airbyte/basic-normalization.md @@ -352,7 +352,7 @@ Therefore, in order to "upgrade" to the desired normalization version, you need | Airbyte Version | Normalization Version | Date | Pull Request | Subject | |:----------------| :--- | :--- | :--- | :--- | -| | 0.2.2 | 2022-05-27 | [\#13289](https://github.com/airbytehq/airbyte/pull/13289) | Improve array parsing for BigQuery | +| | 0.2.2 | 2022-06-02 | [\#13289](https://github.com/airbytehq/airbyte/pull/13289) | BigQuery use `json_extract_string_array` for array of simple type elements | | | 0.2.1 | 2022-05-17 | [\#12924](https://github.com/airbytehq/airbyte/pull/12924) | Fixed checking --event-buffer-size on old dbt crashed entrypoint.sh | | | 0.2.0 | 2022-05-15 | [\#12745](https://github.com/airbytehq/airbyte/pull/12745) | Snowflake: add datetime without timezone | | | 0.1.78 | 2022-05-06 | [\#12305](https://github.com/airbytehq/airbyte/pull/12305) | Mssql: use NVARCHAR and datetime2 by default | From 9b87d77cfdc2e404fd8b6290187acfafad207e55 Mon Sep 17 00:00:00 2001 From: "andrii.leonets" Date: Thu, 2 Jun 2022 13:46:20 +0300 Subject: [PATCH 08/10] Fix reading of array values --- .../db/bigquery/BigQuerySourceOperations.java | 29 ++++---- .../java/io/airbyte/db/util/JsonUtil.java | 70 +++++++++++++++++++ .../BigQueryDestinationAcceptanceTest.java | 2 +- .../bigquery/BigQueryTestDataComparator.java | 18 +++-- 4 files changed, 99 insertions(+), 20 deletions(-) create mode 100644 airbyte-db/lib/src/main/java/io/airbyte/db/util/JsonUtil.java diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java b/airbyte-db/lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java index f71bad6ff012..8581885528b2 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ContainerNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; @@ -19,6 +20,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.db.DataTypeUtils; import io.airbyte.db.SourceOperations; +import io.airbyte.db.util.JsonUtil; import io.airbyte.protocol.models.JsonSchemaType; import java.text.DateFormat; import java.text.ParseException; @@ -43,20 +45,19 @@ public JsonNode rowToJson(final BigQueryResultSet bigQueryResultSet) { return jsonNode; } - private void fillObjectNode(final String fieldName, final StandardSQLTypeName fieldType, final FieldValue fieldValue, final ObjectNode node) { + private void fillObjectNode(final String fieldName, final StandardSQLTypeName fieldType, final FieldValue fieldValue, final ContainerNode node) { switch (fieldType) { - case BOOL -> node.put(fieldName, fieldValue.getBooleanValue()); - case INT64 -> node.put(fieldName, fieldValue.getLongValue()); - case FLOAT64 -> node.put(fieldName, fieldValue.getDoubleValue()); - case NUMERIC -> node.put(fieldName, fieldValue.getNumericValue()); - case BIGNUMERIC -> node.put(fieldName, returnNullIfInvalid(fieldValue::getNumericValue)); - case STRING -> node.put(fieldName, fieldValue.getStringValue()); - case BYTES -> node.put(fieldName, fieldValue.getBytesValue()); - case DATE -> node.put(fieldName, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATE_FORMAT))); - case DATETIME -> node.put(fieldName, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATETIME_FORMAT))); - case TIMESTAMP -> node.put(fieldName, toISO8601String(fieldValue.getTimestampValue() / 1000)); - case TIME -> node.put(fieldName, fieldValue.getStringValue()); - default -> node.put(fieldName, fieldValue.getStringValue()); + case BOOL -> JsonUtil.putBooleanValueIntoJson(node, fieldValue.getBooleanValue(), fieldName); + case INT64 -> JsonUtil.putLongValueIntoJson(node, fieldValue.getLongValue(), fieldName); + case FLOAT64 -> JsonUtil.putDoubleValueIntoJson(node, fieldValue.getDoubleValue(), fieldName); + case NUMERIC -> JsonUtil.putBigDecimalValueIntoJson(node, fieldValue.getNumericValue(), fieldName); + case BIGNUMERIC -> JsonUtil.putBigDecimalValueIntoJson(node, returnNullIfInvalid(fieldValue::getNumericValue), fieldName); + case STRING, TIME -> JsonUtil.putStringValueIntoJson(node, fieldValue.getStringValue(), fieldName); + case BYTES -> JsonUtil.putBytesValueIntoJson(node, fieldValue.getBytesValue(), fieldName); + case DATE -> JsonUtil.putStringValueIntoJson(node, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATE_FORMAT)), fieldName); + case DATETIME -> JsonUtil.putStringValueIntoJson(node, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATETIME_FORMAT)), fieldName); + case TIMESTAMP -> JsonUtil.putStringValueIntoJson(node, toISO8601String(fieldValue.getTimestampValue() / 1000), fieldName); + default -> JsonUtil.putStringValueIntoJson(node, fieldValue.getStringValue(), fieldName); } } @@ -74,7 +75,7 @@ private void setJsonField(final Field field, final FieldValue fieldValue, final final FieldList subFields = field.getSubFields(); // Array of primitive if (subFields == null || subFields.isEmpty()) { - fieldValue.getRepeatedValue().forEach(arrayFieldValue -> fillObjectNode(fieldName, fieldType, arrayFieldValue, arrayNode.addObject())); + fieldValue.getRepeatedValue().forEach(arrayFieldValue -> fillObjectNode(fieldName, fieldType, arrayFieldValue, arrayNode)); // Array of records } else { for (final FieldValue arrayFieldValue : fieldValue.getRepeatedValue()) { diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/util/JsonUtil.java b/airbyte-db/lib/src/main/java/io/airbyte/db/util/JsonUtil.java new file mode 100644 index 000000000000..967c3f49e4db --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/util/JsonUtil.java @@ -0,0 +1,70 @@ +package io.airbyte.db.util; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ContainerNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.math.BigDecimal; + +public class JsonUtil { + + public static void putBooleanValueIntoJson(final ContainerNode node, final boolean value, final String fieldName) { + if (node instanceof ArrayNode) { + ((ArrayNode) node).add(value); + } else if (node instanceof ObjectNode) { + ((ObjectNode) node).put(fieldName, value); + } else { + throw new RuntimeException("Can't populate the node type : " + node.getClass().getName()); + } + } + + public static void putLongValueIntoJson(final ContainerNode node, final long value, final String fieldName) { + if (node instanceof ArrayNode) { + ((ArrayNode) node).add(value); + } else if (node instanceof ObjectNode) { + ((ObjectNode) node).put(fieldName, value); + } else { + throw new RuntimeException("Can't populate the node type : " + node.getClass().getName()); + } + } + + public static void putDoubleValueIntoJson(final ContainerNode node, final double value, final String fieldName) { + if (node instanceof ArrayNode) { + ((ArrayNode) node).add(value); + } else if (node instanceof ObjectNode) { + ((ObjectNode) node).put(fieldName, value); + } else { + throw new RuntimeException("Can't populate the node type : " + node.getClass().getName()); + } + } + + public static void putBigDecimalValueIntoJson(final ContainerNode node, final BigDecimal value, final String fieldName) { + if (node instanceof ArrayNode) { + ((ArrayNode) node).add(value); + } else if (node instanceof ObjectNode) { + ((ObjectNode) node).put(fieldName, value); + } else { + throw new RuntimeException("Can't populate the node type : " + node.getClass().getName()); + } + } + + public static void putStringValueIntoJson(final ContainerNode node, final String value, final String fieldName) { + if (node instanceof ArrayNode) { + ((ArrayNode) node).add(value); + } else if (node instanceof ObjectNode) { + ((ObjectNode) node).put(fieldName, value); + } else { + throw new RuntimeException("Can't populate the node type : " + node.getClass().getName()); + } + } + + public static void putBytesValueIntoJson(final ContainerNode node, final byte[] value, final String fieldName) { + if (node instanceof ArrayNode) { + ((ArrayNode) node).add(value); + } else if (node instanceof ObjectNode) { + ((ObjectNode) node).put(fieldName, value); + } else { + throw new RuntimeException("Can't populate the node type : " + node.getClass().getName()); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java index edda58225624..61f9be7d225e 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java @@ -181,7 +181,7 @@ private List retrieveRecordsFromTable(final String tableName, final St final FieldList fields = queryResults.getSchema().getFields(); BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations(); - return Streams.stream(queryResults.iterateAll()) + return Streams.stream(queryResults.iterateAll()) .map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList()); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java index e2223f5494e2..b3dc1b58b6e5 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java @@ -49,6 +49,19 @@ private LocalDateTime parseDateTime(String dateTimeValue) { } } + @Override + protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) { + if (destinationValue != null) { + if (destinationValue.matches(".+Z")) { + return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)), ZoneOffset.UTC); + } else { + return ZonedDateTime.parse(destinationValue, getAirbyteDateTimeWithTzFormatter()).withZoneSameInstant(ZoneOffset.UTC); + } + } else { + return null; + } + } + @Override protected boolean compareDateTimeValues(String expectedValue, String actualValue) { var destinationDate = parseDateTime(actualValue); @@ -70,11 +83,6 @@ protected boolean compareDateValues(String expectedValue, String actualValue) { return expectedDate.equals(destinationDate); } - @Override - protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) { - return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)), ZoneOffset.UTC); - } - @Override protected boolean compareDateTimeWithTzValues(String airbyteMessageValue, String destinationValue) { // #13123 Normalization issue From b1bddf1d9541304c5537c017845abaea6d5a7f01 Mon Sep 17 00:00:00 2001 From: "andrii.leonets" Date: Wed, 8 Jun 2022 13:18:28 +0300 Subject: [PATCH 09/10] Fix desirelization of a complex type during tests. --- .../destination/comparator/AdvancedTestDataComparator.java | 5 +++-- .../destination/bigquery/BigQueryTestDataComparator.java | 7 +++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.java index 79cdb083508b..dd775e0d1026 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.java @@ -89,12 +89,13 @@ protected boolean compareJsonNodes(final JsonNode expectedValue, final JsonNode return compareDateTimeValues(expectedValue.asText(), actualValue.asText()); } else if (isDateValue(expectedValue.asText())) { return compareDateValues(expectedValue.asText(), actualValue.asText()); - } else if (expectedValue.isArray() && actualValue.isArray()) { + } else if (expectedValue.isArray()) { return compareArrays(expectedValue, actualValue); - } else if (expectedValue.isObject() && actualValue.isObject()) { + } else if (expectedValue.isObject()) { compareObjects(expectedValue, actualValue); return true; } else { + LOGGER.warn("Default comparison method!"); return compareString(expectedValue, actualValue); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java index b3dc1b58b6e5..8c7be65f6fad 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryTestDataComparator.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.destination.bigquery; +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; import java.time.LocalDate; @@ -100,4 +102,9 @@ private ZonedDateTime getBrokenDate() { return ZonedDateTime.of(1583, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); } + @Override + protected void compareObjects(JsonNode expectedObject, JsonNode actualObject) { + JsonNode actualJsonNode = (actualObject.isTextual() ? Jsons.deserialize(actualObject.textValue()) : actualObject); + super.compareObjects(expectedObject, actualJsonNode); + } } From a65c06e2ea4183be340f05a8b2dd51a1170ec5bb Mon Sep 17 00:00:00 2001 From: "andrii.leonets" Date: Wed, 8 Jun 2022 20:03:49 +0300 Subject: [PATCH 10/10] Fix source test --- .../source/bigquery/BigQuerySourceDatatypeTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceDatatypeTest.java b/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceDatatypeTest.java index 05bc5b55ccff..fb7fad3e5231 100644 --- a/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceDatatypeTest.java @@ -284,7 +284,7 @@ protected void initTests() { .airbyteType(JsonSchemaType.STRING) .createTablePatternSql(CREATE_SQL_PATTERN) .addInsertValues("['a', 'b']") - .addExpectedValues("[{\"test_column\":\"a\"},{\"test_column\":\"b\"}]") + .addExpectedValues("[\"a\",\"b\"]") .build()); addDataTypeTestData(