From 8311285adb010db5d20d488f29934a8d105b56a2 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Thu, 12 Dec 2019 15:49:59 -0700 Subject: [PATCH] In bigquery, filter out missing schemas before querying the catalog --- core/dbt/adapters/base/impl.py | 11 +++++-- .../bigquery/dbt/adapters/bigquery/impl.py | 29 ++++++++++++++++-- .../dbt/include/bigquery/macros/catalog.sql | 20 +++++++++---- .../bq_models_noschema/disabled.sql | 2 ++ .../bq_models_noschema/model.sql | 2 ++ .../test_docs_generate.py | 30 +++++++++++++++++++ 6 files changed, 83 insertions(+), 11 deletions(-) create mode 100644 test/integration/029_docs_generate_tests/bq_models_noschema/disabled.sql create mode 100644 test/integration/029_docs_generate_tests/bq_models_noschema/model.sql diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 83b159269af..cbc06311553 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -27,7 +27,9 @@ from dbt.adapters.base.connections import BaseConnectionManager, Connection from dbt.adapters.base.meta import AdapterMeta, available -from dbt.adapters.base.relation import ComponentName, BaseRelation +from dbt.adapters.base.relation import ( + ComponentName, BaseRelation, InformationSchema +) from dbt.adapters.base import Column as BaseColumn from dbt.adapters.cache import RelationsCache @@ -995,11 +997,16 @@ def _catalog_filter_table( """ return table.where(_catalog_filter_schemas(manifest)) + def _get_catalog_information_schemas( + self, manifest: Manifest + ) -> List[InformationSchema]: + return list(self._get_cache_schemas(manifest).keys()) + def get_catalog(self, manifest: Manifest) -> agate.Table: """Get the catalog for this manifest by running the get catalog macro. Returns an agate.Table of catalog information. """ - information_schemas = list(self._get_cache_schemas(manifest).keys()) + information_schemas = self._get_catalog_information_schemas(manifest) # make it a list so macros can index into it. kwargs = {'information_schemas': information_schemas} table = self.execute_macro(GET_CATALOG_MACRO_NAME, diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index 05e22772c8b..689eb9d7075 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -13,6 +13,7 @@ from dbt.adapters.bigquery import BigQueryColumn from dbt.adapters.bigquery import BigQueryConnectionManager from dbt.contracts.connection import Connection +from dbt.contracts.graph.manifest import Manifest from dbt.logger import GLOBAL_LOGGER as logger from dbt.utils import filter_null_values @@ -479,10 +480,32 @@ def load_dataframe(self, database, schema, table_name, agate_table, with self.connections.exception_handler("LOAD TABLE"): self.poll_until_job_completes(job, timeout) - def _catalog_filter_table(self, table, manifest): - # BigQuery doesn't allow ":" chars in column names -- remap them here. + @classmethod + def _catalog_filter_table( + cls, table: agate.Table, manifest: Manifest + ) -> agate.Table: table = table.rename(column_names={ col.name: col.name.replace('__', ':') for col in table.columns }) - return super()._catalog_filter_table(table, manifest) + + def _get_catalog_information_schemas( + self, manifest: Manifest + ) -> List[BigQueryInformationSchema]: + + candidates = super()._get_catalog_information_schemas(manifest) + information_schemas = [] + db_schemas = {} + for candidate in candidates: + database = candidate.database + if database not in db_schemas: + db_schemas[database] = set(self.list_schemas(database)) + if candidate.schema in db_schemas[database]: + information_schemas.append(candidate) + else: + logger.debug( + 'Skipping catalog for {}.{} - schema does not exist' + .format(database, candidate.schema) + ) + + return information_schemas diff --git a/plugins/bigquery/dbt/include/bigquery/macros/catalog.sql b/plugins/bigquery/dbt/include/bigquery/macros/catalog.sql index a20d9865e07..bc013b2bd37 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/catalog.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/catalog.sql @@ -1,8 +1,13 @@ {% macro bigquery__get_catalog(information_schemas) -%} - {%- call statement('catalog', fetch_result=True) -%} - {% for information_schema in information_schemas %} + {%- if (information_schemas | length) == 0 -%} + {# Hopefully nothing cares about the columns we return when there are no rows #} + {%- set query = "select 1 as id limit 0" -%} + {%- else -%} + + {%- set query -%} + {%- for information_schema in information_schemas -%} ( with schemas as ( @@ -214,8 +219,11 @@ ) {% if not loop.last %} union all {% endif %} - {% endfor %} - {%- endcall -%} - {{ return(load_result('catalog').table) }} + {%- endfor -%} + {%- endset -%} + + {%- endif -%} + + {{ return(run_query(query)) }} -{% endmacro %} +{%- endmacro %} diff --git a/test/integration/029_docs_generate_tests/bq_models_noschema/disabled.sql b/test/integration/029_docs_generate_tests/bq_models_noschema/disabled.sql new file mode 100644 index 00000000000..e4368a859ca --- /dev/null +++ b/test/integration/029_docs_generate_tests/bq_models_noschema/disabled.sql @@ -0,0 +1,2 @@ +{{ config(disabled=true, schema='notrealnotreal') }} +select 1 as id diff --git a/test/integration/029_docs_generate_tests/bq_models_noschema/model.sql b/test/integration/029_docs_generate_tests/bq_models_noschema/model.sql new file mode 100644 index 00000000000..2fb872e8446 --- /dev/null +++ b/test/integration/029_docs_generate_tests/bq_models_noschema/model.sql @@ -0,0 +1,2 @@ +{{ config(schema=var('extra_schema')) }} +select 1 as id diff --git a/test/integration/029_docs_generate_tests/test_docs_generate.py b/test/integration/029_docs_generate_tests/test_docs_generate.py index 8080bf9dad3..a8cf4196672 100644 --- a/test/integration/029_docs_generate_tests/test_docs_generate.py +++ b/test/integration/029_docs_generate_tests/test_docs_generate.py @@ -3005,3 +3005,33 @@ def test__presto__run_and_generate(self): self.verify_run_results(self.expected_run_results( model_database=self.default_database )) + + +class TestDocsGenerateMissingSchema(DBTIntegrationTest): + @property + def schema(self): + return 'docs_generate_029' + + @staticmethod + def dir(path): + return normalize(path) + + @property + def models(self): + return self.dir("bq_models_noschema") + + def setUp(self): + super().setUp() + self.extra_schema = self.unique_schema() + '_bq_test' + + def tearDown(self): + with self.adapter.connection_named('__test'): + self._drop_schema_named(self.default_database, self.extra_schema) + super().tearDown() + + @use_profile('bigquery') + def test_bigquery_docs_generate_noschema(self): + self.run_dbt([ + 'docs', 'generate', + '--vars', "{{extra_schema: {}}}".format(self.extra_schema) + ])