Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 MySQL destination: normalization #4163

Merged
merged 46 commits into from
Jul 4, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
55752a9
Add mysql dbt package
tuliren Jun 16, 2021
049cfa2
Add mysql normalization support in java
tuliren Jun 16, 2021
9165e80
Add mysql normalization support in python
tuliren Jun 16, 2021
94c10be
Fix unit tests
tuliren Jun 16, 2021
173ad67
Update readme
tuliren Jun 16, 2021
3d9cbf7
Setup mysql container in integration test
tuliren Jun 18, 2021
9e8e939
Add macros
tuliren Jun 18, 2021
47945bf
Depend on dbt-mysql from git repo
tuliren Jun 18, 2021
7f04076
Remove mysql limitation test
tuliren Jun 18, 2021
b9e3066
Test normalization
tuliren Jun 18, 2021
f13a95b
Revert protocol format change
tuliren Jun 18, 2021
dee3479
Fix mysel json macros
tuliren Jun 18, 2021
abe8eb7
Fix two more macros
tuliren Jun 22, 2021
a6acc91
Fix table name length
tuliren Jun 22, 2021
bbc3905
Fix array macro
tuliren Jun 23, 2021
51fcb2a
Fix equality test macro
tuliren Jun 23, 2021
5cbe550
Update replace-identifiers
tuliren Jun 23, 2021
2396590
Merge branch 'master' into liren/mysql-destination-normalization
tuliren Jun 25, 2021
b7fe9cc
Add more identifiers to replace
tuliren Jun 27, 2021
4a50b25
Fix unnest macro
tuliren Jun 27, 2021
5c6642a
Fix equality macro
tuliren Jun 28, 2021
b71bd6d
Check in mysql test output
tuliren Jun 28, 2021
5dc1cbb
Merge branch 'master' into liren/mysql-destination-normalization
tuliren Jun 28, 2021
b681fd0
Update column limit test for mysql
tuliren Jun 28, 2021
02a8670
Escape parentheses
tuliren Jun 28, 2021
ffa5789
Remove unnecessary mysql test
tuliren Jun 28, 2021
0cf0a17
Remove mysql output for easier code review
tuliren Jun 28, 2021
411425f
Remove unnecessary mysql test
tuliren Jun 28, 2021
c43d06e
Remove parentheses
tuliren Jun 28, 2021
091f137
Update dependencies
tuliren Jun 28, 2021
78e030c
Skip mysql instead of manually write out types
tuliren Jun 28, 2021
e89122d
Bump version
tuliren Jul 1, 2021
ebf5cc4
Merge branch 'master' into liren/mysql-destination-normalization
tuliren Jul 1, 2021
f94b1c8
Check in unit test for mysql name transformer
tuliren Jul 1, 2021
9310950
Fix type conversion
tuliren Jul 1, 2021
1c68996
Use json_value to extract scalar json fields
tuliren Jul 1, 2021
4e2a0dd
Move dbt-mysql to Dockerfile (#4459)
ChristopheDuong Jul 1, 2021
5c49cc3
Format code
tuliren Jul 1, 2021
46cab59
Check in mysql dbt output
tuliren Jul 1, 2021
aefc479
Remove unnecessary quote
tuliren Jul 1, 2021
84e6e1c
Update mysql equality test to match 0.19.0
tuliren Jul 1, 2021
0202369
Check in schema_test update
tuliren Jul 1, 2021
967e1d8
Update readme
tuliren Jul 4, 2021
73a1d25
Merge branch 'master' into liren/mysql-destination-normalization
tuliren Jul 4, 2021
f972e4b
Bump base normalization version
tuliren Jul 4, 2021
289ba24
Update document
tuliren Jul 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-integrations/bases/base-normalization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ allowed characters, if quotes are needed or not, and the length limitations:
- [postgres](../../../docs/integrations/destinations/postgres.md)
- [redshift](../../../docs/integrations/destinations/redshift.md)
- [snowflake](../../../docs/integrations/destinations/snowflake.md)
- [mysql](../../../docs/integrations/destinations/mysql.md)

Rules about truncations, for example for both of these strings which are too long for the postgres 64 limit:
- `Aaaa_Bbbb_Cccc_Dddd_Eeee_Ffff_Gggg_Hhhh_Iiii`
Expand Down
1 change: 1 addition & 0 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
dependsOn ':airbyte-integrations:connectors:destination-postgres:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-redshift:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker'
}

