From 6558a5f7c2352bc11d83a53380de4898de07a1b8 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Mon, 8 Nov 2021 17:42:57 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Minor=20fixes=20to=20incremental?= =?UTF-8?q?=20normalization=20and=20nesting=20(#7669)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bases/base-normalization/.gitignore | 40 +++++++--- .../bases/base-normalization/Dockerfile | 2 +- .../macros/incremental.sql | 4 +- ...d_stream_with_c___long_names_partition.sql | 20 +++++ ...d_stream_with_c___names_partition_data.sql | 18 +++++ .../dedup_exchange_rate_scd.sql | 6 +- .../data_input/catalog.json | 20 ++--- .../data_input/messages.txt | 4 +- .../data_input/messages_incremental.txt | 5 +- .../nested_streams_second_run_row_counts.sql | 3 - .../dbt_schema_tests/schema_test.yml | 14 ++-- .../schema_test.yml | 14 ++-- .../data_input/catalog.json | 3 + .../data_input/catalog_schema_change.json | 3 + .../data_input/messages.txt | 60 ++++++++------- .../data_input/messages_incremental.txt | 22 +++--- .../data_input/messages_schema_change.txt | 16 ++-- .../simple_streams_first_run_row_counts.sql | 10 +-- .../simple_streams_second_run_row_counts.sql | 14 ++-- .../simple_streams_third_run_row_counts.sql | 8 +- .../dbt_schema_tests/schema_test.yml | 18 +---- .../schema_test.yml | 18 +---- .../schema_test.yml | 19 +---- .../integration_tests/test_normalization.py | 12 +-- .../transform_catalog/stream_processor.py | 77 ++++++++++++++----- .../DestinationAcceptanceTest.java | 4 +- .../NormalizationRunnerFactory.java | 2 +- .../basic-normalization.md | 9 ++- 28 files changed, 252 insertions(+), 193 deletions(-) create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___long_names_partition.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___names_partition_data.sql diff --git a/airbyte-integrations/bases/base-normalization/.gitignore b/airbyte-integrations/bases/base-normalization/.gitignore index 59647e6336ee..707446495c01 100644 --- a/airbyte-integrations/bases/base-normalization/.gitignore +++ b/airbyte-integrations/bases/base-normalization/.gitignore @@ -6,12 +6,34 @@ dbt_modules/ secrets/ dist/ -integration_tests/normalization_test_output/*/*/*.log -integration_tests/normalization_test_output/*/*/*.yml -integration_tests/normalization_test_output/*/*/*.json -integration_tests/normalization_test_output/*/*/*.md -integration_tests/normalization_test_output/*/*/macros/ -integration_tests/normalization_test_output/*/*/tests/ -integration_tests/normalization_test_output/*/*/models/dbt_data_tests/ -integration_tests/normalization_test_output/*/*/models/dbt_schema_tests/ -integration_tests/normalization_test_output/*/*/modified_models/ +integration_tests/normalization_test_output/*/*/macros +integration_tests/normalization_test_output/*/*/tests +integration_tests/normalization_test_output/**/*.json +integration_tests/normalization_test_output/**/*.log +integration_tests/normalization_test_output/**/*.md +integration_tests/normalization_test_output/**/*.sql +integration_tests/normalization_test_output/**/*.yml +!integration_tests/normalization_test_output/**/*dbt_project.yml +!integration_tests/normalization_test_output/**/generated/sources.yml + +# We keep a minimal/restricted subset of sql files for all destinations to avoid noise in diff +# Simple Streams +!integration_tests/normalization_test_output/**/dedup_exchange_rate*.sql +!integration_tests/normalization_test_output/**/exchange_rate.sql +# Nested Streams +# Parent table +!integration_tests/normalization_test_output/**/nested_stream_with*_names_ab*.sql +!integration_tests/normalization_test_output/**/nested_stream_with*_names_scd.sql +!integration_tests/normalization_test_output/**/nested_stream_with*_names.sql +# Nested table +!integration_tests/normalization_test_output/**/nested_stream_with_*_partition_ab1.sql +!integration_tests/normalization_test_output/**/nested_stream_with_*_data_ab1.sql +!integration_tests/normalization_test_output/**/nested_stream_with*_partition_scd.sql +!integration_tests/normalization_test_output/**/nested_stream_with*_data_scd.sql +!integration_tests/normalization_test_output/**/nested_stream_with*_partition.sql +!integration_tests/normalization_test_output/**/nested_stream_with*_data.sql + +# but we keep all sql files for Postgres +!integration_tests/normalization_test_output/postgres/**/*.sql +integration_tests/normalization_test_output/postgres/**/dbt_data_tests +integration_tests/normalization_test_output/postgres/**/dbt_schema_tests diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index f00326856c65..d8edc9d9b97a 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -27,5 +27,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.1.58 +LABEL io.airbyte.version=0.1.59 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/incremental.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/incremental.sql index af02a97f605e..afd6d74f2021 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/incremental.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/incremental.sql @@ -10,14 +10,14 @@ {%- macro default__incremental_clause(col_emitted_at) -%} {% if is_incremental() %} -and {{ col_emitted_at }} >= (select max({{ col_emitted_at }}) from {{ this }}) +and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}) {% endif %} {%- endmacro -%} {# -- see https://on-systems.tech/113-beware-dbt-incremental-updates-against-snowflake-external-tables/ #} {%- macro snowflake__incremental_clause(col_emitted_at) -%} {% if is_incremental() %} -and {{ col_emitted_at }} >= cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }}) +and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }}) {% endif %} {%- endmacro -%} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___long_names_partition.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___long_names_partition.sql new file mode 100644 index 000000000000..30b9372e1ebf --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___long_names_partition.sql @@ -0,0 +1,20 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}], + unique_key = '_airbyte_ab_id', + schema = "test_normalization", + tags = [ "nested" ] +) }} +-- Final base SQL model +select + _airbyte_nested_stre__nto_long_names_hashid, + double_array_data, + {{ adapter.quote('DATA') }}, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at, + _airbyte_partition_hashid +from {{ ref('nested_stream_with_c___long_names_partition_ab3') }} +-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition from {{ ref('nested_stream_with_c__lting_into_long_names_scd') }} +where 1 = 1 +{{ incremental_clause('_airbyte_emitted_at') }} + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___names_partition_data.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___names_partition_data.sql new file mode 100644 index 000000000000..d455163ffb3a --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___names_partition_data.sql @@ -0,0 +1,18 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}], + schema = "test_normalization", + tags = [ "nested" ] +) }} +-- Final base SQL model +select + _airbyte_partition_hashid, + currency, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at, + _airbyte_data_hashid +from {{ ref('nested_stream_with_c___names_partition_data_ab3') }} +-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA from {{ ref('nested_stream_with_c___long_names_partition') }} +where 1 = 1 +{{ incremental_clause('_airbyte_emitted_at') }} + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index 88f3125c0583..2af61c0cc3b8 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -1,7 +1,7 @@ - create table "postgres".test_normalization."dedup_exchange_rate_scd" + create table "postgres"."test_normalization"."dedup_exchange_rate_scd__dbt_tmp" as ( with @@ -42,7 +42,7 @@ scd_data as ( "date" desc, _airbyte_emitted_at desc ) as _airbyte_end_at, - case when lag("date") over ( + case when row_number() over ( partition by "id", currency, cast(nzd as varchar ) @@ -50,7 +50,7 @@ scd_data as ( "date" is null asc, "date" desc, _airbyte_emitted_at desc - ) is null then 1 else 0 end as _airbyte_active_row, + ) = 1 then 1 else 0 end as _airbyte_active_row, _airbyte_ab_id, _airbyte_emitted_at, _airbyte_dedup_exchange_rate_hashid diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/catalog.json b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/catalog.json index 88e890a3028f..d0b5e9f8ce83 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/catalog.json +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/catalog.json @@ -37,16 +37,6 @@ } } } - }, - "column`_'with\"_quotes": { - "type": ["null", "array"], - "items": { - "properties": { - "currency": { - "type": ["null", "string"] - } - } - } } } } @@ -234,6 +224,16 @@ "properties": { "owner_id": { "type": ["null", "integer"] + }, + "column`_'with\"_quotes": { + "type": ["null", "array"], + "items": { + "properties": { + "currency": { + "type": ["null", "string"] + } + } + } } } } diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages.txt index 758b2c1159ca..15bd2d455767 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages.txt @@ -13,6 +13,6 @@ {"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":1,"conflict_stream_scalar": 2},"emitted_at":1623861660}} {"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":2,"conflict_stream_scalar": 2},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1}},{"ab_id": 2, "owner": {"owner_id": 2}}]},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3}},{"ab_id": 4, "owner": {"owner_id": 4}}]},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 2, "owner": {"owner_id": 2, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 4, "owner": {"owner_id": 4, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages_incremental.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages_incremental.txt index acbcc644ea49..7c356009ae72 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages_incremental.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages_incremental.txt @@ -14,6 +14,5 @@ {"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":1,"conflict_stream_scalar": 2},"emitted_at":1623861660}} {"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":2,"conflict_stream_scalar": 2},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1}},{"ab_id": 2, "owner": {"owner_id": 2}}]},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3}},{"ab_id": 4, "owner": {"owner_id": 4}}]},"emitted_at":1623861660}} - +{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 2, "owner": {"owner_id": 2, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 4, "owner": {"owner_id": 4, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_data_tests_tmp_incremental/nested_streams_second_run_row_counts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_data_tests_tmp_incremental/nested_streams_second_run_row_counts.sql index 1d9623232229..8f73aa30ccff 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_data_tests_tmp_incremental/nested_streams_second_run_row_counts.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_data_tests_tmp_incremental/nested_streams_second_run_row_counts.sql @@ -4,9 +4,6 @@ with table_row_counts as ( union all select distinct 'nested_stream_with_complex_columns_resulting_into_long_names' as label, count(*) as row_count, 3 as expected_count from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names') }} -union all - select distinct 'nested_stream_with_complex_columns_resulting_into_long_names_partition' as label, count(*) as row_count, 3 as expected_count - from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partition') }} union all select 'nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA' as label, count(distinct currency) as row_count, 1 as expected_count from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA') }} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests/schema_test.yml b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests/schema_test.yml index 315b65ac1633..2695a9e408bc 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests/schema_test.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests/schema_test.yml @@ -7,15 +7,17 @@ models: expression: "double_array_data is not null" - dbt_utils.expression_is_true: expression: "DATA is not null" - - dbt_utils.expression_is_true: - expression: "\"column`_'with\"\"_quotes\" is not null" - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA columns: - name: currency tests: - not_null - - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data - columns: - - name: id - tests: +# - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data +# columns: +# - name: id +# tests: # - not_null # TODO Fix bug here + - name: unnest_alias_children_owner + tests: + - dbt_utils.expression_is_true: + expression: "\"column`_'with\"\"_quotes\" is not null" diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml index 315b65ac1633..2695a9e408bc 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml @@ -7,15 +7,17 @@ models: expression: "double_array_data is not null" - dbt_utils.expression_is_true: expression: "DATA is not null" - - dbt_utils.expression_is_true: - expression: "\"column`_'with\"\"_quotes\" is not null" - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA columns: - name: currency tests: - not_null - - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data - columns: - - name: id - tests: +# - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data +# columns: +# - name: id +# tests: # - not_null # TODO Fix bug here + - name: unnest_alias_children_owner + tests: + - dbt_utils.expression_is_true: + expression: "\"column`_'with\"\"_quotes\" is not null" diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json index f6832f4315be..c8efc48f5b3e 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json @@ -31,6 +31,9 @@ }, "USD": { "type": "number" + }, + "column`_'with\"_quotes": { + "type": "string" } } }, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog_schema_change.json b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog_schema_change.json index 4d5cd0e00c04..ac8cea023214 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog_schema_change.json +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog_schema_change.json @@ -31,6 +31,9 @@ }, "USD": { "type": "number" + }, + "column`_'with\"_quotes": { + "type": "string" } } }, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt index 619f3eb35c51..fa3af2c3f254 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt @@ -1,33 +1,37 @@ -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?" }}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "USD": 10.16}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+0000", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "USD": 10.99}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 8, "HKD_special___characters": "column name collision?", "USD": 10.16}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990700, "data": { "id": 1, "currency": "USD", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 10.5, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637991000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 7.02, "HKD_special___characters": "column name collision?"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a" }}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10.16}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+0000", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10.99}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 8, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10.16}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990700, "data": { "id": 1, "currency": "USD", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 10.5, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637991000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 7.02, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637991100, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 8.12, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637991200, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 9.23, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "USD": 10.16}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+0000", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "USD": 10.99}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 8, "HKD_special___characters": "column name collision?", "USD": 10.16}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990700, "data": { "id": 1, "currency": "USD", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 10.5, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637991000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 7.02, "HKD_special___characters": "column name collision?"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a" }}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10.16}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+0000", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10.99}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 8, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10.16}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990700, "data": { "id": 1, "currency": "USD", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 10.5, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637991000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 7.02, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637991100, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 8.12, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637991200, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 9.23, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":1,"name":"mazda","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849130530,"_ab_cdc_lsn":26971624,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":2,"name":"toyata","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849130549,"_ab_cdc_lsn":26971624,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":4,"name":"bmw","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849314535,"_ab_cdc_lsn":26974776,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":"vw","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849314663,"_ab_cdc_lsn":26975264,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":4,"name":null,"column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849314791,"_ab_cdc_lsn":26975440,"_ab_cdc_deleted_at":1623849314791},"emitted_at":1623860160}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":6,"name":"opel","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623850868109,"_ab_cdc_lsn":27009440,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":7,"name":"lotus","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623850868237,"_ab_cdc_lsn":27010048,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":6,"name":null,"column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623850868371,"_ab_cdc_lsn":27010232,"_ab_cdc_deleted_at":1623850868371},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":1,"name":"mazda","_ab_cdc_updated_at":1623849130530,"_ab_cdc_lsn":26971624,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":2,"name":"toyata","_ab_cdc_updated_at":1623849130549,"_ab_cdc_lsn":26971624,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":4,"name":"bmw","_ab_cdc_updated_at":1623849314535,"_ab_cdc_lsn":26974776,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":"vw","_ab_cdc_updated_at":1623849314663,"_ab_cdc_lsn":26975264,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":4,"name":null,"_ab_cdc_updated_at":1623849314791,"_ab_cdc_lsn":26975440,"_ab_cdc_deleted_at":1623849314791},"emitted_at":1623860160}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":6,"name":"opel","_ab_cdc_updated_at":1623850868109,"_ab_cdc_lsn":27009440,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":7,"name":"lotus","_ab_cdc_updated_at":1623850868237,"_ab_cdc_lsn":27010048,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":6,"name":null,"_ab_cdc_updated_at":1623850868371,"_ab_cdc_lsn":27010232,"_ab_cdc_deleted_at":1623850868371},"emitted_at":1623861660}} {"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":1,"name":"mazda","_ab_cdc_updated_at":1623849130530,"_ab_cdc_lsn":26971624,"_ab_cdc_log_pos": 33274,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} {"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":2,"name":"toyata","_ab_cdc_updated_at":1623849130549,"_ab_cdc_lsn":26971624,"_ab_cdc_log_pos": 33275,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_incremental.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_incremental.txt index 77dbc6f073f1..0f4a6ee16d5e 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_incremental.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_incremental.txt @@ -1,14 +1,16 @@ -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650000000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 14.05, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650010000, "data": { "id": 4, "currency": "HKD", "NZD": 1.19, "HKD@spéçiäl & characters": 0.01, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650011000, "data": { "id": 1, "currency": "USD", "date": "2020-10-14", "timestamp_col": "2020-10-14T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 9.5, "HKD_special___characters": "column name collision?"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650000000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 14.05, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650010000, "data": { "id": 4, "currency": "HKD", "NZD": 1.19, "HKD@spéçiäl & characters": 0.01, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650011000, "data": { "id": 1, "currency": "USD", "date": "2020-10-14", "timestamp_col": "2020-10-14T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 9.5, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650012000, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 6.39, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650000000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 14.05, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650010000, "data": { "id": 4, "currency": "HKD", "NZD": 1.19, "HKD@spéçiäl & characters": 0.01, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650011000, "data": { "id": 1, "currency": "USD", "date": "2020-10-14", "timestamp_col": "2020-10-14T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 9.5, "HKD_special___characters": "column name collision?"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650000000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 14.05, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650010000, "data": { "id": 4, "currency": "HKD", "NZD": 1.19, "HKD@spéçiäl & characters": 0.01, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650011000, "data": { "id": 1, "currency": "USD", "date": "2020-10-14", "timestamp_col": "2020-10-14T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 9.5, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650012000, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 6.39, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":"vw","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849314663,"_ab_cdc_lsn":26975264,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}} {"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":null,"column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623900000000,"_ab_cdc_lsn":28010252,"_ab_cdc_deleted_at":1623900000000},"emitted_at":1623900000000}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_schema_change.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_schema_change.txt index 4491673d040c..ebe17b33d6e7 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_schema_change.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_schema_change.txt @@ -1,12 +1,12 @@ -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661281900, "data": { "id": 3.14, "currency": "EUR", "new_column": 2.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 2.12, "USD": 7}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661291900, "data": { "id": 0.12, "currency": "GBP", "new_column": 3.81, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.14, "HKD@spéçiäl & characters": 3.01, "USD": 11}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661381900, "data": { "id": 4.22, "currency": "EUR", "new_column": 89.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.89, "HKD@spéçiäl & characters": 8.88, "USD": 10}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661481900, "data": { "id": 1, "currency": "HKD", "new_column": 91.11, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 1.19, "HKD@spéçiäl & characters": 99.1, "USD": 10}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661281900, "data": { "id": 3.14, "currency": "EUR", "new_column": 2.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 2.12, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 7}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661291900, "data": { "id": 0.12, "currency": "GBP", "new_column": 3.81, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.14, "HKD@spéçiäl & characters": 3.01, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 11}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661381900, "data": { "id": 4.22, "currency": "EUR", "new_column": 89.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.89, "HKD@spéçiäl & characters": 8.88, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661481900, "data": { "id": 1, "currency": "HKD", "new_column": 91.11, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 1.19, "HKD@spéçiäl & characters": 99.1, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661281900, "data": { "id": 3.14, "currency": "EUR", "new_column": 2.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 2.12, "USD": 7}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661291900, "data": { "id": 0.12, "currency": "GBP", "new_column": 3.81, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.14, "HKD@spéçiäl & characters": 3.01, "USD": 11}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661381900, "data": { "id": 4.22, "currency": "EUR", "new_column": 89.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.89, "HKD@spéçiäl & characters": 8.88, "USD": 10}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661481900, "data": { "id": 1, "currency": "HKD", "new_column": 91.11, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 1.19, "HKD@spéçiäl & characters": 99.1, "USD": 10}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661281900, "data": { "id": 3.14, "currency": "EUR", "new_column": 2.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 2.12, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 7}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661291900, "data": { "id": 0.12, "currency": "GBP", "new_column": 3.81, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.14, "HKD@spéçiäl & characters": 3.01, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 11}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661381900, "data": { "id": 4.22, "currency": "EUR", "new_column": 89.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.89, "HKD@spéçiäl & characters": 8.88, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661481900, "data": { "id": 1, "currency": "HKD", "new_column": 91.11, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 1.19, "HKD@spéçiäl & characters": 99.1, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10}}} {"type":"RECORD","record":{"stream":"renamed_dedup_cdc_excluded","data":{"id":8,"name":"vw","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623949314663,"_ab_cdc_lsn":26985264,"_ab_cdc_deleted_at":null},"emitted_at":1623960160}} {"type":"RECORD","record":{"stream":"renamed_dedup_cdc_excluded","data":{"id":9,"name":"opel","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623950868109,"_ab_cdc_lsn":28009440,"_ab_cdc_deleted_at":null},"emitted_at":1623961660}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp/simple_streams_first_run_row_counts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp/simple_streams_first_run_row_counts.sql index 462558881b27..df4fb4f56c49 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp/simple_streams_first_run_row_counts.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp/simple_streams_first_run_row_counts.sql @@ -1,19 +1,19 @@ with table_row_counts as ( - select distinct '_airbyte_raw_exchange_rate' as label, count(*) as row_count, 10 as expected_count + select distinct '_airbyte_raw_exchange_rate' as label, count(*) as row_count, 12 as expected_count from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} union all - select distinct 'exchange_rate' as label, count(*) as row_count, 10 as expected_count + select distinct 'exchange_rate' as label, count(*) as row_count, 12 as expected_count from {{ ref('exchange_rate') }} union all - select distinct '_airbyte_raw_dedup_exchange_rate' as label, count(*) as row_count, 10 as expected_count + select distinct '_airbyte_raw_dedup_exchange_rate' as label, count(*) as row_count, 12 as expected_count from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} union all - select distinct 'dedup_exchange_rate_scd' as label, count(*) as row_count, 10 as expected_count + select distinct 'dedup_exchange_rate_scd' as label, count(*) as row_count, 12 as expected_count from {{ ref('dedup_exchange_rate_scd') }} union all - select distinct 'dedup_exchange_rate' as label, count(*) as row_count, 5 as expected_count + select distinct 'dedup_exchange_rate' as label, count(*) as row_count, 6 as expected_count from {{ ref('dedup_exchange_rate') }} union all diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_incremental/simple_streams_second_run_row_counts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_incremental/simple_streams_second_run_row_counts.sql index 28963326e82d..ca5cdfa4fc40 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_incremental/simple_streams_second_run_row_counts.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_incremental/simple_streams_second_run_row_counts.sql @@ -1,19 +1,19 @@ with table_row_counts as ( - select distinct '_airbyte_raw_exchange_rate' as label, count(*) as row_count, 5 as expected_count + select distinct '_airbyte_raw_exchange_rate' as label, count(*) as row_count, 6 as expected_count from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} union all - select distinct 'exchange_rate' as label, count(*) as row_count, 13 as expected_count + select distinct 'exchange_rate' as label, count(*) as row_count, 6 as expected_count from {{ ref('exchange_rate') }} union all - select distinct '_airbyte_raw_dedup_exchange_rate' as label, count(*) as row_count, 5 as expected_count + select distinct '_airbyte_raw_dedup_exchange_rate' as label, count(*) as row_count, 6 as expected_count from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} union all - select distinct 'dedup_exchange_rate_scd' as label, count(*) as row_count, 13 as expected_count + select distinct 'dedup_exchange_rate_scd' as label, count(*) as row_count, 16 as expected_count from {{ ref('dedup_exchange_rate_scd') }} union all - select distinct 'dedup_exchange_rate' as label, count(*) as row_count, 6 as expected_count + select distinct 'dedup_exchange_rate' as label, count(*) as row_count, 7 as expected_count from {{ ref('dedup_exchange_rate') }} union all @@ -32,10 +32,10 @@ union all select distinct '_airbyte_raw_pos_dedup_cdcx' as label, count(*) as row_count, 6 as expected_count from {{ source('test_normalization', '_airbyte_raw_pos_dedup_cdcx') }} union all - select distinct 'pos_dedup_cdcx_scd' as label, count(*) as row_count, 6 as expected_count + select distinct 'pos_dedup_cdcx_scd' as label, count(*) as row_count, 8 as expected_count from {{ ref('pos_dedup_cdcx_scd') }} union all - select distinct 'pos_dedup_cdcx' as label, count(*) as row_count, 2 as expected_count + select distinct 'pos_dedup_cdcx' as label, count(*) as row_count, 3 as expected_count from {{ ref('pos_dedup_cdcx') }} ) select * diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_schema_change/simple_streams_third_run_row_counts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_schema_change/simple_streams_third_run_row_counts.sql index 186eedf26dc7..cb886df680e9 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_schema_change/simple_streams_third_run_row_counts.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_schema_change/simple_streams_third_run_row_counts.sql @@ -2,18 +2,18 @@ with table_row_counts as ( select distinct '_airbyte_raw_exchange_rate' as label, count(*) as row_count, 4 as expected_count from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} union all - select distinct 'exchange_rate' as label, count(*) as row_count, 17 as expected_count + select distinct 'exchange_rate' as label, count(*) as row_count, 4 as expected_count from {{ ref('exchange_rate') }} union all - select distinct '_airbyte_raw_dedup_exchange_rate' as label, count(*) as row_count, 9 as expected_count + select distinct '_airbyte_raw_dedup_exchange_rate' as label, count(*) as row_count, 10 as expected_count from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} union all - select distinct 'dedup_exchange_rate_scd' as label, count(*) as row_count, 17 as expected_count + select distinct 'dedup_exchange_rate_scd' as label, count(*) as row_count, 20 as expected_count from {{ ref('dedup_exchange_rate_scd') }} union all - select distinct 'dedup_exchange_rate' as label, count(*) as row_count, 10 as expected_count + select distinct 'dedup_exchange_rate' as label, count(*) as row_count, 11 as expected_count from {{ ref('dedup_exchange_rate') }} union all diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests/schema_test.yml b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests/schema_test.yml index d0192cefe26d..b6b844cfd5f0 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests/schema_test.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests/schema_test.yml @@ -7,22 +7,8 @@ models: # description: check no column collisions # Two columns having similar names especially after removing special characters should remain distincts expression: cast("HKD@spéçiäl & characters" as {{ dbt_utils.type_string() }}) != HKD_special___characters - - dbt_utils.equality: - # description: check_streams_are_equal - # In this integration test, we are sending the same records to both streams - # exchange_rate and dedup_exchange_rate. - # The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror - # the final table with append or overwrite mode from exchange_rate. - compare_model: ref('dedup_exchange_rate_scd') - compare_columns: - - id - - currency - - date - - timestamp_col - - '"HKD@spéçiäl & characters"' - - HKD_special___characters - - NZD - - USD + - dbt_utils.expression_is_true: + expression: "\"column`_'with\"\"_quotes\" is not null" columns: - name: '"HKD@spéçiäl & characters"' # description: check special charactesrs diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml index d0192cefe26d..b6b844cfd5f0 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml @@ -7,22 +7,8 @@ models: # description: check no column collisions # Two columns having similar names especially after removing special characters should remain distincts expression: cast("HKD@spéçiäl & characters" as {{ dbt_utils.type_string() }}) != HKD_special___characters - - dbt_utils.equality: - # description: check_streams_are_equal - # In this integration test, we are sending the same records to both streams - # exchange_rate and dedup_exchange_rate. - # The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror - # the final table with append or overwrite mode from exchange_rate. - compare_model: ref('dedup_exchange_rate_scd') - compare_columns: - - id - - currency - - date - - timestamp_col - - '"HKD@spéçiäl & characters"' - - HKD_special___characters - - NZD - - USD + - dbt_utils.expression_is_true: + expression: "\"column`_'with\"\"_quotes\" is not null" columns: - name: '"HKD@spéçiäl & characters"' # description: check special charactesrs diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_schema_change/schema_test.yml b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_schema_change/schema_test.yml index b8367ee02db8..36982388a1ff 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_schema_change/schema_test.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_schema_change/schema_test.yml @@ -2,28 +2,15 @@ version: 2 models: - name: exchange_rate + tests: + - dbt_utils.expression_is_true: + expression: "\"column`_'with\"\"_quotes\" is not null" columns: - name: '"HKD@spéçiäl & characters"' # description: check special charactesrs # Use special characters in column names and make sure they are correctly parsed in the JSON blob and populated tests: - not_null - tests: - - dbt_utils.equality: - # description: check_streams_are_equal - # In this integration test, we are sending the same records to both streams - # exchange_rate and dedup_exchange_rate. - # The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror - # the final table with append or overwrite mode from exchange_rate. - compare_model: ref('dedup_exchange_rate_scd') - compare_columns: - - id - - currency - - date - - timestamp_col - - '"HKD@spéçiäl & characters"' - - NZD - - USD - name: dedup_exchange_rate tests: diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py index 04b59dae2ff1..884e6bb11942 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py @@ -182,17 +182,7 @@ def setup_test_dir(destination_type: DestinationType, test_resource_name: str) - elif destination_type.value == DestinationType.ORACLE.value: copy_tree("../dbt-project-template-oracle", test_root_dir) dbt_project_yaml = "../dbt-project-template-oracle/dbt_project.yml" - if destination_type.value not in (DestinationType.REDSHIFT.value, DestinationType.ORACLE.value): - # Prefer 'view' to 'ephemeral' for tests so it's easier to debug with dbt - dbt_test_utils.copy_replace( - dbt_project_yaml, - os.path.join(test_root_dir, "dbt_project.yml"), - pattern="ephemeral", - replace_value="view", - ) - else: - # keep ephemeral tables - dbt_test_utils.copy_replace(dbt_project_yaml, os.path.join(test_root_dir, "dbt_project.yml")) + dbt_test_utils.copy_replace(dbt_project_yaml, os.path.join(test_root_dir, "dbt_project.yml")) return test_root_dir diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index 9af79c4a4e83..a6f1ead46e94 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -138,15 +138,19 @@ def create_from_parent( The child stream processor will create a separate table to contain the unnested data. """ + if parent.destination_sync_mode.value == DestinationSyncMode.append_dedup.value: + # nested streams can't be deduped like their parents (as they may not share the same cursor/primary keys) + parent_sync_mode = DestinationSyncMode.append + else: + parent_sync_mode = parent.destination_sync_mode result = StreamProcessor.create( stream_name=child_name, destination_type=parent.destination_type, raw_schema=parent.raw_schema, default_schema=parent.default_schema, schema=parent.schema, - # Nested Streams don't inherit parents sync modes? - source_sync_mode=SyncMode.full_refresh, - destination_sync_mode=DestinationSyncMode.append, + source_sync_mode=parent.source_sync_mode, + destination_sync_mode=parent_sync_mode, cursor_field=[], primary_key=[], json_column_name=json_column_name, @@ -237,42 +241,58 @@ def process(self) -> List["StreamProcessor"]: from_table = self.add_to_outputs( self.generate_json_parsing_model(from_table, column_names), self.get_model_materialization_mode(is_intermediate=True), + is_intermediate=True, suffix="ab1", ) from_table = self.add_to_outputs( self.generate_column_typing_model(from_table, column_names), self.get_model_materialization_mode(is_intermediate=True, column_count=column_count), + is_intermediate=True, suffix="ab2", ) if self.destination_sync_mode != DestinationSyncMode.append_dedup: from_table = self.add_to_outputs( self.generate_id_hashing_model(from_table, column_names), self.get_model_materialization_mode(is_intermediate=True, column_count=column_count), + is_intermediate=True, suffix="ab3", ) from_table = self.add_to_outputs( self.generate_final_model(from_table, column_names), self.get_model_materialization_mode(is_intermediate=False, column_count=column_count), + is_intermediate=False, ) else: + if self.is_incremental_mode(self.destination_sync_mode): + # Force different materialization here because incremental scd models rely on star* macros that requires it + if DestinationType.POSTGRES.value == self.destination_type.value: + # because of https://github.com/dbt-labs/docs.getdbt.com/issues/335, we avoid VIEW for postgres + forced_materialization_type = TableMaterializationType.INCREMENTAL + else: + forced_materialization_type = TableMaterializationType.VIEW + else: + forced_materialization_type = TableMaterializationType.CTE from_table = self.add_to_outputs( self.generate_id_hashing_model(from_table, column_names), - # Force View materialization here because scd models rely on star* macros that requires it - TableMaterializationType.VIEW, + forced_materialization_type, + is_intermediate=True, suffix="ab3", ) from_table = self.add_to_outputs( self.generate_scd_type_2_model(from_table, column_names), self.get_model_materialization_mode(is_intermediate=False, column_count=column_count), + is_intermediate=False, suffix="scd", subdir="scd", unique_key=self.name_transformer.normalize_column_name("_airbyte_unique_key_scd"), partition_by=PartitionScheme.ACTIVE_ROW, ) where_clause = f"\nand {self.name_transformer.normalize_column_name('_airbyte_active_row')} = 1" - from_table = self.add_to_outputs( + # from_table should not use the de-duplicated final table or tables downstream (nested streams) will miss non active rows + self.add_to_outputs( self.generate_final_model(from_table, column_names, self.get_unique_key()) + where_clause, self.get_model_materialization_mode(is_intermediate=False, column_count=column_count), + is_intermediate=False, unique_key=self.get_unique_key(), partition_by=PartitionScheme.UNIQUE_KEY, ) @@ -680,13 +700,13 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup {{ cursor_field }} desc, {{ col_emitted_at }} desc{{ cdc_updated_at_order }} ) as {{ airbyte_end_at }}, - case when lag({{ cursor_field }}) over ( + case when row_number() over ( partition by {{ primary_key_partition | join(", ") }} order by {{ cursor_field }} {{ order_null }}, {{ cursor_field }} desc, {{ col_emitted_at }} desc{{ cdc_updated_at_order }} - ) is null {{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, + ) = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, {{ col_ab_id }}, {{ col_emitted_at }}, {{ hash_id }} @@ -745,7 +765,7 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup col_cdc_updated_at = self.name_transformer.normalize_column_name("_ab_cdc_updated_at") quoted_col_cdc_deleted_at = self.name_transformer.normalize_column_name("_ab_cdc_deleted_at", in_jinja=True) quoted_col_cdc_updated_at = self.name_transformer.normalize_column_name("_ab_cdc_updated_at", in_jinja=True) - cdc_active_row_pattern = f"and {col_cdc_deleted_at} is null " + cdc_active_row_pattern = f" and {col_cdc_deleted_at} is null" cdc_updated_order_pattern = f", {col_cdc_updated_at} desc" cdc_cols = ( f", cast({col_cdc_deleted_at} as " @@ -879,6 +899,10 @@ def generate_final_model(self, from_table: str, column_names: Dict[str, Tuple[st ) return sql + @staticmethod + def is_incremental_mode(destination_sync_mode: DestinationSyncMode) -> bool: + return destination_sync_mode.value in [DestinationSyncMode.append.value, DestinationSyncMode.append_dedup.value] + def add_incremental_clause(self, sql_query: str) -> str: template = Template( """ @@ -900,12 +924,12 @@ def add_to_outputs( self, sql: str, materialization_mode: TableMaterializationType, + is_intermediate: bool = True, suffix: str = "", unique_key: str = "", subdir: str = "", partition_by: PartitionScheme = PartitionScheme.DEFAULT, ) -> str: - is_intermediate = materialization_mode in [TableMaterializationType.CTE, TableMaterializationType.VIEW] schema = self.get_schema(is_intermediate) # MySQL table names need to be manually truncated, because it does not do it automatically truncate_name = self.destination_type == DestinationType.MYSQL @@ -922,9 +946,17 @@ def add_to_outputs( config["schema"] = f'"{self.default_schema}"' else: config["schema"] = f'"{schema}"' - if self.source_sync_mode == SyncMode.incremental and suffix != "scd": - # incremental is handled in the SCD SQL already - sql = self.add_incremental_clause(sql) + if self.is_incremental_mode(self.destination_sync_mode): + if suffix == "scd": + if self.destination_type == DestinationType.POSTGRES: + # because of https://github.com/dbt-labs/docs.getdbt.com/issues/335, we avoid VIEW for postgres + # so we need to clean up temporary table materialization... + ab3_schema = self.get_schema(True) + ab3_table = self.tables_registry.get_file_name(schema, self.json_path, self.stream_name, "ab3", truncate_name) + config["post_hook"] = f"['drop table if exists {ab3_schema}.{ab3_table}']" + else: + # incremental is handled in the SCD SQL already + sql = self.add_incremental_clause(sql) template = Template( """ {{ '{{' }} config( @@ -950,7 +982,7 @@ def get_model_materialization_mode(self, is_intermediate: bool, column_count: in # if ephemeral is used with large number of columns, use views instead return TableMaterializationType.VIEW else: - if self.source_sync_mode == SyncMode.incremental: + if self.is_incremental_mode(self.destination_sync_mode): return TableMaterializationType.INCREMENTAL else: return TableMaterializationType.TABLE @@ -968,8 +1000,10 @@ def get_model_partition_config(self, partition_by: PartitionScheme, unique_key: config = {} if self.destination_type == DestinationType.BIGQUERY: # see https://docs.getdbt.com/reference/resource-configs/bigquery-configs - if partition_by in [PartitionScheme.UNIQUE_KEY, PartitionScheme.ACTIVE_ROW]: + if partition_by == PartitionScheme.UNIQUE_KEY: config["cluster_by"] = '["_airbyte_unique_key","_airbyte_emitted_at"]' + elif partition_by == PartitionScheme.ACTIVE_ROW: + config["cluster_by"] = '["_airbyte_unique_key_scd","_airbyte_emitted_at"]' else: config["cluster_by"] = '"_airbyte_emitted_at"' if partition_by == PartitionScheme.ACTIVE_ROW: @@ -983,15 +1017,15 @@ def get_model_partition_config(self, partition_by: PartitionScheme, unique_key: elif self.destination_type == DestinationType.POSTGRES: # see https://docs.getdbt.com/reference/resource-configs/postgres-configs if partition_by == PartitionScheme.ACTIVE_ROW: - config["indexes"] = "[{'columns':['_airbyte_active_row','_airbyte_unique_key','_airbyte_emitted_at'],'type': 'btree'}]" + config["indexes"] = "[{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}]" elif partition_by == PartitionScheme.UNIQUE_KEY: - config["indexes"] = "[{'columns':['_airbyte_unique_key','_airbyte_emitted_at'],'type': 'btree'}]" + config["indexes"] = "[{'columns':['_airbyte_unique_key'],'unique':True}]" else: config["indexes"] = "[{'columns':['_airbyte_emitted_at'],'type':'hash'}]" elif self.destination_type == DestinationType.REDSHIFT: # see https://docs.getdbt.com/reference/resource-configs/redshift-configs if partition_by == PartitionScheme.ACTIVE_ROW: - config["sort"] = '["_airbyte_active_row", "_airbyte_unique_key", "_airbyte_emitted_at"]' + config["sort"] = '["_airbyte_active_row", "_airbyte_unique_key_scd", "_airbyte_emitted_at"]' elif partition_by == PartitionScheme.UNIQUE_KEY: config["sort"] = '["_airbyte_unique_key", "_airbyte_emitted_at"]' elif partition_by == PartitionScheme.NOTHING: @@ -1001,7 +1035,7 @@ def get_model_partition_config(self, partition_by: PartitionScheme, unique_key: elif self.destination_type == DestinationType.SNOWFLAKE: # see https://docs.getdbt.com/reference/resource-configs/snowflake-configs if partition_by == PartitionScheme.ACTIVE_ROW: - config["cluster_by"] = '["_AIRBYTE_ACTIVE_ROW", "_AIRBYTE_UNIQUE_KEY", "_AIRBYTE_EMITTED_AT"]' + config["cluster_by"] = '["_AIRBYTE_ACTIVE_ROW", "_AIRBYTE_UNIQUE_KEY_SCD", "_AIRBYTE_EMITTED_AT"]' elif partition_by == PartitionScheme.UNIQUE_KEY: config["cluster_by"] = '["_AIRBYTE_UNIQUE_KEY", "_AIRBYTE_EMITTED_AT"]' elif partition_by == PartitionScheme.NOTHING: @@ -1010,8 +1044,9 @@ def get_model_partition_config(self, partition_by: PartitionScheme, unique_key: config["cluster_by"] = '["_AIRBYTE_EMITTED_AT"]' if unique_key: config["unique_key"] = f'"{unique_key}"' - else: - config["unique_key"] = f"env_var('AIRBYTE_DEFAULT_UNIQUE_KEY', {self.get_ab_id(in_jinja=True)})" + elif not self.is_nested_array: + # in nested arrays, each element is sharing the same _airbyte_ab_id, so it's not unique + config["unique_key"] = self.get_ab_id(in_jinja=True) return config def get_model_tags(self, is_intermediate: bool) -> str: diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java index 8c12d911d770..4c73c2c5f293 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java @@ -1121,7 +1121,9 @@ private void pruneMutate(final JsonNode json) { "EMITTED_AT", "AB_ID", "NORMALIZED_AT", - "HASHID"); + "HASHID", + "unique_key", + "UNIQUE_KEY"); if (airbyteInternalFields.stream().anyMatch(internalField -> key.toLowerCase().contains(internalField.toLowerCase())) || json.get(key).isNull()) { ((ObjectNode) json).remove(key); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java index ba5d090605e3..87baa29daa97 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java @@ -13,7 +13,7 @@ public class NormalizationRunnerFactory { public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization"; - public static final String NORMALIZATION_VERSION = "0.1.58"; + public static final String NORMALIZATION_VERSION = "0.1.59"; static final Map> NORMALIZATION_MAPPING = ImmutableMap.>builder() diff --git a/docs/understanding-airbyte/basic-normalization.md b/docs/understanding-airbyte/basic-normalization.md index 1e3c02452a0a..ab67ec78ab12 100644 --- a/docs/understanding-airbyte/basic-normalization.md +++ b/docs/understanding-airbyte/basic-normalization.md @@ -348,13 +348,14 @@ Therefore, in order to "upgrade" to the desired normalization version, you need | Airbyte Version | Normalization Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | :--- | +| 0.30.32-alpha | 0.1.59 | 2021-11-08 | [\#7669](https://github.com/airbytehq/airbyte/pull/7169) | Fix nested incremental dbt | | 0.30.24-alpha | 0.1.57 | 2021-10-26 | [\#7162](https://github.com/airbytehq/airbyte/pull/7162) | Implement incremental dbt updates | | 0.30.16-alpha | 0.1.52 | 2021-10-07 | [\#6379](https://github.com/airbytehq/airbyte/pull/6379) | Handle empty string for date and date-time format | -| 0.30.16-alpha | 0.1.51 | 2021-10-08 | [\#6799](https://github.com/airbytehq/airbyte/pull/6799) | Added support for ad\_cdc\_log\_pos while normalization | -| 0.30.16-alpha | 0.1.50 | 2021-10-07 | [\#6079](https://github.com/airbytehq/airbyte/pull/6079) | Added support for MS SQL Server normalization | -| 0.30.16-alpha | 0.1.49 | 2021-10-06 | [\#6709](https://github.com/airbytehq/airbyte/pull/6709) | Forward destination dataset location to dbt profiles | +| | 0.1.51 | 2021-10-08 | [\#6799](https://github.com/airbytehq/airbyte/pull/6799) | Added support for ad\_cdc\_log\_pos while normalization | +| | 0.1.50 | 2021-10-07 | [\#6079](https://github.com/airbytehq/airbyte/pull/6079) | Added support for MS SQL Server normalization | +| | 0.1.49 | 2021-10-06 | [\#6709](https://github.com/airbytehq/airbyte/pull/6709) | Forward destination dataset location to dbt profiles | | 0.29.17-alpha | 0.1.47 | 2021-09-20 | [\#6317](https://github.com/airbytehq/airbyte/pull/6317) | MySQL: updated MySQL normalization with using SSH tunnel | -| 0.29.17-alpha | 0.1.45 | 2021-09-18 | [\#6052](https://github.com/airbytehq/airbyte/pull/6052) | Snowflake: accept any date-time format | +| | 0.1.45 | 2021-09-18 | [\#6052](https://github.com/airbytehq/airbyte/pull/6052) | Snowflake: accept any date-time format | | 0.29.8-alpha | 0.1.40 | 2021-08-18 | [\#5433](https://github.com/airbytehq/airbyte/pull/5433) | Allow optional credentials\_json for BigQuery | | 0.29.5-alpha | 0.1.39 | 2021-08-11 | [\#4557](https://github.com/airbytehq/airbyte/pull/4557) | Handle date times and solve conflict name btw stream/field | | 0.28.2-alpha | 0.1.38 | 2021-07-28 | [\#5027](https://github.com/airbytehq/airbyte/pull/5027) | Handle quotes in column names when parsing JSON blob |