Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for ingestion time partition table on BigQuery as incremental materialization #136

Merged
merged 6 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20220807-164227.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Support for ingestion time partition table on BigQuery as incremental materialization
time: 2022-08-07T16:42:27.232818+02:00
custom:
Author: Kayrnt
Issue: "75"
PR: "136"
37 changes: 34 additions & 3 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,15 @@ class PartitionConfig(dbtClassMixin):
data_type: str = "date"
granularity: str = "day"
range: Optional[Dict[str, Any]] = None
time_ingestion_partitioning: bool = False

def reject_partition_field_column(self, columns: List[Any]) -> List[str]:
return [c for c in columns if not c.name.upper() == self.field.upper()]

def render(self, alias: Optional[str] = None):
column: str = self.field
column: str = self.field if not self.time_ingestion_partitioning else "_PARTITIONTIME"
if alias:
column = f"{alias}.{self.field}"
column = f"{alias}.{column}"

if self.data_type.lower() == "int64" or (
self.data_type.lower() == "date" and self.granularity.lower() == "day"
Expand All @@ -79,6 +83,13 @@ def render(self, alias: Optional[str] = None):
else:
return f"{self.data_type}_trunc({column}, {self.granularity})"

def render_wrapped(self, alias: Optional[str] = None):
"""Wrap the partitioning column when time involved to ensure it is properly casted to matching time."""
if self.data_type in ("date", "timestamp", "datetime"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to include "time" type here as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, it's not possible to partition by "time", see https://cloud.google.com/bigquery/docs/partitioned-tables?hl=fr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh makes sense, thanks for the clarification!

return f"{self.data_type}({self.render(alias)})"
else:
return self.render(alias)

@classmethod
def parse(cls, raw_partition_by) -> Optional["PartitionConfig"]: # type: ignore [return]
if raw_partition_by is None:
Expand Down Expand Up @@ -236,6 +247,12 @@ def get_columns_in_relation(self, relation: BigQueryRelation) -> List[BigQueryCo
logger.debug("get_columns_in_relation error: {}".format(e))
return []

@available.parse(lambda *a, **k: [])
def add_time_ingestion_partition_column(self, columns) -> List[BigQueryColumn]:
"Add time ingestion partition column to columns list"
columns.append(self.Column("_PARTITIONTIME", "TIMESTAMP", None, "NULLABLE"))
return columns

def expand_column_types(self, goal: BigQueryRelation, current: BigQueryRelation) -> None: # type: ignore[override]
# This is a no-op on BigQuery
pass
Expand Down Expand Up @@ -434,6 +451,19 @@ def copy_table(self, source, destination, materialization):

return "COPY TABLE with materialization: {}".format(materialization)

@available.parse(lambda *a, **k: False)
def get_columns_in_select_sql(self, select_sql: str) -> List[BigQueryColumn]:
try:
conn = self.connections.get_thread_connection()
client = conn.handle
query_job, iterator = self.connections.raw_execute(select_sql)
query_table = client.get_table(query_job.destination)
return self._get_dbt_columns_from_bq_table(query_table)

except (ValueError, google.cloud.exceptions.NotFound) as e:
logger.debug("get_columns_in_select_sql error: {}".format(e))
return []

@classmethod
def poll_until_job_completes(cls, job, timeout):
retry_count = timeout
Expand Down Expand Up @@ -495,7 +525,8 @@ def _partitions_match(self, table, conf_partition: Optional[PartitionConfig]) ->
if not is_partitioned and not conf_partition:
return True
elif conf_partition and table.time_partitioning is not None:
table_field = table.time_partitioning.field.lower()
partioning_field = table.time_partitioning.field or "_PARTITIONTIME"
table_field = partioning_field.lower()
table_granularity = table.partitioning_type.lower()
return (
table_field == conf_partition.field.lower()
Expand Down
131 changes: 26 additions & 105 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,3 @@
{% macro declare_dbt_max_partition(relation, partition_by, complied_code, language='sql') %}

{#-- TODO: revisit partitioning with python models --#}
{%- if '_dbt_max_partition' in complied_code and language == 'sql' -%}

declare _dbt_max_partition {{ partition_by.data_type }} default (
select max({{ partition_by.field }}) from {{ this }}
where {{ partition_by.field }} is not null
);

{%- endif -%}

{% endmacro %}


{% macro dbt_bigquery_validate_get_incremental_strategy(config) %}
{#-- Find and validate the incremental strategy #}
{%- set strategy = config.get("incremental_strategy") or 'merge' -%}
Expand All @@ -28,107 +13,40 @@
{% do return(strategy) %}
{% endmacro %}

{% macro source_sql_with_partition(partition_by, source_sql) %}

{% macro bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}

{% if partitions is not none and partitions != [] %} {# static #}

{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in (
{{ partitions | join (', ') }}
)
{%- endset %}

{%- set source_sql -%}
(
{{sql}}
)
{%- endset -%}

{#-- Because we're putting the model SQL _directly_ into the MERGE statement,
we need to prepend the MERGE statement with the user-configured sql_header,
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
in the "dynamic" case, we save the model SQL result as a temp table first, wherein the
sql_header is included by the create_table_as macro.
#}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}

{% else %} {# dynamic #}

{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}

{%- set source_sql -%}
(
select * from {{ tmp_relation }}
)
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;

{# have we already created the temp table to check for schema changes? #}
{% if not tmp_relation_exists %}
{{ declare_dbt_max_partition(this, partition_by, sql) }}

-- 1. create a temp table
{{ create_table_as(True, tmp_relation, compiled_code) }}
{% else %}
-- 1. temp table already exists, we used it to check for schema changes
{% endif %}

-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct {{ partition_by.render() }})
from {{ tmp_relation }}
);

-- 3. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}
{%- if partition_by.time_ingestion_partitioning %}
{{ return(wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by.field), source_sql, False)) }}
{% else %}
{{ return(source_sql) }}
{%- endif -%}

{% endmacro %}
{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, compiled_code, language='sql') %}
{% if is_time_ingestion_partitioning %}
{#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#}
{% do run_query(create_ingestion_time_partitioned_table_as(temporary, relation, sql)) %}
{{ return(bq_insert_into_ingestion_time_partitioned_table(relation, sql)) }}
{% else %}
{{ return(create_table_as(temporary, relation, sql)) }}
{% endif %}

{% endmacro %}


{% macro bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}
{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
{% if strategy == 'insert_overwrite' %}

{% set missing_partition_msg -%}
The 'insert_overwrite' strategy requires the `partition_by` config.
{%- endset %}
{% if partition_by is none %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{% endif %}

{% set build_sql = bq_insert_overwrite(
{% set build_sql = bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}

{% else %} {# strategy == 'merge' #}
{%- set source_sql -%}
{%- if tmp_relation_exists -%}
(
select * from {{ tmp_relation }}
)
{%- else -%} {#-- wrap sql in parens to make it a subquery --#}
(
{{sql}}
)
{%- endif -%}
{%- endset -%}

{% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %}

{% set build_sql = bq_generate_incremental_merge_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists
) %}

{% endif %}

Expand Down Expand Up @@ -163,14 +81,14 @@

{% if existing_relation is none %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% elif existing_relation.is_view %}
{#-- There's no way to atomically replace a view with a table on BQ --#}
{{ adapter.drop_relation(existing_relation) }}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% elif full_refresh_mode %}
Expand All @@ -180,7 +98,7 @@
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% else %}
Expand All @@ -198,7 +116,7 @@
{#-- Python always needs to create a temp table --#}
{%- call statement('create_tmp_relation', language=language) -%}
{{ declare_dbt_max_partition(this, partition_by, compiled_code, language) +
create_table_as(True, tmp_relation, compiled_code, language)
bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, compiled_code, language)
}}
{%- endcall -%}
{% set tmp_relation_exists = true %}
Expand All @@ -209,6 +127,9 @@
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}
{% if partition_by.time_ingestion_partitioning %}
{% set dest_columns = adapter.add_time_ingestion_partition_column(dest_columns) %}
{% endif %}
{% set build_sql = bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{% macro build_partition_time_exp(partition_by) %}
{% if partition_by.data_type == 'timestamp' %}
{% set partition_value = partition_by.field %}
{% else %}
{% set partition_value = 'timestamp(' + partition_by.field + ')' %}
{% endif %}
{{ return({'value': partition_value, 'field': partition_by.field}) }}
{% endmacro %}

{% macro declare_dbt_max_partition(relation, partition_by, complied_code, language='sql') %}

{#-- TODO: revisit partitioning with python models --#}
{%- if '_dbt_max_partition' in complied_code and language == 'sql' -%}

declare _dbt_max_partition {{ partition_by.data_type }} default (
select max({{ partition_by.field }}) from {{ this }}
where {{ partition_by.field }} is not null
);

{%- endif -%}

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
{% macro bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change
) %}
{% if partition_by is none %}
{% set missing_partition_msg -%}
The 'insert_overwrite' strategy requires the `partition_by` config.
{%- endset %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{% endif %}

{% set build_sql = bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change
) %}

{{ return(build_sql) }}

{% endmacro %}

{% macro bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}

{% if partitions is not none and partitions != [] %} {# static #}

{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in (
{{ partitions | join (', ') }}
)
{%- endset %}

{%- set source_sql -%}
(
{%- if partition_by.time_ingestion_partitioning -%}
{{ wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), sql, True) }}
{%- else -%}
{{sql}}
{%- endif -%}
)
{%- endset -%}

{#-- Because we're putting the model SQL _directly_ into the MERGE statement,
we need to prepend the MERGE statement with the user-configured sql_header,
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
in the "dynamic" case, we save the model SQL result as a temp table first, wherein the
sql_header is included by the create_table_as macro.
#}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}

{% else %} {# dynamic #}

{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}

{%- set source_sql -%}
(
select
{% if partition_by.time_ingestion_partitioning -%}
_PARTITIONTIME,
{%- endif -%}
* from {{ tmp_relation }}
)
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;

{# have we already created the temp table to check for schema changes? #}
{% if not tmp_relation_exists %}
{{ declare_dbt_max_partition(this, partition_by, sql) }}

-- 1. create a temp table
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, compiled_code) }}
{% else %}
-- 1. temp table already exists, we used it to check for schema changes
{% endif %}

-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct {{ partition_by.render_wrapped() }})
from {{ tmp_relation }}
);

-- 3. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}

{% endif %}

{% endmacro %}
Loading