Skip to content

Commit

Permalink
Merge pull request #2005 from fishtown-analytics/fix/bigquery-catalog…
Browse files Browse the repository at this point in the history
…-info-schema-queries

Fix bigquery catalog queries with nonexistent schemas (#1984)
  • Loading branch information
beckjake authored Dec 13, 2019
2 parents b151e2a + 8311285 commit 02c2926
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 11 deletions.
11 changes: 9 additions & 2 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

from dbt.adapters.base.connections import BaseConnectionManager, Connection
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base.relation import ComponentName, BaseRelation
from dbt.adapters.base.relation import (
ComponentName, BaseRelation, InformationSchema
)
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.cache import RelationsCache

Expand Down Expand Up @@ -995,11 +997,16 @@ def _catalog_filter_table(
"""
return table.where(_catalog_filter_schemas(manifest))

def _get_catalog_information_schemas(
self, manifest: Manifest
) -> List[InformationSchema]:
return list(self._get_cache_schemas(manifest).keys())

def get_catalog(self, manifest: Manifest) -> agate.Table:
"""Get the catalog for this manifest by running the get catalog macro.
Returns an agate.Table of catalog information.
"""
information_schemas = list(self._get_cache_schemas(manifest).keys())
information_schemas = self._get_catalog_information_schemas(manifest)
# make it a list so macros can index into it.
kwargs = {'information_schemas': information_schemas}
table = self.execute_macro(GET_CATALOG_MACRO_NAME,
Expand Down
29 changes: 26 additions & 3 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dbt.adapters.bigquery import BigQueryColumn
from dbt.adapters.bigquery import BigQueryConnectionManager
from dbt.contracts.connection import Connection
from dbt.contracts.graph.manifest import Manifest
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import filter_null_values

Expand Down Expand Up @@ -482,10 +483,32 @@ def load_dataframe(self, database, schema, table_name, agate_table,
with self.connections.exception_handler("LOAD TABLE"):
self.poll_until_job_completes(job, timeout)

def _catalog_filter_table(self, table, manifest):
# BigQuery doesn't allow ":" chars in column names -- remap them here.
@classmethod
def _catalog_filter_table(
cls, table: agate.Table, manifest: Manifest
) -> agate.Table:
table = table.rename(column_names={
col.name: col.name.replace('__', ':') for col in table.columns
})

return super()._catalog_filter_table(table, manifest)

def _get_catalog_information_schemas(
self, manifest: Manifest
) -> List[BigQueryInformationSchema]:

candidates = super()._get_catalog_information_schemas(manifest)
information_schemas = []
db_schemas = {}
for candidate in candidates:
database = candidate.database
if database not in db_schemas:
db_schemas[database] = set(self.list_schemas(database))
if candidate.schema in db_schemas[database]:
information_schemas.append(candidate)
else:
logger.debug(
'Skipping catalog for {}.{} - schema does not exist'
.format(database, candidate.schema)
)

return information_schemas
20 changes: 14 additions & 6 deletions plugins/bigquery/dbt/include/bigquery/macros/catalog.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@

{% macro bigquery__get_catalog(information_schemas) -%}

{%- call statement('catalog', fetch_result=True) -%}
{% for information_schema in information_schemas %}
{%- if (information_schemas | length) == 0 -%}
{# Hopefully nothing cares about the columns we return when there are no rows #}
{%- set query = "select 1 as id limit 0" -%}
{%- else -%}

{%- set query -%}
{%- for information_schema in information_schemas -%}
(
with schemas as (

Expand Down Expand Up @@ -214,8 +219,11 @@
)

{% if not loop.last %} union all {% endif %}
{% endfor %}
{%- endcall -%}
{{ return(load_result('catalog').table) }}
{%- endfor -%}
{%- endset -%}

{%- endif -%}

{{ return(run_query(query)) }}

{% endmacro %}
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{ config(disabled=true, schema='notrealnotreal') }}
select 1 as id
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{ config(schema=var('extra_schema')) }}
select 1 as id
30 changes: 30 additions & 0 deletions test/integration/029_docs_generate_tests/test_docs_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3039,3 +3039,33 @@ def test__presto__run_and_generate(self):
self.verify_run_results(self.expected_run_results(
model_database=self.default_database
))


class TestDocsGenerateMissingSchema(DBTIntegrationTest):
@property
def schema(self):
return 'docs_generate_029'

@staticmethod
def dir(path):
return normalize(path)

@property
def models(self):
return self.dir("bq_models_noschema")

def setUp(self):
super().setUp()
self.extra_schema = self.unique_schema() + '_bq_test'

def tearDown(self):
with self.adapter.connection_named('__test'):
self._drop_schema_named(self.default_database, self.extra_schema)
super().tearDown()

@use_profile('bigquery')
def test_bigquery_docs_generate_noschema(self):
self.run_dbt([
'docs', 'generate',
'--vars', "{{extra_schema: {}}}".format(self.extra_schema)
])

0 comments on commit 02c2926

Please sign in to comment.