Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalization: Upgrade MySQL to dbt 1.0.0 #11470

Merged
merged 23 commits into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6ac01c0
Upgrade MySQL to dbt 1.0.0
alafanechere Mar 28, 2022
111580c
update changelog
alafanechere Mar 28, 2022
ca38ffa
update dbt_project.yml and packages.yml
alafanechere Mar 29, 2022
cb6b9b3
Merge branch 'master' into augustin/normalization/upgrade-mysql-adapter
alafanechere Mar 29, 2022
3b1bc29
fix entrypoint
alafanechere Mar 30, 2022
ca450c1
Merge branch 'master' into augustin/normalization/upgrade-mysql-adapter
alafanechere Mar 31, 2022
afe752a
Merge branch 'master' into augustin/normalization/upgrade-mysql-adapter
alafanechere Apr 1, 2022
0971ef7
Merge branch 'master' into augustin/normalization/upgrade-mysql-adapter
edgao Apr 5, 2022
0c0cb5d
fix concat + regenerate output
edgao Apr 7, 2022
58ee603
tmp max workers = 1
alafanechere Apr 11, 2022
ad5ead5
Merge branch 'master' into augustin/normalization/upgrade-mysql-adapter
alafanechere Apr 11, 2022
e4bb4ea
Merge branch 'master' into augustin/normalization/upgrade-mysql-adapter
alafanechere Apr 12, 2022
ed93465
Merge branch 'master' into augustin/normalization/upgrade-mysql-adapter
edgao Apr 12, 2022
4402c20
rebump
edgao Apr 12, 2022
7bf48fd
Merge branch 'master' into augustin/normalization/upgrade-mysql-adapter
alafanechere Apr 20, 2022
ab9cde8
Merge branch 'master' into augustin/normalization/upgrade-mysql-adapter
edgao Apr 20, 2022
355adfe
Merge branch 'master' into augustin/normalization/upgrade-mysql-adapter
alafanechere Apr 22, 2022
6b5b34c
explicit airbyteDocker before runnig tests?
edgao Apr 22, 2022
37cfd39
only run normalization docker?
edgao Apr 22, 2022
61ab575
Merge branch 'master' into augustin/normalization/upgrade-mysql-adapter
alafanechere May 19, 2022
ad7ad53
Merge branch 'master' into augustin/normalization/upgrade-mysql-adapter
edgao Jun 15, 2022
8a7c1d4
regenerate mysql output
edgao Jun 15, 2022
d72be95
bump version
edgao Jun 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# the content will be overwritten by the transform function

# Name your package! Package names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# and underscores. A good package name should reflect your organization"s
# name or the intended use of these models
name: "airbyte_utils"
version: "1.0"
Expand All @@ -13,18 +13,18 @@ config-version: 2
profile: "normalize"

# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that source models can be found
# in the "models/" directory. You probably won't need to change these!
source-paths: ["models"]
# The `model-paths` config, for example, states that source models can be found
# in the "models/" directory. You probably won"t need to change these!
model-paths: ["models"]
docs-paths: ["docs"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
data-paths: ["data"]
seed-paths: ["data"]
macro-paths: ["macros"]

target-path: "../build" # directory which will store compiled SQL files
log-path: "../logs" # directory which will store DBT logs
modules-path: "/dbt" # directory which will store external DBT dependencies
packages-install-path: "/dbt" # directory which will store external DBT dependencies

clean-targets: # directories to be removed by `dbt clean`
- "build"
Expand All @@ -37,7 +37,7 @@ quoting:
schema: false
identifier: true

# You can define configurations for models in the `source-paths` directory here.
# You can define configurations for models in the `model-paths` directory here.
# Using these configurations, you can enable or disable models, change how they
# are materialized, and more!
models:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

packages:
- git: "https://github.com/fishtown-analytics/dbt-utils.git"
revision: 0.6.4
revision: 0.8.2
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
concat({{ fields|join(', ') }})
{%- endmacro %}

{% macro mysql__concat(fields) -%}
{#-- MySQL doesn't support the '||' operator as concatenation by default --#}
concat({{ fields|join(', ') }})
{%- endmacro %}

{% macro sqlserver__concat(fields) -%}
{#-- CONCAT() in SQL SERVER accepts from 2 to 254 arguments, we use batches for the main concat, to overcome the limit. --#}
{% set concat_chunks = [] %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ function main() {
openssh "${PROJECT_DIR}/ssh.json"
trap 'closessh' EXIT

set +e # allow script to continue running even if next commands fail to run properly
# We don't run dbt 1.0.x on all destinations (because their plugins don't support it yet)
# So we need to only pass `--event-buffer-size` if it's supported by DBT.
check_dbt_event_buffer_size
Expand All @@ -130,7 +131,6 @@ function main() {
dbt_additional_args=""
fi

set +e # allow script to continue running even if next commands fail to run properly
# Run dbt to compile and execute the generated normalization models
dbt ${dbt_additional_args} run --profiles-dir "${PROJECT_DIR}" --project-dir "${PROJECT_DIR}"
DBT_EXIT_CODE=$?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc
"""
Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs
"""
if normalization_image.startswith("airbyte/normalization-oracle") or normalization_image.startswith("airbyte/normalization-mysql"):
if normalization_image.startswith("airbyte/normalization-oracle"):
dbtAdditionalArgs = []
else:
dbtAdditionalArgs = ["--event-buffer-size=10000"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
source-paths:
- models
model-paths:
- models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
data-paths:
- data
- tests
seed-paths:
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
modules-path: /dbt
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
Expand All @@ -42,7 +42,7 @@ models:
+materialized: view
vars:
dbt_utils_dispatch_list:
- airbyte_utils
- airbyte_utils
json_column: _airbyte_data
models_to_source:
nested_stream_with_co_1g_into_long_names_ab1: test_normalization._airbyte_raw_nested_s__lting_into_long_names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
test_normalization.`nested_stream_with_co___long_names_partition__dbt_tmp`
as (

with __dbt__CTE__nested_stream_with_co_2g_names_partition_ab1 as (
with __dbt__cte__nested_stream_with_co_2g_names_partition_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization.`nested_stream_with_co_1g_into_long_names_scd`
Expand All @@ -24,10 +24,10 @@ from test_normalization.`nested_stream_with_co_1g_into_long_names_scd` as table_
where 1 = 1
and `partition` is not null

), __dbt__CTE__nested_stream_with_co_2g_names_partition_ab2 as (
), __dbt__cte__nested_stream_with_co_2g_names_partition_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__CTE__nested_stream_with_co_2g_names_partition_ab1
-- depends_on: __dbt__cte__nested_stream_with_co_2g_names_partition_ab1
select
_airbyte_nested_strea__nto_long_names_hashid,
double_array_data,
Expand All @@ -37,23 +37,23 @@ select

CURRENT_TIMESTAMP
as _airbyte_normalized_at
from __dbt__CTE__nested_stream_with_co_2g_names_partition_ab1
from __dbt__cte__nested_stream_with_co_2g_names_partition_ab1
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1

), __dbt__CTE__nested_stream_with_co_2g_names_partition_ab3 as (
), __dbt__cte__nested_stream_with_co_2g_names_partition_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__CTE__nested_stream_with_co_2g_names_partition_ab2
-- depends_on: __dbt__cte__nested_stream_with_co_2g_names_partition_ab2
select
md5(cast(concat(coalesce(cast(_airbyte_nested_strea__nto_long_names_hashid as char), ''), '-', coalesce(cast(double_array_data as char), ''), '-', coalesce(cast(`DATA` as char), '')) as char)) as _airbyte_partition_hashid,
tmp.*
from __dbt__CTE__nested_stream_with_co_2g_names_partition_ab2 tmp
from __dbt__cte__nested_stream_with_co_2g_names_partition_ab2 tmp
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__CTE__nested_stream_with_co_2g_names_partition_ab3
-- depends_on: __dbt__cte__nested_stream_with_co_2g_names_partition_ab3
select
_airbyte_nested_strea__nto_long_names_hashid,
double_array_data,
Expand All @@ -64,7 +64,7 @@ select
CURRENT_TIMESTAMP
as _airbyte_normalized_at,
_airbyte_partition_hashid
from __dbt__CTE__nested_stream_with_co_2g_names_partition_ab3
from __dbt__cte__nested_stream_with_co_2g_names_partition_ab3
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition from test_normalization.`nested_stream_with_co_1g_into_long_names_scd`
where 1 = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
test_normalization.`nested_stream_with_co___names_partition_data__dbt_tmp`
as (

with __dbt__CTE__nested_stream_with_co_3es_partition_data_ab1 as (
with __dbt__cte__nested_stream_with_co_3es_partition_data_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization.`nested_stream_with_co___long_names_partition`
Expand All @@ -20,7 +20,7 @@ with numbers as (
select


p0.generated_number * pow(2, 0)
p0.generated_number * power(2, 0)
edgao marked this conversation as resolved.
Show resolved Hide resolved


+ 1
Expand Down Expand Up @@ -68,10 +68,10 @@ left join joined on _airbyte_partition_hashid = joined._airbyte_hashid
where 1 = 1
and `DATA` is not null

), __dbt__CTE__nested_stream_with_co_3es_partition_data_ab2 as (
), __dbt__cte__nested_stream_with_co_3es_partition_data_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__CTE__nested_stream_with_co_3es_partition_data_ab1
-- depends_on: __dbt__cte__nested_stream_with_co_3es_partition_data_ab1
select
_airbyte_partition_hashid,
cast(currency as char(1024)) as currency,
Expand All @@ -80,23 +80,23 @@ select

CURRENT_TIMESTAMP
as _airbyte_normalized_at
from __dbt__CTE__nested_stream_with_co_3es_partition_data_ab1
from __dbt__cte__nested_stream_with_co_3es_partition_data_ab1
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA
where 1 = 1

), __dbt__CTE__nested_stream_with_co_3es_partition_data_ab3 as (
), __dbt__cte__nested_stream_with_co_3es_partition_data_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__CTE__nested_stream_with_co_3es_partition_data_ab2
-- depends_on: __dbt__cte__nested_stream_with_co_3es_partition_data_ab2
select
md5(cast(concat(coalesce(cast(_airbyte_partition_hashid as char), ''), '-', coalesce(cast(currency as char), '')) as char)) as _airbyte_data_hashid,
tmp.*
from __dbt__CTE__nested_stream_with_co_3es_partition_data_ab2 tmp
from __dbt__cte__nested_stream_with_co_3es_partition_data_ab2 tmp
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__CTE__nested_stream_with_co_3es_partition_data_ab3
-- depends_on: __dbt__cte__nested_stream_with_co_3es_partition_data_ab3
select
_airbyte_partition_hashid,
currency,
Expand All @@ -106,7 +106,7 @@ select
CURRENT_TIMESTAMP
as _airbyte_normalized_at,
_airbyte_data_hashid
from __dbt__CTE__nested_stream_with_co_3es_partition_data_ab3
from __dbt__cte__nested_stream_with_co_3es_partition_data_ab3
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA from test_normalization.`nested_stream_with_co___long_names_partition`
where 1 = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
test_normalization.`nested_stream_with_co__ion_double_array_data__dbt_tmp`
as (

with __dbt__CTE__nested_stream_with_co_3double_array_data_ab1 as (
with __dbt__cte__nested_stream_with_co_3double_array_data_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization.`nested_stream_with_co___long_names_partition`
Expand All @@ -20,7 +20,7 @@ with numbers as (
select


p0.generated_number * pow(2, 0)
p0.generated_number * power(2, 0)


+ 1
Expand Down Expand Up @@ -68,10 +68,10 @@ left join joined on _airbyte_partition_hashid = joined._airbyte_hashid
where 1 = 1
and double_array_data is not null

), __dbt__CTE__nested_stream_with_co_3double_array_data_ab2 as (
), __dbt__cte__nested_stream_with_co_3double_array_data_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__CTE__nested_stream_with_co_3double_array_data_ab1
-- depends_on: __dbt__cte__nested_stream_with_co_3double_array_data_ab1
select
_airbyte_partition_hashid,
cast(id as char(1024)) as id,
Expand All @@ -80,23 +80,23 @@ select

CURRENT_TIMESTAMP
as _airbyte_normalized_at
from __dbt__CTE__nested_stream_with_co_3double_array_data_ab1
from __dbt__cte__nested_stream_with_co_3double_array_data_ab1
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data
where 1 = 1

), __dbt__CTE__nested_stream_with_co_3double_array_data_ab3 as (
), __dbt__cte__nested_stream_with_co_3double_array_data_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__CTE__nested_stream_with_co_3double_array_data_ab2
-- depends_on: __dbt__cte__nested_stream_with_co_3double_array_data_ab2
select
md5(cast(concat(coalesce(cast(_airbyte_partition_hashid as char), ''), '-', coalesce(cast(id as char), '')) as char)) as _airbyte_double_array_data_hashid,
tmp.*
from __dbt__CTE__nested_stream_with_co_3double_array_data_ab2 tmp
from __dbt__cte__nested_stream_with_co_3double_array_data_ab2 tmp
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__CTE__nested_stream_with_co_3double_array_data_ab3
-- depends_on: __dbt__cte__nested_stream_with_co_3double_array_data_ab3
select
_airbyte_partition_hashid,
id,
Expand All @@ -106,7 +106,7 @@ select
CURRENT_TIMESTAMP
as _airbyte_normalized_at,
_airbyte_double_array_data_hashid
from __dbt__CTE__nested_stream_with_co_3double_array_data_ab3
from __dbt__cte__nested_stream_with_co_3double_array_data_ab3
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data from test_normalization.`nested_stream_with_co___long_names_partition`
where 1 = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
test_normalization.`nested_stream_with_co___long_names_partition__dbt_tmp`
as (

with __dbt__CTE__nested_stream_with_co_2g_names_partition_ab1 as (
with __dbt__cte__nested_stream_with_co_2g_names_partition_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization.`nested_stream_with_co_1g_into_long_names_scd`
Expand All @@ -24,10 +24,10 @@ from test_normalization.`nested_stream_with_co_1g_into_long_names_scd` as table_
where 1 = 1
and `partition` is not null

), __dbt__CTE__nested_stream_with_co_2g_names_partition_ab2 as (
), __dbt__cte__nested_stream_with_co_2g_names_partition_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__CTE__nested_stream_with_co_2g_names_partition_ab1
-- depends_on: __dbt__cte__nested_stream_with_co_2g_names_partition_ab1
select
_airbyte_nested_strea__nto_long_names_hashid,
double_array_data,
Expand All @@ -37,23 +37,23 @@ select

CURRENT_TIMESTAMP
as _airbyte_normalized_at
from __dbt__CTE__nested_stream_with_co_2g_names_partition_ab1
from __dbt__cte__nested_stream_with_co_2g_names_partition_ab1
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1

), __dbt__CTE__nested_stream_with_co_2g_names_partition_ab3 as (
), __dbt__cte__nested_stream_with_co_2g_names_partition_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__CTE__nested_stream_with_co_2g_names_partition_ab2
-- depends_on: __dbt__cte__nested_stream_with_co_2g_names_partition_ab2
select
md5(cast(concat(coalesce(cast(_airbyte_nested_strea__nto_long_names_hashid as char), ''), '-', coalesce(cast(double_array_data as char), ''), '-', coalesce(cast(`DATA` as char), '')) as char)) as _airbyte_partition_hashid,
tmp.*
from __dbt__CTE__nested_stream_with_co_2g_names_partition_ab2 tmp
from __dbt__cte__nested_stream_with_co_2g_names_partition_ab2 tmp
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__CTE__nested_stream_with_co_2g_names_partition_ab3
-- depends_on: __dbt__cte__nested_stream_with_co_2g_names_partition_ab3
select
_airbyte_nested_strea__nto_long_names_hashid,
double_array_data,
Expand All @@ -64,7 +64,7 @@ select
CURRENT_TIMESTAMP
as _airbyte_normalized_at,
_airbyte_partition_hashid
from __dbt__CTE__nested_stream_with_co_2g_names_partition_ab3
from __dbt__cte__nested_stream_with_co_2g_names_partition_ab3
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition from test_normalization.`nested_stream_with_co_1g_into_long_names_scd`
where 1 = 1

Expand Down
Loading