Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fixes to incremental normalization and nesting #7669

Merged
merged 10 commits into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions airbyte-integrations/bases/base-normalization/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,34 @@ dbt_modules/
secrets/
dist/

integration_tests/normalization_test_output/*/*/*.log
integration_tests/normalization_test_output/*/*/*.yml
integration_tests/normalization_test_output/*/*/*.json
integration_tests/normalization_test_output/*/*/*.md
integration_tests/normalization_test_output/*/*/macros/
integration_tests/normalization_test_output/*/*/tests/
integration_tests/normalization_test_output/*/*/models/dbt_data_tests/
integration_tests/normalization_test_output/*/*/models/dbt_schema_tests/
integration_tests/normalization_test_output/*/*/modified_models/
integration_tests/normalization_test_output/*/*/macros
integration_tests/normalization_test_output/*/*/tests
integration_tests/normalization_test_output/**/*.json
integration_tests/normalization_test_output/**/*.log
integration_tests/normalization_test_output/**/*.md
integration_tests/normalization_test_output/**/*.sql
integration_tests/normalization_test_output/**/*.yml
!integration_tests/normalization_test_output/**/*dbt_project.yml
!integration_tests/normalization_test_output/**/generated/sources.yml

# We keep a minimal/restricted subset of sql files for all destinations to avoid noise in diff
# Simple Streams
!integration_tests/normalization_test_output/**/dedup_exchange_rate*.sql
!integration_tests/normalization_test_output/**/exchange_rate.sql
# Nested Streams
# Parent table
!integration_tests/normalization_test_output/**/nested_stream_with*_names_ab*.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_names_scd.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_names.sql
# Nested table
!integration_tests/normalization_test_output/**/nested_stream_with_*_partition_ab1.sql
!integration_tests/normalization_test_output/**/nested_stream_with_*_data_ab1.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_partition_scd.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_data_scd.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_partition.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_data.sql

# but we keep all sql files for Postgres
!integration_tests/normalization_test_output/postgres/**/*.sql
integration_tests/normalization_test_output/postgres/**/dbt_data_tests
integration_tests/normalization_test_output/postgres/**/dbt_schema_tests
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

{%- macro default__incremental_clause(col_emitted_at) -%}
{% if is_incremental() %}
and {{ col_emitted_at }} >= (select max({{ col_emitted_at }}) from {{ this }})
and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }})
{% endif %}
{%- endmacro -%}

{# -- see https://on-systems.tech/113-beware-dbt-incremental-updates-against-snowflake-external-tables/ #}
{%- macro snowflake__incremental_clause(col_emitted_at) -%}
{% if is_incremental() %}
and {{ col_emitted_at }} >= cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }})
and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }})
{% endif %}
{%- endmacro -%}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
tags = [ "nested" ]
) }}
Comment on lines +1 to +6
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Un-nesting an object type column from the parent stream can re-use the same _airbyte_ab_id column as unique_key

-- Final base SQL model
select
_airbyte_nested_stre__nto_long_names_hashid,
double_array_data,
{{ adapter.quote('DATA') }},
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_partition_hashid
from {{ ref('nested_stream_with_c___long_names_partition_ab3') }}
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition from {{ ref('nested_stream_with_c__lting_into_long_names_scd') }}
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
schema = "test_normalization",
tags = [ "nested" ]
) }}
Comment on lines +1 to +5
Copy link
Contributor Author

@ChristopheDuong ChristopheDuong Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Un-nesting an array type column from the parent stream can NOT re-use the same _airbyte_ab_id column as unique_key... each element of the nested array would have the same unique_key and it wouldn't be unique anymore...

(this would fail with exceptions on certain destinations)

-- Final base SQL model
select
_airbyte_partition_hashid,
currency,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_data_hashid
from {{ ref('nested_stream_with_c___names_partition_data_ab3') }}
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA from {{ ref('nested_stream_with_c___long_names_partition') }}
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@



