Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bigquery catalog queries with nonexistent schemas (#1984) #2005

Merged
merged 1 commit into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

from dbt.adapters.base.connections import BaseConnectionManager, Connection
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base.relation import ComponentName, BaseRelation
from dbt.adapters.base.relation import (
ComponentName, BaseRelation, InformationSchema
)
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.cache import RelationsCache

Expand Down Expand Up @@ -995,11 +997,16 @@ def _catalog_filter_table(
"""
return table.where(_catalog_filter_schemas(manifest))

def _get_catalog_information_schemas(
self, manifest: Manifest
) -> List[InformationSchema]:
return list(self._get_cache_schemas(manifest).keys())

def get_catalog(self, manifest: Manifest) -> agate.Table:
"""Get the catalog for this manifest by running the get catalog macro.
Returns an agate.Table of catalog information.
"""
information_schemas = list(self._get_cache_schemas(manifest).keys())
information_schemas = self._get_catalog_information_schemas(manifest)
# make it a list so macros can index into it.
kwargs = {'information_schemas': information_schemas}
table = self.execute_macro(GET_CATALOG_MACRO_NAME,
Expand Down
29 changes: 26 additions & 3 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dbt.adapters.bigquery import BigQueryColumn
from dbt.adapters.bigquery import BigQueryConnectionManager
from dbt.contracts.connection import Connection
from dbt.contracts.graph.manifest import Manifest
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import filter_null_values

Expand Down Expand Up @@ -479,10 +480,32 @@ def load_dataframe(self, database, schema, table_name, agate_table,
with self.connections.exception_handler("LOAD TABLE"):
self.poll_until_job_completes(job, timeout)

def _catalog_filter_table(self, table, manifest):
# BigQuery doesn't allow ":" chars in column names -- remap them here.
@classmethod
def _catalog_filter_table(
cls, table: agate.Table, manifest: Manifest
) -> agate.Table:
table = table.rename(column_names={
col.name: col.name.replace('__', ':') for col in table.columns
})

return super()._catalog_filter_table(table, manifest)

def _get_catalog_information_schemas(
self, manifest: Manifest
) -> List[BigQueryInformationSchema]:

candidates = super()._get_catalog_information_schemas(manifest)
information_schemas = []
db_schemas = {}
for candidate in candidates:
database = candidate.database
if database not in db_schemas:
db_schemas[database] = set(self.list_schemas(database))
if candidate.schema in db_schemas[database]:
information_schemas.append(candidate)
else:
logger.debug(
'Skipping catalog for {}.{} - schema does not exist'
.format(database, candidate.schema)
)

return information_schemas
20 changes: 14 additions & 6 deletions plugins/bigquery/dbt/include/bigquery/macros/catalog.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@

{% macro bigquery__get_catalog(information_schemas) -%}

{%- call statement('catalog', fetch_result=True) -%}
{% for information_schema in information_schemas %}
{%- if (information_schemas | length) == 0 -%}
{# Hopefully nothing cares about the columns we return when there are no rows #}
{%- set query = "select 1 as id limit 0" -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return(none) here and make the get_catalog method error with a sensible message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it really error instead of the catalog just being empty? As it is this works, it just generates an empty catalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, ok, that's great. Ship this as-is :D

{%- else -%}

{%- set query -%}
{%- for information_schema in information_schemas -%}
(
with schemas as (

Expand Down Expand Up @@ -214,8 +219,11 @@
)

{% if not loop.last %} union all {% endif %}
{% endfor %}
{%- endcall -%}
{{ return(load_result('catalog').table) }}
{%- endfor -%}
{%- endset -%}

{%- endif -%}

{{ return(run_query(query)) }}

{% endmacro %}
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{ config(disabled=true, schema='notrealnotreal') }}
select 1 as id
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{ config(schema=var('extra_schema')) }}
select 1 as id
30 changes: 30 additions & 0 deletions test/integration/029_docs_generate_tests/test_docs_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3005,3 +3005,33 @@ def test__presto__run_and_generate(self):
self.verify_run_results(self.expected_run_results(
model_database=self.default_database
))


class TestDocsGenerateMissingSchema(DBTIntegrationTest):
@property
def schema(self):
return 'docs_generate_029'

@staticmethod
def dir(path):
return normalize(path)

@property
def models(self):
return self.dir("bq_models_noschema")

def setUp(self):
super().setUp()
self.extra_schema = self.unique_schema() + '_bq_test'

def tearDown(self):
with self.adapter.connection_named('__test'):
self._drop_schema_named(self.default_database, self.extra_schema)
super().tearDown()

@use_profile('bigquery')
def test_bigquery_docs_generate_noschema(self):
self.run_dbt([
'docs', 'generate',
'--vars', "{{extra_schema: {}}}".format(self.extra_schema)
])