Skip to content

Commit

Permalink
🐛 Fix Snowflake destination normalization to accept any date-time for…
Browse files Browse the repository at this point in the history
…mat. (#6052)

snowflake date-time format parser
  • Loading branch information
yaroslav-dudar authored Sep 23, 2021
1 parent c183410 commit a6ecfda
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 20 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.44
LABEL io.airbyte.version=0.1.45
LABEL io.airbyte.name=airbyte/normalization
1 change: 1 addition & 0 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
}

integrationTest.dependsOn("customIntegrationTestPython")
customIntegrationTests.dependsOn("customIntegrationTestPython")

// TODO fix and use https://github.com/airbytehq/airbyte/issues/3192 instead
task('mypyCheck', type: PythonTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def generate_profile_yaml_file(self, destination_type: DestinationType, test_roo
profiles_config = config_generator.read_json_config(f"../secrets/{destination_type.value.lower()}.json")
# Adapt credential file to look like destination config.json
if destination_type.value == DestinationType.BIGQUERY.value:
credentials = profiles_config
credentials = profiles_config["basic_bigquery_config"]
profiles_config = {
"credentials_json": json.dumps(credentials),
"dataset_id": self.target_schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ select
cast(DATE as
date
) as DATE,
cast(TIMESTAMP_COL as
timestamp with time zone
) as TIMESTAMP_COL,
case
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZH')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
else to_timestamp_tz(TIMESTAMP_COL)
end as TIMESTAMP_COL
,
cast("HKD@spéçiäl & characters" as
float
) as "HKD@spéçiäl & characters",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ select
cast(DATE as
date
) as DATE,
cast(TIMESTAMP_COL as
timestamp with time zone
) as TIMESTAMP_COL,
case
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZH')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
else to_timestamp_tz(TIMESTAMP_COL)
end as TIMESTAMP_COL
,
cast("HKD@spéçiäl & characters" as
float
) as "HKD@spéçiäl & characters",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ select
cast(ID as {{ dbt_utils.type_bigint() }}) as ID,
cast(CURRENCY as {{ dbt_utils.type_string() }}) as CURRENCY,
cast(DATE as {{ type_date() }}) as DATE,
cast(TIMESTAMP_COL as {{ type_timestamp_with_timezone() }}) as TIMESTAMP_COL,
case
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZH')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
else to_timestamp_tz(TIMESTAMP_COL)
end as TIMESTAMP_COL
,
cast({{ adapter.quote('HKD@spéçiäl & characters') }} as {{ dbt_utils.type_float() }}) as {{ adapter.quote('HKD@spéçiäl & characters') }},
cast(HKD_SPECIAL___CHARACTERS as {{ dbt_utils.type_string() }}) as HKD_SPECIAL___CHARACTERS,
cast(NZD as {{ dbt_utils.type_float() }}) as NZD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ select
cast(ID as {{ dbt_utils.type_bigint() }}) as ID,
cast(CURRENCY as {{ dbt_utils.type_string() }}) as CURRENCY,
cast(DATE as {{ type_date() }}) as DATE,
cast(TIMESTAMP_COL as {{ type_timestamp_with_timezone() }}) as TIMESTAMP_COL,
case
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SSTZH')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
when TIMESTAMP_COL regexp '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}' then to_timestamp_tz(TIMESTAMP_COL, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
else to_timestamp_tz(TIMESTAMP_COL)
end as TIMESTAMP_COL
,
cast({{ adapter.quote('HKD@spéçiäl & characters') }} as {{ dbt_utils.type_float() }}) as {{ adapter.quote('HKD@spéçiäl & characters') }},
cast(HKD_SPECIAL___CHARACTERS as {{ dbt_utils.type_string() }}) as HKD_SPECIAL___CHARACTERS,
cast(NZD as {{ dbt_utils.type_float() }}) as NZD,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?" }}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00Z", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?" }}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+0000", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 8, "HKD_special___characters": "column name collision?", "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990700, "data": { "id": 1, "currency": "USD", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 10.5, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637991000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 7.02, "HKD_special___characters": "column name collision?"}}}

{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00Z", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "NZD": 1.14, "HKD@spéçiäl & characters": 2.13, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 7.15, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "NZD": 3.89, "HKD@spéçiäl & characters": 7.12, "HKD_special___characters": "column name collision?", "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31", "timestamp_col": "2020-08-31T00:00:00+0000", "NZD": 1.14, "HKD@spéçiäl & characters": 7.99, "HKD_special___characters": "column name collision?", "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 8, "HKD_special___characters": "column name collision?", "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990700, "data": { "id": 1, "currency": "USD", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD@spéçiäl & characters": 10.5, "HKD_special___characters": "column name collision?"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01", "timestamp_col": "2020-09-01T00:00:00Z", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?"}}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,12 @@ def cast_property_type(self, property_name: str, column_name: str, jinja_column:
elif is_number(definition["type"]):
sql_type = jinja_call("dbt_utils.type_float()")
elif is_timestamp_with_time_zone(definition):
if self.destination_type == DestinationType.SNOWFLAKE:
# snowflake uses case when statement to parse timestamp field
# in this case [cast] operator is not needed as data already converted to timestamp type
return self.generate_snowflake_timestamp_statement(column_name)
sql_type = jinja_call("type_timestamp_with_timezone()")

elif is_date(definition):
sql_type = jinja_call("type_date()")
elif is_string(definition["type"]):
Expand All @@ -446,6 +451,31 @@ def cast_property_type(self, property_name: str, column_name: str, jinja_column:

return f"cast({column_name} as {sql_type}) as {column_name}"

def generate_snowflake_timestamp_statement(self, column_name: str) -> str:
"""
Generates snowflake DB specific timestamp case when statement
"""
formats = [
{"regex": r"\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}", "format": "YYYY-MM-DDTHH24:MI:SSTZHTZM"},
{"regex": r"\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}", "format": "YYYY-MM-DDTHH24:MI:SSTZH"},
{
"regex": r"\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}",
"format": "YYYY-MM-DDTHH24:MI:SS.FFTZHTZM",
},
{"regex": r"\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}", "format": "YYYY-MM-DDTHH24:MI:SS.FFTZH"},
]
template = Template(
"""
case
{% for format_item in formats %}
when {{column_name}} regexp '{{format_item['regex']}}' then to_timestamp_tz({{column_name}}, '{{format_item['format']}}')
{% endfor %}
else to_timestamp_tz({{column_name}})
end as {{column_name}}
"""
)
return template.render(formats=formats, column_name=column_name)

def generate_id_hashing_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str:

template = Template(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class);

public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.44";
public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.45";

private final DestinationType destinationType;
private final ProcessFactory processFactory;
Expand Down
3 changes: 2 additions & 1 deletion docs/understanding-airbyte/basic-normalization.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ Most modern data warehouses have name lengths limits on the longer side, so this
However, in the rare cases where these limits are reached:

1. Truncate only the `Json path` to fit into destination's character limits
2. Truncate the `Json path` to at least the 10 first characters, then truncate the nested column name starting in the middle to preserve prefix/suffix substrings intact \(whenever a truncate in the middle is made, two '\_\_' characters are also inserted to denote where it happened\) to fit into destination's character limits
2. Truncate the `Json path` to at least the 10 first characters, then truncate the nested column name starting in the middle to preserve prefix/suffix substrings intact \(whenever a truncate in the middle is made, two '\_\_' characters are also inserted to denote where it happened\) to fit into destination's character limits

As an example from the hubspot source, we could have the following tables with nested columns:

Expand Down Expand Up @@ -294,6 +294,7 @@ Therefore, in order to "upgrade" to the desired normalization version, you need

| Airbyte Version | Normalization Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- | :--- |
| 0.29.17-alpha | 0.1.45 | 2021-09-18 | [#6052](https://github.com/airbytehq/airbyte/pull/6052) | Snowflake: accept any date-time format |
| 0.29.8-alpha | 0.1.40 | 2021-08-18 | [#5433](https://github.com/airbytehq/airbyte/pull/5433) | Allow optional credentials_json for BigQuery |
| 0.29.5-alpha | 0.1.39 | 2021-08-11 | [#4557](https://github.com/airbytehq/airbyte/pull/4557) | Handle date times and solve conflict name btw stream/field |
| 0.28.2-alpha | 0.1.38 | 2021-07-28 | [#5027](https://github.com/airbytehq/airbyte/pull/5027) | Handle quotes in column names when parsing JSON blob |
Expand Down

0 comments on commit a6ecfda

Please sign in to comment.