Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 MySQL destination: normalization #4163

Merged
merged 46 commits into from
Jul 4, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
55752a9
Add mysql dbt package
tuliren Jun 16, 2021
049cfa2
Add mysql normalization support in java
tuliren Jun 16, 2021
9165e80
Add mysql normalization support in python
tuliren Jun 16, 2021
94c10be
Fix unit tests
tuliren Jun 16, 2021
173ad67
Update readme
tuliren Jun 16, 2021
3d9cbf7
Setup mysql container in integration test
tuliren Jun 18, 2021
9e8e939
Add macros
tuliren Jun 18, 2021
47945bf
Depend on dbt-mysql from git repo
tuliren Jun 18, 2021
7f04076
Remove mysql limitation test
tuliren Jun 18, 2021
b9e3066
Test normalization
tuliren Jun 18, 2021
f13a95b
Revert protocol format change
tuliren Jun 18, 2021
dee3479
Fix mysel json macros
tuliren Jun 18, 2021
abe8eb7
Fix two more macros
tuliren Jun 22, 2021
a6acc91
Fix table name length
tuliren Jun 22, 2021
bbc3905
Fix array macro
tuliren Jun 23, 2021
51fcb2a
Fix equality test macro
tuliren Jun 23, 2021
5cbe550
Update replace-identifiers
tuliren Jun 23, 2021
2396590
Merge branch 'master' into liren/mysql-destination-normalization
tuliren Jun 25, 2021
b7fe9cc
Add more identifiers to replace
tuliren Jun 27, 2021
4a50b25
Fix unnest macro
tuliren Jun 27, 2021
5c6642a
Fix equality macro
tuliren Jun 28, 2021
b71bd6d
Check in mysql test output
tuliren Jun 28, 2021
5dc1cbb
Merge branch 'master' into liren/mysql-destination-normalization
tuliren Jun 28, 2021
b681fd0
Update column limit test for mysql
tuliren Jun 28, 2021
02a8670
Escape parentheses
tuliren Jun 28, 2021
ffa5789
Remove unnecessary mysql test
tuliren Jun 28, 2021
0cf0a17
Remove mysql output for easier code review
tuliren Jun 28, 2021
411425f
Remove unnecessary mysql test
tuliren Jun 28, 2021
c43d06e
Remove parentheses
tuliren Jun 28, 2021
091f137
Update dependencies
tuliren Jun 28, 2021
78e030c
Skip mysql instead of manually write out types
tuliren Jun 28, 2021
e89122d
Bump version
tuliren Jul 1, 2021
ebf5cc4
Merge branch 'master' into liren/mysql-destination-normalization
tuliren Jul 1, 2021
f94b1c8
Check in unit test for mysql name transformer
tuliren Jul 1, 2021
9310950
Fix type conversion
tuliren Jul 1, 2021
1c68996
Use json_value to extract scalar json fields
tuliren Jul 1, 2021
4e2a0dd
Move dbt-mysql to Dockerfile (#4459)
ChristopheDuong Jul 1, 2021
5c49cc3
Format code
tuliren Jul 1, 2021
46cab59
Check in mysql dbt output
tuliren Jul 1, 2021
aefc479
Remove unnecessary quote
tuliren Jul 1, 2021
84e6e1c
Update mysql equality test to match 0.19.0
tuliren Jul 1, 2021
0202369
Check in schema_test update
tuliren Jul 1, 2021
967e1d8
Update readme
tuliren Jul 4, 2021
73a1d25
Merge branch 'master' into liren/mysql-destination-normalization
tuliren Jul 4, 2021
f972e4b
Bump base normalization version
tuliren Jul 4, 2021
289ba24
Update document
tuliren Jul 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-integrations/bases/base-normalization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ allowed characters, if quotes are needed or not, and the length limitations:
- [postgres](../../../docs/integrations/destinations/postgres.md)
- [redshift](../../../docs/integrations/destinations/redshift.md)
- [snowflake](../../../docs/integrations/destinations/snowflake.md)
- [mysql](../../../docs/integrations/destinations/mysql.md)

Rules about truncations, for example for both of these strings which are too long for the postgres 64 limit:
- `Aaaa_Bbbb_Cccc_Dddd_Eeee_Ffff_Gggg_Hhhh_Iiii`
Expand Down
1 change: 1 addition & 0 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
dependsOn ':airbyte-integrations:connectors:destination-postgres:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-redshift:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker'
}

