Skip to content

Commit

Permalink
Redshift SUPER type (#12064)
Browse files Browse the repository at this point in the history
* 🎉 Destination Redshift: Use SUPER data type on Redshift destination for raw JSON data (#9407)

Co-authored-by: Oleksandr Tsukanov <alexander.tsukanovvv@gmail.com>
Co-authored-by: Sergey Chvalyuk <grubberr@gmail.com>
Co-authored-by: Christophe Duong <christophe.duong@gmail.com>
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
3 people authored Apr 20, 2022
1 parent a70d28d commit 7023fbd
Show file tree
Hide file tree
Showing 50 changed files with 1,078 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.28
dockerImageTag: 0.3.31
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3414,7 +3414,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.28"
- dockerImage: "airbyte/destination-redshift:0.3.31"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public JdbcDatabase(final JdbcCompatibleSourceOperations<?> sourceOperations) {

@Override
public void execute(final String sql) throws SQLException {
execute(connection -> connection.createStatement().execute(sql));
execute(connection -> {
connection.createStatement().execute(sql);
});
}

public void executeWithinTransaction(final List<String> queries) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
!dbt-project-template-oracle
!dbt-project-template-clickhouse
!dbt-project-template-snowflake
!dbt-project-template-redshift
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.75
LABEL io.airbyte.version=0.1.77
LABEL io.airbyte.name=airbyte/normalization
5 changes: 5 additions & 0 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,17 @@ task airbyteDockerSnowflake(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('snowflake')
dependsOn assemble
}
task airbyteDockerRedshift(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('redshift')
dependsOn assemble
}

airbyteDocker.dependsOn(airbyteDockerMSSql)
airbyteDocker.dependsOn(airbyteDockerMySql)
airbyteDocker.dependsOn(airbyteDockerOracle)
airbyteDocker.dependsOn(airbyteDockerClickhouse)
airbyteDocker.dependsOn(airbyteDockerSnowflake)
airbyteDocker.dependsOn(airbyteDockerRedshift)

task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs) {
module = "pytest"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# This file is necessary to install dbt-utils with dbt deps
# the content will be overwritten by the transform function

# Name your package! Package names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: "airbyte_utils"
version: "1.0"
config-version: 2

# This setting configures which "profile" dbt uses for this project. Profiles contain
# database connection information, and should be configured in the ~/.dbt/profiles.yml file
profile: "normalize"

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that source models can be found
# in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
docs-paths: ["docs"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]

target-path: "../build" # directory which will store compiled SQL files
log-path: "../logs" # directory which will store DBT logs
packages-install-path: "/dbt" # directory which will store external DBT dependencies

clean-targets: # directories to be removed by `dbt clean`
- "build"
- "dbt_modules"

quoting:
database: true
# Temporarily disabling the behavior of the ExtendedNameTransformer on table/schema names, see (issue #1785)
# all schemas should be unquoted
schema: false
identifier: true

# You can define configurations for models in the `model-paths` directory here.
# Using these configurations, you can enable or disable models, change how they
# are materialized, and more!
models:
+transient: false
# https://docs.aws.amazon.com/redshift/latest/dg/super-configurations.html
+pre-hook: "SET enable_case_sensitive_identifier to TRUE"
airbyte_utils:
+materialized: table
generated:
airbyte_ctes:
+tags: airbyte_internal_cte
+materialized: ephemeral
airbyte_incremental:
+tags: incremental_tables
+materialized: incremental
+on_schema_change: sync_all_columns
airbyte_tables:
+tags: normalized_tables
+materialized: table
airbyte_views:
+tags: airbyte_internal_views
+materialized: view

dispatch:
- macro_namespace: dbt_utils
search_order: ["airbyte_utils", "dbt_utils"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{%- macro redshift_super_type() -%}
{%- if not execute -%}
{{ return("") }}
{%- endif -%}

{%- set table_schema, _, table_name = var("models_to_source")[this.identifier].partition(".") -%}

{%- call statement("get_column_type", fetch_result=True) -%}
select data_type from SVV_COLUMNS where table_name = '{{ table_name }}' and column_name = '{{ var("json_column") }}' and table_schema = '{{ table_schema }}';
{%- endcall -%}

{%- set column_type = load_result("get_column_type")["data"][0][0] -%}
{{ return(column_type == "super") }}
{%- endmacro -%}
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,19 @@

{% macro default__unnest_cte(from_table, stream_name, column_col) -%}{%- endmacro %}

{# -- based on https://blog.getdbt.com/how-to-unnest-arrays-in-redshift/ #}
{% macro redshift__unnest_cte(from_table, stream_name, column_col) -%}

{# -- based on https://docs.aws.amazon.com/redshift/latest/dg/query-super.html #}
{% if redshift_super_type() -%}
with joined as (
select
table_alias._airbyte_{{ stream_name }}_hashid as _airbyte_hashid,
_airbyte_nested_data
from {{ from_table }} as table_alias, table_alias.{{ column_col }} as _airbyte_nested_data
)
{%- else -%}

{# -- based on https://blog.getdbt.com/how-to-unnest-arrays-in-redshift/ #}
{%- if not execute -%}
{{ return('') }}
{% endif %}
Expand Down Expand Up @@ -134,6 +145,7 @@ joined as (
-- to the number of items in {{ from_table }}.{{ column_col }}
where numbers.generated_number <= json_array_length({{ column_col }}, true)
)
{%- endif %}
{%- endmacro %}

{% macro mysql__unnest_cte(from_table, stream_name, column_col) -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{% macro redshift__alter_column_type(relation, column_name, new_column_type) -%}

{%- set tmp_column = column_name + "__dbt_alter" -%}

{% call statement('alter_column_type') %}
alter table {{ relation }} add column {{ adapter.quote(tmp_column) }} {{ new_column_type }};
{% if new_column_type.lower() == "super" %}
update {{ relation }} set {{ adapter.quote(tmp_column) }} = JSON_PARSE({{ adapter.quote(column_name) }});
{% else %}
update {{ relation }} set {{ adapter.quote(tmp_column) }} = {{ adapter.quote(column_name) }};
{% endif %}
alter table {{ relation }} drop column {{ adapter.quote(column_name) }} cascade;
alter table {{ relation }} rename column {{ adapter.quote(tmp_column) }} to {{ adapter.quote(column_name) }}
{% endcall %}

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
{% endmacro %}

{%- macro redshift__type_json() -%}
{%- if redshift_super_type() -%}
super
{%- else -%}
varchar
{%- endif -%}
{%- endmacro -%}

{% macro postgres__type_json() %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@
{%- endmacro %}

{% macro redshift__format_json_path(json_path_list) -%}
{%- set quote = '"' if redshift_super_type() else "'" -%}
{%- set str_list = [] -%}
{%- for json_path in json_path_list -%}
{%- if str_list.append(json_path.replace("'", "''")) -%} {%- endif -%}
{%- if str_list.append(json_path.replace(quote, quote + quote)) -%} {%- endif -%}
{%- endfor -%}
{{ "'" ~ str_list|join("','") ~ "'" }}
{{ quote ~ str_list|join(quote + "," + quote) ~ quote }}
{%- endmacro %}

{% macro snowflake__format_json_path(json_path_list) -%}
Expand Down Expand Up @@ -114,11 +115,14 @@
{%- endmacro %}

{% macro redshift__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
{%- if from_table|string() == '' %}
{%- if from_table|string() != '' -%}
{%- set json_column = from_table|string() + "." + json_column|string() -%}
{%- endif -%}
{%- if redshift_super_type() -%}
case when {{ json_column }}.{{ format_json_path(json_path_list) }} != '' then {{ json_column }}.{{ format_json_path(json_path_list) }} end
{%- else -%}
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{% else %}
case when json_extract_path_text({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{% endif -%}
{%- endif -%}
{%- endmacro %}

{% macro snowflake__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
Expand Down Expand Up @@ -168,7 +172,11 @@
{%- endmacro %}

{% macro redshift__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%}
{%- if redshift_super_type() -%}
case when {{ json_column }}.{{ format_json_path(json_path_list) }} != '' then {{ json_column }}.{{ format_json_path(json_path_list) }} end
{%- else -%}
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{%- endif -%}
{%- endmacro %}

{% macro snowflake__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%}
Expand Down Expand Up @@ -210,7 +218,11 @@
{%- endmacro %}

{% macro redshift__json_extract_array(json_column, json_path_list, normalized_json_path) -%}
{%- if redshift_super_type() -%}
{{ json_column }}.{{ format_json_path(json_path_list) }}
{%- else -%}
json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true)
{%- endif -%}
{%- endmacro %}

{% macro snowflake__json_extract_array(json_column, json_path_list, normalized_json_path) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,31 @@
cast({{ array_column }} as {{dbt_utils.type_string()}})
{%- endmacro %}

{% macro redshift__array_to_string(array_column) -%}
{% if redshift_super_type() -%}
json_serialize({{array_column}})
{%- else -%}
{{ array_column }}
{%- endif %}
{%- endmacro %}

{# object_to_string ------------------------------------------------- #}
{% macro object_to_string(object_column) -%}
{{ adapter.dispatch('object_to_string')(object_column) }}
{%- endmacro %}

{% macro default__object_to_string(object_column) -%}
{{ object_column }}
{%- endmacro %}

{% macro redshift__object_to_string(object_column) -%}
{% if redshift_super_type() -%}
json_serialize({{object_column}})
{%- else -%}
{{ object_column }}
{%- endif %}
{%- endmacro %}

{# cast_to_boolean ------------------------------------------------- #}
{% macro cast_to_boolean(field) -%}
{{ adapter.dispatch('cast_to_boolean')(field) }}
Expand All @@ -49,7 +74,11 @@

{# -- Redshift does not support converting string directly to boolean, it must go through int first #}
{% macro redshift__cast_to_boolean(field) -%}
{% if redshift_super_type() -%}
cast({{ field }} as boolean)
{%- else -%}
cast(decode({{ field }}, 'true', '1', 'false', '0')::integer as boolean)
{%- endif %}
{%- endmacro %}

{# -- MS SQL Server does not support converting string directly to boolean, it must be casted as bit #}
Expand All @@ -70,3 +99,7 @@
{%- macro default__empty_string_to_null(field) -%}
nullif({{ field }}, '')
{%- endmacro %}

{%- macro redshift__empty_string_to_null(field) -%}
nullif({{ field }}::varchar, '')
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ services:
context: .
labels:
io.airbyte.git-revision: ${GIT_REVISION}
normalization-redshift:
image: airbyte/normalization-redshift:${VERSION}
build:
dockerfile: redshift.Dockerfile
context: .
labels:
io.airbyte.git-revision: ${GIT_REVISION}
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ services:
image: airbyte/normalization-clickhouse:${VERSION}
normalization-snowflake:
image: airbyte/normalization-snowflake:${VERSION}
normalization-redshift:
image: airbyte/normalization-redshift:${VERSION}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
import threading
import time
from copy import copy
from typing import Any, Dict, List
from typing import Any, Callable, Dict, List

from normalization.destination_type import DestinationType
from normalization.transform_catalog.transform import read_yaml_config, write_yaml_config
from normalization.transform_config.transform import TransformConfig

NORMALIZATION_TEST_TARGET = "NORMALIZATION_TEST_TARGET"
Expand Down Expand Up @@ -309,7 +310,9 @@ def change_current_test_dir(request):
else:
os.chdir(request.fspath.dirname)

def generate_profile_yaml_file(self, destination_type: DestinationType, test_root_dir: str) -> Dict[str, Any]:
def generate_profile_yaml_file(
self, destination_type: DestinationType, test_root_dir: str, random_schema: bool = False
) -> Dict[str, Any]:
"""
Each destination requires different settings to connect to. This step generates the adequate profiles.yml
as described here: https://docs.getdbt.com/reference/profiles.yml
Expand All @@ -326,6 +329,10 @@ def generate_profile_yaml_file(self, destination_type: DestinationType, test_roo
}
elif destination_type.value == DestinationType.MYSQL.value:
profiles_config["database"] = self.target_schema
elif destination_type.value == DestinationType.REDSHIFT.value:
profiles_config["schema"] = self.target_schema
if random_schema:
profiles_config["schema"] = self.target_schema + "_" + "".join(random.choices(string.ascii_lowercase, k=5))
else:
profiles_config["schema"] = self.target_schema
if destination_type.value == DestinationType.CLICKHOUSE.value:
Expand Down Expand Up @@ -376,6 +383,8 @@ def get_normalization_image(destination_type: DestinationType) -> str:
return "airbyte/normalization-clickhouse:dev"
elif DestinationType.SNOWFLAKE.value == destination_type.value:
return "airbyte/normalization-snowflake:dev"
elif DestinationType.REDSHIFT.value == destination_type.value:
return "airbyte/normalization-redshift:dev"
else:
return "airbyte/normalization:dev"

Expand Down Expand Up @@ -527,3 +536,10 @@ def get_test_targets() -> List[str]:
return [d.value for d in {DestinationType.from_string(s.strip()) for s in target_str.split(",")}]
else:
return [d.value for d in DestinationType]

@staticmethod
def update_yaml_file(filename: str, callback: Callable):
config = read_yaml_config(filename)
updated, config = callback(config)
if updated:
write_yaml_config(config, filename)
Loading

0 comments on commit 7023fbd

Please sign in to comment.