From 07397edd47085c46cdaf82d9c8c88ed865065175 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 3 Mar 2019 17:06:34 -0500 Subject: [PATCH 01/11] handle unexpected loaded_at field types --- core/dbt/adapters/base/impl.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index a77094c2301..f3128bd4e0b 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -79,6 +79,26 @@ def _utc(dt): return dt.replace(tzinfo=pytz.UTC) +def _parse_max_loaded_at(row, source, field_name): + for dt in row: + if dt is None: + raise dbt.exceptions.raise_database_error( + "Expected a non-null value when querying field '{}' of table " + " {} but received value 'null' instead".format( + field_name, + source)) + + elif not hasattr(dt, 'tzinfo'): + raise dbt.exceptions.raise_database_error( + "Expected a timestamp value when querying field '{}' of table " + "{} but received value of type '{}' instead".format( + field_name, + source, + type(dt).__name__)) + else: + yield _utc(dt) + + @six.add_metaclass(AdapterMeta) class BaseAdapter(object): """The BaseAdapter provides an abstract base class for adapters. @@ -827,7 +847,9 @@ def calculate_freshness(self, source, loaded_at_field, manifest=None, node=node ) - max_loaded_at, snapshotted_at = map(_utc, table[0]) + max_loaded_at, snapshotted_at = _parse_max_loaded_at(table[0], + source, + loaded_at_field) age = (snapshotted_at - max_loaded_at).total_seconds() return { 'max_loaded_at': max_loaded_at, From 67b56488d300513dc3e227fc6033ba909e55c331 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 3 Mar 2019 16:36:14 -0500 Subject: [PATCH 02/11] 0.13.0 fixes around database quoting and rendering - do not quote snowflake database identifiers by default - do not find relations in source schemas in list_relations - do not render database names in stdout if a custom database is not specified --- core/dbt/adapters/base/impl.py | 9 ++++++++- core/dbt/contracts/graph/manifest.py | 5 +++-- core/dbt/node_runners.py | 16 +++++++++++----- .../snowflake/dbt/adapters/snowflake/relation.py | 2 +- 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index a77094c2301..7a7d372f37e 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -229,7 +229,14 @@ def _relations_cache_for_schemas(self, manifest): if not dbt.flags.USE_CACHE: return - schemas = manifest.get_used_schemas() + # We only really need to cache relations for resources that + # dbt will try to build. Even the executable() list is probably + # more expansive than necessary. Really, we just want to avoid + # caching Sources here, as there could be _many_ different schemas + # in the list, and dbt largely doesn't need to know if those sources + # exist or not. + resource_types = NodeType.executable() + schemas = manifest.get_used_schemas(resource_types) relations = [] # add all relations diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 99866cd8ddb..7b9a8508aeb 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -4,7 +4,7 @@ PARSED_MACRO_CONTRACT, PARSED_DOCUMENTATION_CONTRACT, \ PARSED_SOURCE_DEFINITION_CONTRACT from dbt.contracts.graph.compiled import COMPILED_NODE_CONTRACT, CompiledNode -from dbt.exceptions import ValidationException +from dbt.exceptions import ValidationException, raise_duplicate_resource_name from dbt.node_types import NodeType from dbt.logger import GLOBAL_LOGGER as logger from dbt import tracking @@ -401,10 +401,11 @@ def __getattr__(self, name): type(self).__name__, name) ) - def get_used_schemas(self): + def get_used_schemas(self, resource_types=None): return frozenset({ (node.database, node.schema) for node in self.nodes.values() + if resource_types and node.resource_type in resource_types }) def get_used_databases(self): diff --git a/core/dbt/node_runners.py b/core/dbt/node_runners.py index b949f0221f5..4c988bb0165 100644 --- a/core/dbt/node_runners.py +++ b/core/dbt/node_runners.py @@ -286,11 +286,17 @@ def compile(self, manifest): class ModelRunner(CompileRunner): + def get_node_representation(self): + if self.config.credentials.database == self.node.database: + template = "{0.schema}.{0.alias}" + else: + template = "{0.database}.{0.schema}.{0.alias}" + + return template.format(self.node) + def describe_node(self): - materialization = dbt.utils.get_materialization(self.node) - return "{0} model {1.database}.{1.schema}.{1.alias}".format( - materialization, self.node - ) + return "{} model {}".format(self.node.get_materialization(), + self.get_node_representation()) def print_start_line(self): description = self.describe_node() @@ -477,7 +483,7 @@ def print_result_line(self, result): class SeedRunner(ModelRunner): def describe_node(self): - return "seed file {0.database}.{0.schema}.{0.alias}".format(self.node) + return "seed file {}".format(self.get_node_representation()) def before_execute(self): description = self.describe_node() diff --git a/plugins/snowflake/dbt/adapters/snowflake/relation.py b/plugins/snowflake/dbt/adapters/snowflake/relation.py index a494fb89363..5563f904c1d 100644 --- a/plugins/snowflake/dbt/adapters/snowflake/relation.py +++ b/plugins/snowflake/dbt/adapters/snowflake/relation.py @@ -9,7 +9,7 @@ class SnowflakeRelation(BaseRelation): }, 'quote_character': '"', 'quote_policy': { - 'database': True, + 'database': False, 'schema': False, 'identifier': False, }, From d39a048e6e48ed666cfe3a2ddb1f35d72279bc24 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 4 Mar 2019 20:39:31 -0500 Subject: [PATCH 03/11] pr feedback --- core/dbt/adapters/base/impl.py | 48 +++++++++++++++------------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index f3128bd4e0b..09cff955ac5 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -69,36 +69,31 @@ def test(row): return test -def _utc(dt): +def _utc(dt, source, field_name): """If dt has a timezone, return a new datetime that's in UTC. Otherwise, assume the datetime is already for UTC and add the timezone. """ - if dt.tzinfo: + if dt is None: + raise dbt.exceptions.raise_database_error( + "Expected a non-null value when querying field '{}' of table " + " {} but received value 'null' instead".format( + field_name, + source)) + + elif not hasattr(dt, 'tzinfo'): + raise dbt.exceptions.raise_database_error( + "Expected a timestamp value when querying field '{}' of table " + "{} but received value of type '{}' instead".format( + field_name, + source, + type(dt).__name__)) + + elif dt.tzinfo: return dt.astimezone(pytz.UTC) else: return dt.replace(tzinfo=pytz.UTC) -def _parse_max_loaded_at(row, source, field_name): - for dt in row: - if dt is None: - raise dbt.exceptions.raise_database_error( - "Expected a non-null value when querying field '{}' of table " - " {} but received value 'null' instead".format( - field_name, - source)) - - elif not hasattr(dt, 'tzinfo'): - raise dbt.exceptions.raise_database_error( - "Expected a timestamp value when querying field '{}' of table " - "{} but received value of type '{}' instead".format( - field_name, - source, - type(dt).__name__)) - else: - yield _utc(dt) - - @six.add_metaclass(AdapterMeta) class BaseAdapter(object): """The BaseAdapter provides an abstract base class for adapters. @@ -843,13 +838,12 @@ def calculate_freshness(self, source, loaded_at_field, manifest=None, dbt.exceptions.raise_compiler_error( 'Got an invalid result from "{}" macro: {}'.format( FRESHNESS_MACRO_NAME, [tuple(r) for r in table] - ), - node=node + ) ) - max_loaded_at, snapshotted_at = _parse_max_loaded_at(table[0], - source, - loaded_at_field) + max_loaded_at = _utc(table[0][0], source, loaded_at_field) + snapshotted_at = _utc(table[0][1], source, loaded_at_field) + age = (snapshotted_at - max_loaded_at).total_seconds() return { 'max_loaded_at': max_loaded_at, From 328ce82bae5471f5bda57049234a47860c31c13a Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 4 Mar 2019 20:43:37 -0500 Subject: [PATCH 04/11] fix unit tests --- test/unit/test_snowflake_adapter.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/unit/test_snowflake_adapter.py b/test/unit/test_snowflake_adapter.py index d97ee2b4c7e..db5394c3fe3 100644 --- a/test/unit/test_snowflake_adapter.py +++ b/test/unit/test_snowflake_adapter.py @@ -69,7 +69,7 @@ def test_quoting_on_drop_schema(self): ) self.mock_execute.assert_has_calls([ - mock.call('drop schema if exists "test_database"."test_schema" cascade', None) + mock.call('drop schema if exists test_database."test_schema" cascade', None) ]) def test_quoting_on_drop(self): @@ -84,7 +84,7 @@ def test_quoting_on_drop(self): self.mock_execute.assert_has_calls([ mock.call( - 'drop table if exists "test_database"."test_schema".test_table cascade', + 'drop table if exists test_database."test_schema".test_table cascade', None ) ]) @@ -100,7 +100,7 @@ def test_quoting_on_truncate(self): self.adapter.truncate_relation(relation) self.mock_execute.assert_has_calls([ - mock.call('truncate table "test_database"."test_schema".test_table', None) + mock.call('truncate table test_database."test_schema".test_table', None) ]) def test_quoting_on_rename(self): @@ -125,7 +125,7 @@ def test_quoting_on_rename(self): ) self.mock_execute.assert_has_calls([ mock.call( - 'alter table "test_database"."test_schema".table_a rename to "test_database"."test_schema".table_b', + 'alter table test_database."test_schema".table_a rename to test_database."test_schema".table_b', None ) ]) From 22a2887df29194d10fb9254f2bd6062b0fef0620 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 4 Mar 2019 22:50:23 -0500 Subject: [PATCH 05/11] add missing import --- core/dbt/adapters/base/impl.py | 1 + 1 file changed, 1 insertion(+) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 7a7d372f37e..5f18f8d7367 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -18,6 +18,7 @@ from dbt.logger import GLOBAL_LOGGER as logger from dbt.schema import Column from dbt.utils import filter_null_values, translate_aliases +from dbt.node_types import NodeType from dbt.adapters.base.meta import AdapterMeta, available, available_raw, \ available_deprecated From 7eb033e71ae7d3335e5282ba1fe4d2d18cfa078f Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 5 Mar 2019 17:54:29 -0500 Subject: [PATCH 06/11] fix incorrect schema filtering logic --- core/dbt/contracts/graph/manifest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 7b9a8508aeb..06f8a9270a8 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -405,7 +405,7 @@ def get_used_schemas(self, resource_types=None): return frozenset({ (node.database, node.schema) for node in self.nodes.values() - if resource_types and node.resource_type in resource_types + if not resource_types or node.resource_type in resource_types }) def get_used_databases(self): From ec14c6b2dcf91efe05726d553511b0b4e491caca Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 5 Mar 2019 10:22:06 -0700 Subject: [PATCH 07/11] initial work, unit tests --- core/dbt/adapters/base/relation.py | 13 ++++++------ core/dbt/context/common.py | 5 +++++ core/dbt/contracts/graph/parsed.py | 22 +++++++++++++++++--- core/dbt/contracts/graph/unparsed.py | 25 +++++++++++++++++++++++ core/dbt/parser/schemas.py | 25 ++++++++++++++++++++--- test/unit/test_parser.py | 30 ++++++++++++++++++++++++++-- 6 files changed, 106 insertions(+), 14 deletions(-) diff --git a/core/dbt/adapters/base/relation.py b/core/dbt/adapters/base/relation.py index 0a40346b5be..1488a3615b5 100644 --- a/core/dbt/adapters/base/relation.py +++ b/core/dbt/adapters/base/relation.py @@ -30,7 +30,7 @@ class BaseRelation(APIObject): 'database': True, 'schema': True, 'identifier': True - } + }, } PATH_SCHEMA = { @@ -174,15 +174,16 @@ def quoted(self, identifier): @classmethod def create_from_source(cls, source, **kwargs): + quote_policy = dbt.utils.deep_merge( + cls.DEFAULTS['quote_policy'], + source.quoting, + kwargs.get('quote_policy', {}) + ) return cls.create( database=source.database, schema=source.schema, identifier=source.identifier, - quote_policy={ - 'database': True, - 'schema': True, - 'identifier': True, - }, + quote_policy=quote_policy, **kwargs ) diff --git a/core/dbt/context/common.py b/core/dbt/context/common.py index f528af89ad3..c46a3fb8528 100644 --- a/core/dbt/context/common.py +++ b/core/dbt/context/common.py @@ -36,6 +36,11 @@ def __init__(self, adapter): def __getattr__(self, key): return getattr(self.relation_type, key) + def create_from_source(self, *args, **kwargs): + # bypass our create when creating from source so as not to mess up + # the source quoting + return self.relation_type.create_from_source(*args, **kwargs) + def create(self, *args, **kwargs): kwargs['quote_policy'] = dbt.utils.merge( self.quoting_config, diff --git a/core/dbt/contracts/graph/parsed.py b/core/dbt/contracts/graph/parsed.py index 966ba0f90f4..295db58b771 100644 --- a/core/dbt/contracts/graph/parsed.py +++ b/core/dbt/contracts/graph/parsed.py @@ -1,8 +1,6 @@ from dbt.api import APIObject from dbt.utils import deep_merge from dbt.node_types import NodeType -from dbt.exceptions import raise_duplicate_resource_name, \ - raise_patch_targets_not_found import dbt.clients.jinja @@ -558,6 +556,7 @@ def generator(self): # available in this class. should we just generate this here? return dbt.clients.jinja.macro_generator(self._contents) + # This is just the file + its ID PARSED_DOCUMENTATION_CONTRACT = deep_merge( UNPARSED_DOCUMENTATION_FILE_CONTRACT, @@ -634,9 +633,26 @@ class Hook(APIObject): } +QUOTING_CONTRACT = { + 'properties': { + 'quoting': { + 'type': 'object', + 'additionalProperties': False, + 'properties': { + 'database': {'type': 'boolean'}, + 'schema': {'type': 'boolean'}, + 'identifier': {'type': 'boolean'}, + }, + }, + }, + 'required': ['quoting'], +} + + PARSED_SOURCE_DEFINITION_CONTRACT = deep_merge( UNPARSED_BASE_CONTRACT, FRESHNESS_CONTRACT, + QUOTING_CONTRACT, HAS_DESCRIPTION_CONTRACT, HAS_UNIQUE_ID_CONTRACT, HAS_DOCREFS_CONTRACT, @@ -676,7 +692,7 @@ class Hook(APIObject): # the manifest search stuff really requires this, sadly 'resource_type': { 'enum': [NodeType.Source], - } + }, }, # note that while required, loaded_at_field and freshness may be null 'required': [ diff --git a/core/dbt/contracts/graph/unparsed.py b/core/dbt/contracts/graph/unparsed.py index 30de42ef695..dbac0a31e32 100644 --- a/core/dbt/contracts/graph/unparsed.py +++ b/core/dbt/contracts/graph/unparsed.py @@ -218,6 +218,29 @@ class UnparsedNodeUpdate(APIObject): } +_QUOTING_CONTRACT = { + 'type': 'object', + 'additionalProperties': False, + 'properties': { + 'database': {'type': 'boolean'}, + 'schema': {'type': 'boolean'}, + 'identifier': {'type': 'boolean'}, + }, +} + + +QUOTING_CONTRACT = { + 'properties': { + 'quoting': { + 'anyOf': [ + {'type': 'null'}, + _QUOTING_CONTRACT, + ], + }, + }, +} + + FRESHNESS_CONTRACT = { 'properties': { 'loaded_at_field': { @@ -238,6 +261,7 @@ class UnparsedNodeUpdate(APIObject): UNPARSED_NODE_DESCRIPTION_CONTRACT, UNPARSED_COLUMN_DESCRIPTION_CONTRACT, FRESHNESS_CONTRACT, + QUOTING_CONTRACT, { 'description': ( 'A source table definition, as provided in the "tables" ' @@ -256,6 +280,7 @@ class UnparsedNodeUpdate(APIObject): UNPARSED_SOURCE_DEFINITION_CONTRACT = deep_merge( FRESHNESS_CONTRACT, + QUOTING_CONTRACT, { 'type': 'object', 'additionalProperties': False, diff --git a/core/dbt/parser/schemas.py b/core/dbt/parser/schemas.py index 82e92973536..ba548b4a372 100644 --- a/core/dbt/parser/schemas.py +++ b/core/dbt/parser/schemas.py @@ -243,6 +243,10 @@ def _generate_test_name(self, target, test_type, test_args): """Returns a hashed_name, full_name pair.""" raise NotImplementedError + @staticmethod + def _describe_test_target(test_target): + raise NotImplementedError + def build_test_node(self, test_target, package_name, test, root_dir, path, column_name=None): """Build a test node against the given target (a model or a source). @@ -257,8 +261,9 @@ def build_test_node(self, test_target, package_name, test, root_dir, path, source_package = self.all_projects.get(package_name) if source_package is None: - desc = '"{}" test on model "{}"'.format(test_type, - model_name) + desc = '"{}" test on {}'.format( + test_type, self._describe_test_target(test_target) + ) dbt.exceptions.raise_dep_not_found(None, desc, test_namespace) test_path = os.path.basename(path) @@ -306,6 +311,10 @@ def _build_raw_sql(self, test_namespace, target, test_type, test_args): def _generate_test_name(self, target, test_type, test_args): return get_nice_schema_test_name(test_type, target['name'], test_args) + @staticmethod + def _describe_test_target(test_target): + return 'model "{}"'.format(test_target) + def parse_models_entry(self, model_dict, path, package_name, root_dir): model_name = model_dict['name'] refs = ParserRef() @@ -378,6 +387,10 @@ def _generate_test_name(self, target, test_type, test_args): test_args ) + @staticmethod + def _describe_test_target(test_target): + return 'source "{0[source]}.{0[table]}"'.format(test_target) + def get_path(self, *parts): return '.'.join(str(s) for s in parts) @@ -392,12 +405,17 @@ def generate_source_node(self, source, table, path, package_name, root_dir, get_rendered(description, context) get_rendered(source_description, context) - # we'll fill columns in later. freshness = dbt.utils.deep_merge(source.get('freshness', {}), table.get('freshness', {})) loaded_at_field = table.get('loaded_at_field', source.get('loaded_at_field')) + + # use 'or {}' to allow quoting: null + source_quoting = source.get('quoting') or {} + table_quoting = table.get('quoting') or {} + quoting = dbt.utils.deep_merge(source_quoting, table_quoting) + default_database = self.root_project_config.credentials.database return ParsedSourceDefinition( package_name=package_name, @@ -417,6 +435,7 @@ def generate_source_node(self, source, table, path, package_name, root_dir, docrefs=refs.docrefs, loaded_at_field=loaded_at_field, freshness=freshness, + quoting=quoting, resource_type=NodeType.Source ) diff --git a/test/unit/test_parser.py b/test/unit/test_parser.py index 7cce0ca75e4..a4e23b444c3 100644 --- a/test/unit/test_parser.py +++ b/test/unit/test_parser.py @@ -206,7 +206,11 @@ def setUp(self): database='test', schema='foo', identifier='bar', - resource_type='source' + resource_type='source', + quoting={ + 'schema': True, + 'identifier': False, + } ) self._expected_source_tests = [ @@ -468,6 +472,9 @@ def test__source_schema(self): - name: my_source loader: some_loader description: my source description + quoting: + schema: True + identifier: True freshness: warn_after: count: 10 @@ -476,7 +483,7 @@ def test__source_schema(self): count: 20 period: hour loaded_at_field: something - schema: foo + schema: '{{ var("test_schema_name") }}' tables: - name: my_table description: "my table description" @@ -485,6 +492,8 @@ def test__source_schema(self): warn_after: count: 7 period: hour + quoting: + identifier: False columns: - name: id description: user ID @@ -586,6 +595,8 @@ def test__model_schema(self): def test__mixed_schema(self): test_yml = yaml.safe_load(''' version: 2 + quoting: + database: True models: - name: model_one description: blah blah @@ -609,6 +620,9 @@ def test__mixed_schema(self): - name: my_source loader: some_loader description: my source description + quoting: + schema: True + identifier: True freshness: warn_after: count: 10 @@ -626,6 +640,8 @@ def test__mixed_schema(self): warn_after: count: 7 period: hour + quoting: + identifier: False columns: - name: id description: user ID @@ -681,6 +697,9 @@ def test__source_schema_invalid_test_strict(self): - name: my_source loader: some_loader description: my source description + quoting: + schema: True + identifier: True freshness: warn_after: count: 10 @@ -698,6 +717,8 @@ def test__source_schema_invalid_test_strict(self): warn_after: count: 7 period: hour + quoting: + identifier: False columns: - name: id description: user ID @@ -738,6 +759,9 @@ def test__source_schema_invalid_test_not_strict(self): - name: my_source loader: some_loader description: my source description + quoting: + schema: True + identifier: True freshness: warn_after: count: 10 @@ -755,6 +779,8 @@ def test__source_schema_invalid_test_not_strict(self): warn_after: count: 7 period: hour + quoting: + identifier: False columns: - name: id description: user ID From 088442e9c13ee3aa86c71e37278c4aeb0ac9120c Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 5 Mar 2019 10:42:31 -0700 Subject: [PATCH 08/11] test fixes --- core/dbt/adapters/base/impl.py | 100 +++++++++++++++--- core/dbt/adapters/base/relation.py | 40 ++++++- core/dbt/adapters/sql/impl.py | 10 +- core/dbt/contracts/graph/manifest.py | 2 +- .../global_project/macros/adapters/common.sql | 12 +-- .../bigquery/dbt/adapters/bigquery/impl.py | 7 +- .../dbt/include/bigquery/macros/adapters.sql | 4 +- .../postgres/dbt/adapters/postgres/impl.py | 5 + .../dbt/include/postgres/macros/adapters.sql | 8 +- .../dbt/include/postgres/macros/catalog.sql | 8 +- .../dbt/include/redshift/macros/adapters.sql | 6 +- .../dbt/include/redshift/macros/catalog.sql | 14 +-- .../dbt/include/snowflake/macros/adapters.sql | 14 +-- .../dbt/include/snowflake/macros/catalog.sql | 9 +- .../ref_models/schema.yml | 5 + .../test_docs_generate.py | 61 ++++++----- .../042_sources_test/models/schema.yml | 4 + test/integration/base.py | 2 +- 18 files changed, 215 insertions(+), 96 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 09cff955ac5..f501c119b3d 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -1,7 +1,4 @@ import abc -import copy -import multiprocessing -import time import agate import pytz @@ -13,11 +10,11 @@ import dbt.clients.agate_helper from dbt.compat import abstractclassmethod, classmethod -from dbt.contracts.connection import Connection +from dbt.node_types import NodeType from dbt.loader import GraphLoader from dbt.logger import GLOBAL_LOGGER as logger from dbt.schema import Column -from dbt.utils import filter_null_values, translate_aliases +from dbt.utils import filter_null_values from dbt.adapters.base.meta import AdapterMeta, available, available_raw, \ available_deprecated @@ -94,6 +91,53 @@ def _utc(dt, source, field_name): return dt.replace(tzinfo=pytz.UTC) +class SchemaSearchMap(dict): + """A utility class to keep track of what information_schema tables to + search for what schemas + """ + def add(self, relation): + key = relation.information_schema_only() + if key not in self: + self[key] = set() + self[key].add(relation.schema.lower()) + + def search(self): + for information_schema_name, schemas in self.items(): + for schema in schemas: + yield information_schema_name, schema + + def schemas_searched(self): + result = set() + for information_schema_name, schemas in self.items(): + result.update( + (information_schema_name.database, schema) + for schema in schemas + ) + return result + + def flatten(self): + new = self.__class__() + + database = None + # iterate once to look for a database name + seen = {r.database.lower() for r in self if r.database} + if len(seen) > 1: + dbt.exceptions.raise_compiler_error( + 'flatten() requires <=1 database (got {})'.format(seen) + ) + elif len(seen) == 1: + database = list(seen)[0] + + for information_schema_name, schema in self.search(): + new.add(information_schema_name.incorporate( + path={'database': database, 'schema': schema}, + quote_policy={'database': False}, + include_policy={'database': False}, + )) + + return new + + @six.add_metaclass(AdapterMeta) class BaseAdapter(object): """The BaseAdapter provides an abstract base class for adapters. @@ -237,6 +281,25 @@ def _relations_filter_table(cls, table, schemas): """ return table.where(_relations_filter_schemas(schemas)) + def _get_cache_schemas(self, manifest): + """Get a mapping of each node's "information_schema" relations to a + set of all schemas expected in that information_schema. + + There may be keys that are technically duplicates on the database side, + for example all of '"foo", 'foo', '"FOO"' and 'FOO' could coexist as + databases, and values could overlap as appropriate. All values are + lowercase strings. + """ + info_schema_name_map = SchemaSearchMap() + for node in manifest.nodes.values(): + relation = self.Relation.create_from(self.config, node) + info_schema_name_map.add(relation) + # result is a map whose keys are information_schema Relations without + # identifiers that have appropriate database prefixes, and whose values + # are sets of lowercase schema names that are valid members of those + # schemas + return info_schema_name_map + def _relations_cache_for_schemas(self, manifest): """Populate the relations cache for the given schemas. Returns an iteratble of the schemas populated, as strings. @@ -244,17 +307,15 @@ def _relations_cache_for_schemas(self, manifest): if not dbt.flags.USE_CACHE: return - schemas = manifest.get_used_schemas() - - relations = [] - # add all relations - for db, schema in schemas: + info_schema_name_map = self._get_cache_schemas(manifest) + for db, schema in info_schema_name_map.search(): for relation in self.list_relations_without_caching(db, schema): self.cache.add(relation) + # it's possible that there were no relations in some schemas. We want # to insert the schemas we query into the cache's `.schemas` attribute # so we can check it later - self.cache.update_schemas(schemas) + self.cache.update_schemas(info_schema_name_map.schemas_searched()) def set_relations_cache(self, manifest, clear=False): """Run a query that gets a populated cache of the relations in the @@ -415,13 +476,14 @@ def expand_column_types(self, goal, current, model_name=None): ) @abc.abstractmethod - def list_relations_without_caching(self, database, schema, + def list_relations_without_caching(self, information_schema, schema, model_name=None): """List relations in the given schema, bypassing the cache. This is used as the underlying behavior to fill the cache. - :param str database: The name of the database to list relations from. + :param Relation information_schema: The information schema to list + relations from. :param str schema: The name of the schema to list relations from. :param Optional[str] model_name: The name of the model to use for the connection. @@ -495,10 +557,15 @@ def list_relations(self, database, schema, model_name=None): if self._schema_is_cached(database, schema, model_name): return self.cache.get_relations(database, schema) + information_schema = self.Relation.create( + database=database, + schema=schema, + model_name='').information_schema() + # we can't build the relations cache because we don't have a # manifest so we can't run any operations. relations = self.list_relations_without_caching( - database, schema, model_name=model_name + information_schema, schema, model_name=model_name ) logger.debug('with schema={}, model_name={}, relations={}' @@ -802,10 +869,11 @@ def get_catalog(self, manifest): """Get the catalog for this manifest by running the get catalog macro. Returns an agate.Table of catalog information. """ + information_schemas = list(self._get_cache_schemas(manifest).keys()) # make it a list so macros can index into it. - context = {'databases': list(manifest.get_used_databases())} + kwargs = {'information_schemas': information_schemas} table = self.execute_macro(GET_CATALOG_MACRO_NAME, - context_override=context, + kwargs=kwargs, release=True) results = self._catalog_filter_table(table, manifest) diff --git a/core/dbt/adapters/base/relation.py b/core/dbt/adapters/base/relation.py index 1488a3615b5..dccfdb3d534 100644 --- a/core/dbt/adapters/base/relation.py +++ b/core/dbt/adapters/base/relation.py @@ -1,5 +1,6 @@ from dbt.api import APIObject from dbt.utils import filter_null_values +from dbt.node_types import NodeType import dbt.exceptions @@ -38,7 +39,7 @@ class BaseRelation(APIObject): 'properties': { 'database': {'type': ['string', 'null']}, 'schema': {'type': ['string', 'null']}, - 'identifier': {'type': 'string'}, + 'identifier': {'type': ['string', 'null']}, }, 'required': ['database', 'schema', 'identifier'], } @@ -135,6 +136,36 @@ def include(self, database=None, schema=None, identifier=None): return self.incorporate(include_policy=policy) + def information_schema(self, identifier=None): + include_db = self.database is not None + include_policy = filter_null_values({ + 'database': include_db, + 'schema': True, + 'identifier': identifier is not None + }) + quote_policy = filter_null_values({ + 'database': self.quote_policy['database'], + 'schema': False, + 'identifier': False, + }) + + path_update = { + 'schema': 'information_schema', + 'identifier': identifier + } + + return self.incorporate( + quote_policy=quote_policy, + include_policy=include_policy, + path=path_update, + table_name=identifier) + + def information_schema_only(self): + return self.information_schema() + + def information_schema_table(self, identifier): + return self.information_schema(identifier) + def render(self, use_table_name=True): parts = [] @@ -203,6 +234,13 @@ def create_from_node(cls, config, node, table_name=None, quote_policy=None, quote_policy=quote_policy, **kwargs) + @classmethod + def create_from(cls, config, node, **kwargs): + if node.resource_type == NodeType.Source: + return cls.create_from_source(node, **kwargs) + else: + return cls.create_from_node(config, node, **kwargs) + @classmethod def create(cls, database=None, schema=None, identifier=None, table_name=None, diff --git a/core/dbt/adapters/sql/impl.py b/core/dbt/adapters/sql/impl.py index c7fa6d79b7f..196d4fad186 100644 --- a/core/dbt/adapters/sql/impl.py +++ b/core/dbt/adapters/sql/impl.py @@ -1,15 +1,10 @@ -import abc -import time - import agate -import six import dbt.clients.agate_helper import dbt.exceptions import dbt.flags from dbt.adapters.base import BaseAdapter, available from dbt.logger import GLOBAL_LOGGER as logger -from dbt.compat import abstractclassmethod LIST_RELATIONS_MACRO_NAME = 'list_relations_without_caching' @@ -196,11 +191,12 @@ def drop_schema(self, database, schema, model_name=None): kwargs=kwargs, connection_name=model_name) - def list_relations_without_caching(self, database, schema, + def list_relations_without_caching(self, information_schema, schema, model_name=None): + kwargs = {'information_schema': information_schema, 'schema': schema} results = self.execute_macro( LIST_RELATIONS_MACRO_NAME, - kwargs={'database': database, 'schema': schema}, + kwargs=kwargs, connection_name=model_name, release=True ) diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 99866cd8ddb..9adc3b35205 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -4,7 +4,7 @@ PARSED_MACRO_CONTRACT, PARSED_DOCUMENTATION_CONTRACT, \ PARSED_SOURCE_DEFINITION_CONTRACT from dbt.contracts.graph.compiled import COMPILED_NODE_CONTRACT, CompiledNode -from dbt.exceptions import ValidationException +from dbt.exceptions import raise_duplicate_resource_name from dbt.node_types import NodeType from dbt.logger import GLOBAL_LOGGER as logger from dbt import tracking diff --git a/core/dbt/include/global_project/macros/adapters/common.sql b/core/dbt/include/global_project/macros/adapters/common.sql index b14004fa68b..ca2ea6c47a3 100644 --- a/core/dbt/include/global_project/macros/adapters/common.sql +++ b/core/dbt/include/global_project/macros/adapters/common.sql @@ -83,11 +83,11 @@ {% endmacro %} -{% macro get_catalog() -%} - {{ return(adapter_macro('get_catalog')) }} +{% macro get_catalog(information_schemas) -%} + {{ return(adapter_macro('get_catalog', information_schemas)) }} {%- endmacro %} -{% macro default__get_catalog() -%} +{% macro default__get_catalog(information_schemas) -%} {% set typename = adapter.type() %} {% set msg -%} @@ -214,12 +214,12 @@ {% endmacro %} -{% macro list_relations_without_caching(database, schema) %} - {{ return(adapter_macro('list_relations_without_caching', database, schema)) }} +{% macro list_relations_without_caching(information_schema, schema) %} + {{ return(adapter_macro('list_relations_without_caching', information_schema, schema)) }} {% endmacro %} -{% macro default__list_relations_without_caching(database, schema) %} +{% macro default__list_relations_without_caching(information_schema, schema) %} {{ dbt.exceptions.raise_not_implemented( 'list_relations_without_caching macro not implemented for adapter '+adapter.type()) }} {% endmacro %} diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index e503148be97..ad5fbf724bf 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -116,13 +116,14 @@ def expand_column_types(self, goal, current, model_name=None): # This is a no-op on BigQuery pass - def list_relations_without_caching(self, database, schema, + def list_relations_without_caching(self, information_schema, schema, model_name=None): connection = self.connections.get(model_name) client = connection.handle - bigquery_dataset = self.connections.dataset(database, schema, - connection) + bigquery_dataset = self.connections.dataset( + information_schema.database, schema, connection + ) all_tables = client.list_tables( bigquery_dataset, diff --git a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql index d9700f3591a..e14dd93a120 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql @@ -59,8 +59,8 @@ {% endmacro %} -{% macro bigquery__list_relations_without_caching(database, schema) -%} - {{ return(adapter.list_relations_without_caching(database, schema)) }} +{% macro bigquery__list_relations_without_caching(information_schema, schema) -%} + {{ return(adapter.list_relations_without_caching(information_schema, schema)) }} {% endmacro %} diff --git a/plugins/postgres/dbt/adapters/postgres/impl.py b/plugins/postgres/dbt/adapters/postgres/impl.py index a5b0087a711..94e5f92ad37 100644 --- a/plugins/postgres/dbt/adapters/postgres/impl.py +++ b/plugins/postgres/dbt/adapters/postgres/impl.py @@ -56,6 +56,11 @@ def _link_cached_database_relations(self, schemas): if refed_schema.lower() in schemas: self.cache.add_link(dependent, referenced) + def _get_cache_schemas(self, manifest): + # postgres/redshift only allow one database (the main one) + schemas = super(PostgresAdapter, self)._get_cache_schemas(manifest) + return schemas.flatten() + def _link_cached_relations(self, manifest): schemas = set() for db, schema in manifest.get_used_schemas(): diff --git a/plugins/postgres/dbt/include/postgres/macros/adapters.sql b/plugins/postgres/dbt/include/postgres/macros/adapters.sql index b51df9f42c5..f40ca6ca143 100644 --- a/plugins/postgres/dbt/include/postgres/macros/adapters.sql +++ b/plugins/postgres/dbt/include/postgres/macros/adapters.sql @@ -26,7 +26,7 @@ numeric_precision, numeric_scale - from {{ information_schema_name(relation.database) }}.columns + from {{ relation.information_schema('columns') }} where table_name = '{{ relation.identifier }}' {% if relation.schema %} and table_schema = '{{ relation.schema }}' @@ -39,10 +39,10 @@ {% endmacro %} -{% macro postgres__list_relations_without_caching(database, schema) %} +{% macro postgres__list_relations_without_caching(information_schema, schema) %} {% call statement('list_relations_without_caching', fetch_result=True) -%} select - '{{ database }}' as database, + '{{ information_schema.database.lower() }}' as database, tablename as name, schemaname as schema, 'table' as type @@ -50,7 +50,7 @@ where schemaname ilike '{{ schema }}' union all select - '{{ database }}' as database, + '{{ information_schema.database.lower() }}' as database, viewname as name, schemaname as schema, 'view' as type diff --git a/plugins/postgres/dbt/include/postgres/macros/catalog.sql b/plugins/postgres/dbt/include/postgres/macros/catalog.sql index e04e521ea94..3558f3ff649 100644 --- a/plugins/postgres/dbt/include/postgres/macros/catalog.sql +++ b/plugins/postgres/dbt/include/postgres/macros/catalog.sql @@ -1,11 +1,11 @@ -{% macro postgres__get_catalog() -%} +{% macro postgres__get_catalog(information_schemas) -%} {%- call statement('catalog', fetch_result=True) -%} - {% if (databases | length) != 1 %} - exceptions.raise_compiler_error('postgres get_catalog requires exactly one database') + {% if (information_schemas | length) != 1 %} + {{ exceptions.raise_compiler_error('postgres get_catalog requires exactly one database') }} {% endif %} - {% set database = databases[0] %} + {% set database = information_schemas[0].database %} {{ adapter.verify_database(database) }} with table_owners as ( diff --git a/plugins/redshift/dbt/include/redshift/macros/adapters.sql b/plugins/redshift/dbt/include/redshift/macros/adapters.sql index 7fd7063bcf5..bb2f6ced58d 100644 --- a/plugins/redshift/dbt/include/redshift/macros/adapters.sql +++ b/plugins/redshift/dbt/include/redshift/macros/adapters.sql @@ -88,7 +88,7 @@ numeric_precision, numeric_scale - from information_schema.columns + from {{ relation.information_schema('columns') }} where table_name = '{{ relation.identifier }}' ), @@ -153,8 +153,8 @@ {% endmacro %} -{% macro redshift__list_relations_without_caching(database, schema) %} - {{ return(postgres__list_relations_without_caching(database, schema)) }} +{% macro redshift__list_relations_without_caching(information_schema, schema) %} + {{ return(postgres__list_relations_without_caching(information_schema, schema)) }} {% endmacro %} diff --git a/plugins/redshift/dbt/include/redshift/macros/catalog.sql b/plugins/redshift/dbt/include/redshift/macros/catalog.sql index 34529df8b02..c6788d9c66c 100644 --- a/plugins/redshift/dbt/include/redshift/macros/catalog.sql +++ b/plugins/redshift/dbt/include/redshift/macros/catalog.sql @@ -1,10 +1,10 @@ -{% macro redshift__get_base_catalog() -%} +{% macro redshift__get_base_catalog(information_schemas) -%} {%- call statement('base_catalog', fetch_result=True) -%} - {% if (databases | length) != 1 %} - exceptions.raise_compiler_error('redshift get_catalog requires exactly one database') + {% if (information_schemas | length) != 1 %} + {{ exceptions.raise_compiler_error('redshift get_catalog requires exactly one database') }} {% endif %} - {% set database = databases[0] %} + {% set database = information_schemas[0].database %} {{ adapter.verify_database(database) }} with late_binding as ( @@ -106,7 +106,7 @@ {{ return(load_result('base_catalog').table) }} {%- endmacro %} -{% macro redshift__get_extended_catalog() %} +{% macro redshift__get_extended_catalog(information_schemas) %} {%- call statement('extended_catalog', fetch_result=True) -%} select @@ -218,12 +218,12 @@ {% endmacro %} -{% macro redshift__get_catalog() %} +{% macro redshift__get_catalog(information_schemas) %} {#-- Compute a left-outer join in memory. Some Redshift queries are -- leader-only, and cannot be joined to other compute-based queries #} - {% set catalog = redshift__get_base_catalog() %} + {% set catalog = redshift__get_base_catalog(information_schemas) %} {% set select_extended = redshift__can_select_from('svv_table_info') %} {% if select_extended %} diff --git a/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql b/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql index 4e8c5f3fbdd..2870448b22a 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql @@ -31,7 +31,7 @@ numeric_scale from - {{ information_schema_name(relation.database) }}.columns + {{ relation.information_schema('columns') }} where table_name ilike '{{ relation.identifier }}' {% if relation.schema %} @@ -50,7 +50,7 @@ {% endmacro %} -{% macro snowflake__list_relations_without_caching(database, schema) %} +{% macro snowflake__list_relations_without_caching(information_schema, schema) %} {% call statement('list_relations_without_caching', fetch_result=True) -%} select table_catalog as database, @@ -60,20 +60,20 @@ when table_type = 'VIEW' then 'view' else table_type end as table_type - from {{ information_schema_name(database) }}.tables + from {{ information_schema }}.tables where table_schema ilike '{{ schema }}' - and table_catalog ilike '{{ database }}' + and table_catalog ilike '{{ information_schema.database.lower() }}' {% endcall %} {{ return(load_result('list_relations_without_caching').table) }} {% endmacro %} -{% macro snowflake__check_schema_exists(database, schema) -%} +{% macro snowflake__check_schema_exists(information_schema, schema) -%} {% call statement('check_schema_exists', fetch_result=True) -%} select count(*) - from {{ information_schema_name(database) }}.schemata + from {{ information_schema }}.schemata where upper(schema_name) = upper('{{ schema }}') - and upper(catalog_name) = upper('{{ database }}') + and upper(catalog_name) = upper('{{ information_schema.database }}') {%- endcall %} {{ return(load_result('check_schema_exists').table) }} {%- endmacro %} diff --git a/plugins/snowflake/dbt/include/snowflake/macros/catalog.sql b/plugins/snowflake/dbt/include/snowflake/macros/catalog.sql index d235e687607..fe68cd2e46e 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/catalog.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/catalog.sql @@ -1,8 +1,8 @@ -{% macro snowflake__get_catalog() -%} +{% macro snowflake__get_catalog(information_schemas) -%} {%- call statement('catalog', fetch_result=True) -%} - {% for database in databases %} + {% for information_schema in information_schemas %} ( with tables as ( @@ -31,7 +31,7 @@ 'Approximate size of the table as reported by Snowflake' as "stats:bytes:description", (bytes is not null) as "stats:bytes:include" - from {{ information_schema_name(database) }}.tables + from {{ information_schema }}.tables ), @@ -48,7 +48,7 @@ data_type as "column_type", null as "column_comment" - from {{ adapter.quote_as_configured(database, "database") }}.information_schema.columns + from {{ information_schema }}.columns ) @@ -56,7 +56,6 @@ from tables join columns using ("table_database", "table_schema", "table_name") where "table_schema" != 'INFORMATION_SCHEMA' - and "table_database" = {{ adapter.quote_as_configured(database, "database").replace('"', "'") }} order by "column_index" ) {% if not loop.last %} union all {% endif %} diff --git a/test/integration/029_docs_generate_tests/ref_models/schema.yml b/test/integration/029_docs_generate_tests/ref_models/schema.yml index 087efc30108..0ebd5e3af3f 100644 --- a/test/integration/029_docs_generate_tests/ref_models/schema.yml +++ b/test/integration/029_docs_generate_tests/ref_models/schema.yml @@ -17,10 +17,15 @@ sources: description: "{{ doc('source_info') }}" loader: a_loader schema: "{{ var('test_schema') }}" + quoting: + database: False + identifier: False tables: - name: my_table description: "{{ doc('table_info') }}" identifier: seed + quoting: + identifier: True columns: - name: id description: "An ID field" diff --git a/test/integration/029_docs_generate_tests/test_docs_generate.py b/test/integration/029_docs_generate_tests/test_docs_generate.py index b71443ad15c..dee9a38c0ef 100644 --- a/test/integration/029_docs_generate_tests/test_docs_generate.py +++ b/test/integration/029_docs_generate_tests/test_docs_generate.py @@ -1114,8 +1114,7 @@ def expected_postgres_references_manifest(self, model_database=None): } ], 'empty': False, - 'fqn': ['test', - 'ephemeral_summary'], + 'fqn': ['test', 'ephemeral_summary'], 'name': 'ephemeral_summary', 'original_file_path': self.dir('ref_models/ephemeral_summary.sql'), 'package_name': 'test', @@ -1236,32 +1235,36 @@ def expected_postgres_references_manifest(self, model_database=None): 'name': 'id' } }, - 'database': self.default_database, - 'description': 'My table', - 'docrefs': [ - { - 'documentation_name': 'table_info', - 'documentation_package': '' - }, - { - 'documentation_name': 'source_info', - 'documentation_package': '' - } - ], - 'freshness': {}, - 'identifier': 'seed', - 'loaded_at_field': None, - 'loader': 'a_loader', - 'name': 'my_table', - 'original_file_path': self.dir('ref_models/schema.yml'), - 'package_name': 'test', - 'path': self.dir('ref_models/schema.yml'), - 'resource_type': 'source', - 'root_path': os.getcwd(), - 'schema': my_schema_name, - 'source_description': "{{ doc('source_info') }}", - 'source_name': 'my_source', - 'unique_id': 'source.test.my_source.my_table' + 'quoting': { + 'database': False, + 'identifier': True, + }, + 'database': self.default_database, + 'description': 'My table', + 'docrefs': [ + { + 'documentation_name': 'table_info', + 'documentation_package': '' + }, + { + 'documentation_name': 'source_info', + 'documentation_package': '' + } + ], + 'freshness': {}, + 'identifier': 'seed', + 'loaded_at_field': None, + 'loader': 'a_loader', + 'name': 'my_table', + 'original_file_path': self.dir('ref_models/schema.yml'), + 'package_name': 'test', + 'path': self.dir('ref_models/schema.yml'), + 'resource_type': 'source', + 'root_path': os.getcwd(), + 'schema': my_schema_name, + 'source_description': "{{ doc('source_info') }}", + 'source_name': 'my_source', + 'unique_id': 'source.test.my_source.my_table' } }, 'docs': { @@ -2075,7 +2078,7 @@ def expected_postgres_references_run_results(self): ) cte_sql = ( - ' __dbt__CTE__ephemeral_copy as (\n\n\nselect * from "{}"."{}"."seed"\n)' + ' __dbt__CTE__ephemeral_copy as (\n\n\nselect * from {}."{}"."seed"\n)' ).format(self.default_database, my_schema_name) ephemeral_injected_sql = ( diff --git a/test/integration/042_sources_test/models/schema.yml b/test/integration/042_sources_test/models/schema.yml index 894dbfc03f4..00b32bc6dcd 100644 --- a/test/integration/042_sources_test/models/schema.yml +++ b/test/integration/042_sources_test/models/schema.yml @@ -14,6 +14,8 @@ sources: warn_after: {count: 10, period: hour} error_after: {count: 1, period: day} schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}" + quoting: + identifier: True tables: - name: test_table identifier: source @@ -46,6 +48,8 @@ sources: identifier: other_table - name: other_source schema: "{{ var('test_run_schema') }}" + quoting: + identifier: True tables: - name: test_table identifier: other_source_table diff --git a/test/integration/base.py b/test/integration/base.py index a4c52326772..137e04afbae 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -439,7 +439,7 @@ def run_dbt(self, args=None, expect_pass=True, strict=True): args = ["run"] if strict: - args = ["--strict"] + args + args = ["--single-threaded", "--strict"] + args args.append('--log-cache-events') logger.info("Invoking dbt with {}".format(args)) From 2501783d62c3b1efebcc28d6ed991986751baaed Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 5 Mar 2019 18:02:13 -0700 Subject: [PATCH 09/11] fix macro kwargs --- core/dbt/adapters/sql/impl.py | 7 ++++++- .../include/global_project/macros/adapters/common.sql | 10 +++++----- .../postgres/dbt/include/postgres/macros/adapters.sql | 4 ++-- .../redshift/dbt/include/redshift/macros/adapters.sql | 4 ++-- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/core/dbt/adapters/sql/impl.py b/core/dbt/adapters/sql/impl.py index 196d4fad186..8a6ace3bef7 100644 --- a/core/dbt/adapters/sql/impl.py +++ b/core/dbt/adapters/sql/impl.py @@ -232,9 +232,14 @@ def list_schemas(self, database, model_name=None): return [row[0] for row in results] def check_schema_exists(self, database, schema, model_name=None): + information_schema = self.Relation.create( + database=database, schema=schema + ).information_schema() + + kwargs = {'information_schema': information_schema, 'schema': schema} results = self.execute_macro( CHECK_SCHEMA_EXISTS_MACRO_NAME, - kwargs={'database': database, 'schema': schema}, + kwargs=kwargs, connection_name=model_name ) return results[0][0] > 0 diff --git a/core/dbt/include/global_project/macros/adapters/common.sql b/core/dbt/include/global_project/macros/adapters/common.sql index ca2ea6c47a3..b30ba91a85b 100644 --- a/core/dbt/include/global_project/macros/adapters/common.sql +++ b/core/dbt/include/global_project/macros/adapters/common.sql @@ -199,15 +199,15 @@ {% endmacro %} -{% macro check_schema_exists(database, schema) -%} - {{ return(adapter_macro('check_schema_exists', database, schema)) }} +{% macro check_schema_exists(information_schema, schema) -%} + {{ return(adapter_macro('check_schema_exists', information_schema, schema)) }} {% endmacro %} -{% macro default__check_schema_exists(database, schema) -%} +{% macro default__check_schema_exists(information_schema, schema) -%} {% call statement('check_schema_exists', fetch_result=True, auto_begin=False) -%} select count(*) - from {{ information_schema_name(database) }}.schemata - where catalog_name='{{ database }}' + from {{ information_schema }}.schemata + where catalog_name='{{ information_schema.database }}' and schema_name='{{ schema }}' {%- endcall %} {{ return(load_result('check_schema_exists').table) }} diff --git a/plugins/postgres/dbt/include/postgres/macros/adapters.sql b/plugins/postgres/dbt/include/postgres/macros/adapters.sql index f40ca6ca143..0bda7fc9ad4 100644 --- a/plugins/postgres/dbt/include/postgres/macros/adapters.sql +++ b/plugins/postgres/dbt/include/postgres/macros/adapters.sql @@ -77,9 +77,9 @@ {{ return(load_result('list_schemas').table) }} {% endmacro %} -{% macro postgres__check_schema_exists(database, schema) -%} +{% macro postgres__check_schema_exists(information_schema, schema) -%} {% if database -%} - {{ adapter.verify_database(database) }} + {{ adapter.verify_database(information_schema.database) }} {%- endif -%} {% call statement('check_schema_exists', fetch_result=True, auto_begin=False) %} select count(*) from pg_namespace where nspname = '{{ schema }}' diff --git a/plugins/redshift/dbt/include/redshift/macros/adapters.sql b/plugins/redshift/dbt/include/redshift/macros/adapters.sql index bb2f6ced58d..0e9baa1a30f 100644 --- a/plugins/redshift/dbt/include/redshift/macros/adapters.sql +++ b/plugins/redshift/dbt/include/redshift/macros/adapters.sql @@ -168,8 +168,8 @@ {%- endmacro %} -{% macro redshift__check_schema_exists(database, schema) -%} - {{ return(postgres__check_schema_exists(database, schema)) }} +{% macro redshift__check_schema_exists(information_schema, schema) -%} + {{ return(postgres__check_schema_exists(information_schema, schema)) }} {%- endmacro %} list_schemas From 95a88b9d5da2ab2127629d0e82118339b85cb95f Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 5 Mar 2019 18:12:19 -0700 Subject: [PATCH 10/11] PR feedback --- core/dbt/adapters/base/impl.py | 12 +++++++----- plugins/postgres/dbt/adapters/postgres/impl.py | 5 +++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index fae17e56b12..4c64c83786c 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -281,7 +281,7 @@ def _relations_filter_table(cls, table, schemas): """ return table.where(_relations_filter_schemas(schemas)) - def _get_cache_schemas(self, manifest): + def _get_cache_schemas(self, manifest, exec_only=False): """Get a mapping of each node's "information_schema" relations to a set of all schemas expected in that information_schema. @@ -292,9 +292,10 @@ def _get_cache_schemas(self, manifest): """ info_schema_name_map = SchemaSearchMap() for node in manifest.nodes.values(): - if node.resource_type in NodeType.executable(): - relation = self.Relation.create_from_node(self.config, node) - info_schema_name_map.add(relation) + if exec_only and node.resource_type not in NodeType.executable(): + continue + relation = self.Relation.create_from(self.config, node) + info_schema_name_map.add(relation) # result is a map whose keys are information_schema Relations without # identifiers that have appropriate database prefixes, and whose values # are sets of lowercase schema names that are valid members of those @@ -308,7 +309,8 @@ def _relations_cache_for_schemas(self, manifest): if not dbt.flags.USE_CACHE: return - info_schema_name_map = self._get_cache_schemas(manifest) + info_schema_name_map = self._get_cache_schemas(manifest, + exec_only=True) for db, schema in info_schema_name_map.search(): for relation in self.list_relations_without_caching(db, schema): self.cache.add(relation) diff --git a/plugins/postgres/dbt/adapters/postgres/impl.py b/plugins/postgres/dbt/adapters/postgres/impl.py index 94e5f92ad37..fbf2fad89ec 100644 --- a/plugins/postgres/dbt/adapters/postgres/impl.py +++ b/plugins/postgres/dbt/adapters/postgres/impl.py @@ -56,9 +56,10 @@ def _link_cached_database_relations(self, schemas): if refed_schema.lower() in schemas: self.cache.add_link(dependent, referenced) - def _get_cache_schemas(self, manifest): + def _get_cache_schemas(self, manifest, exec_only=False): # postgres/redshift only allow one database (the main one) - schemas = super(PostgresAdapter, self)._get_cache_schemas(manifest) + superself = super(PostgresAdapter, self) + schemas = superself._get_cache_schemas(manifest, exec_only=exec_only) return schemas.flatten() def _link_cached_relations(self, manifest): From a33585769582665da39da8f81ff1732993477675 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 5 Mar 2019 21:47:37 -0700 Subject: [PATCH 11/11] PR feedback --- core/dbt/adapters/base/impl.py | 4 +--- plugins/postgres/dbt/adapters/postgres/impl.py | 9 ++++++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 4c64c83786c..e290042ce80 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -122,9 +122,7 @@ def flatten(self): # iterate once to look for a database name seen = {r.database.lower() for r in self if r.database} if len(seen) > 1: - dbt.exceptions.raise_compiler_error( - 'flatten() requires <=1 database (got {})'.format(seen) - ) + dbt.exceptions.raise_compiler_error(str(seen)) elif len(seen) == 1: database = list(seen)[0] diff --git a/plugins/postgres/dbt/adapters/postgres/impl.py b/plugins/postgres/dbt/adapters/postgres/impl.py index fbf2fad89ec..87487c7f791 100644 --- a/plugins/postgres/dbt/adapters/postgres/impl.py +++ b/plugins/postgres/dbt/adapters/postgres/impl.py @@ -60,7 +60,14 @@ def _get_cache_schemas(self, manifest, exec_only=False): # postgres/redshift only allow one database (the main one) superself = super(PostgresAdapter, self) schemas = superself._get_cache_schemas(manifest, exec_only=exec_only) - return schemas.flatten() + try: + return schemas.flatten() + except dbt.exceptions.RuntimeException as exc: + dbt.exceptions.raise_compiler_error( + 'Cross-db references not allowed in adapter {}: Got {}'.format( + self.type(), exc.msg + ) + ) def _link_cached_relations(self, manifest): schemas = set()