Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed Jan 31, 2020
1 parent 0bf6eca commit 12f1188
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 15 deletions.
38 changes: 31 additions & 7 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import Future # noqa - we use this for typing only
from contextlib import contextmanager
from datetime import datetime
from typing import (
Expand Down Expand Up @@ -118,7 +119,7 @@ def _relation_name(rel: Optional[BaseRelation]) -> str:
return str(rel)


class SchemaSearchMap(Dict[InformationSchema, Set[str]]):
class SchemaSearchMap(Dict[InformationSchema, Set[Optional[str]]]):
"""A utility class to keep track of what information_schema tables to
search for what schemas
"""
Expand Down Expand Up @@ -1044,22 +1045,21 @@ def _get_one_catalog(
results = self._catalog_filter_table(table, manifest)
return results

def get_catalog(self, manifest: Manifest) -> agate.Table:
def get_catalog(
self, manifest: Manifest
) -> Tuple[agate.Table, List[Exception]]:
# snowflake is super slow. split it out into the specified threads
num_threads = self.config.threads
schema_map = self._get_cache_schemas(manifest)
catalogs: agate.Table = agate.Table(rows=[])

with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [
executor.submit(self._get_one_catalog, info, schemas, manifest)
for info, schemas in schema_map.items() if len(schemas) > 0
]
for future in as_completed(futures):
catalog = future.result()
catalogs = agate.Table.merge([catalogs, catalog])
catalogs, exceptions = catch_as_completed(futures)

return catalogs
return catalogs, exceptions

def cancel_open_connections(self):
"""Cancel all open connections."""
Expand Down Expand Up @@ -1133,3 +1133,27 @@ def post_model_hook(self, config: Mapping[str, Any], context: Any) -> None:
The second parameter is the value returned by pre_mdoel_hook.
"""
pass


def catch_as_completed(
futures # typing: List[Future[agate.Table]]
) -> Tuple[agate.Table, List[Exception]]:

catalogs: agate.Table = agate.Table(rows=[])
exceptions: List[Exception] = []

for future in as_completed(futures):
exc = future.exception()
# we want to re-raise on ctrl+c and BaseException
if exc is None:
catalog = future.result()
catalogs = agate.Table.merge([catalogs, catalog])
elif (
isinstance(exc, KeyboardInterrupt) or
not isinstance(exc, Exception)
):
raise exc
else:
# exc is not None, derives from Exception, and isn't ctrl+c
exceptions.append(exc)
return catalogs, exceptions
10 changes: 9 additions & 1 deletion core/dbt/task/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def run(self):
adapter = get_adapter(self.config)
with adapter.connection_named('generate_catalog'):
dbt.ui.printer.print_timestamped_line("Building catalog")
catalog_table = adapter.get_catalog(self.manifest)
catalog_table, exceptions = adapter.get_catalog(self.manifest)

catalog_data: List[PrimitiveDict] = [
dict(zip(catalog_table.column_names, map(_coerce_decimal, row)))
Expand All @@ -229,6 +229,14 @@ def run(self):
dbt.ui.printer.print_timestamped_line(
'Catalog written to {}'.format(os.path.abspath(path))
)

if exceptions:
# just include the first to avoid spamming
exc_msg = str(exceptions[0])
dbt.exceptions.warn_or_error(
f'Warning: Some parts of catalog generation failed: {exc_msg}'
)

return results

def get_catalog_results(
Expand Down
2 changes: 1 addition & 1 deletion plugins/bigquery/dbt/include/bigquery/macros/catalog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from {{ information_schema.replace(information_schema_view='SCHEMATA') }}
where (
{%- for schema in schemas -%}
schema_name = '{{ schema }}'{%- if not loop.last %} or {% endif -%}
upper(schema_name) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
),
Expand Down
2 changes: 1 addition & 1 deletion plugins/postgres/dbt/include/postgres/macros/catalog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
where (
{%- for schema in schemas -%}
sch.nspname = '{{ schema }}'{%- if not loop.last %} or {% endif -%}
upper(sch.nspname) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
and not pg_is_other_temp_schema(sch.oid) -- not a temporary schema belonging to another session
Expand Down
4 changes: 2 additions & 2 deletions plugins/redshift/dbt/include/redshift/macros/catalog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@

where (
{%- for schema in schemas -%}
table_schema = '{{ schema }}'{%- if not loop.last %} or {% endif -%}
upper(table_schema) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)

Expand Down Expand Up @@ -185,7 +185,7 @@
from svv_table_info
where (
{%- for schema in schemas -%}
schema = '{{ schema }}'{%- if not loop.last %} or {% endif -%}
upper(schema) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)

Expand Down
2 changes: 1 addition & 1 deletion plugins/snowflake/dbt/include/snowflake/macros/catalog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
join columns using ("table_database", "table_schema", "table_name")
where (
{%- for schema in schemas -%}
"table_schema" = '{{ schema }}'{%- if not loop.last %} or {% endif -%}
upper("table_schema") = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
order by "column_index"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{% macro get_catalog(information_schemas) %}
{% macro get_catalog(information_schema, schemas) %}
{% do exceptions.raise_compiler_error('rejected: no catalogs for you') %}
{% endmacro %}
3 changes: 2 additions & 1 deletion test/unit/test_postgres_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,12 @@ def test_get_catalog_various_schemas(self, mock_get_schemas, mock_execute):
mock_manifest.get_used_schemas.return_value = {('dbt', 'foo'),
('dbt', 'quux')}

catalog = self.adapter.get_catalog(mock_manifest)
catalog, exceptions = self.adapter.get_catalog(mock_manifest)
self.assertEqual(
set(map(tuple, catalog)),
{('dbt', 'foo', 'bar'), ('dbt', 'FOO', 'baz'), ('dbt', 'quux', 'bar')}
)
self.assertEqual(exceptions, [])


class TestConnectingPostgresAdapter(unittest.TestCase):
Expand Down
2 changes: 2 additions & 0 deletions third-party-stubs/agate/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class Table:
def from_object(cls, obj: Iterable[Dict[str, Any]], *, column_types: Optional['TypeTester'] = None) -> 'Table': ...
@classmethod
def from_csv(cls, path: Iterable[str], *, column_types: Optional['TypeTester'] = None) -> 'Table': ...
@classmethod
def merge(cls, tables: Iterable['Table']) -> 'Table': ...


class TypeTester:
Expand Down

0 comments on commit 12f1188

Please sign in to comment.