Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BQ] Fix insert_overwrite with int + ts partitions #3098

Merged
merged 1 commit into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ Contributors:

## dbt 0.19.1 (Release TBD)

### Fixes

- On BigQuery, fix regressions for `insert_overwrite` incremental strategy with `int64` and `timestamp` partition columns ([#3063](https://github.com/fishtown-analytics/dbt/issues/3063), [#3095](https://github.com/fishtown-analytics/dbt/issues/3095), [#3098](https://github.com/fishtown-analytics/dbt/issues/3098))

### Under the hood
- Bump werkzeug upper bound dependency to `<v2.0` ([#3011](https://github.com/fishtown-analytics/dbt/pull/3011))
- Performance fixes for many different things ([#2862](https://github.com/fishtown-analytics/dbt/issues/2862), [#3034](https://github.com/fishtown-analytics/dbt/pull/3034))
Expand Down
14 changes: 8 additions & 6 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ def render(self, alias: Optional[str] = None):
if alias:
column = f'{alias}.{self.field}'

if self.data_type.lower() == 'date' and \
self.granularity.lower() == 'day':
if self.data_type.lower() == 'int64' or (
self.data_type.lower() == 'date' and
self.granularity.lower() == 'day'
):
return column
else:
return f'{self.data_type}_trunc({column}, {self.granularity})'
Expand Down Expand Up @@ -551,10 +553,10 @@ def _partitions_match(
if not is_partitioned and not conf_partition:
return True
elif conf_partition and table.time_partitioning is not None:
table_field = table.time_partitioning.field
table_granularity = table.partitioning_type
return table_field == conf_partition.field \
and table_granularity == conf_partition.granularity
table_field = table.time_partitioning.field.lower()
table_granularity = table.partitioning_type.lower()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think table.partitioning_type could ever be None here? I'd think table.time_partitioning is not None also means table.partitioning_type has a value, just being nit-picky

Copy link
Contributor

@kwigley kwigley Feb 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine, you can ignore! I dug around docs a little and I think

table.time_partitioning is not None also means table.partitioning_type has a value

holds true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch though! if one of our intrepid dbt-bigquery beta testers manages to get an error like NoneType has no attribute lower, then we'll know who to blame (me) and where to fix it (here)

return table_field == conf_partition.field.lower() \
and table_granularity == conf_partition.granularity.lower()
elif conf_partition and table.range_partitioning is not None:
dest_part = table.range_partitioning
conf_part = conf_partition.range or {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@


{% macro bq_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns) %}
{%- set partition_type =
'date' if partition_by.data_type in ('timestamp, datetime')
else partition_by.data_type -%}

{% if partitions is not none and partitions != [] %} {# static #}

Expand Down Expand Up @@ -49,7 +46,7 @@
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare dbt_partitions_for_replacement array<{{ partition_type }}>;
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;
declare _dbt_max_partition {{ partition_by.data_type }};

set _dbt_max_partition = (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,date_time
10,2020-01-01 00:00:00
20,2020-01-01 00:00:00
30,2020-01-02 00:00:00
40,2020-01-02 00:00:00
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,date_int
10,20200101
20,20200101
30,20200102
40,20200102
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,date_hour
10,2020-01-01 01:00:00
20,2020-01-01 01:00:00
30,2020-01-01 02:00:00
40,2020-01-01 02:00:00
7 changes: 7 additions & 0 deletions test/integration/022_bigquery_test/data/merge_expected.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
id,date_time
1,2020-01-01 00:00:00
2,2020-01-01 00:00:00
3,2020-01-01 00:00:00
4,2020-01-02 00:00:00
5,2020-01-02 00:00:00
6,2020-01-02 00:00:00
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

{{
config(
materialized="incremental",
unique_key="id",
cluster_by="id",
partition_by={
"field": "id",
"data_type": "int64",
"range": {
"start": 1,
"end": 10,
"interval": 1
}
}
)
}}


with data as (

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-01' as datetime) as date_time

{% else %}

select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-02' as datetime) as date_time union all
select 5 as id, cast('2020-01-02' as datetime) as date_time union all
select 6 as id, cast('2020-01-02' as datetime) as date_time

{% endif %}

)

select * from data

{% if is_incremental() %}
where id >= (select max(id) from {{ this }})
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

{{
config(
materialized="incremental",
unique_key="id",
cluster_by="id",
partition_by={
"field": "date_time",
"data_type": "datetime"
}
)
}}



with data as (

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-01' as datetime) as date_time

{% else %}

select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-02' as datetime) as date_time union all
select 5 as id, cast('2020-01-02' as datetime) as date_time union all
select 6 as id, cast('2020-01-02' as datetime) as date_time

{% endif %}

)

select * from data

{% if is_incremental() %}
where date_time > (select max(date_time) from {{ this }})
{% endif %}
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,29 @@


with data as (
select 1 as id, cast('2020-01-01' as date) as date_day union all
select 2 as id, cast('2020-01-01' as date) as date_day union all
select 3 as id, cast('2020-01-01' as date) as date_day union all
select 4 as id, cast('2020-01-01' as date) as date_day

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01' as date) as date_day union all
select 2 as id, cast('2020-01-01' as date) as date_day union all
select 3 as id, cast('2020-01-01' as date) as date_day union all
select 4 as id, cast('2020-01-01' as date) as date_day

{% if is_incremental() %}
union all
{% else %}

-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as date) as date_day union all
select 20 as id, cast('2020-01-01' as date) as date_day union all
select 30 as id, cast('2020-01-02' as date) as date_day union all
select 40 as id, cast('2020-01-02' as date) as date_day

{% endif %}

)

select * from data

{% if is_incremental() %}
where ts >= _dbt_max_partition
where date_day >= _dbt_max_partition
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partition_by={
"field": "date_time",
"data_type": "datetime"
}
)
}}


with data as (

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-01' as datetime) as date_time

{% else %}

-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as datetime) as date_time union all
select 20 as id, cast('2020-01-01' as datetime) as date_time union all
select 30 as id, cast('2020-01-02' as datetime) as date_time union all
select 40 as id, cast('2020-01-02' as datetime) as date_time

{% endif %}

)

select * from data

{% if is_incremental() %}
where date_time >= _dbt_max_partition
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partitions=["'2020-01-01'","'2020-01-02'"],
partition_by={
"field": "date_day",
"data_type": "date"
}
)
}}


with data as (

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01' as date) as date_day union all
select 2 as id, cast('2020-01-01' as date) as date_day union all
select 3 as id, cast('2020-01-01' as date) as date_day union all
select 4 as id, cast('2020-01-01' as date) as date_day

{% else %}

-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as date) as date_day union all
select 20 as id, cast('2020-01-01' as date) as date_day union all
select 30 as id, cast('2020-01-02' as date) as date_day union all
select 40 as id, cast('2020-01-02' as date) as date_day

{% endif %}

)

