Skip to content

Commit

Permalink
🐛 Normalization correctly propagates deletions to the final tables (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jun 14, 2022
1 parent da60cd4 commit 61ce03a
Show file tree
Hide file tree
Showing 244 changed files with 2,817 additions and 1,526 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,43 @@
- incremental_clause controls the predicate to filter on new data to process incrementally
#}
{% macro incremental_clause(col_emitted_at) -%}
{{ adapter.dispatch('incremental_clause')(col_emitted_at) }}
{% macro incremental_clause(col_emitted_at, tablename) -%}
{{ adapter.dispatch('incremental_clause')(col_emitted_at, tablename) }}
{%- endmacro %}
{%- macro default__incremental_clause(col_emitted_at) -%}
{%- macro default__incremental_clause(col_emitted_at, tablename) -%}
{% if is_incremental() %}
and coalesce(
cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}),
cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ tablename }}),
{# -- if {{ col_emitted_at }} is NULL in either table, the previous comparison would evaluate to NULL, #}
{# -- so we coalesce and make sure the row is always returned for incremental processing instead #}
true)
{% endif %}
{%- endmacro -%}
{# -- see https://on-systems.tech/113-beware-dbt-incremental-updates-against-snowflake-external-tables/ #}
{%- macro snowflake__incremental_clause(col_emitted_at) -%}
{%- macro snowflake__incremental_clause(col_emitted_at, tablename) -%}
{% if is_incremental() %}
{% if get_max_normalized_cursor(col_emitted_at) %}
{% if get_max_normalized_cursor(col_emitted_at, tablename) %}
and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >=
cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }})
cast('{{ get_max_normalized_cursor(col_emitted_at, tablename) }}' as {{ type_timestamp_with_timezone() }})
{% endif %}
{% endif %}
{%- endmacro -%}
{%- macro sqlserver__incremental_clause(col_emitted_at) -%}
{%- macro sqlserver__incremental_clause(col_emitted_at, tablename) -%}
{% if is_incremental() %}
and ((select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}) is null
and ((select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ tablename }}) is null
or cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >=
(select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}))
(select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ tablename }}))
{% endif %}
{%- endmacro -%}
{% macro get_max_normalized_cursor(col_emitted_at) %}
{% macro get_max_normalized_cursor(col_emitted_at, tablename) %}
{% if execute and is_incremental() %}
{% if env_var('INCREMENTAL_CURSOR', 'UNSET') == 'UNSET' %}
{% set query %}
select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}
select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ tablename }}
{% endset %}
{% set max_cursor = run_query(query).columns[0][0] %}
{% do return(max_cursor) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,12 @@ def setup_mysql_db(self):
"MYSQL_INITDB_SKIP_TZINFO=yes",
"-e",
f"MYSQL_DATABASE={config['database']}",
"-e",
"MYSQL_ROOT_HOST=%",
"-p",
f"{config['port']}:3306",
"-d",
"mysql",
"mysql/mysql-server",
]
print("Executing: ", " ".join(commands))
subprocess.call(commands)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,29 @@
# This file is necessary to install dbt-utils with dbt deps
# the content will be overwritten by the transform function

# Name your package! Package names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: "airbyte_utils"
name: airbyte_utils
version: "1.0"
config-version: 2

# This setting configures which "profile" dbt uses for this project. Profiles contain
# database connection information, and should be configured in the ~/.dbt/profiles.yml file
profile: "normalize"

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that source models can be found
# in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
docs-paths: ["docs"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]

target-path: "../build" # directory which will store compiled SQL files
log-path: "../logs" # directory which will store DBT logs
packages-install-path: "/dbt" # directory which will store external DBT dependencies

clean-targets: # directories to be removed by `dbt clean`
- "build"
- "dbt_modules"

profile: normalize
model-paths:
- models
docs-paths:
- docs
analysis-paths:
- analysis
test-paths:
- tests
seed-paths:
- data
macro-paths:
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
quoting:
database: true
# Temporarily disabling the behavior of the ExtendedNameTransformer on table/schema names, see (issue #1785)
# all schemas should be unquoted
schema: false
identifier: true

# You can define configurations for models in the `model-paths` directory here.
# Using these configurations, you can enable or disable models, change how they
# are materialized, and more!
models:
airbyte_utils:
+materialized: table
Expand All @@ -57,7 +41,77 @@ models:
airbyte_views:
+tags: airbyte_internal_views
+materialized: view

