Skip to content

Commit

Permalink
Support for incremental materialization with ingestion time partition…
Browse files Browse the repository at this point in the history
… tables
  • Loading branch information
Kayrnt committed Apr 12, 2022
1 parent 3d69869 commit 88d550b
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 35 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
- Use dbt.tests.adapter.basic in tests (new test framework) ([#135](https://github.com/dbt-labs/dbt-bigquery/issues/135), [#142](https://github.com/dbt-labs/dbt-bigquery/pull/142))
- Adding pre-commit and black formatter hooks ([#147](https://github.com/dbt-labs/dbt-bigquery/pull/147))
- Adding pre-commit code changes ([#148](https://github.com/dbt-labs/dbt-bigquery/pull/148))
- Add support for ingestion time partitioned table using incremental materialization ([#136](https://github.com/dbt-labs/dbt-bigquery/pull/136))

### Contributors
- [@Kayrnt](https://github.com/Kayrnt) ([#136](https://github.com/dbt-labs/dbt-bigquery/pull/136))

## dbt-bigquery 1.1.0b1 (March 23, 2022)
### Features
Expand Down
24 changes: 19 additions & 5 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import json
import re
from contextlib import contextmanager
from dataclasses import dataclass
from dataclasses import dataclass, field
from functools import lru_cache
import agate
from requests.exceptions import ConnectionError
from typing import Optional, Any, Dict, Tuple
from typing import Optional, Any, Dict, Tuple, List

import google.auth
import google.auth.exceptions
Expand Down Expand Up @@ -86,6 +86,7 @@ class BigQueryConnectionMethod(StrEnum):
@dataclass
class BigQueryAdapterResponse(AdapterResponse):
bytes_processed: Optional[int] = None
fields: List[Any] = field(default_factory=list)


@dataclass
Expand Down Expand Up @@ -434,6 +435,7 @@ def execute(
code = None
num_rows = None
bytes_processed = None
fields = list()

if query_job.statement_type == "CREATE_VIEW":
code = "CREATE VIEW"
Expand All @@ -448,6 +450,7 @@ def execute(
bytes_processed = query_job.total_bytes_processed
processed_bytes = self.format_bytes(bytes_processed)
message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)"
fields = query_table.schema

elif query_job.statement_type == "SCRIPT":
code = "SCRIPT"
Expand All @@ -473,9 +476,14 @@ def execute(
bytes_processed = query_job.total_bytes_processed
processed_bytes = self.format_bytes(bytes_processed)
message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)"
fields = query_table.schema

response = BigQueryAdapterResponse( # type: ignore[call-arg]
_message=message, rows_affected=num_rows, code=code, bytes_processed=bytes_processed
_message=message,
rows_affected=num_rows,
code=code,
bytes_processed=bytes_processed,
fields=fields,
)

return response, table
Expand Down Expand Up @@ -529,7 +537,8 @@ def copy_and_results():

self._retry_and_handle(
msg='copy table "{}" to "{}"'.format(
", ".join(source_ref.path for source_ref in source_ref_array), destination_ref.path
", ".join(source_ref.path for source_ref in source_ref_array),
destination_ref.path,
),
conn=conn,
fn=copy_and_results,
Expand Down Expand Up @@ -571,7 +580,12 @@ def fn():
self._retry_and_handle(msg="create dataset", conn=conn, fn=fn)

def _query_and_results(
self, client, sql, job_params, job_creation_timeout=None, job_execution_timeout=None
self,
client,
sql,
job_params,
job_creation_timeout=None,
job_execution_timeout=None,
):
"""Query the client and wait for results."""
# Cannot reuse job_config if destination is set and ddl is used
Expand Down
56 changes: 46 additions & 10 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
import dbt.clients.agate_helper

from dbt import ui # type: ignore
from dbt.adapters.base import BaseAdapter, available, RelationType, SchemaSearchMap, AdapterConfig
from dbt.adapters.base import (
BaseAdapter,
available,
RelationType,
SchemaSearchMap,
AdapterConfig,
)
from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.adapters.bigquery import BigQueryColumn
from dbt.adapters.bigquery import BigQueryConnectionManager
Expand Down Expand Up @@ -47,11 +53,15 @@ class PartitionConfig(dbtClassMixin):
data_type: str = "date"
granularity: str = "day"
range: Optional[Dict[str, Any]] = None
time_ingestion_partitioning: bool = False

def reject_partition_field_column(self, columns: List[Any]) -> List[str]:
return [c for c in columns if not c.name.upper() == self.field.upper()]

def render(self, alias: Optional[str] = None):
column: str = self.field
column: str = self.field if not self.time_ingestion_partitioning else "_PARTITIONTIME"
if alias:
column = f"{alias}.{self.field}"
column = f"{alias}.{column}"

if self.data_type.lower() == "int64" or (
self.data_type.lower() == "date" and self.granularity.lower() == "day"
Expand Down Expand Up @@ -89,7 +99,11 @@ def render(self):

def _stub_relation(*args, **kwargs):
return BigQueryRelation.create(
database="", schema="", identifier="", quote_policy={}, type=BigQueryRelation.Table
database="",
schema="",
identifier="",
quote_policy={},
type=BigQueryRelation.Table,
)


Expand Down Expand Up @@ -209,14 +223,22 @@ def check_schema_exists(self, database: str, schema: str) -> bool:
def get_columns_in_relation(self, relation: BigQueryRelation) -> List[BigQueryColumn]:
try:
table = self.connections.get_bq_table(
database=relation.database, schema=relation.schema, identifier=relation.identifier
database=relation.database,
schema=relation.schema,
identifier=relation.identifier,
)
return self._get_dbt_columns_from_bq_table(table)

except (ValueError, google.cloud.exceptions.NotFound) as e:
logger.debug("get_columns_in_relation error: {}".format(e))
return []

@available.parse(lambda *a, **k: [])
def add_time_ingestion_partition_column(self, columns) -> List[BigQueryColumn]:
"Add time ingestion partition column to columns list"
columns.append(self.Column("_PARTITIONTIME", "TIMESTAMP", None, "NULLABLE"))
return columns

def expand_column_types(self, goal: BigQueryRelation, current: BigQueryRelation) -> None: # type: ignore[override]
# This is a no-op on BigQuery
pass
Expand Down Expand Up @@ -358,7 +380,10 @@ def _materialize_as_view(self, model: Dict[str, Any]) -> str:

logger.debug("Model SQL ({}):\n{}".format(model_alias, model_sql))
self.connections.create_view(
database=model_database, schema=model_schema, table_name=model_alias, sql=model_sql
database=model_database,
schema=model_schema,
table_name=model_alias,
sql=model_sql,
)
return "CREATE VIEW"

Expand All @@ -379,7 +404,10 @@ def _materialize_as_table(

logger.debug("Model SQL ({}):\n{}".format(table_name, model_sql))
self.connections.create_table(
database=model_database, schema=model_schema, table_name=table_name, sql=model_sql
database=model_database,
schema=model_schema,
table_name=table_name,
sql=model_sql,
)

return "CREATE TABLE"
Expand Down Expand Up @@ -462,7 +490,8 @@ def _partitions_match(self, table, conf_partition: Optional[PartitionConfig]) ->
if not is_partitioned and not conf_partition:
return True
elif conf_partition and table.time_partitioning is not None:
table_field = table.time_partitioning.field.lower()
partioning_field = table.time_partitioning.field or "_PARTITIONTIME"
table_field = partioning_field.lower()
table_granularity = table.partitioning_type.lower()
return (
table_field == conf_partition.field.lower()
Expand Down Expand Up @@ -508,7 +537,9 @@ def is_replaceable(

try:
table = self.connections.get_bq_table(
database=relation.database, schema=relation.schema, identifier=relation.identifier
database=relation.database,
schema=relation.schema,
identifier=relation.identifier,
)
except google.cloud.exceptions.NotFound:
return True
Expand Down Expand Up @@ -630,7 +661,12 @@ def load_dataframe(self, database, schema, table_name, agate_table, column_overr

@available.parse_none
def upload_file(
self, local_file_path: str, database: str, table_schema: str, table_name: str, **kwargs
self,
local_file_path: str,
database: str,
table_schema: str,
table_name: str,
**kwargs,
) -> None:
conn = self.connections.get_thread_connection()
client = conn.handle
Expand Down
Loading

0 comments on commit 88d550b

Please sign in to comment.