Skip to content

Commit

Permalink
handle funky case bigquery models, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed Nov 4, 2019
1 parent ca49790 commit 1bbc00c
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 36 deletions.
87 changes: 62 additions & 25 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Dict, List, Optional, Any

import dbt.deprecations
import dbt.exceptions
import dbt.flags as flags
Expand All @@ -6,12 +8,13 @@

from dbt.adapters.base import BaseAdapter, available, RelationType
from dbt.adapters.bigquery.relation import (
BigQueryRelation
BigQueryRelation, BigQueryInformationSchema
)
from dbt.adapters.bigquery import BigQueryColumn
from dbt.adapters.bigquery import BigQueryConnectionManager
from dbt.contracts.connection import Connection
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import filter_null_values

import google.auth
import google.api_core
Expand Down Expand Up @@ -52,14 +55,14 @@ class BigQueryAdapter(BaseAdapter):
###

@classmethod
def date_function(cls):
def date_function(cls) -> str:
return 'CURRENT_TIMESTAMP()'

@classmethod
def is_cancelable(cls):
def is_cancelable(cls) -> bool:
return False

def drop_relation(self, relation):
def drop_relation(self, relation: BigQueryRelation) -> None:
is_cached = self._schema_is_cached(relation.database, relation.schema)
if is_cached:
self.cache_dropped(relation)
Expand All @@ -72,18 +75,20 @@ def drop_relation(self, relation):
relation_object = dataset.table(relation.identifier)
client.delete_table(relation_object)

def truncate_relation(self, relation):
def truncate_relation(self, relation: BigQueryRelation) -> None:
raise dbt.exceptions.NotImplementedException(
'`truncate` is not implemented for this adapter!'
)

def rename_relation(self, from_relation, to_relation):
def rename_relation(
self, from_relation: BigQueryRelation, to_relation: BigQueryRelation
) -> None:
raise dbt.exceptions.NotImplementedException(
'`rename_relation` is not implemented for this adapter!'
)

@available
def list_schemas(self, database):
def list_schemas(self, database: str) -> List[str]:
conn = self.connections.get_thread_connection()
client = conn.handle

Expand Down Expand Up @@ -114,7 +119,9 @@ def check_schema_exists(self, database: str, schema: str) -> bool:
return False
return True

def get_columns_in_relation(self, relation):
def get_columns_in_relation(
self, relation: BigQueryRelation
) -> List[BigQueryColumn]:
try:
table = self.connections.get_bq_table(
database=relation.database,
Expand All @@ -127,15 +134,21 @@ def get_columns_in_relation(self, relation):
logger.debug("get_columns_in_relation error: {}".format(e))
return []

def expand_column_types(self, goal, current):
def expand_column_types(
self, goal: BigQueryRelation, current: BigQueryRelation
) -> None:
# This is a no-op on BigQuery
pass

def expand_target_column_types(self, from_relation, to_relation):
def expand_target_column_types(
self, from_relation: BigQueryRelation, to_relation: BigQueryRelation
) -> None:
# This is a no-op on BigQuery
pass

def list_relations_without_caching(self, information_schema, schema):
def list_relations_without_caching(
self, information_schema: BigQueryInformationSchema, schema: str
) -> List[BigQueryRelation]:
connection = self.connections.get_thread_connection()
client = connection.handle

Expand All @@ -162,7 +175,9 @@ def list_relations_without_caching(self, information_schema, schema):
except google.api_core.exceptions.NotFound:
return []

def get_relation(self, database, schema, identifier):
def get_relation(
self, database: str, schema: str, identifier: str
) -> BigQueryRelation:
if self._schema_is_cached(database, schema):
# if it's in the cache, use the parent's model of going through
# the relations cache and picking out the relation
Expand All @@ -178,47 +193,62 @@ def get_relation(self, database, schema, identifier):
table = None
return self._bq_table_to_relation(table)

def create_schema(self, database, schema):
def create_schema(self, database: str, schema: str) -> None:
logger.debug('Creating schema "{}.{}".', database, schema)
self.connections.create_dataset(database, schema)

def drop_schema(self, database, schema):
def drop_schema(self, database: str, schema: str) -> None:
logger.debug('Dropping schema "{}.{}".', database, schema)
self.connections.drop_dataset(database, schema)

@classmethod
def quote(cls, identifier):
def quote(cls, identifier: str) -> str:
return '`{}`'.format(identifier)

@classmethod
def convert_text_type(cls, agate_table, col_idx):
def convert_text_type(cls, agate_table: agate.Table, col_idx: int) -> str:
return "string"

@classmethod
def convert_number_type(cls, agate_table, col_idx):
def convert_number_type(
cls, agate_table: agate.Table, col_idx: int
) -> str:
decimals = agate_table.aggregate(agate.MaxPrecision(col_idx))
return "float64" if decimals else "int64"

@classmethod
def convert_boolean_type(cls, agate_table, col_idx):
def convert_boolean_type(
cls, agate_table: agate.Table, col_idx: int
) -> str:
return "bool"

@classmethod
def convert_datetime_type(cls, agate_table, col_idx):
def convert_datetime_type(
cls, agate_table: agate.Table, col_idx: int
) -> str:
return "datetime"

@classmethod
def convert_date_type(cls, agate_table, col_idx):
def convert_date_type(cls, agate_table: agate.Table, col_idx: int) -> str:
return "date"

@classmethod
def convert_time_type(cls, agate_table, col_idx):
def convert_time_type(cls, agate_table: agate.Table, col_idx: int) -> str:
return "time"

###
# Implementation details
###
def _get_dbt_columns_from_bq_table(self, table):
def _make_match_kwargs(
self, database: str, schema: str, identifier: str
) -> Dict[str, str]:
return filter_null_values({
'database': database,
'identifier': identifier,
'schema': schema,
})

def _get_dbt_columns_from_bq_table(self, table) -> List[BigQueryColumn]:
"Translates BQ SchemaField dicts into dbt BigQueryColumn objects"

columns = []
Expand All @@ -231,7 +261,9 @@ def _get_dbt_columns_from_bq_table(self, table):

return columns

def _agate_to_schema(self, agate_table, column_override):
def _agate_to_schema(
self, agate_table: agate.Table, column_override: Dict[str, str]
) -> List[google.cloud.bigquery.SchemaField]:
"""Convert agate.Table with column names to a list of bigquery schemas.
"""
bq_schema = []
Expand All @@ -243,7 +275,7 @@ def _agate_to_schema(self, agate_table, column_override):
)
return bq_schema