dispatch:
- macro_namespace: dbt_utils
search_order: ["airbyte_utils", "dbt_utils"]
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
nested_stream_with_complex_columns_resulting_into_long_names_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_stg: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_scd: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names_ab1: test_normalization._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names_ab2: test_normalization._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names_ab3: test_normalization._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names: test_normalization._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
some_stream_that_was_empty_ab1: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_ab2: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_stg: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_scd: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty: test_normalization._airbyte_raw_some_stream_that_was_empty
simple_stream_with_namespace_resulting_into_long_names_ab1: test_normalization_namespace._airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
simple_stream_with_namespace_resulting_into_long_names_ab2: test_normalization_namespace._airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
simple_stream_with_namespace_resulting_into_long_names_ab3: test_normalization_namespace._airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
simple_stream_with_namespace_resulting_into_long_names: test_normalization_namespace._airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
conflict_stream_name_ab1: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_ab2: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_ab3: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_scalar_ab1: test_normalization._airbyte_raw_conflict_stream_scalar
conflict_stream_scalar_ab2: test_normalization._airbyte_raw_conflict_stream_scalar
conflict_stream_scalar_ab3: test_normalization._airbyte_raw_conflict_stream_scalar
conflict_stream_scalar: test_normalization._airbyte_raw_conflict_stream_scalar
conflict_stream_array_ab1: test_normalization._airbyte_raw_conflict_stream_array
conflict_stream_array_ab2: test_normalization._airbyte_raw_conflict_stream_array
conflict_stream_array_ab3: test_normalization._airbyte_raw_conflict_stream_array
conflict_stream_array: test_normalization._airbyte_raw_conflict_stream_array
unnest_alias_ab1: test_normalization._airbyte_raw_unnest_alias
unnest_alias_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias: test_normalization._airbyte_raw_unnest_alias
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
conflict_stream_name_conflict_stream_name_ab1: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_ab2: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_ab3: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name: test_normalization._airbyte_raw_conflict_stream_name
unnest_alias_children_ab1: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children: test_normalization._airbyte_raw_unnest_alias
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
conflict_stream_name_conflict_stream_name_conflict_stream_name_ab1: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_conflict_stream_name_ab2: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_conflict_stream_name_ab3: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_conflict_stream_name: test_normalization._airbyte_raw_conflict_stream_name
unnest_alias_children_owner_ab1: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes_ab1: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes: test_normalization._airbyte_raw_unnest_alias
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ select
from {{ source('test_normalization', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }} as table_alias
-- nested_stream_with_complex_columns_resulting_into_long_names
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}
{{ incremental_clause('_airbyte_emitted_at', this) }}

Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ select
from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_ab1') }}
-- nested_stream_with_complex_columns_resulting_into_long_names
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}
{{ incremental_clause('_airbyte_emitted_at', this) }}

Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partit
{{ cross_join_unnest('partition', 'DATA') }}
where 1 = 1
and DATA is not null
{{ incremental_clause('_airbyte_emitted_at') }}
{{ incremental_clause('_airbyte_emitted_at', this) }}

Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_scd')
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1
and {{ adapter.quote('partition') }} is not null
{{ incremental_clause('_airbyte_emitted_at') }}
{{ incremental_clause('_airbyte_emitted_at', this) }}

Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partit
{{ cross_join_unnest('partition', 'double_array_data') }}
where 1 = 1
and double_array_data is not null
{{ incremental_clause('_airbyte_emitted_at') }}
{{ incremental_clause('_airbyte_emitted_at', this) }}

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,53 @@
partition_by = {"field": "_airbyte_active_row", "data_type": "int64", "range": {"start": 0, "end": 1, "interval": 1}},
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ["drop view _airbyte_test_normalization.nested_stream_with_complex_columns_resulting_into_long_names_stg"],
post_hook = ["
{%
set final_table_relation = adapter.get_relation(
database=this.database,
schema=this.schema,
identifier='nested_stream_with_complex_columns_resulting_into_long_names'
)
%}
{#
If the final table doesn't exist, then obviously we can't delete anything from it.
Also, after a reset, the final table is created without the _airbyte_unique_key column (this column is created during the first sync)
So skip this deletion if the column doesn't exist. (in this case, the table is guaranteed to be empty anyway)
#}
{%
if final_table_relation is not none and '_airbyte_unique_key' in adapter.get_columns_in_relation(final_table_relation)|map(attribute='name')
%}
-- Delete records which are no longer active:
-- This query is equivalent, but the left join version is more performant:
-- delete from final_table where unique_key in (
-- select unique_key from scd_table where 1 = 1 <incremental_clause(normalized_at, final_table)>
-- ) and unique_key not in (
-- select unique_key from scd_table where active_row = 1 <incremental_clause(normalized_at, final_table)>
-- )
-- We're incremental against normalized_at rather than emitted_at because we need to fetch the SCD
-- entries that were _updated_ recently. This is because a deleted record will have an SCD record
-- which was emitted a long time ago, but recently re-normalized to have active_row = 0.
delete from {{ final_table_relation }} final_table where final_table._airbyte_unique_key in (
select recent_records.unique_key
from (
select distinct _airbyte_unique_key as unique_key
from {{ this }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
) recent_records
left join (
select _airbyte_unique_key as unique_key, count(_airbyte_unique_key) as active_count
from {{ this }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
group by _airbyte_unique_key
) active_counts
on recent_records.unique_key = active_counts.unique_key
where active_count is null or active_count = 0
)
{% else %}
-- We have to have a non-empty query, so just do a noop delete
delete from {{ this }} where 1=0
{% endif %}
","drop view _airbyte_test_normalization.nested_stream_with_complex_columns_resulting_into_long_names_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('nested_stream_with_complex_columns_resulting_into_long_names_stg')
Expand All @@ -16,7 +62,7 @@ new_data as (
from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_stg') }}
-- nested_stream_with_complex_columns_resulting_into_long_names from {{ source('test_normalization', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }}
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}
{{ incremental_clause('_airbyte_emitted_at', this) }}
),
new_data_ids as (
-- build a subset of _airbyte_unique_key from rows that are new
Expand Down
Loading

0 comments on commit 61ce03a

Please sign in to comment.