From e392212c0e34ee1fe8759d46b402d13342d874f7 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Wed, 6 May 2020 13:20:07 -0600 Subject: [PATCH 1/2] Change list_relations_without_caching macro to take a single argument The argument is a Relation object with no identifier field, configured with the appropriate quoting information Unique quoted/unquoted representations will be treated as distinct The logic for generating what schemas to search for relations is now distinct from the catalog search logic. Schema creation/dropping takes a similar relation argument Add tests --- CHANGELOG.md | 7 ++- core/dbt/adapters/base/impl.py | 58 +++++++++++------- core/dbt/adapters/base/relation.py | 37 ++++++------ core/dbt/adapters/sql/impl.py | 22 +++---- .../global_project/macros/adapters/common.sql | 22 +++---- core/dbt/task/generate.py | 1 + core/dbt/task/runnable.py | 59 +++++++++---------- .../bigquery/dbt/adapters/bigquery/impl.py | 22 +++---- .../dbt/include/bigquery/macros/adapters.sql | 8 +-- .../postgres/dbt/adapters/postgres/impl.py | 24 ++++---- .../dbt/include/postgres/macros/adapters.sql | 26 ++++---- .../dbt/include/redshift/macros/adapters.sql | 12 ++-- .../snowflake/dbt/adapters/snowflake/impl.py | 4 +- .../dbt/include/snowflake/macros/adapters.sql | 8 +-- .../001_simple_copy_test/test_simple_copy.py | 31 +++++++++- .../test_external_reference.py | 4 +- .../054_adapter_methods_test/models/model.sql | 4 +- test/integration/base.py | 6 +- test/unit/test_bigquery_adapter.py | 6 +- test/unit/test_context.py | 13 ++-- test/unit/test_postgres_adapter.py | 8 ++- test/unit/test_snowflake_adapter.py | 6 +- 22 files changed, 218 insertions(+), 170 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 223f566da52..2522d397169 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,20 +1,23 @@ ## dbt 0.17.0 (Release TBD) +### Breaking changes +- The `list_relations_without_caching`, `drop_schema`, and `create_schema` macros and methods now accept a single argument of a Relation object with no identifier field. ([#2411](https://github.com/fishtown-analytics/dbt/pull/2411)) + ### Features - Added warning to nodes selector if nothing was matched ([#2115](https://github.com/fishtown-analytics/dbt/issues/2115), [#2343](https://github.com/fishtown-analytics/dbt/pull/2343)) - Suport column descriptions for BigQuery models ([#2335](https://github.com/fishtown-analytics/dbt/issues/2335), [#2402](https://github.com/fishtown-analytics/dbt/pull/2402)) + ### Fixes - When tracking is disabled due to errors, do not reset the invocation ID ([#2398](https://github.com/fishtown-analytics/dbt/issues/2398), [#2400](https://github.com/fishtown-analytics/dbt/pull/2400)) - Fix for logic error in compilation errors for duplicate data test names ([#2406](https://github.com/fishtown-analytics/dbt/issues/2406), [#2407](https://github.com/fishtown-analytics/dbt/pull/2407)) - Fix list_schemas macro failing for BigQuery ([#2412](https://github.com/fishtown-analytics/dbt/issues/2412), [#2413](https://github.com/fishtown-analytics/dbt/issues/2413)) - Fix for making schema tests work for community plugin [dbt-sqlserver](https://github.com/mikaelene/dbt-sqlserver) [#2414](https://github.com/fishtown-analytics/dbt/pull/2414) +- Fix a bug where quoted uppercase schemas on snowflake were not processed properly during cache building. ([#2403](https://github.com/fishtown-analytics/dbt/issues/2403), [#2411](https://github.com/fishtown-analytics/dbt/pull/2411)) Contributors: - [@azhard](https://github.com/azhard) [#2413](https://github.com/fishtown-analytics/dbt/pull/2413) - [@mikaelene](https://github.com/mikaelene) [#2414](https://github.com/fishtown-analytics/dbt/pull/2414) - -Contributors: - [@raalsky](https://github.com/Raalsky) ([#2343](https://github.com/fishtown-analytics/dbt/pull/2343)) ## dbt 0.17.0b1 (May 5, 2020) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index b7ea8212e87..479d1dac673 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -278,9 +278,18 @@ def _schema_is_cached(self, database: str, schema: str) -> bool: else: return True - def _get_cache_schemas( - self, manifest: Manifest, exec_only: bool = False - ) -> SchemaSearchMap: + def _get_cache_schemas(self, manifest: Manifest) -> Set[BaseRelation]: + """Get the set of schema relations that the cache logic needs to + populate. This means only executable nodes are included. + """ + # the cache only cares about executable nodes + return { + self.Relation.create_from(self.config, node).without_identifier() + for node in manifest.nodes.values() + if node.resource_type in NodeType.executable() + } + + def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap: """Get a mapping of each node's "information_schema" relations to a set of all schemas expected in that information_schema. @@ -295,8 +304,6 @@ def _get_cache_schemas( manifest.sources.values(), ) for node in nodes: - if exec_only and node.resource_type not in NodeType.executable(): - continue relation = self.Relation.create_from(self.config, node) info_schema_name_map.add(relation) # result is a map whose keys are information_schema Relations without @@ -306,10 +313,11 @@ def _get_cache_schemas( return info_schema_name_map def _list_relations_get_connection( - self, db: BaseRelation, schema: str + self, schema_relation: BaseRelation ) -> List[BaseRelation]: - with self.connection_named(f'list_{db.database}_{schema}'): - return self.list_relations_without_caching(db, schema) + name = f'list_{schema_relation.database}_{schema_relation.schema}' + with self.connection_named(name): + return self.list_relations_without_caching(schema_relation) def _relations_cache_for_schemas(self, manifest: Manifest) -> None: """Populate the relations cache for the given schemas. Returns an @@ -318,11 +326,11 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None: if not dbt.flags.USE_CACHE: return - schema_map = self._get_cache_schemas(manifest, exec_only=True) + cache_schemas = self._get_cache_schemas(manifest) with executor(self.config) as tpe: futures: List[Future[List[BaseRelation]]] = [ - tpe.submit(self._list_relations_get_connection, db, schema) - for db, schema in schema_map.search() + tpe.submit(self._list_relations_get_connection, cache_schema) + for cache_schema in cache_schemas ] for future in as_completed(futures): # if we can't read the relations we need to just raise anyway, @@ -333,7 +341,14 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None: # it's possible that there were no relations in some schemas. We want # to insert the schemas we query into the cache's `.schemas` attribute # so we can check it later - self.cache.update_schemas(schema_map.schemas_searched()) + cache_update: Set[Tuple[str, Optional[str]]] = set() + for relation in cache_schemas: + if relation.database is None: + raise InternalException( + 'Got a None database in a cached schema!' + ) + cache_update.add((relation.database, relation.schema)) + self.cache.update_schemas(cache_update) def set_relations_cache( self, manifest: Manifest, clear: bool = False @@ -512,15 +527,14 @@ def expand_column_types( @abc.abstractmethod def list_relations_without_caching( - self, information_schema: BaseRelation, schema: str + self, schema_relation: BaseRelation ) -> List[BaseRelation]: """List relations in the given schema, bypassing the cache. This is used as the underlying behavior to fill the cache. - :param Relation information_schema: The information schema to list - relations from. - :param str schema: The name of the schema to list relations from. + :param schema_relation: A relation containing the database and schema + as appropraite for the underlying data warehouse :return: The relations in schema :rtype: List[self.Relation] """ @@ -636,17 +650,17 @@ def list_relations(self, database: str, schema: str) -> List[BaseRelation]: if self._schema_is_cached(database, schema): return self.cache.get_relations(database, schema) - information_schema = self.Relation.create( + schema_relation = self.Relation.create( database=database, schema=schema, identifier='', quote_policy=self.config.quoting - ).information_schema() + ).without_identifier() # we can't build the relations cache because we don't have a # manifest so we can't run any operations. relations = self.list_relations_without_caching( - information_schema, schema + schema_relation ) logger.debug('with database={}, schema={}, relations={}' @@ -727,7 +741,7 @@ def already_exists(self, schema: str, name: str) -> bool: ### @abc.abstractmethod @available.parse_none - def create_schema(self, database: str, schema: str): + def create_schema(self, relation: BaseRelation): """Create the given schema if it does not exist.""" raise NotImplementedException( '`create_schema` is not implemented for this adapter!' @@ -735,7 +749,7 @@ def create_schema(self, database: str, schema: str): @abc.abstractmethod @available.parse_none - def drop_schema(self, database: str, schema: str): + def drop_schema(self, relation: BaseRelation): """Drop the given schema (and everything in it) if it exists.""" raise NotImplementedException( '`drop_schema` is not implemented for this adapter!' @@ -1014,7 +1028,7 @@ def _get_one_catalog( def get_catalog( self, manifest: Manifest ) -> Tuple[agate.Table, List[Exception]]: - schema_map = self._get_cache_schemas(manifest) + schema_map = self._get_catalog_schemas(manifest) with executor(self.config) as tpe: futures: List[Future[agate.Table]] = [ diff --git a/core/dbt/adapters/base/relation.py b/core/dbt/adapters/base/relation.py index d223741de01..6591c48980b 100644 --- a/core/dbt/adapters/base/relation.py +++ b/core/dbt/adapters/base/relation.py @@ -255,6 +255,16 @@ def information_schema(self, view_name=None) -> 'InformationSchema': def information_schema_only(self) -> 'InformationSchema': return self.information_schema() + def without_identifier(self) -> 'BaseRelation': + """Return a form of this relation that only has the database and schema + set to included. To get the appropriately-quoted form the schema out of + the result (for use as part of a query), use `.render()`. To get the + raw database or schema name, use `.database` or `.schema`. + + The hash of the returned object is the result of render(). + """ + return self.include(identifier=False).replace_path(identifier=None) + def _render_iterator( self ) -> Iterator[Tuple[Optional[ComponentName], Optional[str]]]: @@ -501,38 +511,25 @@ def _render_iterator(self): class SchemaSearchMap(Dict[InformationSchema, Set[Optional[str]]]): """A utility class to keep track of what information_schema tables to - search for what schemas + search for what schemas. The schema values are all lowercased to avoid + duplication. """ - def add(self, relation: BaseRelation, preserve_case=False): + def add(self, relation: BaseRelation): key = relation.information_schema_only() if key not in self: self[key] = set() schema: Optional[str] = None if relation.schema is not None: - if preserve_case: - schema = relation.schema - else: - schema = relation.schema.lower() + schema = relation.schema.lower() self[key].add(schema) - def search(self) -> Iterator[Tuple[InformationSchema, Optional[str]]]: + def search( + self + ) -> Iterator[Tuple[InformationSchema, Optional[str]]]: for information_schema_name, schemas in self.items(): for schema in schemas: yield information_schema_name, schema - def schemas_searched(self) -> Set[Tuple[str, Optional[str]]]: - result: Set[Tuple[str, Optional[str]]] = set() - for information_schema_name, schemas in self.items(): - if information_schema_name.database is None: - raise InternalException( - 'Got a None database in an information schema!' - ) - result.update( - (information_schema_name.database, schema) - for schema in schemas - ) - return result - def flatten(self): new = self.__class__() diff --git a/core/dbt/adapters/sql/impl.py b/core/dbt/adapters/sql/impl.py index cb79ff9348a..84268b34d16 100644 --- a/core/dbt/adapters/sql/impl.py +++ b/core/dbt/adapters/sql/impl.py @@ -174,31 +174,31 @@ def get_columns_in_relation(self, relation): kwargs={'relation': relation} ) - def create_schema(self, database: str, schema: str) -> None: - logger.debug('Creating schema "{}"."{}".', database, schema) + def create_schema(self, relation: BaseRelation) -> None: + relation = relation.without_identifier() + logger.debug('Creating schema "{}"', relation) kwargs = { - 'database_name': self.quote_as_configured(database, 'database'), - 'schema_name': self.quote_as_configured(schema, 'schema'), + 'relation': relation, } self.execute_macro(CREATE_SCHEMA_MACRO_NAME, kwargs=kwargs) self.commit_if_has_connection() # we can't update the cache here, as if the schema already existed we # don't want to (incorrectly) say that it's empty - def drop_schema(self, database: str, schema: str) -> None: - logger.debug('Dropping schema "{}"."{}".', database, schema) + def drop_schema(self, relation: BaseRelation) -> None: + relation = relation.without_identifier() + logger.debug('Dropping schema "{}".', relation) kwargs = { - 'database_name': self.quote_as_configured(database, 'database'), - 'schema_name': self.quote_as_configured(schema, 'schema'), + 'relation': relation, } self.execute_macro(DROP_SCHEMA_MACRO_NAME, kwargs=kwargs) # we can update the cache here - self.cache.drop_schema(database, schema) + self.cache.drop_schema(relation.database, relation.schema) def list_relations_without_caching( - self, information_schema, schema + self, schema_relation: BaseRelation, ) -> List[BaseRelation]: - kwargs = {'information_schema': information_schema, 'schema': schema} + kwargs = {'schema_relation': schema_relation} results = self.execute_macro( LIST_RELATIONS_MACRO_NAME, kwargs=kwargs diff --git a/core/dbt/include/global_project/macros/adapters/common.sql b/core/dbt/include/global_project/macros/adapters/common.sql index 36258c0ee4c..23f548eb2cb 100644 --- a/core/dbt/include/global_project/macros/adapters/common.sql +++ b/core/dbt/include/global_project/macros/adapters/common.sql @@ -44,23 +44,23 @@ {{ return(load_result('get_columns_in_query').table.columns | map(attribute='name') | list) }} {% endmacro %} -{% macro create_schema(database_name, schema_name) -%} - {{ adapter_macro('create_schema', database_name, schema_name) }} +{% macro create_schema(relation) -%} + {{ adapter_macro('create_schema', relation) }} {% endmacro %} -{% macro default__create_schema(database_name, schema_name) -%} +{% macro default__create_schema(relation) -%} {%- call statement('create_schema') -%} - create schema if not exists {{database_name}}.{{schema_name}} + create schema if not exists {{ relation.without_identifier() }} {% endcall %} {% endmacro %} -{% macro drop_schema(database_name, schema_name) -%} - {{ adapter_macro('drop_schema', database_name, schema_name) }} +{% macro drop_schema(relation) -%} + {{ adapter_macro('drop_schema', relation) }} {% endmacro %} -{% macro default__drop_schema(database_name, schema_name) -%} +{% macro default__drop_schema(relation) -%} {%- call statement('drop_schema') -%} - drop schema if exists {{database_name}}.{{schema_name}} cascade + drop schema if exists {{ relation.without_identifier() }} cascade {% endcall %} {% endmacro %} @@ -262,12 +262,12 @@ {% endmacro %} -{% macro list_relations_without_caching(information_schema, schema) %} - {{ return(adapter_macro('list_relations_without_caching', information_schema, schema)) }} +{% macro list_relations_without_caching(schema_relation) %} + {{ return(adapter_macro('list_relations_without_caching', schema_relation)) }} {% endmacro %} -{% macro default__list_relations_without_caching(information_schema, schema) %} +{% macro default__list_relations_without_caching(schema_relation) %} {{ exceptions.raise_not_implemented( 'list_relations_without_caching macro not implemented for adapter '+adapter.type()) }} {% endmacro %} diff --git a/core/dbt/task/generate.py b/core/dbt/task/generate.py index 50d505c6f10..d3167d93429 100644 --- a/core/dbt/task/generate.py +++ b/core/dbt/task/generate.py @@ -98,6 +98,7 @@ def make_unique_id_map( sources: Dict[str, CatalogTable] = {} node_map, source_map = get_unique_id_mapping(manifest) + table: CatalogTable for table in self.values(): key = table.key() if key in node_map: diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index d86bd2c744e..5a0b42b4529 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -6,8 +6,7 @@ from typing import Optional, Dict, List, Set, Tuple, Iterable from dbt.task.base import ConfiguredTask -from dbt.adapters.base import SchemaSearchMap -from dbt.adapters.base.relation import InformationSchema +from dbt.adapters.base import BaseRelation from dbt.adapters.factory import get_adapter from dbt.logger import ( GLOBAL_LOGGER as logger, @@ -431,48 +430,42 @@ def interpret_results(self, results): def get_model_schemas( self, adapter, selected_uids: Iterable[str] - ) -> SchemaSearchMap: + ) -> Set[BaseRelation]: if self.manifest is None: raise InternalException('manifest was None in get_model_schemas') - search_map = SchemaSearchMap() + result: Set[BaseRelation] = set() for node in self.manifest.nodes.values(): if node.unique_id not in selected_uids: continue if node.is_refable and not node.is_ephemeral: relation = adapter.Relation.create_from(self.config, node) - # we're going to be creating these schemas, so preserve the - # case. - search_map.add(relation, preserve_case=True) + result.add(relation.without_identifier()) - return search_map + return result def create_schemas(self, adapter, selected_uids: Iterable[str]): required_schemas = self.get_model_schemas(adapter, selected_uids) # we want the string form of the information schema database - required_databases: List[str] = [] - for info in required_schemas: - include_policy = info.include_policy.replace( - schema=False, identifier=False, database=True - ) - db_only = info.replace( - include_policy=include_policy, - information_schema_view=None, + required_databases: Set[BaseRelation] = set() + for required in required_schemas: + db_only = required.include( + database=True, schema=False, identifier=False ) - required_databases.append(db_only) + required_databases.add(db_only) existing_schemas_lowered: Set[Tuple[str, Optional[str]]] = set() - def list_schemas(info: InformationSchema) -> List[Tuple[str, str]]: + def list_schemas(db_only: BaseRelation) -> List[Tuple[str, str]]: # the database name should never be None here (or where are we # listing schemas from?) - if info.database is None: + if db_only.database is None: raise InternalException( - f'Got an invalid information schema of {info} (database ' - f'was None)' + f'Got an invalid database-only portion of {db_only} ' + f'(database was None)' ) - database_name = info.database - database_quoted = str(info) + database_name: str = db_only.database + database_quoted = str(db_only) with adapter.connection_named(f'list_{database_name}'): # we should never create a null schema, so just filter them out return [ @@ -481,9 +474,11 @@ def list_schemas(info: InformationSchema) -> List[Tuple[str, str]]: if s is not None ] - def create_schema(db: str, schema: str) -> None: + def create_schema(relation: BaseRelation) -> None: + db = relation.database + schema = relation.schema with adapter.connection_named(f'create_{db}_{schema}'): - adapter.create_schema(db, schema) + adapter.create_schema(relation) list_futures = [] create_futures = [] @@ -496,21 +491,23 @@ def create_schema(db: str, schema: str) -> None: for ls_future in as_completed(list_futures): existing_schemas_lowered.update(ls_future.result()) - for info, schema in required_schemas.search(): + for info in required_schemas: if info.database is None: raise InternalException( 'Got an information schema with no database!' ) + if info.schema is None: + # we are not in teh business of creating null schemas, so + # skip this + continue db: str = info.database - lower_schema: Optional[str] = None - if schema is not None: - lower_schema = schema.lower() + schema: str = info.schema - db_schema = (db.lower(), lower_schema) + db_schema = (db.lower(), schema.lower()) if db_schema not in existing_schemas_lowered: existing_schemas_lowered.add(db_schema) create_futures.append( - tpe.submit(create_schema, db, schema) + tpe.submit(create_schema, info) ) for create_future in as_completed(create_futures): diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index eb8083390b4..c1ea5aab78e 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -12,9 +12,7 @@ from dbt.adapters.base import ( BaseAdapter, available, RelationType, SchemaSearchMap, AdapterConfig ) -from dbt.adapters.bigquery.relation import ( - BigQueryRelation, BigQueryInformationSchema -) +from dbt.adapters.bigquery.relation import BigQueryRelation from dbt.adapters.bigquery import BigQueryColumn from dbt.adapters.bigquery import BigQueryConnectionManager from dbt.contracts.connection import Connection @@ -215,13 +213,13 @@ def expand_target_column_types( pass def list_relations_without_caching( - self, information_schema: BigQueryInformationSchema, schema: str + self, schema_relation: BigQueryRelation ) -> List[BigQueryRelation]: connection = self.connections.get_thread_connection() client = connection.handle bigquery_dataset = self.connections.dataset( - information_schema.database, information_schema.schema, connection + schema_relation.database, schema_relation.schema, connection ) all_tables = client.list_tables( @@ -261,11 +259,15 @@ def get_relation( table = None return self._bq_table_to_relation(table) - def create_schema(self, database: str, schema: str) -> None: + def create_schema(self, relation: BigQueryRelation) -> None: + database = relation.database + schema = relation.schema logger.debug('Creating schema "{}.{}".', database, schema) self.connections.create_dataset(database, schema) - def drop_schema(self, database: str, schema: str) -> None: + def drop_schema(self, relation: BigQueryRelation) -> None: + database = relation.database + schema = relation.schema logger.debug('Dropping schema "{}.{}".', database, schema) self.connections.drop_dataset(database, schema) self.cache.drop_schema(database, schema) @@ -632,10 +634,8 @@ def _catalog_filter_table( }) return super()._catalog_filter_table(table, manifest) - def _get_cache_schemas( - self, manifest: Manifest, exec_only: bool = False - ) -> SchemaSearchMap: - candidates = super()._get_cache_schemas(manifest, exec_only) + def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap: + candidates = super()._get_catalog_schemas(manifest) db_schemas: Dict[str, Set[str]] = {} result = SchemaSearchMap() diff --git a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql index dd1d9c46955..c52172f81e3 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql @@ -74,8 +74,8 @@ {{ adapter.create_schema(database_name, schema_name) }} {% endmacro %} -{% macro bigquery__drop_schema(database_name, schema_name) -%} - {{ adapter.drop_schema(database_name, schema_name) }} +{% macro bigquery__drop_schema(relation) -%} + {{ adapter.drop_schema(relation) }} {% endmacro %} {% macro bigquery__drop_relation(relation) -%} @@ -89,8 +89,8 @@ {% endmacro %} -{% macro bigquery__list_relations_without_caching(information_schema, schema) -%} - {{ return(adapter.list_relations_without_caching(information_schema, schema)) }} +{% macro bigquery__list_relations_without_caching(schema_relation) -%} + {{ return(adapter.list_relations_without_caching(schema_relation)) }} {%- endmacro %} diff --git a/plugins/postgres/dbt/adapters/postgres/impl.py b/plugins/postgres/dbt/adapters/postgres/impl.py index 7f7fd1d78dd..a267a1ed5d2 100644 --- a/plugins/postgres/dbt/adapters/postgres/impl.py +++ b/plugins/postgres/dbt/adapters/postgres/impl.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Optional +from typing import Optional, Set from dbt.adapters.base.meta import available from dbt.adapters.base.impl import AdapterConfig from dbt.adapters.sql import SQLAdapter @@ -40,11 +40,9 @@ def verify_database(self, database): # return an empty string on success so macros can call this return '' - def _link_cached_database_relations(self, schemas): + def _link_cached_database_relations(self, schemas: Set[str]): """ - - :param Set[str] schemas: The set of schemas that should have links - added. + :param schemas: The set of schemas that should have links added. """ database = self.config.credentials.database table = self.execute_macro(GET_RELATIONS_MACRO_NAME) @@ -66,9 +64,9 @@ def _link_cached_database_relations(self, schemas): if refed_schema.lower() in schemas: self.cache.add_link(referenced, dependent) - def _get_cache_schemas(self, manifest, exec_only=False): + def _get_catalog_schemas(self, manifest): # postgres/redshift only allow one database (the main one) - schemas = super()._get_cache_schemas(manifest, exec_only=exec_only) + schemas = super()._get_catalog_schemas(manifest) try: return schemas.flatten() except dbt.exceptions.RuntimeException as exc: @@ -79,13 +77,11 @@ def _get_cache_schemas(self, manifest, exec_only=False): ) def _link_cached_relations(self, manifest): - schemas = set() - # only link executable nodes - info_schema_name_map = self._get_cache_schemas(manifest, - exec_only=True) - for db, schema in info_schema_name_map.search(): - self.verify_database(db.database) - schemas.add(schema) + schemas: Set[str] = set() + relations_schemas = self._get_cache_schemas(manifest) + for relation in relations_schemas: + self.verify_database(relation.database) + schemas.add(relation.schema.lower()) self._link_cached_database_relations(schemas) diff --git a/plugins/postgres/dbt/include/postgres/macros/adapters.sql b/plugins/postgres/dbt/include/postgres/macros/adapters.sql index 52b40edaa24..12e7cca47df 100644 --- a/plugins/postgres/dbt/include/postgres/macros/adapters.sql +++ b/plugins/postgres/dbt/include/postgres/macros/adapters.sql @@ -14,21 +14,21 @@ ); {%- endmacro %} -{% macro postgres__create_schema(database_name, schema_name) -%} - {% if database_name -%} - {{ adapter.verify_database(database_name) }} +{% macro postgres__create_schema(relation) -%} + {% if relation.database -%} + {{ adapter.verify_database(relation.database) }} {%- endif -%} {%- call statement('create_schema') -%} - create schema if not exists {{ schema_name }} + create schema if not exists {{ relation.without_identifier().include(database=False) }} {%- endcall -%} {% endmacro %} -{% macro postgres__drop_schema(database_name, schema_name) -%} - {% if database_name -%} - {{ adapter.verify_database(database_name) }} +{% macro postgres__drop_schema(relation) -%} + {% if relation.database -%} + {{ adapter.verify_database(relation.database) }} {%- endif -%} {%- call statement('drop_schema') -%} - drop schema if exists {{ schema_name }} cascade + drop schema if exists {{ relation.without_identifier().include(database=False) }} cascade {%- endcall -%} {% endmacro %} @@ -54,23 +54,23 @@ {% endmacro %} -{% macro postgres__list_relations_without_caching(information_schema, schema) %} +{% macro postgres__list_relations_without_caching(schema_relation) %} {% call statement('list_relations_without_caching', fetch_result=True) -%} select - '{{ information_schema.database }}' as database, + '{{ schema_relation.database }}' as database, tablename as name, schemaname as schema, 'table' as type from pg_tables - where schemaname ilike '{{ schema }}' + where schemaname ilike '{{ schema_relation.schema }}' union all select - '{{ information_schema.database }}' as database, + '{{ schema_relation.database }}' as database, viewname as name, schemaname as schema, 'view' as type from pg_views - where schemaname ilike '{{ schema }}' + where schemaname ilike '{{ schema_relation.schema }}' {% endcall %} {{ return(load_result('list_relations_without_caching').table) }} {% endmacro %} diff --git a/plugins/redshift/dbt/include/redshift/macros/adapters.sql b/plugins/redshift/dbt/include/redshift/macros/adapters.sql index e1b359873d8..a6bc82c2b8e 100644 --- a/plugins/redshift/dbt/include/redshift/macros/adapters.sql +++ b/plugins/redshift/dbt/include/redshift/macros/adapters.sql @@ -66,13 +66,13 @@ {% endmacro %} -{% macro redshift__create_schema(database_name, schema_name) -%} - {{ postgres__create_schema(database_name, schema_name) }} +{% macro redshift__create_schema(relation) -%} + {{ postgres__create_schema(relation) }} {% endmacro %} -{% macro redshift__drop_schema(database_name, schema_name) -%} - {{ postgres__drop_schema(database_name, schema_name) }} +{% macro redshift__drop_schema(relation) -%} + {{ postgres__drop_schema(relation) }} {% endmacro %} @@ -153,8 +153,8 @@ {% endmacro %} -{% macro redshift__list_relations_without_caching(information_schema, schema) %} - {{ return(postgres__list_relations_without_caching(information_schema, schema)) }} +{% macro redshift__list_relations_without_caching(schema_relation) %} + {{ return(postgres__list_relations_without_caching(schema_relation)) }} {% endmacro %} diff --git a/plugins/snowflake/dbt/adapters/snowflake/impl.py b/plugins/snowflake/dbt/adapters/snowflake/impl.py index 614326bdf99..901bb803f6c 100644 --- a/plugins/snowflake/dbt/adapters/snowflake/impl.py +++ b/plugins/snowflake/dbt/adapters/snowflake/impl.py @@ -113,9 +113,9 @@ def list_schemas(self, database: str) -> List[str]: return [row['name'] for row in results] def list_relations_without_caching( - self, information_schema, schema + self, schema_relation: SnowflakeRelation ) -> List[SnowflakeRelation]: - kwargs = {'information_schema': information_schema, 'schema': schema} + kwargs = {'schema_relation': schema_relation} try: results = self.execute_macro( LIST_RELATIONS_MACRO_NAME, diff --git a/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql b/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql index 57eab7daf9e..1d80180e746 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql @@ -92,18 +92,16 @@ {% endmacro %} -{% macro snowflake__list_relations_without_caching(information_schema, schema) %} - {%- set db_name = adapter.quote_as_configured(information_schema.database, 'database') -%} - {%- set schema_name = adapter.quote_as_configured(schema, 'schema') -%} +{% macro snowflake__list_relations_without_caching(schema_relation) %} {%- set sql -%} - show terse objects in {{ db_name }}.{{ schema_name }} + show terse objects in {{ schema_relation }} {%- endset -%} {%- set result = run_query(sql) -%} {% set maximum = 10000 %} {% if (result | length) >= maximum %} {% set msg %} - Too many schemas in schema {{ database }}.{{ schema }}! dbt can only get + Too many schemas in schema {{ schema_relation }}! dbt can only get information about schemas with fewer than {{ maximum }} objects. {% endset %} {% do exceptions.raise_compiler_error(msg) %} diff --git a/test/integration/001_simple_copy_test/test_simple_copy.py b/test/integration/001_simple_copy_test/test_simple_copy.py index 0705fbba93f..45725e0627f 100644 --- a/test/integration/001_simple_copy_test/test_simple_copy.py +++ b/test/integration/001_simple_copy_test/test_simple_copy.py @@ -290,11 +290,14 @@ def project_config(self): }) @use_profile("snowflake") - def test__snowflake__seed__quoting_switch_schema(self): + def test__snowflake__seed__quoting_switch_schema_lower(self): self.use_default_project({ "data-paths": [self.dir("snowflake-seed-initial")], }) + results = self.run_dbt(["seed"]) + self.assertEqual(len(results), 1) + # this is intentional - should not error! results = self.run_dbt(["seed"]) self.assertEqual(len(results), 1) @@ -305,6 +308,32 @@ def test__snowflake__seed__quoting_switch_schema(self): results = self.run_dbt(["seed"], expect_pass=False) +class TestSnowflakeSimpleUppercasedSchemaQuoted(BaseTestSimpleCopy): + @property + def project_config(self): + return self.seed_quote_cfg_with({ + 'quoting': {'identifier': False, 'schema': True} + }) + + @use_profile("snowflake") + def test__snowflake__seed__quoting_switch_schema_upper(self): + self.use_default_project({ + "data-paths": [self.dir("snowflake-seed-initial")], + }) + + results = self.run_dbt(["seed"]) + self.assertEqual(len(results), 1) + # this is intentional - should not error! + results = self.run_dbt(["seed"]) + self.assertEqual(len(results), 1) + + self.use_default_project({ + "data-paths": [self.dir("snowflake-seed-update")], + "quoting": {"identifier": False, "schema": False}, + }) + results = self.run_dbt(["seed"]) + + class TestSnowflakeIncrementalOverwrite(BaseTestSimpleCopy): @property def models(self): diff --git a/test/integration/037_external_reference_test/test_external_reference.py b/test/integration/037_external_reference_test/test_external_reference.py index c0c73014741..d5a7e129e3a 100644 --- a/test/integration/037_external_reference_test/test_external_reference.py +++ b/test/integration/037_external_reference_test/test_external_reference.py @@ -30,7 +30,7 @@ def tearDown(self): # otherwise postgres hangs forever. self._drop_schemas() with self.get_connection(): - self.adapter.drop_schema(self.default_database, self.external_schema) + self._drop_schema_named(self.default_database, self.external_schema) super().tearDown() @use_profile('postgres') @@ -56,7 +56,7 @@ def tearDown(self): # otherwise postgres hangs forever. self._drop_schemas() with self.get_connection(): - self.adapter.drop_schema(self.default_database, self.external_schema) + self._drop_schema_named(self.default_database, self.external_schema) super().tearDown() @use_profile('postgres') diff --git a/test/integration/054_adapter_methods_test/models/model.sql b/test/integration/054_adapter_methods_test/models/model.sql index 455b2ab3f2d..bcf11d81c3d 100644 --- a/test/integration/054_adapter_methods_test/models/model.sql +++ b/test/integration/054_adapter_methods_test/models/model.sql @@ -3,13 +3,13 @@ {% if execute %} {# don't ever do any of this #} - {%- do adapter.drop_schema(upstream.database, upstream.schema) -%} + {%- do adapter.drop_schema(upstream) -%} {% set existing = adapter.get_relation(upstream.database, upstream.schema, upstream.identifier) %} {% if existing is not none %} {% do exceptions.raise_compiler_error('expected ' ~ ' to not exist, but it did') %} {% endif %} - {%- do adapter.create_schema(upstream.database, upstream.schema) -%} + {%- do adapter.create_schema(upstream) -%} {% set sql = create_view_as(upstream, 'select 2 as id') %} {% do run_query(sql) %} diff --git a/test/integration/base.py b/test/integration/base.py index d8dff6aeaea..e6720db4f9f 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -466,7 +466,8 @@ def _get_schema_fqn(self, database, schema): def _create_schema_named(self, database, schema): if self.adapter_type == 'bigquery': - self.adapter.create_schema(database, schema) + relation = self.adapter.Relation.create(database=database, schema=schema) + self.adapter.create_schema(relation) else: schema_fqn = self._get_schema_fqn(database, schema) self.run_sql(self.CREATE_SCHEMA_STATEMENT.format(schema_fqn)) @@ -474,7 +475,8 @@ def _create_schema_named(self, database, schema): def _drop_schema_named(self, database, schema): if self.adapter_type == 'bigquery' or self.adapter_type == 'presto': - self.adapter.drop_schema(database, schema) + relation = self.adapter.Relation.create(database=database, schema=schema) + self.adapter.drop_schema(relation) else: schema_fqn = self._get_schema_fqn(database, schema) self.run_sql(self.DROP_SCHEMA_STATEMENT.format(schema_fqn)) diff --git a/test/unit/test_bigquery_adapter.py b/test/unit/test_bigquery_adapter.py index 5fc8fd87e2d..456b6d79ded 100644 --- a/test/unit/test_bigquery_adapter.py +++ b/test/unit/test_bigquery_adapter.py @@ -216,13 +216,15 @@ def test_get_relation(self): self.mock_connection_manager.get_bq_table.assert_called_once_with('db', 'schema', 'my_model') def test_create_schema(self): - self.adapter.create_schema('db', 'schema') + relation = BigQueryRelation.create(database='db', schema='schema') + self.adapter.create_schema(relation) self.mock_connection_manager.create_dataset.assert_called_once_with('db', 'schema') @patch.object(BigQueryAdapter, 'check_schema_exists') def test_drop_schema(self, mock_check_schema): mock_check_schema.return_value = True - self.adapter.drop_schema('db', 'schema') + relation = BigQueryRelation.create(database='db', schema='schema') + self.adapter.drop_schema(relation) self.mock_connection_manager.drop_dataset.assert_called_once_with('db', 'schema') def test_get_columns_in_relation(self): diff --git a/test/unit/test_context.py b/test/unit/test_context.py index 2ef99f4e248..d8d25c92d48 100644 --- a/test/unit/test_context.py +++ b/test/unit/test_context.py @@ -145,11 +145,14 @@ def test_wrapped_method(self): found = self.wrapper.get_relation('database', 'schema', 'identifier') self.assertEqual(found, rel) - # it gets called with an information schema relation as the first arg, - # which is hard to mock. - self.responder.list_relations_without_caching.assert_called_once_with( - mock.ANY, 'schema' - ) + + self.responder.list_relations_without_caching.assert_called_once_with(mock.ANY) + # extract the argument + assert len(self.responder.list_relations_without_caching.mock_calls) == 1 + assert len(self.responder.list_relations_without_caching.call_args[0]) == 1 + arg = self.responder.list_relations_without_caching.call_args[0][0] + assert arg.database == 'database' + assert arg.schema == 'schema' def assert_has_keys( diff --git a/test/unit/test_postgres_adapter.py b/test/unit/test_postgres_adapter.py index 9a27790ab08..3b1ef63567e 100644 --- a/test/unit/test_postgres_adapter.py +++ b/test/unit/test_postgres_adapter.py @@ -209,7 +209,7 @@ def test_set_zero_keepalive(self, psycopg2): connect_timeout=10) @mock.patch.object(PostgresAdapter, 'execute_macro') - @mock.patch.object(PostgresAdapter, '_get_cache_schemas') + @mock.patch.object(PostgresAdapter, '_get_catalog_schemas') def test_get_catalog_various_schemas(self, mock_get_schemas, mock_execute): column_names = ['table_database', 'table_schema', 'table_name'] rows = [ @@ -297,7 +297,11 @@ def tearDown(self): self.load_patch.stop() def test_quoting_on_drop_schema(self): - self.adapter.drop_schema(database='postgres', schema='test_schema') + relation = self.adapter.Relation.create( + database='postgres', schema='test_schema', + quote_policy=self.adapter.config.quoting, + ) + self.adapter.drop_schema(relation) self.mock_execute.assert_has_calls([ mock.call('/* dbt */\ndrop schema if exists "test_schema" cascade', None) diff --git a/test/unit/test_snowflake_adapter.py b/test/unit/test_snowflake_adapter.py index 79c5bfe0d4d..7e41feeb589 100644 --- a/test/unit/test_snowflake_adapter.py +++ b/test/unit/test_snowflake_adapter.py @@ -82,10 +82,12 @@ def tearDown(self): self.load_patch.stop() def test_quoting_on_drop_schema(self): - self.adapter.drop_schema( + relation = SnowflakeAdapter.Relation.create( database='test_database', - schema='test_schema' + schema='test_schema', + quote_policy=self.adapter.config.quoting ) + self.adapter.drop_schema(relation) self.mock_execute.assert_has_calls([ mock.call('/* dbt */\ndrop schema if exists test_database."test_schema" cascade', None) From ab8392b856a9a47e62a49d41dd26f5ad63dab9b7 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Thu, 7 May 2020 20:02:41 -0600 Subject: [PATCH 2/2] Update core/dbt/task/runnable.py Co-authored-by: Drew Banin --- core/dbt/task/runnable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 5a0b42b4529..6fd8562fa90 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -497,7 +497,7 @@ def create_schema(relation: BaseRelation) -> None: 'Got an information schema with no database!' ) if info.schema is None: - # we are not in teh business of creating null schemas, so + # we are not in the business of creating null schemas, so # skip this continue db: str = info.database