def _materialize_as_view(self, model):
def _materialize_as_view(self, model: Dict[str, Any]) -> str:
model_database = model.get('database')
model_schema = model.get('schema')
model_alias = model.get('alias')
Expand All @@ -258,7 +290,12 @@ def _materialize_as_view(self, model):
)
return "CREATE VIEW"

def _materialize_as_table(self, model, model_sql, decorator=None):
def _materialize_as_table(
self,
model: Dict[str, Any],
model_sql: str,
decorator: Optional[str] = None,
) -> str:
model_database = model.get('database')
model_schema = model.get('schema')
model_alias = model.get('alias')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{{ config(materialized='incremental') }}
select 1 as id
{% if is_incremental() %}
this is a syntax error!
{% endif %}
1 change: 1 addition & 0 deletions test/integration/022_bigquery_test/models/fUnKyCaSe.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select 1 as id
10 changes: 10 additions & 0 deletions test/integration/022_bigquery_test/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,13 @@ models:
- was_materialized:
name: table_model
type: table
- name: fUnKyCaSe
columns:
- name: id
tests:
- not_null
- unique
tests:
- was_materialized:
name: fUnKyCaSe
type: view
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
from test.integration.base import DBTIntegrationTest, use_profile


class TestCaseSensitiveSchemaBigQueryRun(DBTIntegrationTest):
class TestCaseSensitiveModelBigQueryRun(DBTIntegrationTest):
@property
def schema(self):
return "bigquery_test_022"

@property
def models(self):
return "case-sensitive-models"

@use_profile('bigquery')
def test__bigquery_double_run_fails(self):
results = self.run_dbt()
self.assertEqual(len(results), 1)
self.run_dbt(expect_pass=False)


class TestCaseSensitiveSchemaBigQueryRun(TestCaseSensitiveModelBigQueryRun):
# same test, but the schema is funky instead of the model name
@property
def schema(self):
return "BigQuerY_test_022"
Expand All @@ -15,10 +31,4 @@ def unique_schema(self):

@property
def models(self):
return "case-sensitive-models"

@use_profile('bigquery')
def test__bigquery_double_run_fails(self):
results = self.run_dbt()
self.assertEqual(len(results), 1)
self.run_dbt(expect_pass=False)
return "case-sensitive-schemas"
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test__bigquery_simple_run(self):
self.run_dbt(['seed'])
self.run_dbt(['seed', '--full-refresh'])
results = self.run_dbt()
self.assertEqual(len(results), 5)
self.assertEqual(len(results), 6)
self.assert_nondupes_pass()


Expand All @@ -60,7 +60,7 @@ class TestUnderscoreBigQueryRun(TestBaseBigQueryRun):
def test_bigquery_run_twice(self):
self.run_dbt(['seed'])
results = self.run_dbt()
self.assertEqual(len(results), 5)
self.assertEqual(len(results), 6)
results = self.run_dbt()
self.assertEqual(len(results), 5)
self.assertEqual(len(results), 6)
self.assert_nondupes_pass()

0 comments on commit 1bbc00c

Please sign in to comment.