Skip to content

Commit

Permalink
report bytes billed for scripts, add _dbt_max_partition field
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbanin committed Feb 15, 2020
1 parent 7106a7c commit 1ae17d5
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 6 deletions.
9 changes: 9 additions & 0 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,12 @@ def __init__(self, func):

def __get__(self, obj, objtype):
return self.func(objtype)


def format_bytes(num_bytes):
for unit in ['Bytes', 'KB', 'MB', 'GB', 'TB']:
if abs(num_bytes) < 1024.0:
return f"{num_bytes:3.1f} {unit}"
num_bytes /= 1024.0

return "> 1024 TB"
6 changes: 6 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import dbt.clients.agate_helper
import dbt.exceptions
import dbt.utils
from dbt.adapters.base import BaseConnectionManager, Credentials
from dbt.logger import GLOBAL_LOGGER as logger

Expand Down Expand Up @@ -230,13 +231,18 @@ def execute(self, sql, auto_begin=False, fetch=None):
table = client.get_table(query_job.destination)
status = 'CREATE TABLE ({})'.format(table.num_rows)

elif query_job.statement_type == 'SCRIPT':
billed = query_job.total_bytes_billed
status = 'SCRIPT ({} billed)'.format(dbt.utils.format_bytes(billed))

elif query_job.statement_type in ['INSERT', 'DELETE', 'MERGE']:
status = '{} ({})'.format(
query_job.statement_type,
query_job.num_dml_affected_rows
)

else:
import ipdb; ipdb.set_trace()
status = 'OK'

return status, res
Expand Down
2 changes: 1 addition & 1 deletion plugins/bigquery/dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
{{ relation.include(database=(not is_scripting), schema=(not is_scripting)) }}
{{ partition_by(partition_by_dict) }}
{{ cluster_by(raw_cluster_by) }}
{%- if not temporary -%}
{%- if not temporary %}
{{ bigquery_table_options(
persist_docs=raw_persist_docs,
kms_key_name=raw_kms_key_name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,35 @@

{% macro bq_partition_merge(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns) %}
{%- set array_datatype =
{%- set partition_type =
'date' if partition_by.data_type in ('timestamp, datetime')
else partition_by.data_type -%}

{% set predicate -%}
{{ pprint_partition_field(
partition_by,
alias = 'DBT_INTERNAL_DEST')
}} in unnest(partitions_for_upsert)
}} in unnest(dbt_partitions_for_upsert)
{%- endset %}

{%- set source_sql -%}
(
select * from {{tmp_relation.identifier}}
select * from {{tmp_relation}}
)
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare partitions_for_upsert array<{{array_datatype}}>;
declare dbt_partitions_for_upsert array<{{ partition_type }}>;
declare _dbt_max_partition {{ partition_by.data_type }};

set _dbt_max_partition = (
select max({{ partition_by.field }}) from {{ this }}
);

-- 1. create a temp table
{{ create_table_as(True, tmp_relation, sql) }}

-- 2. define partitions to update
set (partitions_for_upsert) = (
set (dbt_partitions_for_upsert) = (
select as struct
array_agg(distinct {{pprint_partition_field(partition_by)}})
from {{tmp_relation.identifier}}
Expand Down

0 comments on commit 1ae17d5

Please sign in to comment.