create table "postgres".test_normalization."dedup_exchange_rate_scd"
create table "postgres"."test_normalization"."dedup_exchange_rate_scd__dbt_tmp"
as (

with
Expand Down Expand Up @@ -42,15 +42,15 @@ scd_data as (
"date" desc,
_airbyte_emitted_at desc
) as _airbyte_end_at,
case when lag("date") over (
case when row_number() over (
partition by "id", currency, cast(nzd as
varchar
)
order by
"date" is null asc,
"date" desc,
_airbyte_emitted_at desc
) is null then 1 else 0 end as _airbyte_active_row,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when all (multiple) rows for the same primary key have a cursor (date here) equal to NULL, then this will flag multiple rows as active...

) = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_dedup_exchange_rate_hashid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@
}
}
}
},
"column`_'with\"_quotes": {
"type": ["null", "array"],
"items": {
"properties": {
"currency": {
"type": ["null", "string"]
}
}
}
}
}
}
Expand Down Expand Up @@ -234,6 +224,16 @@
"properties": {
"owner_id": {
"type": ["null", "integer"]
},
"column`_'with\"_quotes": {
"type": ["null", "array"],
"items": {
"properties": {
"currency": {
"type": ["null", "string"]
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
{"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":1,"conflict_stream_scalar": 2},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":2,"conflict_stream_scalar": 2},"emitted_at":1623861660}}

{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1}},{"ab_id": 2, "owner": {"owner_id": 2}}]},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3}},{"ab_id": 4, "owner": {"owner_id": 4}}]},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 2, "owner": {"owner_id": 2, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 4, "owner": {"owner_id": 4, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}}

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,5 @@
{"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":1,"conflict_stream_scalar": 2},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":2,"conflict_stream_scalar": 2},"emitted_at":1623861660}}

{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1}},{"ab_id": 2, "owner": {"owner_id": 2}}]},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3}},{"ab_id": 4, "owner": {"owner_id": 4}}]},"emitted_at":1623861660}}

{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 2, "owner": {"owner_id": 2, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 4, "owner": {"owner_id": 4, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ with table_row_counts as (
union all
select distinct 'nested_stream_with_complex_columns_resulting_into_long_names' as label, count(*) as row_count, 3 as expected_count
from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names') }}
union all
select distinct 'nested_stream_with_complex_columns_resulting_into_long_names_partition' as label, count(*) as row_count, 3 as expected_count
from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partition') }}
union all
select 'nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA' as label, count(distinct currency) as row_count, 1 as expected_count
from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA') }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ models:
expression: "double_array_data is not null"
- dbt_utils.expression_is_true:
expression: "DATA is not null"
- dbt_utils.expression_is_true:
expression: "\"column`_'with\"\"_quotes\" is not null"
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA
columns:
- name: currency
tests:
- not_null
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data
columns:
- name: id
tests:
# - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data
# columns:
# - name: id
# tests:
# - not_null # TODO Fix bug here
- name: unnest_alias_children_owner
tests:
- dbt_utils.expression_is_true:
expression: "\"column`_'with\"\"_quotes\" is not null"
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ models:
expression: "double_array_data is not null"
- dbt_utils.expression_is_true:
expression: "DATA is not null"
- dbt_utils.expression_is_true:
expression: "\"column`_'with\"\"_quotes\" is not null"
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA
columns:
- name: currency
tests:
- not_null
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data
columns:
- name: id
tests:
# - name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data
# columns:
# - name: id
# tests:
# - not_null # TODO Fix bug here
- name: unnest_alias_children_owner
tests:
- dbt_utils.expression_is_true:
expression: "\"column`_'with\"\"_quotes\" is not null"
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
},
"USD": {
"type": "number"
},
"column`_'with\"_quotes": {
"type": "string"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
},
"USD": {
"type": "number"
},
"column`_'with\"_quotes": {
"type": "string"
}
}
},
Expand Down
Loading