Skip to content

Commit

Permalink
Fix int64, ts partitions. Rework tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Feb 12, 2021
1 parent 66f442a commit e01a10c
Show file tree
Hide file tree
Showing 19 changed files with 336 additions and 117 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ Contributors:

## dbt 0.19.1 (Release TBD)

### Fixes

- On BigQuery, fix regressions for `insert_overwrite` incremental strategy with `int64` and `timestamp` partition columns ([#3063](https://github.com/fishtown-analytics/dbt/issues/3063), [#3095](https://github.com/fishtown-analytics/dbt/issues/3095), [#3098](https://github.com/fishtown-analytics/dbt/issues/3098))

### Under the hood
- Bump werkzeug upper bound dependency to `<v2.0` ([#3011](https://github.com/fishtown-analytics/dbt/pull/3011))
- Performance fixes for many different things ([#2862](https://github.com/fishtown-analytics/dbt/issues/2862), [#3034](https://github.com/fishtown-analytics/dbt/pull/3034))
Expand Down
14 changes: 8 additions & 6 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ def render(self, alias: Optional[str] = None):
if alias:
column = f'{alias}.{self.field}'

if self.data_type.lower() == 'date' and \
self.granularity.lower() == 'day':
if self.data_type.lower() == 'int64' or (
self.data_type.lower() == 'date' and
self.granularity.lower() == 'day'
):
return column
else:
return f'{self.data_type}_trunc({column}, {self.granularity})'
Expand Down Expand Up @@ -551,10 +553,10 @@ def _partitions_match(
if not is_partitioned and not conf_partition:
return True
elif conf_partition and table.time_partitioning is not None:
table_field = table.time_partitioning.field
table_granularity = table.partitioning_type
return table_field == conf_partition.field \
and table_granularity == conf_partition.granularity
table_field = table.time_partitioning.field.lower()
table_granularity = table.partitioning_type.lower()
return table_field == conf_partition.field.lower() \
and table_granularity == conf_partition.granularity.lower()
elif conf_partition and table.range_partitioning is not None:
dest_part = table.range_partitioning
conf_part = conf_partition.range or {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@


{% macro bq_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns) %}
{%- set partition_type =
'date' if partition_by.data_type in ('timestamp, datetime')
else partition_by.data_type -%}

{% if partitions is not none and partitions != [] %} {# static #}

Expand Down Expand Up @@ -49,7 +46,7 @@
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare dbt_partitions_for_replacement array<{{ partition_type }}>;
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;
declare _dbt_max_partition {{ partition_by.data_type }};

set _dbt_max_partition = (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,date_time
10,2020-01-01 00:00:00
20,2020-01-01 00:00:00
30,2020-01-02 00:00:00
40,2020-01-02 00:00:00
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,date_int
10,20200101
20,20200101
30,20200102
40,20200102
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,date_hour
10,2020-01-01 01:00:00
20,2020-01-01 01:00:00
30,2020-01-01 02:00:00
40,2020-01-01 02:00:00
7 changes: 7 additions & 0 deletions test/integration/022_bigquery_test/data/merge_expected.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
id,date_time
1,2020-01-01 00:00:00
2,2020-01-01 00:00:00
3,2020-01-01 00:00:00
4,2020-01-02 00:00:00
5,2020-01-02 00:00:00
6,2020-01-02 00:00:00
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

{{
config(
materialized="incremental",
unique_key="id",
cluster_by="id",
partition_by={
"field": "id",
"data_type": "int64",
"range": {
"start": 1,
"end": 10,
"interval": 1
}
}
)
}}


with data as (

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-01' as datetime) as date_time

{% else %}

select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-02' as datetime) as date_time union all
select 5 as id, cast('2020-01-02' as datetime) as date_time union all
select 6 as id, cast('2020-01-02' as datetime) as date_time

{% endif %}

)

select * from data

{% if is_incremental() %}
where id >= (select max(id) from {{ this }})
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

{{
config(
materialized="incremental",
unique_key="id",
cluster_by="id",
partition_by={
"field": "date_time",
"data_type": "datetime"
}
)
}}



with data as (

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-01' as datetime) as date_time

{% else %}

select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-02' as datetime) as date_time union all
select 5 as id, cast('2020-01-02' as datetime) as date_time union all
select 6 as id, cast('2020-01-02' as datetime) as date_time

{% endif %}

)

select * from data

{% if is_incremental() %}
where date_time > (select max(date_time) from {{ this }})
{% endif %}
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,29 @@


with data as (
select 1 as id, cast('2020-01-01' as date) as date_day union all
select 2 as id, cast('2020-01-01' as date) as date_day union all
select 3 as id, cast('2020-01-01' as date) as date_day union all
select 4 as id, cast('2020-01-01' as date) as date_day

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01' as date) as date_day union all
select 2 as id, cast('2020-01-01' as date) as date_day union all
select 3 as id, cast('2020-01-01' as date) as date_day union all
select 4 as id, cast('2020-01-01' as date) as date_day

{% if is_incremental() %}
union all
{% else %}

-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as date) as date_day union all
select 20 as id, cast('2020-01-01' as date) as date_day union all
select 30 as id, cast('2020-01-02' as date) as date_day union all
select 40 as id, cast('2020-01-02' as date) as date_day

{% endif %}

)

select * from data

{% if is_incremental() %}
where ts >= _dbt_max_partition
where date_day >= _dbt_max_partition
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partition_by={
"field": "date_time",
"data_type": "datetime"
}
)
}}


with data as (

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-01' as datetime) as date_time

{% else %}

-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as datetime) as date_time union all
select 20 as id, cast('2020-01-01' as datetime) as date_time union all
select 30 as id, cast('2020-01-02' as datetime) as date_time union all
select 40 as id, cast('2020-01-02' as datetime) as date_time

{% endif %}

)

select * from data

{% if is_incremental() %}
where date_time >= _dbt_max_partition
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partitions=["'2020-01-01'","'2020-01-02'"],
partition_by={
"field": "date_day",
"data_type": "date"
}
)
}}


with data as (

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01' as date) as date_day union all
select 2 as id, cast('2020-01-01' as date) as date_day union all
select 3 as id, cast('2020-01-01' as date) as date_day union all
select 4 as id, cast('2020-01-01' as date) as date_day

{% else %}

-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as date) as date_day union all
select 20 as id, cast('2020-01-01' as date) as date_day union all
select 30 as id, cast('2020-01-02' as date) as date_day union all
select 40 as id, cast('2020-01-02' as date) as date_day

{% endif %}

)

