Skip to content

Commit

Permalink
Fix insert_overwrite with int64, ts partitions. Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Feb 12, 2021
1 parent 66f442a commit 0f2a3bb
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 14 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ Contributors:

## dbt 0.19.1 (Release TBD)

### Fixes

- On BigQuery, fix regressions for `insert_overwrite` incremental strategy with `int64` and `timestamp` partition columns ([#3063](https://github.com/fishtown-analytics/dbt/issues/3063), [#3095](https://github.com/fishtown-analytics/dbt/issues/3095), [#3098](https://github.com/fishtown-analytics/dbt/issues/3098))

### Under the hood
- Bump werkzeug upper bound dependency to `<v2.0` ([#3011](https://github.com/fishtown-analytics/dbt/pull/3011))
- Performance fixes for many different things ([#2862](https://github.com/fishtown-analytics/dbt/issues/2862), [#3034](https://github.com/fishtown-analytics/dbt/pull/3034))
Expand Down
14 changes: 8 additions & 6 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ def render(self, alias: Optional[str] = None):
if alias:
column = f'{alias}.{self.field}'

if self.data_type.lower() == 'date' and \
self.granularity.lower() == 'day':
if self.data_type.lower() == 'int64' or (
self.data_type.lower() == 'date' and
self.granularity.lower() == 'day'
):
return column
else:
return f'{self.data_type}_trunc({column}, {self.granularity})'
Expand Down Expand Up @@ -551,10 +553,10 @@ def _partitions_match(
if not is_partitioned and not conf_partition:
return True
elif conf_partition and table.time_partitioning is not None:
table_field = table.time_partitioning.field
table_granularity = table.partitioning_type
return table_field == conf_partition.field \
and table_granularity == conf_partition.granularity
table_field = table.time_partitioning.field.lower()
table_granularity = table.partitioning_type.lower()
return table_field == conf_partition.field.lower() \
and table_granularity == conf_partition.granularity.lower()
elif conf_partition and table.range_partitioning is not None:
dest_part = table.range_partitioning
conf_part = conf_partition.range or {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@


{% macro bq_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns) %}
{%- set partition_type =
'date' if partition_by.data_type in ('timestamp, datetime')
else partition_by.data_type -%}

{% if partitions is not none and partitions != [] %} {# static #}

Expand Down Expand Up @@ -49,7 +46,7 @@
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare dbt_partitions_for_replacement array<{{ partition_type }}>;
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;
declare _dbt_max_partition {{ partition_by.data_type }};

set _dbt_max_partition = (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,date_time
10,2020-01-01 00:00:00
20,2020-01-01 00:00:00
30,2020-01-02 00:00:00
40,2020-01-02 00:00:00
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,date_int
10,20200101
20,20200101
30,20200102
40,20200102
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,date_day
10,2020-01-01 01:00:00
20,2020-01-01 01:00:00
30,2020-01-01 02:00:00
40,2020-01-01 02:00:00
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ with data as (
{% if is_incremental() %}
union all
-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as date) as date_day union all
select 20 as id, cast('2020-01-01' as date) as date_day union all
select 30 as id, cast('2020-01-02' as date) as date_day union all
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partition_by={
"field": "date_time",
"data_type": "timestamp"
}
)
}}


with data as (
select 1 as id, cast('2020-01-01' as timestamp) as date_time union all
select 2 as id, cast('2020-01-01' as timestamp) as date_time union all
select 3 as id, cast('2020-01-01' as timestamp) as date_time union all
select 4 as id, cast('2020-01-01' as timestamp) as date_time

{% if is_incremental() %}
union all
-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as timestamp) as date_time union all
select 20 as id, cast('2020-01-01' as timestamp) as date_time union all
select 30 as id, cast('2020-01-02' as timestamp) as date_time union all
select 40 as id, cast('2020-01-02' as timestamp) as date_time
{% endif %}
)

select * from data

{% if is_incremental() %}
where ts >= _dbt_max_partition
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partitions=["'2020-01-01'","'2020-01-02'"],
partition_by={
"field": "date_day",
"data_type": "date"
}
)
}}


with data as (
select 1 as id, cast('2020-01-01' as date) as date_day union all
select 2 as id, cast('2020-01-01' as date) as date_day union all
select 3 as id, cast('2020-01-01' as date) as date_day union all
select 4 as id, cast('2020-01-01' as date) as date_day

{% if is_incremental() %}
union all
-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as date) as date_day union all
select 20 as id, cast('2020-01-01' as date) as date_day union all
select 30 as id, cast('2020-01-02' as date) as date_day union all
select 40 as id, cast('2020-01-02' as date) as date_day
{% endif %}
)

select * from data

{% if is_incremental() %}
where ts in ({{ config.get("partitions") | join(",") }})
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partition_by={
"field": "date_int",
"data_type": "int64",
"range": {
"start": 20200101,
"end": 20200110,
"interval": 1
}
}
)
}}