select * from data

{% if is_incremental() %}
where date_day in ({{ config.get("partitions") | join(",") }})
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partition_by={
"field": "date_int",
"data_type": "int64",
"range": {
"start": 20200101,
"end": 20200110,
"interval": 1
}
}
)
}}


with data as (

{% if not is_incremental() %}

select 1 as id, 20200101 as date_int union all
select 2 as id, 20200101 as date_int union all
select 3 as id, 20200101 as date_int union all
select 4 as id, 20200101 as date_int

{% else %}

-- we want to overwrite the 4 records in the 20200101 partition
-- with the 2 records below, but add two more in the 20200102 partition
select 10 as id, 20200101 as date_int union all
select 20 as id, 20200101 as date_int union all
select 30 as id, 20200102 as date_int union all
select 40 as id, 20200102 as date_int

{% endif %}

)

select * from data

{% if is_incremental() %}
where date_int >= _dbt_max_partition
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partition_by={
"field": "date_hour",
"data_type": "datetime",
"granularity": "hour"
}
)
}}


with data as (

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 2 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 3 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 4 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour

{% else %}

-- we want to overwrite the 4 records in the 2020-01-01 01:00:00 partition
-- with the 2 records below, but add two more in the 2020-01-00 02:00:00 partition
select 10 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 20 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 30 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour union all
select 40 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour

{% endif %}

)

select * from data

{% if is_incremental() %}
where date_hour >= _dbt_max_partition
{% endif %}
Loading