integrationTest.dependsOn("customIntegrationTestPython")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
) as _airbyte_nested_data
{%- endmacro %}

{% macro mysql__cross_join_unnest(stream_name, array_col) -%}
left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid
{%- endmacro %}

{% macro redshift__cross_join_unnest(stream_name, array_col) -%}
left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid
{%- endmacro %}
Expand Down Expand Up @@ -97,3 +101,37 @@ joined as (
where numbers.generated_number <= json_array_length({{ column_col }}, true)
)
{%- endmacro %}

{% macro mysql__unnest_cte(table_name, stream_name, column_col) -%}
{%- if not execute -%}
{{ return('') }}
{% endif %}

{%- call statement('max_json_array_length', fetch_result=True) -%}
with max_value as (
select max(json_length({{ column_col }})) as max_number_of_items
from {{ ref(table_name) }}
)
select
case when max_number_of_items is not null and max_number_of_items > 1
then max_number_of_items
else 1 end as max_number_of_items
from max_value
{%- endcall -%}

{%- set max_length = load_result('max_json_array_length') -%}
with numbers as (
{{ dbt_utils.generate_series(max_length["data"][0][0]) }}
),
joined as (
select
_airbyte_{{ stream_name }}_hashid as _airbyte_hashid,
{# -- json_extract(column_col, '$[i]') as _airbyte_nested_data #}
json_extract({{ column_col }}, concat("'$[", numbers.generated_number - 1, "]'")) as _airbyte_nested_data
from {{ ref(table_name) }}
cross join numbers
-- only generate the number of records in the cross join that corresponds
-- to the number of items in {{ table_name }}.{{ column_col }}
where numbers.generated_number <= json_length({{ column_col }})
)
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,11 @@
{% macro snowflake__type_json() %}
variant
{% endmacro %}

{%- macro mysql__type_json() -%}
json
{%- endmacro -%}

{%- macro mysql__type_string() -%}
char
{%- endmacro -%}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro mysql__except() %}
{% do exceptions.warn("MySQL does not support EXCEPT operator") %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
Adapter Macros for the following functions:
- Bigquery: JSON_EXTRACT(json_string_expr, json_path_format) -> https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions
- Snowflake: JSON_EXTRACT_PATH_TEXT( <column_identifier> , '<path_name>' ) -> https://docs.snowflake.com/en/sql-reference/functions/json_extract_path_text.html
- Redshift: json_extract_path_text('json_string', 'path_elem' [,'path_elem'[, ] ] [, null_if_invalid ] ) -> https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html
- Redshift: json_extract_path_text('json_string', 'path_elem' [,'path_elem'[, ...] ] [, null_if_invalid ] ) -> https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html
- Postgres: json_extract_path_text(<from_json>, 'path' [, 'path' [, ...}}) -> https://www.postgresql.org/docs/12/functions-json.html
- MySQL: JSON_EXTRACT(json_doc, 'path' [, 'path'] ...) -> https://dev.mysql.com/doc/refman/8.0/en/json-search-functions.html
#}

{# format_json_path -------------------------------------------------- #}
Expand All @@ -23,6 +24,11 @@
{{ "'" ~ json_path_list|join("','") ~ "'" }}
{%- endmacro %}

{% macro mysql__format_json_path(json_path_list) -%}
{# -- '$."x"."y"."z"' #}
{{ "'$.\"" ~ json_path_list|join(".") ~ "\"'" }}
{%- endmacro %}

{% macro redshift__format_json_path(json_path_list) -%}
{{ "'" ~ json_path_list|join("','") ~ "'" }}
{%- endmacro %}
Expand All @@ -49,6 +55,10 @@
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract(json_column, json_path_list) -%}
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract(json_column, json_path_list) -%}
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{%- endmacro %}
Expand All @@ -75,6 +85,10 @@
jsonb_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract_scalar(json_column, json_path_list) -%}
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract_scalar(json_column, json_path_list) -%}
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{%- endmacro %}
Expand All @@ -101,6 +115,10 @@
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract_array(json_column, json_path_list) -%}
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract_array(json_column, json_path_list) -%}
json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true)
{%- endmacro %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,8 @@
{% macro redshift__cast_to_boolean(field) -%}
cast(decode({{ field }}, 'true', '1', 'false', '0')::integer as boolean)
{%- endmacro %}

{# cast_to_bigint ------------------------------------------------- #}
{% macro mysql__type_bigint() %}
tuliren marked this conversation as resolved.
Show resolved Hide resolved
signed
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{#
-- Adapted from https://github.com/fishtown-analytics/dbt-utils/blob/master/macros/schema_tests/equality.sql
-- This is needed because MySQL does not support the EXCEPT operator!
#}

{% macro mysql__test_equality(model, compare_model, compare_columns=None) %}

{% set set_diff %}
count(*) + coalesce(abs(
sum(case when which_diff = 'a_minus_b' then 1 else 0 end) -
sum(case when which_diff = 'b_minus_a' then 1 else 0 end)
), 0)
{% endset %}

{{ config(fail_calc = set_diff) }}

{%- if not execute -%}
{{ return('') }}
{% endif %}

{%- do dbt_utils._is_relation(model, 'test_equality') -%}

{%- if not compare_columns -%}
{%- do dbt_utils._is_ephemeral(model, 'test_equality') -%}
{%- set compare_columns = adapter.get_columns_in_relation(model) | map(attribute='quoted') -%}
{%- endif -%}

{% set compare_cols_csv = compare_columns | join(', ') %}

with a as (
select * from {{ model }}
),

b as (
select * from {{ compare_model }}
),

a_minus_b as (
select {{ compare_cols_csv }} from a
where ({{ compare_cols_csv }}) not in
(select {{ compare_cols_csv }} from b)
),

b_minus_a as (
select {{ compare_cols_csv }} from b
where ({{ compare_cols_csv }}) not in
(select {{ compare_cols_csv }} from a)
),

unioned as (
select 'a_minus_b' as which_diff from a_minus_b
union all
select 'b_minus_a' as which_diff from b_minus_a
)

select * from unioned

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,17 @@
class DbtIntegrationTest(object):
def __init__(self):
self.target_schema = "test_normalization"
self.container_name = "test_normalization_db_" + self.random_string(3)
self.container_prefix = f"test_normalization_db_{self.random_string(3)}"
self.db_names = ["postgres", "mysql"]

@staticmethod
def random_string(length: int) -> str:
return "".join(random.choice(string.ascii_lowercase) for i in range(length))

def setup_db(self):
self.setup_postgres_db()
self.setup_mysql_db()

def setup_postgres_db(self):
print("Starting localhost postgres container for tests")
port = self.find_free_port()
Expand All @@ -64,7 +69,7 @@ def setup_postgres_db(self):
"run",
"--rm",
"--name",
f"{self.container_name}",
f"{self.container_prefix}_postgres",
"-e",
f"POSTGRES_USER={config['username']}",
"-e",
Expand All @@ -81,6 +86,42 @@ def setup_postgres_db(self):
with open("../secrets/postgres.json", "w") as fh:
fh.write(json.dumps(config))

def setup_mysql_db(self):
print("Starting localhost mysql container for tests")
port = self.find_free_port()
config = {
"type": "mysql",
"host": "localhost",
"port": port,
"database": self.target_schema,
"username": "root",
"password": "",
}
commands = [
"docker",
"run",
"--rm",
"--name",
f"{self.container_prefix}_mysql",
"-e",
"MYSQL_ALLOW_EMPTY_PASSWORD=yes",
"-e",
"MYSQL_INITDB_SKIP_TZINFO=yes",
"-e",
f"MYSQL_DATABASE={config['database']}",
"-p",
f"{config['port']}:3306",
"-d",
"mysql",
]
print("Executing: ", " ".join(commands))
subprocess.call(commands)

if not os.path.exists("../secrets"):
os.makedirs("../secrets")
with open("../secrets/mysql.json", "w") as fh:
fh.write(json.dumps(config))

@staticmethod
def find_free_port():
"""
Expand All @@ -92,12 +133,13 @@ def find_free_port():
s.close()
return addr[1]

def tear_down_postgres_db(self):
print("Stopping localhost postgres container for tests")
try:
subprocess.call(["docker", "kill", f"{self.container_name}"])
except Exception as e:
print(f"WARN: Exception while shutting down postgres db: {e}")
def tear_down_db(self):
for db_name in self.db_names:
print(f"Stopping localhost {db_name} container for tests")
try:
subprocess.call(["docker", "kill", f"{self.container_prefix}_{db_name}"])
except Exception as e:
print(f"WARN: Exception while shutting down {db_name}: {e}")

@staticmethod
def change_current_test_dir(request):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,23 @@
{ "#quote: true in postgres": "quote: true" }
],
"snowflake": [{ "HKD@SPÉÇIÄL & CHARACTERS": "HKD@spéçiäl & characters" }],
"redshift": []
"redshift": [],
"mysql": [
{
"nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data": "nested_stream_with_co__ion_double_array_data"
},
{
"nested_stream_with_complex_columns_resulting_into_long_names_partition_data": "nested_stream_with_co___names_partition_data"
},
{
"nested_stream_with_complex_columns_resulting_into_long_names_partition": "nested_stream_with_co___long_names_partition"
},
{
"'nested_stream_with_complex_columns_resulting_into_long_names'": "'nested_stream_with_co__lting_into_long_names'"
},
{
"'non_nested_stream_without_namespace_resulting_into_long_names'": "'non_nested_stream_wit__lting_into_long_names'"
},
{ "#quote: true in mysql": "quote: true" }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ models:
- name: exchange_rate
tests:
- dbt_utils.equality:
description: check_streams_are_equal
In this integration test, we are sending the same records to both streams
exchange_rate and dedup_exchange_rate.
The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror
the final table with append or overwrite mode from exchange_rate.
# description: check_streams_are_equal
# In this integration test, we are sending the same records to both streams
# exchange_rate and dedup_exchange_rate.
# The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror
# the final table with append or overwrite mode from exchange_rate.
tuliren marked this conversation as resolved.
Show resolved Hide resolved
compare_model: ref('dedup_exchange_rate_scd')
compare_columns:
- id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
@pytest.fixture(scope="module", autouse=True)
def before_all_tests(request):
dbt_test_utils.change_current_test_dir(request)
dbt_test_utils.setup_postgres_db()
dbt_test_utils.setup_db()
os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"]
yield
dbt_test_utils.tear_down_postgres_db()
dbt_test_utils.tear_down_db()
for folder in temporary_folders:
print(f"Deleting temporary test folder {folder}")
shutil.rmtree(folder, ignore_errors=True)
Expand Down Expand Up @@ -81,6 +81,7 @@ def test_destination_supported_limits(integration_type: DestinationType, column_
"Operation failed because soft limit on objects of type 'Column' per table was exceeded.",
),
("Redshift", 1665, "target lists can have at most 1664 entries"),
# MySQL allows upto 4096 columns, and is not worth testing
],
)
def test_destination_failure_over_limits(integration_type: str, column_count: int, expected_exception_message: str, setup_test_path):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@
@pytest.fixture(scope="module", autouse=True)
def before_all_tests(request):
dbt_test_utils.change_current_test_dir(request)
dbt_test_utils.setup_postgres_db()
dbt_test_utils.setup_db()
os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"]
yield
dbt_test_utils.tear_down_postgres_db()
dbt_test_utils.tear_down_db()
for folder in temporary_folders:
print(f"Deleting temporary test folder {folder}")
shutil.rmtree(folder, ignore_errors=True)
Expand All @@ -75,7 +75,6 @@ def setup_test_path(request):
),
)
@pytest.mark.parametrize("destination_type", list(DestinationType))
# @pytest.mark.parametrize("destination_type", [DestinationType.POSTGRES])
def test_normalization(destination_type: DestinationType, test_resource_name: str, setup_test_path):
print("Testing normalization")
integration_type = destination_type.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class DestinationType(Enum):
POSTGRES = "postgres"
REDSHIFT = "redshift"
SNOWFLAKE = "snowflake"
MYSQL = "mysql"

@classmethod
def from_string(cls, string_value: str) -> "DestinationType":
Expand Down
Loading