with data as (
select 1 as id, 20200101 as date_int union all
select 2 as id, 20200101 as date_int union all
select 3 as id, 20200101 as date_int union all
select 4 as id, 20200101 as date_int

{% if is_incremental() %}
union all
-- we want to overwrite the 4 records in the 20200101 partition
-- with the 2 records below, but add two more in the 20200102 partition
select 10 as id, 20200101 as date_int union all
select 20 as id, 20200101 as date_int union all
select 30 as id, 20200102 as date_int union all
select 40 as id, 20200102 as date_int as date_day
{% endif %}
)

select * from data

{% if is_incremental() %}
where ts >= _dbt_max_partition
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partition_by={
"field": "date_hour",
"data_type": "timestamp",
"granularity": "hour"
}
)
}}


with data as (
select 1 as id, cast('2020-01-01 01:00:00' as timestamp) as date_hour union all
select 2 as id, cast('2020-01-01 01:00:00' as timestamp) as date_hour union all
select 3 as id, cast('2020-01-01 01:00:00' as timestamp) as date_hour union all
select 4 as id, cast('2020-01-01 01:00:00' as timestamp) as date_hour

{% if is_incremental() %}
union all
-- we want to overwrite the 4 records in the 2020-01-01 01:00:00 partition
-- with the 2 records below, but add two more in the 2020-01-00 02:00:00 partition
select 10 as id, cast('2020-01-01 01:00:00' as timestamp) as date_hour union all
select 20 as id, cast('2020-01-01 01:00:00' as timestamp) as date_hour union all
select 30 as id, cast('2020-01-01 02:00:00' as timestamp) as date_hour union all
select 40 as id, cast('2020-01-01 02:00:00' as timestamp) as date_hour
{% endif %}
)

select * from data

{% if is_incremental() %}
where ts >= _dbt_max_partition
{% endif %}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ def test_bigquery_add_partition_hour(self):
'require_partition_filter': None}
self.run_changes(before, after)
self.test_partitions({"expected": 1})

@use_profile('bigquery')
def test_bigquery_change_partition_granularity_casing(self):
before = {"partition_by": {'field': 'cur_time', 'data_type': 'timestamp', 'granularity': 'hour'},
"cluster_by": None}
after = {"partition_by": {'field': 'cur_time', 'data_type': 'timestamp', 'granularity': 'HOUR'},
"cluster_by": None}
self.run_changes(before, after)
self.test_partitions({"expected": 1})

@use_profile('bigquery')
def test_bigquery_remove_partition(self):
Expand Down
11 changes: 8 additions & 3 deletions test/integration/022_bigquery_test/test_scripting.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ def profile_config(self):

def assert_incrementals(self):
results = self.run_dbt()
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 7)

self.run_dbt()
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 7)

results = self.run_dbt(['seed'])

self.assertTablesEqual('incremental_overwrite', 'incremental_overwrite_expected')
self.assertTablesEqual('incremental_overwrite_date', 'incremental_overwrite_date_expected')
self.assertTablesEqual('incremental_overwrite_partitions', 'incremental_overwrite_date_expected')
self.assertTablesEqual('incremental_overwrite_day', 'incremental_overwrite_day_expected')
self.assertTablesEqual('incremental_overwrite_range', 'incremental_overwrite_range_expected')
self.assertTablesEqual('incremental_overwrite_time', 'incremental_overwrite_time_expected')

0 comments on commit 0f2a3bb

Please sign in to comment.