Skip to content

Commit

Permalink
🎉 Base-normalization: Implement normalization for `MSSQL-destinatio…
Browse files Browse the repository at this point in the history
…n` (airbytehq#6079)

See the attached PR (airbytehq#6079)
  • Loading branch information
bazarnov authored and schlattk committed Jan 4, 2022
1 parent 3469dfd commit c7d853c
Show file tree
Hide file tree
Showing 131 changed files with 4,246 additions and 50 deletions.
24 changes: 21 additions & 3 deletions airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,25 @@ USER root
WORKDIR /tmp
RUN apt-get update && apt-get install -y \
wget \
curl \
unzip \
libaio-dev \
libaio1
libaio1 \
gnupg \
gnupg1 \
gnupg2

# Install MS SQL Server dependencies
RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl https://packages.microsoft.com/config/debian/10/prod.list > /etc/apt/sources.list.d/mssql-release.list
RUN apt-get update && ACCEPT_EULA=Y apt-get install -y \
libgssapi-krb5-2 \
unixodbc-dev \
msodbcsql17 \
mssql-tools
ENV PATH=$PATH:/opt/mssql-tools/bin

# Install Oracle dependencies
RUN mkdir -p /opt/oracle
RUN wget https://download.oracle.com/otn_software/linux/instantclient/19600/instantclient-basic-linux.x64-19.6.0.0.0dbru.zip
RUN unzip instantclient-basic-linux.x64-19.6.0.0.0dbru.zip -d /opt/oracle
Expand All @@ -17,8 +33,8 @@ RUN pip install cx_Oracle

COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte

# Install SSH Tunneling dependencies
RUN apt-get update && apt-get install -y jq sshpass

WORKDIR /airbyte
COPY entrypoint.sh .
COPY build/sshtunneling.sh .
Expand All @@ -28,13 +44,15 @@ COPY normalization ./normalization
COPY setup.py .
COPY dbt-project-template/ ./dbt-template/

# Install python dependencies
WORKDIR /airbyte/base_python_structs
RUN pip install .

WORKDIR /airbyte/normalization_code
RUN pip install .
RUN pip install dbt-oracle==0.4.3
RUN pip install git+https://github.com/dbeatty10/dbt-mysql@96655ea9f7fca7be90c9112ce8ffbb5aac1d3716#egg=dbt-mysql
RUN pip install dbt-sqlserver==0.19.3


WORKDIR /airbyte/normalization_code/dbt-template/
Expand All @@ -45,5 +63,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.49
LABEL io.airbyte.version=0.1.50
LABEL io.airbyte.name=airbyte/normalization
20 changes: 20 additions & 0 deletions airbyte-integrations/bases/base-normalization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,25 @@ Related documentation on normalization is available here:

Below are short descriptions of the kind of tests that may be affected by changes to the normalization code.

### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python3 -m venv .venv
```

This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
```
source .venv/bin/activate
pip install -r requirements.txt
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.

Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.

## Unit Tests

Unit tests are automatically included when building the normalization project.
Expand Down Expand Up @@ -56,6 +75,7 @@ allowed characters, if quotes are needed or not, and the length limitations:
- [snowflake](../../../docs/integrations/destinations/snowflake.md)
- [mysql](../../../docs/integrations/destinations/mysql.md)
- [oracle](../../../docs/integrations/destinations/oracle.md)
- [mssql](../../../docs/integrations/destinations/mssql.md)

Rules about truncations, for example for both of these strings which are too long for the postgres 64 limit:
- `Aaaa_Bbbb_Cccc_Dddd_Eeee_Ffff_Gggg_Hhhh_Iiii`
Expand Down
1 change: 1 addition & 0 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
dependsOn ':airbyte-integrations:connectors:destination-redshift:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-oracle:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-mssql:airbyteDocker'

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- Snowflake: flatten() -> https://docs.snowflake.com/en/sql-reference/functions/flatten.html
- Redshift: -> https://blog.getdbt.com/how-to-unnest-arrays-in-redshift/
- postgres: unnest() -> https://www.postgresqltutorial.com/postgresql-array/
- MSSQL: openjson() –> https://docs.microsoft.com/en-us/sql/relational-databases/json/validate-query-and-change-json-data-with-built-in-functions-sql-server?view=sql-server-ver15
#}

{# cross_join_unnest ------------------------------------------------- #}
Expand Down Expand Up @@ -44,6 +45,17 @@
cross join table(flatten({{ array_col }})) as {{ array_col }}
{%- endmacro %}

{% macro sqlserver__cross_join_unnest(stream_name, array_col) -%}
{# https://docs.microsoft.com/en-us/sql/relational-databases/json/convert-json-data-to-rows-and-columns-with-openjson-sql-server?view=sql-server-ver15#option-1---openjson-with-the-default-output #}
CROSS APPLY (
SELECT [value] = CASE
WHEN [type] = 4 THEN (SELECT [value] FROM OPENJSON([value]))
WHEN [type] = 5 THEN [value]
END
FROM OPENJSON({{ array_col }})
) AS {{ array_col }}
{%- endmacro %}

{# unnested_column_value -- this macro is related to unnest_cte #}

{% macro unnested_column_value(column_col) -%}
Expand Down Expand Up @@ -74,6 +86,11 @@
{{ column_col }}
{%- endmacro %}

{% macro sqlserver__unnested_column_value(column_col) -%}
{# unnested array/sub_array will be located in `value` column afterwards, we need to address to it #}
{{ column_col }}.value
{%- endmacro %}

{# unnest_cte ------------------------------------------------- #}

{% macro unnest_cte(table_name, stream_name, column_col) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@
{% macro postgres__concat(fields) %}
{{ dbt_utils.alternative_concat(fields) }}
{% endmacro %}

{% macro sqlserver__concat(fields) -%}
{#-- CONCAT() in SQL SERVER accepts from 2 to 254 arguments, we use batches for the main concat, to overcome the limit. --#}
{% set concat_chunks = [] %}
{% for chunk in fields|batch(253) -%}
{% set _ = concat_chunks.append( "concat(" ~ chunk|join(', ') ~ ",'')" ) %}
{% endfor %}

concat({{ concat_chunks|join(', ') }}, '')
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
json
{%- endmacro -%}

{%- macro sqlserver__type_json() -%}
VARCHAR(max)
{%- endmacro -%}


{# string ------------------------------------------------- #}

Expand All @@ -39,6 +43,10 @@
varchar2(4000)
{%- endmacro -%}

{% macro sqlserver__type_string() %}
VARCHAR(max)
{%- endmacro -%}


{# float ------------------------------------------------- #}
{% macro mysql__type_float() %}
Expand Down Expand Up @@ -69,17 +77,23 @@
{% endmacro %}


{# numeric ------------------------------------------------- #}
{# numeric ------------------------------------------------- --#}
{% macro mysql__type_numeric() %}
float
{% endmacro %}


{# timestamp ------------------------------------------------- #}
{# timestamp ------------------------------------------------- --#}
{% macro mysql__type_timestamp() %}
time
{% endmacro %}

{%- macro sqlserver__type_timestamp() -%}
{#-- in TSQL timestamp is really datetime --#}
{#-- https://docs.microsoft.com/en-us/sql/t-sql/functions/date-and-time-data-types-and-functions-transact-sql?view=sql-server-ver15#DateandTimeDataTypes --#}
datetime
{%- endmacro -%}


{# timestamp with time zone ------------------------------------------------- #}

Expand All @@ -95,7 +109,7 @@
timestamp
{% endmacro %}

{# MySQL doesnt allow cast operation to work with TIMESTAMP so we have to use char #}
{#-- MySQL doesnt allow cast operation to work with TIMESTAMP so we have to use char --#}
{%- macro mysql__type_timestamp_with_timezone() -%}
char
{%- endmacro -%}
Expand All @@ -104,6 +118,12 @@
varchar2(4000)
{% endmacro %}

{%- macro sqlserver__type_timestamp_with_timezone() -%}
{#-- in TSQL timestamp is really datetime or datetime2 --#}
{#-- https://docs.microsoft.com/en-us/sql/t-sql/functions/date-and-time-data-types-and-functions-transact-sql?view=sql-server-ver15#DateandTimeDataTypes --#}
datetime
{%- endmacro -%}


{# date ------------------------------------------------- #}

Expand All @@ -118,3 +138,7 @@
{% macro oracle__type_date() %}
varchar2(4000)
{% endmacro %}

{%- macro sqlserver__type_date() -%}
date
{%- endmacro -%}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{# converting hash in varchar _macro #}

{% macro sqlserver__hash(field) -%}
convert(varchar(32), HashBytes('md5', coalesce(cast({{field}} as {{dbt_utils.type_string()}}), '')), 2)
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@
{{ "'\"" ~ str_list|join('"."') ~ "\"'" }}
{%- endmacro %}

{% macro sqlserver__format_json_path(json_path_list) -%}
{# -- '$."x"."y"."z"' #}
{%- set str_list = [] -%}
{%- for json_path in json_path_list -%}
{%- if str_list.append(json_path.replace("'", "''").replace('"', '\\"')) -%} {%- endif -%}
{%- endfor -%}
{{ "'$.\"" ~ str_list|join(".") ~ "\"'" }}
{%- endmacro %}

{# json_extract ------------------------------------------------- #}

{% macro json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
Expand Down Expand Up @@ -111,6 +120,10 @@
{% endif -%}
{%- endmacro %}

{% macro sqlserver__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
json_query({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{# json_extract_scalar ------------------------------------------------- #}

{% macro json_extract_scalar(json_column, json_path_list, normalized_json_path) -%}
Expand Down Expand Up @@ -145,6 +158,10 @@
to_varchar(get_path(parse_json({{ json_column }}), {{ format_json_path(json_path_list) }}))
{%- endmacro %}

{% macro sqlserver__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%}
json_value({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{# json_extract_array ------------------------------------------------- #}

{% macro json_extract_array(json_column, json_path_list, normalized_json_path) -%}
Expand Down Expand Up @@ -178,3 +195,7 @@
{% macro snowflake__json_extract_array(json_column, json_path_list, normalized_json_path) -%}
get_path(parse_json({{ json_column }}), {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro sqlserver__json_extract_array(json_column, json_path_list, normalized_json_path) -%}
json_query({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
cast({{ array_column }} as varchar2(4000))
{%- endmacro %}

{% macro sqlserver__array_to_string(array_column) -%}
cast({{ array_column }} as {{dbt_utils.type_string()}})
{%- endmacro %}

{# cast_to_boolean ------------------------------------------------- #}
{% macro cast_to_boolean(field) -%}
{{ adapter.dispatch('cast_to_boolean')(field) }}
Expand All @@ -47,3 +51,8 @@
{% macro redshift__cast_to_boolean(field) -%}
cast(decode({{ field }}, 'true', '1', 'false', '0')::integer as boolean)
{%- endmacro %}

{# -- MS SQL Server does not support converting string directly to boolean, it must be casted as bit #}
{% macro sqlserver__cast_to_boolean(field) -%}
cast({{ field }} as bit)
{%- endmacro %}
Loading

0 comments on commit c7d853c

Please sign in to comment.