From 53539941e51d9b7e85974d6cc863db214058bb29 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Thu, 4 Nov 2021 19:22:39 +0100 Subject: [PATCH 01/10] Fix incremental and add safety cast --- .../bases/base-normalization/.gitignore | 40 ++++++++++--- .../macros/incremental.sql | 4 +- .../data_input/catalog.json | 20 +++---- .../data_input/messages.txt | 4 +- .../data_input/messages_incremental.txt | 5 +- .../nested_streams_second_run_row_counts.sql | 3 - .../dbt_schema_tests/schema_test.yml | 14 +++-- .../schema_test.yml | 14 +++-- .../data_input/catalog.json | 3 + .../data_input/catalog_schema_change.json | 3 + .../data_input/messages.txt | 56 +++++++++---------- .../data_input/messages_incremental.txt | 20 +++---- .../data_input/messages_schema_change.txt | 16 +++--- .../simple_streams_second_run_row_counts.sql | 6 +- .../simple_streams_third_run_row_counts.sql | 2 +- .../dbt_schema_tests/schema_test.yml | 18 +----- .../schema_test.yml | 18 +----- .../schema_test.yml | 19 +------ .../integration_tests/test_normalization.py | 12 +--- .../transform_catalog/stream_processor.py | 48 +++++++++++----- 20 files changed, 162 insertions(+), 163 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/.gitignore b/airbyte-integrations/bases/base-normalization/.gitignore index 59647e6336ee..707446495c01 100644 --- a/airbyte-integrations/bases/base-normalization/.gitignore +++ b/airbyte-integrations/bases/base-normalization/.gitignore @@ -6,12 +6,34 @@ dbt_modules/ secrets/ dist/ -integration_tests/normalization_test_output/*/*/*.log -integration_tests/normalization_test_output/*/*/*.yml -integration_tests/normalization_test_output/*/*/*.json -integration_tests/normalization_test_output/*/*/*.md -integration_tests/normalization_test_output/*/*/macros/ -integration_tests/normalization_test_output/*/*/tests/ -integration_tests/normalization_test_output/*/*/models/dbt_data_tests/ -integration_tests/normalization_test_output/*/*/models/dbt_schema_tests/ -integration_tests/normalization_test_output/*/*/modified_models/ +integration_tests/normalization_test_output/*/*/macros +integration_tests/normalization_test_output/*/*/tests +integration_tests/normalization_test_output/**/*.json +integration_tests/normalization_test_output/**/*.log +integration_tests/normalization_test_output/**/*.md +integration_tests/normalization_test_output/**/*.sql +integration_tests/normalization_test_output/**/*.yml +!integration_tests/normalization_test_output/**/*dbt_project.yml +!integration_tests/normalization_test_output/**/generated/sources.yml + +# We keep a minimal/restricted subset of sql files for all destinations to avoid noise in diff +# Simple Streams +!integration_tests/normalization_test_output/**/dedup_exchange_rate*.sql +!integration_tests/normalization_test_output/**/exchange_rate.sql +# Nested Streams +# Parent table +!integration_tests/normalization_test_output/**/nested_stream_with*_names_ab*.sql +!integration_tests/normalization_test_output/**/nested_stream_with*_names_scd.sql +!integration_tests/normalization_test_output/**/nested_stream_with*_names.sql +# Nested table +!integration_tests/normalization_test_output/**/nested_stream_with_*_partition_ab1.sql +!integration_tests/normalization_test_output/**/nested_stream_with_*_data_ab1.sql +!integration_tests/normalization_test_output/**/nested_stream_with*_partition_scd.sql +!integration_tests/normalization_test_output/**/nested_stream_with*_data_scd.sql +!integration_tests/normalization_test_output/**/nested_stream_with*_partition.sql +!integration_tests/normalization_test_output/**/nested_stream_with*_data.sql + +# but we keep all sql files for Postgres +!integration_tests/normalization_test_output/postgres/**/*.sql +integration_tests/normalization_test_output/postgres/**/dbt_data_tests +integration_tests/normalization_test_output/postgres/**/dbt_schema_tests diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/incremental.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/incremental.sql index af02a97f605e..afd6d74f2021 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/incremental.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/incremental.sql @@ -10,14 +10,14 @@ {%- macro default__incremental_clause(col_emitted_at) -%} {% if is_incremental() %} -and {{ col_emitted_at }} >= (select max({{ col_emitted_at }}) from {{ this }}) +and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}) {% endif %} {%- endmacro -%} {# -- see https://on-systems.tech/113-beware-dbt-incremental-updates-against-snowflake-external-tables/ #} {%- macro snowflake__incremental_clause(col_emitted_at) -%} {% if is_incremental() %} -and {{ col_emitted_at }} >= cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }}) +and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }}) {% endif %} {%- endmacro -%} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/catalog.json b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/catalog.json index 88e890a3028f..d0b5e9f8ce83 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/catalog.json +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/catalog.json @@ -37,16 +37,6 @@ } } } - }, - "column`_'with\"_quotes": { - "type": ["null", "array"], - "items": { - "properties": { - "currency": { - "type": ["null", "string"] - } - } - } } } } @@ -234,6 +224,16 @@ "properties": { "owner_id": { "type": ["null", "integer"] + }, + "column`_'with\"_quotes": { + "type": ["null", "array"], + "items": { + "properties": { + "currency": { + "type": ["null", "string"] + } + } + } } } } diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages.txt index 758b2c1159ca..15bd2d455767 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages.txt @@ -13,6 +13,6 @@ {"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":1,"conflict_stream_scalar": 2},"emitted_at":1623861660}} {"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":2,"conflict_stream_scalar": 2},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1}},{"ab_id": 2, "owner": {"owner_id": 2}}]},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3}},{"ab_id": 4, "owner": {"owner_id": 4}}]},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 2, "owner": {"owner_id": 2, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 4, "owner": {"owner_id": 4, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages_incremental.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages_incremental.txt index acbcc644ea49..7c356009ae72 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages_incremental.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/messages_incremental.txt @@ -14,6 +14,5 @@ {"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":1,"conflict_stream_scalar": 2},"emitted_at":1623861660}} {"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":2,"conflict_stream_scalar": 2},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1}},{"ab_id": 2, "owner": {"owner_id": 2}}]},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3}},{"ab_id": 4, "owner": {"owner_id": 4}}]},"emitted_at":1623861660}} - +{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 2, "owner": {"owner_id": 2, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 4, "owner": {"owner_id": 4, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_data_tests_tmp_incremental/nested_streams_second_run_row_counts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_data_tests_tmp_incremental/nested_streams_second_run_row_counts.sql index 1d9623232229..8f73aa30ccff 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_data_tests_tmp_incremental/nested_streams_second_run_row_counts.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_data_tests_tmp_incremental/nested_streams_second_run_row_counts.sql @@ -4,9 +4,6 @@ with table_row_counts as ( union all select distinct 'nested_stream_with_complex_columns_resulting_into_long_names' as label, count(*) as row_count, 3 as expected_count from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names') }} -union all - select distinct 'nested_stream_with_complex_columns_resulting_into_long_names_partition' as label, count(*) as row_count, 3 as expected_count - from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partition') }} union all select 'nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA' as label, count(distinct currency) as row_count, 1 as expected_count from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA') }} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests/schema_test.yml b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests/schema_test.yml index 315b65ac1633..2695a9e408bc 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests/schema_test.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests/schema_test.yml @@ -7,15 +7,17 @@ models: expression: "double_array_data is not null" - dbt_utils.expression_is_true: expression: "DATA is not null" - - dbt_utils.expression_is_true: - expression: "\"column`_'with\"\"_quotes\" is not null" - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA columns: - name: currency tests: - not_null - - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data - columns: - - name: id - tests: +# - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data +# columns: +# - name: id +# tests: # - not_null # TODO Fix bug here + - name: unnest_alias_children_owner + tests: + - dbt_utils.expression_is_true: + expression: "\"column`_'with\"\"_quotes\" is not null" diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml index 315b65ac1633..2695a9e408bc 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml @@ -7,15 +7,17 @@ models: expression: "double_array_data is not null" - dbt_utils.expression_is_true: expression: "DATA is not null" - - dbt_utils.expression_is_true: - expression: "\"column`_'with\"\"_quotes\" is not null" - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA columns: - name: currency tests: - not_null - - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data - columns: - - name: id - tests: +# - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data +# columns: +# - name: id +# tests: # - not_null # TODO Fix bug here + - name: unnest_alias_children_owner + tests: + - dbt_utils.expression_is_true: + expression: "\"column`_'with\"\"_quotes\" is not null" diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json index f6832f4315be..c8efc48f5b3e 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json @@ -31,6 +31,9 @@ }, "USD": { "type": "number" + }, + "column`_'with\"_quotes": { + "type": "string" } } }, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog_schema_change.json b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog_schema_change.json index 4d5cd0e00c04..ac8cea023214 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog_schema_change.json +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog_schema_change.json @@ -31,6 +31,9 @@ }, "USD": { "type": "number" + }, + "column`_'with\"_quotes": { + "type": "string" } } }, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt index 619f3eb35c51..23a4ea1ab58e 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt @@ -1,33 +1,33 @@ -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?" }}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "USD": 10.16}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+0000", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "USD": 10.99}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 8, "HKD_special___characters": "column name collision?", "USD": 10.16}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990700, "data": { "id": 1, "currency": "USD", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 10.5, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637991000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 7.02, "HKD_special___characters": "column name collision?"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a" }}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10.16}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+0000", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10.99}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 8, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10.16}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990700, "data": { "id": 1, "currency": "USD", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 10.5, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637991000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 7.02, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "USD": 10.16}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+0000", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "USD": 10.99}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 8, "HKD_special___characters": "column name collision?", "USD": 10.16}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990700, "data": { "id": 1, "currency": "USD", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 10.5, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637991000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 7.02, "HKD_special___characters": "column name collision?"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a" }}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10.16}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+0000", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10.99}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 8, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10.16}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990700, "data": { "id": 1, "currency": "USD", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 10.5, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637991000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 7.02, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":1,"name":"mazda","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849130530,"_ab_cdc_lsn":26971624,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":2,"name":"toyata","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849130549,"_ab_cdc_lsn":26971624,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":4,"name":"bmw","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849314535,"_ab_cdc_lsn":26974776,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":"vw","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849314663,"_ab_cdc_lsn":26975264,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":4,"name":null,"column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849314791,"_ab_cdc_lsn":26975440,"_ab_cdc_deleted_at":1623849314791},"emitted_at":1623860160}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":6,"name":"opel","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623850868109,"_ab_cdc_lsn":27009440,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":7,"name":"lotus","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623850868237,"_ab_cdc_lsn":27010048,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}} -{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":6,"name":null,"column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623850868371,"_ab_cdc_lsn":27010232,"_ab_cdc_deleted_at":1623850868371},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":1,"name":"mazda","_ab_cdc_updated_at":1623849130530,"_ab_cdc_lsn":26971624,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":2,"name":"toyata","_ab_cdc_updated_at":1623849130549,"_ab_cdc_lsn":26971624,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":4,"name":"bmw","_ab_cdc_updated_at":1623849314535,"_ab_cdc_lsn":26974776,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":"vw","_ab_cdc_updated_at":1623849314663,"_ab_cdc_lsn":26975264,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":4,"name":null,"_ab_cdc_updated_at":1623849314791,"_ab_cdc_lsn":26975440,"_ab_cdc_deleted_at":1623849314791},"emitted_at":1623860160}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":6,"name":"opel","_ab_cdc_updated_at":1623850868109,"_ab_cdc_lsn":27009440,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":7,"name":"lotus","_ab_cdc_updated_at":1623850868237,"_ab_cdc_lsn":27010048,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":6,"name":null,"_ab_cdc_updated_at":1623850868371,"_ab_cdc_lsn":27010232,"_ab_cdc_deleted_at":1623850868371},"emitted_at":1623861660}} {"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":1,"name":"mazda","_ab_cdc_updated_at":1623849130530,"_ab_cdc_lsn":26971624,"_ab_cdc_log_pos": 33274,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} {"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":2,"name":"toyata","_ab_cdc_updated_at":1623849130549,"_ab_cdc_lsn":26971624,"_ab_cdc_log_pos": 33275,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_incremental.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_incremental.txt index 77dbc6f073f1..689782e19ecc 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_incremental.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_incremental.txt @@ -1,14 +1,14 @@ -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650000000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 14.05, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650010000, "data": { "id": 4, "currency": "HKD", "NZD": 1.19, "HKD@spéçiäl & characters": 0.01, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650011000, "data": { "id": 1, "currency": "USD", "date": "2020-10-14", "timestamp_col": "2020-10-14T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 9.5, "HKD_special___characters": "column name collision?"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650000000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 14.05, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650010000, "data": { "id": 4, "currency": "HKD", "NZD": 1.19, "HKD@spéçiäl & characters": 0.01, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650011000, "data": { "id": 1, "currency": "USD", "date": "2020-10-14", "timestamp_col": "2020-10-14T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 9.5, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650000000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 14.05, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650010000, "data": { "id": 4, "currency": "HKD", "NZD": 1.19, "HKD@spéçiäl & characters": 0.01, "HKD_special___characters": "column name collision?"}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650011000, "data": { "id": 1, "currency": "USD", "date": "2020-10-14", "timestamp_col": "2020-10-14T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 9.5, "HKD_special___characters": "column name collision?"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650000000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 14.05, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650010000, "data": { "id": 4, "currency": "HKD", "NZD": 1.19, "HKD@spéçiäl & characters": 0.01, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650011000, "data": { "id": 1, "currency": "USD", "date": "2020-10-14", "timestamp_col": "2020-10-14T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 9.5, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":"vw","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849314663,"_ab_cdc_lsn":26975264,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}} {"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":null,"column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623900000000,"_ab_cdc_lsn":28010252,"_ab_cdc_deleted_at":1623900000000},"emitted_at":1623900000000}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_schema_change.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_schema_change.txt index 4491673d040c..ebe17b33d6e7 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_schema_change.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_schema_change.txt @@ -1,12 +1,12 @@ -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661281900, "data": { "id": 3.14, "currency": "EUR", "new_column": 2.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 2.12, "USD": 7}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661291900, "data": { "id": 0.12, "currency": "GBP", "new_column": 3.81, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.14, "HKD@spéçiäl & characters": 3.01, "USD": 11}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661381900, "data": { "id": 4.22, "currency": "EUR", "new_column": 89.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.89, "HKD@spéçiäl & characters": 8.88, "USD": 10}}} -{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661481900, "data": { "id": 1, "currency": "HKD", "new_column": 91.11, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 1.19, "HKD@spéçiäl & characters": 99.1, "USD": 10}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661281900, "data": { "id": 3.14, "currency": "EUR", "new_column": 2.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 2.12, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 7}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661291900, "data": { "id": 0.12, "currency": "GBP", "new_column": 3.81, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.14, "HKD@spéçiäl & characters": 3.01, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 11}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661381900, "data": { "id": 4.22, "currency": "EUR", "new_column": 89.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.89, "HKD@spéçiäl & characters": 8.88, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602661481900, "data": { "id": 1, "currency": "HKD", "new_column": 91.11, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 1.19, "HKD@spéçiäl & characters": 99.1, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661281900, "data": { "id": 3.14, "currency": "EUR", "new_column": 2.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 2.12, "USD": 7}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661291900, "data": { "id": 0.12, "currency": "GBP", "new_column": 3.81, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.14, "HKD@spéçiäl & characters": 3.01, "USD": 11}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661381900, "data": { "id": 4.22, "currency": "EUR", "new_column": 89.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.89, "HKD@spéçiäl & characters": 8.88, "USD": 10}}} -{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661481900, "data": { "id": 1, "currency": "HKD", "new_column": 91.11, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 1.19, "HKD@spéçiäl & characters": 99.1, "USD": 10}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661281900, "data": { "id": 3.14, "currency": "EUR", "new_column": 2.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 2.12, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 7}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661291900, "data": { "id": 0.12, "currency": "GBP", "new_column": 3.81, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.14, "HKD@spéçiäl & characters": 3.01, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 11}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661381900, "data": { "id": 4.22, "currency": "EUR", "new_column": 89.1, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 3.89, "HKD@spéçiäl & characters": 8.88, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602661481900, "data": { "id": 1, "currency": "HKD", "new_column": 91.11, "date": "2020-11-01", "timestamp_col": "2020-11-01T00:00:00Z", "NZD": 1.19, "HKD@spéçiäl & characters": 99.1, "column`_'with\"_quotes":"ma\"z`d'a", "USD": 10}}} {"type":"RECORD","record":{"stream":"renamed_dedup_cdc_excluded","data":{"id":8,"name":"vw","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623949314663,"_ab_cdc_lsn":26985264,"_ab_cdc_deleted_at":null},"emitted_at":1623960160}} {"type":"RECORD","record":{"stream":"renamed_dedup_cdc_excluded","data":{"id":9,"name":"opel","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623950868109,"_ab_cdc_lsn":28009440,"_ab_cdc_deleted_at":null},"emitted_at":1623961660}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_incremental/simple_streams_second_run_row_counts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_incremental/simple_streams_second_run_row_counts.sql index 28963326e82d..00a08016a446 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_incremental/simple_streams_second_run_row_counts.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_incremental/simple_streams_second_run_row_counts.sql @@ -2,7 +2,7 @@ with table_row_counts as ( select distinct '_airbyte_raw_exchange_rate' as label, count(*) as row_count, 5 as expected_count from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} union all - select distinct 'exchange_rate' as label, count(*) as row_count, 13 as expected_count + select distinct 'exchange_rate' as label, count(*) as row_count, 5 as expected_count from {{ ref('exchange_rate') }} union all @@ -32,10 +32,10 @@ union all select distinct '_airbyte_raw_pos_dedup_cdcx' as label, count(*) as row_count, 6 as expected_count from {{ source('test_normalization', '_airbyte_raw_pos_dedup_cdcx') }} union all - select distinct 'pos_dedup_cdcx_scd' as label, count(*) as row_count, 6 as expected_count + select distinct 'pos_dedup_cdcx_scd' as label, count(*) as row_count, 8 as expected_count from {{ ref('pos_dedup_cdcx_scd') }} union all - select distinct 'pos_dedup_cdcx' as label, count(*) as row_count, 2 as expected_count + select distinct 'pos_dedup_cdcx' as label, count(*) as row_count, 3 as expected_count from {{ ref('pos_dedup_cdcx') }} ) select * diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_schema_change/simple_streams_third_run_row_counts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_schema_change/simple_streams_third_run_row_counts.sql index 186eedf26dc7..c5e8aea4f709 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_schema_change/simple_streams_third_run_row_counts.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_schema_change/simple_streams_third_run_row_counts.sql @@ -2,7 +2,7 @@ with table_row_counts as ( select distinct '_airbyte_raw_exchange_rate' as label, count(*) as row_count, 4 as expected_count from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} union all - select distinct 'exchange_rate' as label, count(*) as row_count, 17 as expected_count + select distinct 'exchange_rate' as label, count(*) as row_count, 4 as expected_count from {{ ref('exchange_rate') }} union all diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests/schema_test.yml b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests/schema_test.yml index d0192cefe26d..b6b844cfd5f0 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests/schema_test.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests/schema_test.yml @@ -7,22 +7,8 @@ models: # description: check no column collisions # Two columns having similar names especially after removing special characters should remain distincts expression: cast("HKD@spéçiäl & characters" as {{ dbt_utils.type_string() }}) != HKD_special___characters - - dbt_utils.equality: - # description: check_streams_are_equal - # In this integration test, we are sending the same records to both streams - # exchange_rate and dedup_exchange_rate. - # The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror - # the final table with append or overwrite mode from exchange_rate. - compare_model: ref('dedup_exchange_rate_scd') - compare_columns: - - id - - currency - - date - - timestamp_col - - '"HKD@spéçiäl & characters"' - - HKD_special___characters - - NZD - - USD + - dbt_utils.expression_is_true: + expression: "\"column`_'with\"\"_quotes\" is not null" columns: - name: '"HKD@spéçiäl & characters"' # description: check special charactesrs diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml index d0192cefe26d..b6b844cfd5f0 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_incremental/schema_test.yml @@ -7,22 +7,8 @@ models: # description: check no column collisions # Two columns having similar names especially after removing special characters should remain distincts expression: cast("HKD@spéçiäl & characters" as {{ dbt_utils.type_string() }}) != HKD_special___characters - - dbt_utils.equality: - # description: check_streams_are_equal - # In this integration test, we are sending the same records to both streams - # exchange_rate and dedup_exchange_rate. - # The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror - # the final table with append or overwrite mode from exchange_rate. - compare_model: ref('dedup_exchange_rate_scd') - compare_columns: - - id - - currency - - date - - timestamp_col - - '"HKD@spéçiäl & characters"' - - HKD_special___characters - - NZD - - USD + - dbt_utils.expression_is_true: + expression: "\"column`_'with\"\"_quotes\" is not null" columns: - name: '"HKD@spéçiäl & characters"' # description: check special charactesrs diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_schema_change/schema_test.yml b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_schema_change/schema_test.yml index b8367ee02db8..36982388a1ff 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_schema_change/schema_test.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_schema_tests_schema_change/schema_test.yml @@ -2,28 +2,15 @@ version: 2 models: - name: exchange_rate + tests: + - dbt_utils.expression_is_true: + expression: "\"column`_'with\"\"_quotes\" is not null" columns: - name: '"HKD@spéçiäl & characters"' # description: check special charactesrs # Use special characters in column names and make sure they are correctly parsed in the JSON blob and populated tests: - not_null - tests: - - dbt_utils.equality: - # description: check_streams_are_equal - # In this integration test, we are sending the same records to both streams - # exchange_rate and dedup_exchange_rate. - # The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror - # the final table with append or overwrite mode from exchange_rate. - compare_model: ref('dedup_exchange_rate_scd') - compare_columns: - - id - - currency - - date - - timestamp_col - - '"HKD@spéçiäl & characters"' - - NZD - - USD - name: dedup_exchange_rate tests: diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py index 04b59dae2ff1..884e6bb11942 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py @@ -182,17 +182,7 @@ def setup_test_dir(destination_type: DestinationType, test_resource_name: str) - elif destination_type.value == DestinationType.ORACLE.value: copy_tree("../dbt-project-template-oracle", test_root_dir) dbt_project_yaml = "../dbt-project-template-oracle/dbt_project.yml" - if destination_type.value not in (DestinationType.REDSHIFT.value, DestinationType.ORACLE.value): - # Prefer 'view' to 'ephemeral' for tests so it's easier to debug with dbt - dbt_test_utils.copy_replace( - dbt_project_yaml, - os.path.join(test_root_dir, "dbt_project.yml"), - pattern="ephemeral", - replace_value="view", - ) - else: - # keep ephemeral tables - dbt_test_utils.copy_replace(dbt_project_yaml, os.path.join(test_root_dir, "dbt_project.yml")) + dbt_test_utils.copy_replace(dbt_project_yaml, os.path.join(test_root_dir, "dbt_project.yml")) return test_root_dir diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index 9af79c4a4e83..03d21d3c6feb 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -138,15 +138,19 @@ def create_from_parent( The child stream processor will create a separate table to contain the unnested data. """ + if parent.destination_sync_mode.value == DestinationSyncMode.append_dedup.value: + # nested streams can't be deduped like their parents (as they may not share the same cursor/primary keys) + parent_sync_mode = DestinationSyncMode.append + else: + parent_sync_mode = parent.destination_sync_mode result = StreamProcessor.create( stream_name=child_name, destination_type=parent.destination_type, raw_schema=parent.raw_schema, default_schema=parent.default_schema, schema=parent.schema, - # Nested Streams don't inherit parents sync modes? - source_sync_mode=SyncMode.full_refresh, - destination_sync_mode=DestinationSyncMode.append, + source_sync_mode=parent.source_sync_mode, + destination_sync_mode=parent_sync_mode, cursor_field=[], primary_key=[], json_column_name=json_column_name, @@ -237,42 +241,55 @@ def process(self) -> List["StreamProcessor"]: from_table = self.add_to_outputs( self.generate_json_parsing_model(from_table, column_names), self.get_model_materialization_mode(is_intermediate=True), + is_intermediate=True, suffix="ab1", ) from_table = self.add_to_outputs( self.generate_column_typing_model(from_table, column_names), self.get_model_materialization_mode(is_intermediate=True, column_count=column_count), + is_intermediate=True, suffix="ab2", ) if self.destination_sync_mode != DestinationSyncMode.append_dedup: from_table = self.add_to_outputs( self.generate_id_hashing_model(from_table, column_names), self.get_model_materialization_mode(is_intermediate=True, column_count=column_count), + is_intermediate=True, suffix="ab3", ) from_table = self.add_to_outputs( self.generate_final_model(from_table, column_names), self.get_model_materialization_mode(is_intermediate=False, column_count=column_count), + is_intermediate=False, ) else: + if DestinationType.POSTGRES.value == self.destination_type: + # because of https://github.com/dbt-labs/docs.getdbt.com/issues/335, we have to use tables for postgres + forced_materialization_type = TableMaterializationType.TABLE + else: + forced_materialization_type = TableMaterializationType.VIEW from_table = self.add_to_outputs( self.generate_id_hashing_model(from_table, column_names), # Force View materialization here because scd models rely on star* macros that requires it - TableMaterializationType.VIEW, + forced_materialization_type, + is_intermediate=True, suffix="ab3", ) from_table = self.add_to_outputs( self.generate_scd_type_2_model(from_table, column_names), self.get_model_materialization_mode(is_intermediate=False, column_count=column_count), + is_intermediate=False, suffix="scd", subdir="scd", unique_key=self.name_transformer.normalize_column_name("_airbyte_unique_key_scd"), partition_by=PartitionScheme.ACTIVE_ROW, ) where_clause = f"\nand {self.name_transformer.normalize_column_name('_airbyte_active_row')} = 1" - from_table = self.add_to_outputs( + # from_table should not use the de-duplicated final table or tables downstream (nested streams) will miss non active rows + self.add_to_outputs( self.generate_final_model(from_table, column_names, self.get_unique_key()) + where_clause, self.get_model_materialization_mode(is_intermediate=False, column_count=column_count), + is_intermediate=False, unique_key=self.get_unique_key(), partition_by=PartitionScheme.UNIQUE_KEY, ) @@ -680,13 +697,13 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup {{ cursor_field }} desc, {{ col_emitted_at }} desc{{ cdc_updated_at_order }} ) as {{ airbyte_end_at }}, - case when lag({{ cursor_field }}) over ( + case when row_number() over ( partition by {{ primary_key_partition | join(", ") }} order by {{ cursor_field }} {{ order_null }}, {{ cursor_field }} desc, {{ col_emitted_at }} desc{{ cdc_updated_at_order }} - ) is null {{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, + ) = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, {{ col_ab_id }}, {{ col_emitted_at }}, {{ hash_id }} @@ -745,7 +762,7 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup col_cdc_updated_at = self.name_transformer.normalize_column_name("_ab_cdc_updated_at") quoted_col_cdc_deleted_at = self.name_transformer.normalize_column_name("_ab_cdc_deleted_at", in_jinja=True) quoted_col_cdc_updated_at = self.name_transformer.normalize_column_name("_ab_cdc_updated_at", in_jinja=True) - cdc_active_row_pattern = f"and {col_cdc_deleted_at} is null " + cdc_active_row_pattern = f" and {col_cdc_deleted_at} is null" cdc_updated_order_pattern = f", {col_cdc_updated_at} desc" cdc_cols = ( f", cast({col_cdc_deleted_at} as " @@ -879,6 +896,10 @@ def generate_final_model(self, from_table: str, column_names: Dict[str, Tuple[st ) return sql + @staticmethod + def is_incremental_mode(destination_sync_mode: DestinationSyncMode) -> bool: + return destination_sync_mode.value in [DestinationSyncMode.append.value, DestinationSyncMode.append_dedup.value] + def add_incremental_clause(self, sql_query: str) -> str: template = Template( """ @@ -900,12 +921,12 @@ def add_to_outputs( self, sql: str, materialization_mode: TableMaterializationType, + is_intermediate: bool = True, suffix: str = "", unique_key: str = "", subdir: str = "", partition_by: PartitionScheme = PartitionScheme.DEFAULT, ) -> str: - is_intermediate = materialization_mode in [TableMaterializationType.CTE, TableMaterializationType.VIEW] schema = self.get_schema(is_intermediate) # MySQL table names need to be manually truncated, because it does not do it automatically truncate_name = self.destination_type == DestinationType.MYSQL @@ -922,7 +943,7 @@ def add_to_outputs( config["schema"] = f'"{self.default_schema}"' else: config["schema"] = f'"{schema}"' - if self.source_sync_mode == SyncMode.incremental and suffix != "scd": + if self.is_incremental_mode(self.destination_sync_mode) and suffix != "scd": # incremental is handled in the SCD SQL already sql = self.add_incremental_clause(sql) template = Template( @@ -950,7 +971,7 @@ def get_model_materialization_mode(self, is_intermediate: bool, column_count: in # if ephemeral is used with large number of columns, use views instead return TableMaterializationType.VIEW else: - if self.source_sync_mode == SyncMode.incremental: + if self.is_incremental_mode(self.destination_sync_mode): return TableMaterializationType.INCREMENTAL else: return TableMaterializationType.TABLE @@ -1010,8 +1031,9 @@ def get_model_partition_config(self, partition_by: PartitionScheme, unique_key: config["cluster_by"] = '["_AIRBYTE_EMITTED_AT"]' if unique_key: config["unique_key"] = f'"{unique_key}"' - else: - config["unique_key"] = f"env_var('AIRBYTE_DEFAULT_UNIQUE_KEY', {self.get_ab_id(in_jinja=True)})" + elif not self.is_nested_array: + # in nested arrays, each element is sharing the same _airbyte_ab_id, so it's not unique + config["unique_key"] = self.get_ab_id(in_jinja=True) return config def get_model_tags(self, is_intermediate: bool) -> str: From b2b3f576590a58582d91c1392eef126253374e63 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Fri, 5 Nov 2021 12:47:41 +0100 Subject: [PATCH 02/10] add sql examples --- ...d_stream_with_c___long_names_partition.sql | 20 +++++++++++++++++++ ...d_stream_with_c___names_partition_data.sql | 18 +++++++++++++++++ .../dedup_exchange_rate_scd.sql | 6 +++--- 3 files changed, 41 insertions(+), 3 deletions(-) create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___long_names_partition.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___names_partition_data.sql diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___long_names_partition.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___long_names_partition.sql new file mode 100644 index 000000000000..30b9372e1ebf --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___long_names_partition.sql @@ -0,0 +1,20 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}], + unique_key = '_airbyte_ab_id', + schema = "test_normalization", + tags = [ "nested" ] +) }} +-- Final base SQL model +select + _airbyte_nested_stre__nto_long_names_hashid, + double_array_data, + {{ adapter.quote('DATA') }}, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at, + _airbyte_partition_hashid +from {{ ref('nested_stream_with_c___long_names_partition_ab3') }} +-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition from {{ ref('nested_stream_with_c__lting_into_long_names_scd') }} +where 1 = 1 +{{ incremental_clause('_airbyte_emitted_at') }} + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___names_partition_data.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___names_partition_data.sql new file mode 100644 index 000000000000..d455163ffb3a --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_nested_streams/models/generated/airbyte_incremental/test_normalization/nested_stream_with_c___names_partition_data.sql @@ -0,0 +1,18 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}], + schema = "test_normalization", + tags = [ "nested" ] +) }} +-- Final base SQL model +select + _airbyte_partition_hashid, + currency, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at, + _airbyte_data_hashid +from {{ ref('nested_stream_with_c___names_partition_data_ab3') }} +-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA from {{ ref('nested_stream_with_c___long_names_partition') }} +where 1 = 1 +{{ incremental_clause('_airbyte_emitted_at') }} + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index 88f3125c0583..2af61c0cc3b8 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -1,7 +1,7 @@ - create table "postgres".test_normalization."dedup_exchange_rate_scd" + create table "postgres"."test_normalization"."dedup_exchange_rate_scd__dbt_tmp" as ( with @@ -42,7 +42,7 @@ scd_data as ( "date" desc, _airbyte_emitted_at desc ) as _airbyte_end_at, - case when lag("date") over ( + case when row_number() over ( partition by "id", currency, cast(nzd as varchar ) @@ -50,7 +50,7 @@ scd_data as ( "date" is null asc, "date" desc, _airbyte_emitted_at desc - ) is null then 1 else 0 end as _airbyte_active_row, + ) = 1 then 1 else 0 end as _airbyte_active_row, _airbyte_ab_id, _airbyte_emitted_at, _airbyte_dedup_exchange_rate_hashid From 90fbf86e19aee4763f4858068582d3a7c87e42ec Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Fri, 5 Nov 2021 13:13:39 +0100 Subject: [PATCH 03/10] Fix mypy check --- .../normalization/transform_catalog/stream_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index 03d21d3c6feb..77f2619805d7 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -263,7 +263,7 @@ def process(self) -> List["StreamProcessor"]: is_intermediate=False, ) else: - if DestinationType.POSTGRES.value == self.destination_type: + if DestinationType.POSTGRES.value == self.destination_type.value: # because of https://github.com/dbt-labs/docs.getdbt.com/issues/335, we have to use tables for postgres forced_materialization_type = TableMaterializationType.TABLE else: From 05f28a9f1f245451e73cda901fba6e89ec02afc8 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Fri, 5 Nov 2021 13:23:11 +0100 Subject: [PATCH 04/10] Postgres materialization --- .../transform_catalog/stream_processor.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index 77f2619805d7..52051fa89421 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -263,14 +263,17 @@ def process(self) -> List["StreamProcessor"]: is_intermediate=False, ) else: - if DestinationType.POSTGRES.value == self.destination_type.value: - # because of https://github.com/dbt-labs/docs.getdbt.com/issues/335, we have to use tables for postgres - forced_materialization_type = TableMaterializationType.TABLE + if self.is_incremental_mode(self.destination_sync_mode): + # Force different materialization here because incremental scd models rely on star* macros that requires it + if DestinationType.POSTGRES.value == self.destination_type.value: + # because of https://github.com/dbt-labs/docs.getdbt.com/issues/335, we avoid VIEW for postgres + forced_materialization_type = TableMaterializationType.INCREMENTAL + else: + forced_materialization_type = TableMaterializationType.VIEW else: - forced_materialization_type = TableMaterializationType.VIEW + forced_materialization_type = TableMaterializationType.CTE from_table = self.add_to_outputs( self.generate_id_hashing_model(from_table, column_names), - # Force View materialization here because scd models rely on star* macros that requires it forced_materialization_type, is_intermediate=True, suffix="ab3", From f332c28ced90c98a23293c6e46888300f1fe7678 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Fri, 5 Nov 2021 18:48:20 +0100 Subject: [PATCH 05/10] make postgres index unique for dedup table --- .../normalization/transform_catalog/stream_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index 52051fa89421..b908b0698a06 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -1009,7 +1009,7 @@ def get_model_partition_config(self, partition_by: PartitionScheme, unique_key: if partition_by == PartitionScheme.ACTIVE_ROW: config["indexes"] = "[{'columns':['_airbyte_active_row','_airbyte_unique_key','_airbyte_emitted_at'],'type': 'btree'}]" elif partition_by == PartitionScheme.UNIQUE_KEY: - config["indexes"] = "[{'columns':['_airbyte_unique_key','_airbyte_emitted_at'],'type': 'btree'}]" + config["indexes"] = "[{'columns':['_airbyte_unique_key'],'unique':True}]" else: config["indexes"] = "[{'columns':['_airbyte_emitted_at'],'type':'hash'}]" elif self.destination_type == DestinationType.REDSHIFT: From b4edde0a071d162656002c2257e1d9afe3349af1 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Mon, 8 Nov 2021 10:27:34 +0100 Subject: [PATCH 06/10] Fix destination acceptance tests --- .../standardtest/destination/DestinationAcceptanceTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java index 2959aab70089..dd89ad2c8834 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java @@ -1122,7 +1122,9 @@ private void pruneMutate(final JsonNode json) { "EMITTED_AT", "AB_ID", "NORMALIZED_AT", - "HASHID"); + "HASHID", + "unique_key", + "UNIQUE_KEY"); if (airbyteInternalFields.stream().anyMatch(internalField -> key.toLowerCase().contains(internalField.toLowerCase())) || json.get(key).isNull()) { ((ObjectNode) json).remove(key); From bb84c845c325993de7254939757e2b3e7c52ef95 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Mon, 8 Nov 2021 12:16:05 +0100 Subject: [PATCH 07/10] Add tests for dedup on null cursor --- .../test_simple_streams/data_input/messages.txt | 4 ++++ .../data_input/messages_incremental.txt | 2 ++ .../simple_streams_first_run_row_counts.sql | 10 +++++----- .../simple_streams_second_run_row_counts.sql | 10 +++++----- .../simple_streams_third_run_row_counts.sql | 6 +++--- .../transform_catalog/stream_processor.py | 10 ++++++---- 6 files changed, 25 insertions(+), 17 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt index 23a4ea1ab58e..fa3af2c3f254 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt @@ -8,6 +8,8 @@ {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637991000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 7.02, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637991100, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 8.12, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637991200, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 9.23, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a" }}} {"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} @@ -19,6 +21,8 @@ {"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637991000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 7.02, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637991100, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 8.12, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637991200, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 9.23, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":1,"name":"mazda","_ab_cdc_updated_at":1623849130530,"_ab_cdc_lsn":26971624,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} {"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":2,"name":"toyata","_ab_cdc_updated_at":1623849130549,"_ab_cdc_lsn":26971624,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_incremental.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_incremental.txt index 689782e19ecc..0f4a6ee16d5e 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_incremental.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages_incremental.txt @@ -3,12 +3,14 @@ {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650000000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 14.05, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650010000, "data": { "id": 4, "currency": "HKD", "NZD": 1.19, "HKD@spéçiäl & characters": 0.01, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650011000, "data": { "id": 1, "currency": "USD", "date": "2020-10-14", "timestamp_col": "2020-10-14T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 9.5, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650012000, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 6.39, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650000000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 14.05, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650010000, "data": { "id": 4, "currency": "HKD", "NZD": 1.19, "HKD@spéçiäl & characters": 0.01, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650011000, "data": { "id": 1, "currency": "USD", "date": "2020-10-14", "timestamp_col": "2020-10-14T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 9.5, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} +{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650012000, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 6.39, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}} {"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":"vw","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849314663,"_ab_cdc_lsn":26975264,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}} {"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":null,"column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623900000000,"_ab_cdc_lsn":28010252,"_ab_cdc_deleted_at":1623900000000},"emitted_at":1623900000000}} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp/simple_streams_first_run_row_counts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp/simple_streams_first_run_row_counts.sql index 462558881b27..df4fb4f56c49 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp/simple_streams_first_run_row_counts.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp/simple_streams_first_run_row_counts.sql @@ -1,19 +1,19 @@ with table_row_counts as ( - select distinct '_airbyte_raw_exchange_rate' as label, count(*) as row_count, 10 as expected_count + select distinct '_airbyte_raw_exchange_rate' as label, count(*) as row_count, 12 as expected_count from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} union all - select distinct 'exchange_rate' as label, count(*) as row_count, 10 as expected_count + select distinct 'exchange_rate' as label, count(*) as row_count, 12 as expected_count from {{ ref('exchange_rate') }} union all - select distinct '_airbyte_raw_dedup_exchange_rate' as label, count(*) as row_count, 10 as expected_count + select distinct '_airbyte_raw_dedup_exchange_rate' as label, count(*) as row_count, 12 as expected_count from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} union all - select distinct 'dedup_exchange_rate_scd' as label, count(*) as row_count, 10 as expected_count + select distinct 'dedup_exchange_rate_scd' as label, count(*) as row_count, 12 as expected_count from {{ ref('dedup_exchange_rate_scd') }} union all - select distinct 'dedup_exchange_rate' as label, count(*) as row_count, 5 as expected_count + select distinct 'dedup_exchange_rate' as label, count(*) as row_count, 6 as expected_count from {{ ref('dedup_exchange_rate') }} union all diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_incremental/simple_streams_second_run_row_counts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_incremental/simple_streams_second_run_row_counts.sql index 00a08016a446..ca5cdfa4fc40 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_incremental/simple_streams_second_run_row_counts.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_incremental/simple_streams_second_run_row_counts.sql @@ -1,19 +1,19 @@ with table_row_counts as ( - select distinct '_airbyte_raw_exchange_rate' as label, count(*) as row_count, 5 as expected_count + select distinct '_airbyte_raw_exchange_rate' as label, count(*) as row_count, 6 as expected_count from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} union all - select distinct 'exchange_rate' as label, count(*) as row_count, 5 as expected_count + select distinct 'exchange_rate' as label, count(*) as row_count, 6 as expected_count from {{ ref('exchange_rate') }} union all - select distinct '_airbyte_raw_dedup_exchange_rate' as label, count(*) as row_count, 5 as expected_count + select distinct '_airbyte_raw_dedup_exchange_rate' as label, count(*) as row_count, 6 as expected_count from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} union all - select distinct 'dedup_exchange_rate_scd' as label, count(*) as row_count, 13 as expected_count + select distinct 'dedup_exchange_rate_scd' as label, count(*) as row_count, 16 as expected_count from {{ ref('dedup_exchange_rate_scd') }} union all - select distinct 'dedup_exchange_rate' as label, count(*) as row_count, 6 as expected_count + select distinct 'dedup_exchange_rate' as label, count(*) as row_count, 7 as expected_count from {{ ref('dedup_exchange_rate') }} union all diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_schema_change/simple_streams_third_run_row_counts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_schema_change/simple_streams_third_run_row_counts.sql index c5e8aea4f709..cb886df680e9 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_schema_change/simple_streams_third_run_row_counts.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/dbt_test_config/dbt_data_tests_tmp_schema_change/simple_streams_third_run_row_counts.sql @@ -7,13 +7,13 @@ union all union all - select distinct '_airbyte_raw_dedup_exchange_rate' as label, count(*) as row_count, 9 as expected_count + select distinct '_airbyte_raw_dedup_exchange_rate' as label, count(*) as row_count, 10 as expected_count from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} union all - select distinct 'dedup_exchange_rate_scd' as label, count(*) as row_count, 17 as expected_count + select distinct 'dedup_exchange_rate_scd' as label, count(*) as row_count, 20 as expected_count from {{ ref('dedup_exchange_rate_scd') }} union all - select distinct 'dedup_exchange_rate' as label, count(*) as row_count, 10 as expected_count + select distinct 'dedup_exchange_rate' as label, count(*) as row_count, 11 as expected_count from {{ ref('dedup_exchange_rate') }} union all diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index b908b0698a06..7de9adf8ebfd 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -992,8 +992,10 @@ def get_model_partition_config(self, partition_by: PartitionScheme, unique_key: config = {} if self.destination_type == DestinationType.BIGQUERY: # see https://docs.getdbt.com/reference/resource-configs/bigquery-configs - if partition_by in [PartitionScheme.UNIQUE_KEY, PartitionScheme.ACTIVE_ROW]: + if partition_by == PartitionScheme.UNIQUE_KEY: config["cluster_by"] = '["_airbyte_unique_key","_airbyte_emitted_at"]' + elif partition_by == PartitionScheme.ACTIVE_ROW: + config["cluster_by"] = '["_airbyte_unique_key_scd","_airbyte_emitted_at"]' else: config["cluster_by"] = '"_airbyte_emitted_at"' if partition_by == PartitionScheme.ACTIVE_ROW: @@ -1007,7 +1009,7 @@ def get_model_partition_config(self, partition_by: PartitionScheme, unique_key: elif self.destination_type == DestinationType.POSTGRES: # see https://docs.getdbt.com/reference/resource-configs/postgres-configs if partition_by == PartitionScheme.ACTIVE_ROW: - config["indexes"] = "[{'columns':['_airbyte_active_row','_airbyte_unique_key','_airbyte_emitted_at'],'type': 'btree'}]" + config["indexes"] = "[{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}]" elif partition_by == PartitionScheme.UNIQUE_KEY: config["indexes"] = "[{'columns':['_airbyte_unique_key'],'unique':True}]" else: @@ -1015,7 +1017,7 @@ def get_model_partition_config(self, partition_by: PartitionScheme, unique_key: elif self.destination_type == DestinationType.REDSHIFT: # see https://docs.getdbt.com/reference/resource-configs/redshift-configs if partition_by == PartitionScheme.ACTIVE_ROW: - config["sort"] = '["_airbyte_active_row", "_airbyte_unique_key", "_airbyte_emitted_at"]' + config["sort"] = '["_airbyte_active_row", "_airbyte_unique_key_scd", "_airbyte_emitted_at"]' elif partition_by == PartitionScheme.UNIQUE_KEY: config["sort"] = '["_airbyte_unique_key", "_airbyte_emitted_at"]' elif partition_by == PartitionScheme.NOTHING: @@ -1025,7 +1027,7 @@ def get_model_partition_config(self, partition_by: PartitionScheme, unique_key: elif self.destination_type == DestinationType.SNOWFLAKE: # see https://docs.getdbt.com/reference/resource-configs/snowflake-configs if partition_by == PartitionScheme.ACTIVE_ROW: - config["cluster_by"] = '["_AIRBYTE_ACTIVE_ROW", "_AIRBYTE_UNIQUE_KEY", "_AIRBYTE_EMITTED_AT"]' + config["cluster_by"] = '["_AIRBYTE_ACTIVE_ROW", "_AIRBYTE_UNIQUE_KEY_SCD", "_AIRBYTE_EMITTED_AT"]' elif partition_by == PartitionScheme.UNIQUE_KEY: config["cluster_by"] = '["_AIRBYTE_UNIQUE_KEY", "_AIRBYTE_EMITTED_AT"]' elif partition_by == PartitionScheme.NOTHING: From 40a0f0f3a80c339aca149d28a0b6ae905d3b930d Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Fri, 5 Nov 2021 14:25:36 +0100 Subject: [PATCH 08/10] Bumpversion of normalization --- airbyte-integrations/bases/base-normalization/Dockerfile | 2 +- .../normalization/NormalizationRunnerFactory.java | 2 +- docs/understanding-airbyte/basic-normalization.md | 9 +++++---- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index f00326856c65..d8edc9d9b97a 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -27,5 +27,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.1.58 +LABEL io.airbyte.version=0.1.59 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java index ba5d090605e3..87baa29daa97 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java @@ -13,7 +13,7 @@ public class NormalizationRunnerFactory { public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization"; - public static final String NORMALIZATION_VERSION = "0.1.58"; + public static final String NORMALIZATION_VERSION = "0.1.59"; static final Map> NORMALIZATION_MAPPING = ImmutableMap.>builder() diff --git a/docs/understanding-airbyte/basic-normalization.md b/docs/understanding-airbyte/basic-normalization.md index 1e3c02452a0a..ab67ec78ab12 100644 --- a/docs/understanding-airbyte/basic-normalization.md +++ b/docs/understanding-airbyte/basic-normalization.md @@ -348,13 +348,14 @@ Therefore, in order to "upgrade" to the desired normalization version, you need | Airbyte Version | Normalization Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | :--- | +| 0.30.32-alpha | 0.1.59 | 2021-11-08 | [\#7669](https://github.com/airbytehq/airbyte/pull/7169) | Fix nested incremental dbt | | 0.30.24-alpha | 0.1.57 | 2021-10-26 | [\#7162](https://github.com/airbytehq/airbyte/pull/7162) | Implement incremental dbt updates | | 0.30.16-alpha | 0.1.52 | 2021-10-07 | [\#6379](https://github.com/airbytehq/airbyte/pull/6379) | Handle empty string for date and date-time format | -| 0.30.16-alpha | 0.1.51 | 2021-10-08 | [\#6799](https://github.com/airbytehq/airbyte/pull/6799) | Added support for ad\_cdc\_log\_pos while normalization | -| 0.30.16-alpha | 0.1.50 | 2021-10-07 | [\#6079](https://github.com/airbytehq/airbyte/pull/6079) | Added support for MS SQL Server normalization | -| 0.30.16-alpha | 0.1.49 | 2021-10-06 | [\#6709](https://github.com/airbytehq/airbyte/pull/6709) | Forward destination dataset location to dbt profiles | +| | 0.1.51 | 2021-10-08 | [\#6799](https://github.com/airbytehq/airbyte/pull/6799) | Added support for ad\_cdc\_log\_pos while normalization | +| | 0.1.50 | 2021-10-07 | [\#6079](https://github.com/airbytehq/airbyte/pull/6079) | Added support for MS SQL Server normalization | +| | 0.1.49 | 2021-10-06 | [\#6709](https://github.com/airbytehq/airbyte/pull/6709) | Forward destination dataset location to dbt profiles | | 0.29.17-alpha | 0.1.47 | 2021-09-20 | [\#6317](https://github.com/airbytehq/airbyte/pull/6317) | MySQL: updated MySQL normalization with using SSH tunnel | -| 0.29.17-alpha | 0.1.45 | 2021-09-18 | [\#6052](https://github.com/airbytehq/airbyte/pull/6052) | Snowflake: accept any date-time format | +| | 0.1.45 | 2021-09-18 | [\#6052](https://github.com/airbytehq/airbyte/pull/6052) | Snowflake: accept any date-time format | | 0.29.8-alpha | 0.1.40 | 2021-08-18 | [\#5433](https://github.com/airbytehq/airbyte/pull/5433) | Allow optional credentials\_json for BigQuery | | 0.29.5-alpha | 0.1.39 | 2021-08-11 | [\#4557](https://github.com/airbytehq/airbyte/pull/4557) | Handle date times and solve conflict name btw stream/field | | 0.28.2-alpha | 0.1.38 | 2021-07-28 | [\#5027](https://github.com/airbytehq/airbyte/pull/5027) | Handle quotes in column names when parsing JSON blob | From 5bfbdb212a8055ced658d73d39cb42284dab515a Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Mon, 8 Nov 2021 16:29:30 +0100 Subject: [PATCH 09/10] Fix acceptance test --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 8f8584934a95..ba13796e01f4 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -920,6 +920,8 @@ private Set addAirbyteGeneratedTables(final boolean withScd String.format("_airbyte_raw_%s%s", OUTPUT_STREAM_PREFIX, cleanedNameStream)), new SchemaTableNamePair(OUTPUT_NAMESPACE_PREFIX + x.schemaName, String.format("%s%s", OUTPUT_STREAM_PREFIX, cleanedNameStream)))); if (withScdTable) { + explodedStreamNames + .add(new SchemaTableNamePair("_airbyte_" + OUTPUT_NAMESPACE_PREFIX + x.schemaName, String.format("%s%s_ab3", OUTPUT_STREAM_PREFIX, cleanedNameStream))); explodedStreamNames .add(new SchemaTableNamePair(OUTPUT_NAMESPACE_PREFIX + x.schemaName, String.format("%s%s_scd", OUTPUT_STREAM_PREFIX, cleanedNameStream))); } From 62c2992b793d40d75c362bffa2207e77dab74b4c Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Mon, 8 Nov 2021 16:56:49 +0100 Subject: [PATCH 10/10] Fix acceptance tests --- .../transform_catalog/stream_processor.py | 14 +++++++++++--- .../airbyte/test/acceptance/AcceptanceTests.java | 2 -- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index 7de9adf8ebfd..a6f1ead46e94 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -946,9 +946,17 @@ def add_to_outputs( config["schema"] = f'"{self.default_schema}"' else: config["schema"] = f'"{schema}"' - if self.is_incremental_mode(self.destination_sync_mode) and suffix != "scd": - # incremental is handled in the SCD SQL already - sql = self.add_incremental_clause(sql) + if self.is_incremental_mode(self.destination_sync_mode): + if suffix == "scd": + if self.destination_type == DestinationType.POSTGRES: + # because of https://github.com/dbt-labs/docs.getdbt.com/issues/335, we avoid VIEW for postgres + # so we need to clean up temporary table materialization... + ab3_schema = self.get_schema(True) + ab3_table = self.tables_registry.get_file_name(schema, self.json_path, self.stream_name, "ab3", truncate_name) + config["post_hook"] = f"['drop table if exists {ab3_schema}.{ab3_table}']" + else: + # incremental is handled in the SCD SQL already + sql = self.add_incremental_clause(sql) template = Template( """ {{ '{{' }} config( diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index ba13796e01f4..8f8584934a95 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -920,8 +920,6 @@ private Set addAirbyteGeneratedTables(final boolean withScd String.format("_airbyte_raw_%s%s", OUTPUT_STREAM_PREFIX, cleanedNameStream)), new SchemaTableNamePair(OUTPUT_NAMESPACE_PREFIX + x.schemaName, String.format("%s%s", OUTPUT_STREAM_PREFIX, cleanedNameStream)))); if (withScdTable) { - explodedStreamNames - .add(new SchemaTableNamePair("_airbyte_" + OUTPUT_NAMESPACE_PREFIX + x.schemaName, String.format("%s%s_ab3", OUTPUT_STREAM_PREFIX, cleanedNameStream))); explodedStreamNames .add(new SchemaTableNamePair(OUTPUT_NAMESPACE_PREFIX + x.schemaName, String.format("%s%s_scd", OUTPUT_STREAM_PREFIX, cleanedNameStream))); }