Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix Snowflake destination normalization to accept any date-time format. #6052

Merged
merged 14 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.44
LABEL io.airbyte.version=0.1.45
yaroslav-dudar marked this conversation as resolved.
Show resolved Hide resolved
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def generate_profile_yaml_file(self, destination_type: DestinationType, test_roo
profiles_config = config_generator.read_json_config(f"../secrets/{destination_type.value.lower()}.json")
# Adapt credential file to look like destination config.json
if destination_type.value == DestinationType.BIGQUERY.value:
credentials = profiles_config
credentials = profiles_config["basic_bigquery_config"]
profiles_config = {
"credentials_json": json.dumps(credentials),
"dataset_id": self.target_schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ select
cast(DATE as
date
) as DATE,
cast(TIMESTAMP_COL as
timestamp with time zone
) as TIMESTAMP_COL,
case
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZH')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
else to_timestamp_tz(TIMESTAMP_COL)
end as TIMESTAMP_COL
,
cast("HKD@spéçiäl & characters" as
float
) as "HKD@spéçiäl & characters",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ select
cast(DATE as
date
) as DATE,
cast(TIMESTAMP_COL as
timestamp with time zone
) as TIMESTAMP_COL,
case
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZH')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
else to_timestamp_tz(TIMESTAMP_COL)
end as TIMESTAMP_COL
,
cast("HKD@spéçiäl & characters" as
float
) as "HKD@spéçiäl & characters",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ select
cast(ID as {{ dbt_utils.type_bigint() }}) as ID,
cast(CURRENCY as {{ dbt_utils.type_string() }}) as CURRENCY,
cast(DATE as {{ type_date() }}) as DATE,
cast(TIMESTAMP_COL as {{ type_timestamp_with_timezone() }}) as TIMESTAMP_COL,
case
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZH')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
else to_timestamp_tz(TIMESTAMP_COL)
end as TIMESTAMP_COL
,
cast({{ adapter.quote('HKD@spéçiäl & characters') }} as {{ dbt_utils.type_float() }}) as {{ adapter.quote('HKD@spéçiäl & characters') }},
cast(HKD_SPECIAL___CHARACTERS as {{ dbt_utils.type_string() }}) as HKD_SPECIAL___CHARACTERS,
cast(NZD as {{ dbt_utils.type_float() }}) as NZD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ select
cast(ID as {{ dbt_utils.type_bigint() }}) as ID,
cast(CURRENCY as {{ dbt_utils.type_string() }}) as CURRENCY,
cast(DATE as {{ type_date() }}) as DATE,
cast(TIMESTAMP_COL as {{ type_timestamp_with_timezone() }}) as TIMESTAMP_COL,
case
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZH')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
else to_timestamp_tz(TIMESTAMP_COL)
end as TIMESTAMP_COL
,
cast({{ adapter.quote('HKD@spéçiäl & characters') }} as {{ dbt_utils.type_float() }}) as {{ adapter.quote('HKD@spéçiäl & characters') }},
cast(HKD_SPECIAL___CHARACTERS as {{ dbt_utils.type_string() }}) as HKD_SPECIAL___CHARACTERS,
cast(NZD as {{ dbt_utils.type_float() }}) as NZD,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?" }}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00Z", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?" }}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "USD": 10.16}}}
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+0000", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 8, "HKD_special___characters": "column name collision?", "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990700, "data": { "id": 1, "currency": "USD", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 10.5, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637991000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 7.02, "HKD_special___characters": "column name collision?"}}}

{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00Z", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+0000", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 8, "HKD_special___characters": "column name collision?", "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990700, "data": { "id": 1, "currency": "USD", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 10.5, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,12 @@ def cast_property_type(self, property_name: str, column_name: str, jinja_column:
elif is_number(definition["type"]):
sql_type = jinja_call("dbt_utils.type_float()")
elif is_timestamp_with_time_zone(definition):
if self.destination_type == DestinationType.SNOWFLAKE:
# snowflake uses case when statement to parse timestamp field
# in this case [cast] operator is not needed as data already converted to timestamp type
return self.generate_snowflake_timestamp_statement(column_name)
sql_type = jinja_call("type_timestamp_with_timezone()")

elif is_date(definition):
sql_type = jinja_call("type_date()")
elif is_string(definition["type"]):
Expand All @@ -446,6 +451,31 @@ def cast_property_type(self, property_name: str, column_name: str, jinja_column:

return f"cast({column_name} as {sql_type}) as {column_name}"

def generate_snowflake_timestamp_statement(self, column_name: str) -> str:
"""
Generates snowflake DB specific timestamp case when statement
"""
formats = [
{"regex": r"\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}", "format": "YYYY-MM-DDTHH24:MI:SSTZHTZM"},
{"regex": r"\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}", "format": "YYYY-MM-DDTHH24:MI:SSTZH"},
{
"regex": r"\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}",
"format": "YYYY-MM-DDTHH24:MI:SS.FFTZHTZM",
},
{"regex": r"\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}", "format": "YYYY-MM-DDTHH24:MI:SS.FFTZH"},
]
template = Template(
"""
case
{% for format_item in formats %}
when {{column_name}} regexp '{{format_item['regex']}}' then to_timestamp_tz({{column_name}}, '{{format_item['format']}}')
{% endfor %}
else to_timestamp_tz({{column_name}})
end as {{column_name}}
"""
)
return template.render(formats=formats, column_name=column_name)

def generate_id_hashing_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str:

template = Template(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class);

public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.44";
public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.45";

private final DestinationType destinationType;
private final ProcessFactory processFactory;
Expand Down
3 changes: 2 additions & 1 deletion docs/understanding-airbyte/basic-normalization.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ Most modern data warehouses have name lengths limits on the longer side, so this
However, in the rare cases where these limits are reached:

1. Truncate only the `Json path` to fit into destination's character limits
2. Truncate the `Json path` to at least the 10 first characters, then truncate the nested column name starting in the middle to preserve prefix/suffix substrings intact \(whenever a truncate in the middle is made, two '\_\_' characters are also inserted to denote where it happened\) to fit into destination's character limits
2. Truncate the `Json path` to at least the 10 first characters, then truncate the nested column name starting in the middle to preserve prefix/suffix substrings intact \(whenever a truncate in the middle is made, two '\_\_' characters are also inserted to denote where it happened\) to fit into destination's character limits

As an example from the hubspot source, we could have the following tables with nested columns:

Expand Down Expand Up @@ -294,6 +294,7 @@ Therefore, in order to "upgrade" to the desired normalization version, you need

| Airbyte Version | Normalization Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- | :--- |
| 0.29.17-alpha | 0.1.45 | 2021-09-18 | [#6052](https://github.com/airbytehq/airbyte/pull/6052) | Snowflake: accept any date-time format |
| 0.29.8-alpha | 0.1.40 | 2021-08-18 | [#5433](https://github.com/airbytehq/airbyte/pull/5433) | Allow optional credentials_json for BigQuery |
| 0.29.5-alpha | 0.1.39 | 2021-08-11 | [#4557](https://github.com/airbytehq/airbyte/pull/4557) | Handle date times and solve conflict name btw stream/field |
| 0.28.2-alpha | 0.1.38 | 2021-07-28 | [#5027](https://github.com/airbytehq/airbyte/pull/5027) | Handle quotes in column names when parsing JSON blob |
Expand Down