integrationTest.dependsOn("customIntegrationTestPython")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@
) as _airbyte_nested_data
{%- endmacro %}

{% macro mysql__cross_join_unnest(stream_name, array_col) -%}
cross join json_table(
{{ array_col }}
'$[*]'
COLUMNS(
"{{ array_col }}_col"
json
path '$'
)
) as airbyte_nested_data
{%- endmacro %}

{% macro redshift__cross_join_unnest(stream_name, array_col) -%}
left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid
{%- endmacro %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,11 @@
{% macro snowflake__type_json() %}
variant
{% endmacro %}

{%- macro mysql__type_json() -%}
json
{%- endmacro -%}

{%- macro mysql__type_string() -%}
char
{%- endmacro -%}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
Adapter Macros for the following functions:
- Bigquery: JSON_EXTRACT(json_string_expr, json_path_format) -> https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions
- Snowflake: JSON_EXTRACT_PATH_TEXT( <column_identifier> , '<path_name>' ) -> https://docs.snowflake.com/en/sql-reference/functions/json_extract_path_text.html
- Redshift: json_extract_path_text('json_string', 'path_elem' [,'path_elem'[, ] ] [, null_if_invalid ] ) -> https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html
- Redshift: json_extract_path_text('json_string', 'path_elem' [,'path_elem'[, ...] ] [, null_if_invalid ] ) -> https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html
- Postgres: json_extract_path_text(<from_json>, 'path' [, 'path' [, ...}}) -> https://www.postgresql.org/docs/12/functions-json.html
- MySQL: JSON_EXTRACT(json_doc, 'path' [, 'path'] ...) -> https://dev.mysql.com/doc/refman/8.0/en/json-search-functions.html
#}

{# format_json_path -------------------------------------------------- #}
Expand All @@ -23,6 +24,10 @@
{{ "'" ~ json_path_list|join("','") ~ "'" }}
{%- endmacro %}

{% macro mysql__format_json_path(json_path_list) -%}
{{ "\"$." ~ json_path_list|join('.') ~ "\"" }}
{%- endmacro %}

{% macro redshift__format_json_path(json_path_list) -%}
{{ "'" ~ json_path_list|join("','") ~ "'" }}
{%- endmacro %}
Expand All @@ -49,6 +54,10 @@
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract(json_column, json_path_list) -%}
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract(json_column, json_path_list) -%}
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{%- endmacro %}
Expand All @@ -75,6 +84,10 @@
jsonb_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract_scalar(json_column, json_path_list) -%}
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract_scalar(json_column, json_path_list) -%}
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{%- endmacro %}
Expand All @@ -101,6 +114,10 @@
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract_array(json_column, json_path_list) -%}
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract_array(json_column, json_path_list) -%}
json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true)
{%- endmacro %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,17 @@
class DbtIntegrationTest(object):
def __init__(self):
self.target_schema = "test_normalization"
self.container_name = "test_normalization_db_" + self.random_string(3)
self.container_prefix = f"test_normalization_db_{self.random_string(3)}"
self.db_names = ["postgres", "mysql"]

@staticmethod
def random_string(length: int) -> str:
return "".join(random.choice(string.ascii_lowercase) for i in range(length))

def setup_db(self):
self.setup_postgres_db()
self.setup_mysql_db()

def setup_postgres_db(self):
print("Starting localhost postgres container for tests")
port = self.find_free_port()
Expand All @@ -64,7 +69,7 @@ def setup_postgres_db(self):
"run",
"--rm",
"--name",
f"{self.container_name}",
f"{self.container_prefix}_postgres",
"-e",
f"POSTGRES_USER={config['username']}",
"-e",
Expand All @@ -81,6 +86,42 @@ def setup_postgres_db(self):
with open("../secrets/postgres.json", "w") as fh:
fh.write(json.dumps(config))

def setup_mysql_db(self):
print("Starting localhost mysql container for tests")
port = self.find_free_port()
config = {
"type": "mysql",
"host": "localhost",
"port": port,
"database": self.target_schema,
"username": "root",
"password": "",
}
commands = [
"docker",
"run",
"--rm",
"--name",
f"{self.container_prefix}_mysql",
"-e",
"MYSQL_ALLOW_EMPTY_PASSWORD=yes",
"-e",
"MYSQL_INITDB_SKIP_TZINFO=yes",
"-e",
f"MYSQL_DATABASE={config['database']}",
"-p",
f"{config['port']}:3306",
"-d",
"mysql",
]
print("Executing: ", " ".join(commands))
subprocess.call(commands)

if not os.path.exists("../secrets"):
os.makedirs("../secrets")
with open("../secrets/mysql.json", "w") as fh:
fh.write(json.dumps(config))

@staticmethod
def find_free_port():
"""
Expand All @@ -92,12 +133,13 @@ def find_free_port():
s.close()
return addr[1]

def tear_down_postgres_db(self):
print("Stopping localhost postgres container for tests")
try:
subprocess.call(["docker", "kill", f"{self.container_name}"])
except Exception as e:
print(f"WARN: Exception while shutting down postgres db: {e}")
def tear_down_db(self):
for db_name in self.db_names:
print(f"Stopping localhost {db_name} container for tests")
try:
subprocess.call(["docker", "kill", f"{self.container_prefix}_{db_name}"])
except Exception as e:
print(f"WARN: Exception while shutting down {db_name}: {e}")

@staticmethod
def change_current_test_dir(request):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
@pytest.fixture(scope="module", autouse=True)
def before_all_tests(request):
dbt_test_utils.change_current_test_dir(request)
dbt_test_utils.setup_postgres_db()
dbt_test_utils.setup_db()
os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"]
yield
dbt_test_utils.tear_down_postgres_db()
dbt_test_utils.tear_down_db()
for folder in temporary_folders:
print(f"Deleting temporary test folder {folder}")
shutil.rmtree(folder, ignore_errors=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@
@pytest.fixture(scope="module", autouse=True)
def before_all_tests(request):
dbt_test_utils.change_current_test_dir(request)
dbt_test_utils.setup_postgres_db()
dbt_test_utils.setup_db()
os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"]
yield
dbt_test_utils.tear_down_postgres_db()
dbt_test_utils.tear_down_db()
for folder in temporary_folders:
print(f"Deleting temporary test folder {folder}")
shutil.rmtree(folder, ignore_errors=True)
Expand All @@ -75,7 +75,6 @@ def setup_test_path(request):
),
)
@pytest.mark.parametrize("destination_type", list(DestinationType))
# @pytest.mark.parametrize("destination_type", [DestinationType.POSTGRES])
def test_normalization(destination_type: DestinationType, test_resource_name: str, setup_test_path):
print("Testing normalization")
integration_type = destination_type.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class DestinationType(Enum):
POSTGRES = "postgres"
REDSHIFT = "redshift"
SNOWFLAKE = "snowflake"
MYSQL = "mysql"

@classmethod
def from_string(cls, string_value: str) -> "DestinationType":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@
from normalization.transform_catalog.utils import jinja_call

DESTINATION_SIZE_LIMITS = {
# https://cloud.google.com/bigquery/quotas#all_tables
DestinationType.BIGQUERY.value: 1024,
# https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html
DestinationType.SNOWFLAKE.value: 255,
# https://docs.aws.amazon.com/redshift/latest/dg/r_names.html
DestinationType.REDSHIFT.value: 127,
# https://www.postgresql.org/docs/12/limits.html
DestinationType.POSTGRES.value: 63,
# https://dev.mysql.com/doc/refman/8.0/en/identifier-length.html
DestinationType.MYSQL.value: 64,
}

# DBT also needs to generate suffix to table names, so we need to make sure it has enough characters to do so...
Expand Down Expand Up @@ -176,6 +182,9 @@ def __normalize_identifier_case(self, input_name: str, is_quoted: bool = False)
elif self.destination_type.value == DestinationType.SNOWFLAKE.value:
if not is_quoted and not self.needs_quotes(input_name):
result = input_name.upper()
elif self.destination_type.value == DestinationType.MYSQL.value:
if not is_quoted and not self.needs_quotes(input_name):
result = input_name.lower()
else:
raise KeyError(f"Unknown destination type {self.destination_type}")
return result
Expand Down
Loading