select * from data

{% if is_incremental() %}
where date_day in ({{ config.get("partitions") | join(",") }})
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partition_by={
"field": "date_int",
"data_type": "int64",
"range": {
"start": 20200101,
"end": 20200110,
"interval": 1
}
}
)
}}


with data as (

{% if not is_incremental() %}

select 1 as id, 20200101 as date_int union all
select 2 as id, 20200101 as date_int union all
select 3 as id, 20200101 as date_int union all
select 4 as id, 20200101 as date_int

{% else %}

-- we want to overwrite the 4 records in the 20200101 partition
-- with the 2 records below, but add two more in the 20200102 partition
select 10 as id, 20200101 as date_int union all
select 20 as id, 20200101 as date_int union all
select 30 as id, 20200102 as date_int union all
select 40 as id, 20200102 as date_int

{% endif %}

)

select * from data

{% if is_incremental() %}
where date_int >= _dbt_max_partition
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partition_by={
"field": "date_hour",
"data_type": "datetime",
"granularity": "hour"
}
)
}}


with data as (

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 2 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 3 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 4 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour

{% else %}

-- we want to overwrite the 4 records in the 2020-01-01 01:00:00 partition
-- with the 2 records below, but add two more in the 2020-01-00 02:00:00 partition
select 10 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 20 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 30 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour union all
select 40 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour

{% endif %}

)

select * from data

{% if is_incremental() %}
where date_hour >= _dbt_max_partition
{% endif %}
Loading

0 comments on commit e01a10c

Please sign in to comment.