diff --git a/.changes/unreleased/Features-20220807-164227.yaml b/.changes/unreleased/Features-20220807-164227.yaml new file mode 100644 index 000000000..9352edc27 --- /dev/null +++ b/.changes/unreleased/Features-20220807-164227.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Support for ingestion time partition table on BigQuery as incremental materialization +time: 2022-08-07T16:42:27.232818+02:00 +custom: + Author: Kayrnt + Issue: "75" + PR: "136" diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index e9a9dfc04..eee6cdbe6 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -66,11 +66,15 @@ class PartitionConfig(dbtClassMixin): data_type: str = "date" granularity: str = "day" range: Optional[Dict[str, Any]] = None + time_ingestion_partitioning: bool = False + + def reject_partition_field_column(self, columns: List[Any]) -> List[str]: + return [c for c in columns if not c.name.upper() == self.field.upper()] def render(self, alias: Optional[str] = None): - column: str = self.field + column: str = self.field if not self.time_ingestion_partitioning else "_PARTITIONTIME" if alias: - column = f"{alias}.{self.field}" + column = f"{alias}.{column}" if self.data_type.lower() == "int64" or ( self.data_type.lower() == "date" and self.granularity.lower() == "day" @@ -79,6 +83,13 @@ def render(self, alias: Optional[str] = None): else: return f"{self.data_type}_trunc({column}, {self.granularity})" + def render_wrapped(self, alias: Optional[str] = None): + """Wrap the partitioning column when time involved to ensure it is properly casted to matching time.""" + if self.data_type in ("date", "timestamp", "datetime"): + return f"{self.data_type}({self.render(alias)})" + else: + return self.render(alias) + @classmethod def parse(cls, raw_partition_by) -> Optional["PartitionConfig"]: # type: ignore [return] if raw_partition_by is None: @@ -236,6 +247,12 @@ def get_columns_in_relation(self, relation: BigQueryRelation) -> List[BigQueryCo logger.debug("get_columns_in_relation error: {}".format(e)) return [] + @available.parse(lambda *a, **k: []) + def add_time_ingestion_partition_column(self, columns) -> List[BigQueryColumn]: + "Add time ingestion partition column to columns list" + columns.append(self.Column("_PARTITIONTIME", "TIMESTAMP", None, "NULLABLE")) + return columns + def expand_column_types(self, goal: BigQueryRelation, current: BigQueryRelation) -> None: # type: ignore[override] # This is a no-op on BigQuery pass @@ -434,6 +451,19 @@ def copy_table(self, source, destination, materialization): return "COPY TABLE with materialization: {}".format(materialization) + @available.parse(lambda *a, **k: False) + def get_columns_in_select_sql(self, select_sql: str) -> List[BigQueryColumn]: + try: + conn = self.connections.get_thread_connection() + client = conn.handle + query_job, iterator = self.connections.raw_execute(select_sql) + query_table = client.get_table(query_job.destination) + return self._get_dbt_columns_from_bq_table(query_table) + + except (ValueError, google.cloud.exceptions.NotFound) as e: + logger.debug("get_columns_in_select_sql error: {}".format(e)) + return [] + @classmethod def poll_until_job_completes(cls, job, timeout): retry_count = timeout @@ -495,7 +525,8 @@ def _partitions_match(self, table, conf_partition: Optional[PartitionConfig]) -> if not is_partitioned and not conf_partition: return True elif conf_partition and table.time_partitioning is not None: - table_field = table.time_partitioning.field.lower() + partioning_field = table.time_partitioning.field or "_PARTITIONTIME" + table_field = partioning_field.lower() table_granularity = table.partitioning_type.lower() return ( table_field == conf_partition.field.lower() diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index a594e993e..0e474f2bf 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -1,18 +1,3 @@ -{% macro declare_dbt_max_partition(relation, partition_by, complied_code, language='sql') %} - - {#-- TODO: revisit partitioning with python models --#} - {%- if '_dbt_max_partition' in complied_code and language == 'sql' -%} - - declare _dbt_max_partition {{ partition_by.data_type }} default ( - select max({{ partition_by.field }}) from {{ this }} - where {{ partition_by.field }} is not null - ); - - {%- endif -%} - -{% endmacro %} - - {% macro dbt_bigquery_validate_get_incremental_strategy(config) %} {#-- Find and validate the incremental strategy #} {%- set strategy = config.get("incremental_strategy") or 'merge' -%} @@ -28,107 +13,40 @@ {% do return(strategy) %} {% endmacro %} +{% macro source_sql_with_partition(partition_by, source_sql) %} -{% macro bq_insert_overwrite( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists -) %} - - {% if partitions is not none and partitions != [] %} {# static #} - - {% set predicate -%} - {{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in ( - {{ partitions | join (', ') }} - ) - {%- endset %} - - {%- set source_sql -%} - ( - {{sql}} - ) - {%- endset -%} - - {#-- Because we're putting the model SQL _directly_ into the MERGE statement, - we need to prepend the MERGE statement with the user-configured sql_header, - which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header) - in the "dynamic" case, we save the model SQL result as a temp table first, wherein the - sql_header is included by the create_table_as macro. - #} - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }} - - {% else %} {# dynamic #} - - {% set predicate -%} - {{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement) - {%- endset %} - - {%- set source_sql -%} - ( - select * from {{ tmp_relation }} - ) - {%- endset -%} - - -- generated script to merge partitions into {{ target_relation }} - declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>; - - {# have we already created the temp table to check for schema changes? #} - {% if not tmp_relation_exists %} - {{ declare_dbt_max_partition(this, partition_by, sql) }} - - -- 1. create a temp table - {{ create_table_as(True, tmp_relation, compiled_code) }} - {% else %} - -- 1. temp table already exists, we used it to check for schema changes - {% endif %} - - -- 2. define partitions to update - set (dbt_partitions_for_replacement) = ( - select as struct - array_agg(distinct {{ partition_by.render() }}) - from {{ tmp_relation }} - ); - - -- 3. run the merge statement - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; - - -- 4. clean up the temp table - drop table if exists {{ tmp_relation }} + {%- if partition_by.time_ingestion_partitioning %} + {{ return(wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by.field), source_sql, False)) }} + {% else %} + {{ return(source_sql) }} + {%- endif -%} +{% endmacro %} +{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, compiled_code, language='sql') %} + {% if is_time_ingestion_partitioning %} + {#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#} + {% do run_query(create_ingestion_time_partitioned_table_as(temporary, relation, sql)) %} + {{ return(bq_insert_into_ingestion_time_partitioned_table(relation, sql)) }} + {% else %} + {{ return(create_table_as(temporary, relation, sql)) }} {% endif %} - {% endmacro %} - {% macro bq_generate_incremental_build_sql( strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists ) %} {#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#} {% if strategy == 'insert_overwrite' %} - {% set missing_partition_msg -%} - The 'insert_overwrite' strategy requires the `partition_by` config. - {%- endset %} - {% if partition_by is none %} - {% do exceptions.raise_compiler_error(missing_partition_msg) %} - {% endif %} - - {% set build_sql = bq_insert_overwrite( + {% set build_sql = bq_generate_incremental_insert_overwrite_build_sql( tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists ) %} {% else %} {# strategy == 'merge' #} - {%- set source_sql -%} - {%- if tmp_relation_exists -%} - ( - select * from {{ tmp_relation }} - ) - {%- else -%} {#-- wrap sql in parens to make it a subquery --#} - ( - {{sql}} - ) - {%- endif -%} - {%- endset -%} - - {% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %} + + {% set build_sql = bq_generate_incremental_merge_build_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists + ) %} {% endif %} @@ -163,14 +81,14 @@ {% if existing_relation is none %} {%- call statement('main', language=language) -%} - {{ create_table_as(False, target_relation, compiled_code, language) }} + {{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }} {%- endcall -%} {% elif existing_relation.is_view %} {#-- There's no way to atomically replace a view with a table on BQ --#} {{ adapter.drop_relation(existing_relation) }} {%- call statement('main', language=language) -%} - {{ create_table_as(False, target_relation, compiled_code, language) }} + {{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }} {%- endcall -%} {% elif full_refresh_mode %} @@ -180,7 +98,7 @@ {{ adapter.drop_relation(existing_relation) }} {% endif %} {%- call statement('main', language=language) -%} - {{ create_table_as(False, target_relation, compiled_code, language) }} + {{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }} {%- endcall -%} {% else %} @@ -198,7 +116,7 @@ {#-- Python always needs to create a temp table --#} {%- call statement('create_tmp_relation', language=language) -%} {{ declare_dbt_max_partition(this, partition_by, compiled_code, language) + - create_table_as(True, tmp_relation, compiled_code, language) + bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, compiled_code, language) }} {%- endcall -%} {% set tmp_relation_exists = true %} @@ -209,6 +127,9 @@ {% if not dest_columns %} {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} {% endif %} + {% if partition_by.time_ingestion_partitioning %} + {% set dest_columns = adapter.add_time_ingestion_partition_column(dest_columns) %} + {% endif %} {% set build_sql = bq_generate_incremental_build_sql( strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists ) %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql new file mode 100644 index 000000000..ca9d2a2c5 --- /dev/null +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql @@ -0,0 +1,22 @@ +{% macro build_partition_time_exp(partition_by) %} + {% if partition_by.data_type == 'timestamp' %} + {% set partition_value = partition_by.field %} + {% else %} + {% set partition_value = 'timestamp(' + partition_by.field + ')' %} + {% endif %} + {{ return({'value': partition_value, 'field': partition_by.field}) }} +{% endmacro %} + +{% macro declare_dbt_max_partition(relation, partition_by, complied_code, language='sql') %} + + {#-- TODO: revisit partitioning with python models --#} + {%- if '_dbt_max_partition' in complied_code and language == 'sql' -%} + + declare _dbt_max_partition {{ partition_by.data_type }} default ( + select max({{ partition_by.field }}) from {{ this }} + where {{ partition_by.field }} is not null + ); + + {%- endif -%} + +{% endmacro %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql new file mode 100644 index 000000000..cacdb603e --- /dev/null +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql @@ -0,0 +1,93 @@ +{% macro bq_generate_incremental_insert_overwrite_build_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change +) %} + {% if partition_by is none %} + {% set missing_partition_msg -%} + The 'insert_overwrite' strategy requires the `partition_by` config. + {%- endset %} + {% do exceptions.raise_compiler_error(missing_partition_msg) %} + {% endif %} + + {% set build_sql = bq_insert_overwrite( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change + ) %} + + {{ return(build_sql) }} + +{% endmacro %} + +{% macro bq_insert_overwrite( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists +) %} + + {% if partitions is not none and partitions != [] %} {# static #} + + {% set predicate -%} + {{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in ( + {{ partitions | join (', ') }} + ) + {%- endset %} + + {%- set source_sql -%} + ( + {%- if partition_by.time_ingestion_partitioning -%} + {{ wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), sql, True) }} + {%- else -%} + {{sql}} + {%- endif -%} + ) + {%- endset -%} + + {#-- Because we're putting the model SQL _directly_ into the MERGE statement, + we need to prepend the MERGE statement with the user-configured sql_header, + which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header) + in the "dynamic" case, we save the model SQL result as a temp table first, wherein the + sql_header is included by the create_table_as macro. + #} + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }} + + {% else %} {# dynamic #} + + {% set predicate -%} + {{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement) + {%- endset %} + + {%- set source_sql -%} + ( + select + {% if partition_by.time_ingestion_partitioning -%} + _PARTITIONTIME, + {%- endif -%} + * from {{ tmp_relation }} + ) + {%- endset -%} + + -- generated script to merge partitions into {{ target_relation }} + declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>; + + {# have we already created the temp table to check for schema changes? #} + {% if not tmp_relation_exists %} + {{ declare_dbt_max_partition(this, partition_by, sql) }} + + -- 1. create a temp table + {{ bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, compiled_code) }} + {% else %} + -- 1. temp table already exists, we used it to check for schema changes + {% endif %} + + -- 2. define partitions to update + set (dbt_partitions_for_replacement) = ( + select as struct + array_agg(distinct {{ partition_by.render_wrapped() }}) + from {{ tmp_relation }} + ); + + -- 3. run the merge statement + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; + + -- 4. clean up the temp table + drop table if exists {{ tmp_relation }} + + {% endif %} + +{% endmacro %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql new file mode 100644 index 000000000..8a86d1d8f --- /dev/null +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql @@ -0,0 +1,28 @@ +{% macro bq_generate_incremental_merge_build_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists +) %} + {%- set source_sql -%} + {%- if tmp_relation_exists -%} + ( + select + {% if partition_by.time_ingestion_partitioning -%} + _PARTITIONTIME, + {%- endif -%} + * from {{ tmp_relation }} + ) + {%- else -%} {#-- wrap sql in parens to make it a subquery --#} + ( + {%- if partition_by.time_ingestion_partitioning -%} + {{ wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), sql, True) }} + {%- else -%} + {{sql}} + {%- endif -%} + ) + {%- endif -%} + {%- endset -%} + + {% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %} + + {{ return(build_sql) }} + +{% endmacro %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/time_ingestion_tables.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/time_ingestion_tables.sql new file mode 100644 index 000000000..4b2cae0b9 --- /dev/null +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/time_ingestion_tables.sql @@ -0,0 +1,69 @@ +{% macro wrap_with_time_ingestion_partitioning(partition_time_exp, sql, is_nested) %} + + select {{ partition_time_exp['value'] }} as _partitiontime, * EXCEPT({{ partition_time_exp['field'] }}) from ( + {{ sql }} + ){%- if not is_nested -%};{%- endif -%} + +{% endmacro %} + +{% macro create_ingestion_time_partitioned_table_as(temporary, relation, sql) -%} + {%- set raw_partition_by = config.get('partition_by', none) -%} + {%- set raw_cluster_by = config.get('cluster_by', none) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} + + {%- set columns = get_columns_with_types_in_query(sql) -%} + {%- set table_dest_columns_csv = columns_without_partition_fields_csv(partition_config, columns) -%} + + {{ sql_header if sql_header is not none }} + + {% set ingestion_time_partition_config_raw = fromjson(tojson(raw_partition_by)) %} + {% do ingestion_time_partition_config_raw.update({'field':'_PARTITIONTIME'}) %} + + {%- set ingestion_time_partition_config = adapter.parse_partition_by(ingestion_time_partition_config_raw) -%} + + create or replace table {{ relation }} ({{table_dest_columns_csv}}) + {{ partition_by(ingestion_time_partition_config) }} + {{ cluster_by(raw_cluster_by) }} + {{ bigquery_table_options(config, model, temporary) }} + +{%- endmacro -%} + +{% macro get_quoted_with_types_csv(columns) %} + {% set quoted = [] %} + {% for col in columns -%} + {%- do quoted.append(adapter.quote(col.name) ~ " " ~ col.data_type) -%} + {%- endfor %} + {%- set dest_cols_csv = quoted | join(', ') -%} + {{ return(dest_cols_csv) }} + +{% endmacro %} + +{% macro columns_without_partition_fields_csv(partition_config, columns) -%} + {%- set columns_no_partition = partition_config.reject_partition_field_column(columns) -%} + {% set columns_names = get_quoted_with_types_csv(columns_no_partition) %} + {{ return(columns_names) }} + +{%- endmacro -%} + +{% macro bq_insert_into_ingestion_time_partitioned_table(target_relation, sql) -%} + {%- set partition_by = config.get('partition_by', none) -%} + {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} + {%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + + insert into {{ target_relation }} (_partitiontime, {{ dest_columns_csv }}) + {{ wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), sql, False) }} + +{%- endmacro -%} + +{% macro get_columns_with_types_in_query(select_sql) %} + {% set sql %} + select * from ( + {{ select_sql }} + ) as __dbt_sbq + where false + limit 0 + {% endset %} + {{ return(adapter.get_columns_in_select_sql(sql)) }} +{% endmacro %} diff --git a/tests/integration/incremental_schema_tests/models/incremental_time_ingestion_partitioning.sql b/tests/integration/incremental_schema_tests/models/incremental_time_ingestion_partitioning.sql new file mode 100644 index 000000000..ce064b33c --- /dev/null +++ b/tests/integration/incremental_schema_tests/models/incremental_time_ingestion_partitioning.sql @@ -0,0 +1,38 @@ + +{{ + config( + materialized="incremental", + incremental_strategy='insert_overwrite', + partition_by={ + "field": "date_hour", + "data_type": "datetime", + "granularity": "hour", + "time_ingestion_partitioning": true + } + ) +}} + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all + select 2 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all + select 3 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all + select 4 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 01:00:00 partition + -- with the 2 records below, but add two more in the 2020-01-00 02:00:00 partition + select 10 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all + select 20 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all + select 30 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour union all + select 40 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour + + {% endif %} + +) + +select * from data diff --git a/tests/integration/incremental_schema_tests/models/incremental_time_ingestion_partitioning_target.sql b/tests/integration/incremental_schema_tests/models/incremental_time_ingestion_partitioning_target.sql new file mode 100644 index 000000000..2f25229de --- /dev/null +++ b/tests/integration/incremental_schema_tests/models/incremental_time_ingestion_partitioning_target.sql @@ -0,0 +1,24 @@ + +{{ + config( + materialized="incremental", + partition_by={ + "field": "date_hour", + "data_type": "datetime", + "granularity": "hour", + "time_ingestion_partitioning": true + } + ) +}} + +{% if not is_incremental() %} + + select 10 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all + select 30 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour + +{% else %} + + select 20 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all + select 40 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour + +{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_schema_tests/models/schema.yml b/tests/integration/incremental_schema_tests/models/schema.yml index 21aa6095f..fd3136102 100644 --- a/tests/integration/incremental_schema_tests/models/schema.yml +++ b/tests/integration/incremental_schema_tests/models/schema.yml @@ -63,6 +63,18 @@ models: tags: [column_leveL_tag] tests: - unique - - \ No newline at end of file + - name: incremental_time_ingestion_partitioning + columns: + - name: id + tags: [column_level_tag] + tests: + - unique + + - name: incremental_time_ingestion_partitioning_target + columns: + - name: id + tags: [column_level_tag] + tests: + - unique + diff --git a/tests/integration/incremental_schema_tests/test_incremental_schema.py b/tests/integration/incremental_schema_tests/test_incremental_schema.py index 113a53585..9592c500f 100644 --- a/tests/integration/incremental_schema_tests/test_incremental_schema.py +++ b/tests/integration/incremental_schema_tests/test_incremental_schema.py @@ -124,6 +124,22 @@ def run_incremental_fail_on_schema_change(self): results_two = self.run_dbt(['run', '--models', select], expect_pass = False) self.assertIn('Compilation Error', results_two[1].message) + def run_incremental_time_ingestion_partitioning(self): + select = 'model_a incremental_time_ingestion_partitioning incremental_time_ingestion_partitioning_target' + compare_source = 'incremental_time_ingestion_partitioning' + compare_target = 'incremental_time_ingestion_partitioning_target' + exclude = None + expected = [ + 'select_from_a', + 'select_from_incremental_time_ingestion_partitioning', + 'select_from_incremental_time_ingestion_partitioning_target', + 'unique_model_a_id', + 'unique_incremental_time_ingestion_partitioning_id', + 'unique_incremental_time_ingestion_partitioning_target_id' + ] + self.list_tests_and_assert(select, exclude, expected) + self.run_tests_and_assert(select, exclude, expected, compare_source, compare_target) + @use_profile('bigquery') def test__bigquery__run_incremental_ignore(self): self.run_incremental_ignore() @@ -140,3 +156,7 @@ def test__bigquery__run_incremental_sync_all_columns(self): @use_profile('bigquery') def test__bigquery__run_incremental_fail_on_schema_change(self): self.run_incremental_fail_on_schema_change() + + @use_profile('bigquery') + def test__bigquery__run_incremental_time_ingestion_partitioning(self): + self.run_incremental_time_ingestion_partitioning() \ No newline at end of file diff --git a/tests/integration/incremental_schema_tests/tests/select_from_incremental_time_ingestion_partitioning.sql b/tests/integration/incremental_schema_tests/tests/select_from_incremental_time_ingestion_partitioning.sql new file mode 100644 index 000000000..85e653c11 --- /dev/null +++ b/tests/integration/incremental_schema_tests/tests/select_from_incremental_time_ingestion_partitioning.sql @@ -0,0 +1 @@ +select * from {{ ref('incremental_time_ingestion_partitioning') }} where false diff --git a/tests/integration/incremental_schema_tests/tests/select_from_incremental_time_ingestion_partitioning_target.sql b/tests/integration/incremental_schema_tests/tests/select_from_incremental_time_ingestion_partitioning_target.sql new file mode 100644 index 000000000..e2533dff7 --- /dev/null +++ b/tests/integration/incremental_schema_tests/tests/select_from_incremental_time_ingestion_partitioning_target.sql @@ -0,0 +1 @@ +select * from {{ ref('incremental_time_ingestion_partitioning_target') }} where false diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py index 188c19b7c..36d84fc44 100644 --- a/tests/unit/test_bigquery_adapter.py +++ b/tests/unit/test_bigquery_adapter.py @@ -686,7 +686,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "date", - "granularity": "day" + "granularity": "day", + "time_ingestion_partitioning": False } ) @@ -697,7 +698,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "date", - "granularity": "day" + "granularity": "day", + "time_ingestion_partitioning": False } ) @@ -710,7 +712,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "date", - "granularity": "MONTH" + "granularity": "MONTH", + "time_ingestion_partitioning": False } ) @@ -723,7 +726,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "date", - "granularity": "YEAR" + "granularity": "YEAR", + "time_ingestion_partitioning": False } ) @@ -736,7 +740,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "timestamp", - "granularity": "HOUR" + "granularity": "HOUR", + "time_ingestion_partitioning": False } ) @@ -750,7 +755,8 @@ def test_parse_partition_by(self): ), { "field": "ts", "data_type": "timestamp", - "granularity": "MONTH" + "granularity": "MONTH", + "time_ingestion_partitioning": False } ) @@ -763,7 +769,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "timestamp", - "granularity": "YEAR" + "granularity": "YEAR", + "time_ingestion_partitioning": False } ) @@ -776,7 +783,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "datetime", - "granularity": "HOUR" + "granularity": "HOUR", + "time_ingestion_partitioning": False } ) @@ -789,7 +797,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "datetime", - "granularity": "MONTH" + "granularity": "MONTH", + "time_ingestion_partitioning": False } ) @@ -802,7 +811,21 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "datetime", - "granularity": "YEAR" + "granularity": "YEAR", + "time_ingestion_partitioning": False + } + ) + + self.assertEqual( + adapter.parse_partition_by({ + "field": "ts", + "time_ingestion_partitioning": True + + }).to_dict(omit_none=True), { + "field": "ts", + "data_type": "date", + "granularity": "day", + "time_ingestion_partitioning": True } ) @@ -829,7 +852,8 @@ def test_parse_partition_by(self): "start": 1, "end": 100, "interval": 20 - } + }, + "time_ingestion_partitioning": False } )