Skip to content

Commit

Permalink
Merge pull request #2650 from fishtown-analytics/fix/close-connections
Browse files Browse the repository at this point in the history
Fix: close connections after use
  • Loading branch information
beckjake authored Jul 28, 2020
2 parents 5b40cc4 + 1bfe43f commit 5c18c78
Show file tree
Hide file tree
Showing 27 changed files with 373 additions and 153 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
## dbt 0.17.2 (Release TBD)


### Breaking changes (for plugins)
- The `release` argument to adapter.execute_macro no longer has any effect. It will be removed in a future release of dbt (likely 0.18.0) ([#2650](https://github.com/fishtown-analytics/dbt/pull/2650))


### Fixes
- fast-fail option with adapters that don't support cancelling queries will now passthrough the original error messages ([#2644](https://github.com/fishtown-analytics/dbt/issues/2644), [#2646](https://github.com/fishtown-analytics/dbt/pull/2646))
- `dbt clean` no longer requires a profile ([#2620](https://github.com/fishtown-analytics/dbt/issues/2620), [#2649](https://github.com/fishtown-analytics/dbt/pull/2649))
- Close all connections so snowflake's keepalive thread will exit. ([#2645](https://github.com/fishtown-analytics/dbt/issues/2645), [#2650](https://github.com/fishtown-analytics/dbt/pull/2650))

Contributors:
- [@joshpeng-quibi](https://github.com/joshpeng-quibi) ([#2646](https://github.com/fishtown-analytics/dbt/pull/2646))


## dbt 0.17.2b1 (July 21, 2020)


Expand Down
26 changes: 15 additions & 11 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ def clear_transaction(self) -> None:
self.begin()
self.commit()

def rollback_if_open(self) -> None:
conn = self.get_if_exists()
if conn is not None and conn.handle and conn.transaction_open:
self._rollback(conn)

@abc.abstractmethod
def exception_handler(self, sql: str) -> ContextManager:
"""Create a context manager that handles exceptions caused by database
Expand Down Expand Up @@ -176,11 +181,9 @@ def release(self) -> None:
return

try:
if conn.state == 'open':
if conn.transaction_open is True:
self._rollback(conn)
else:
self.close(conn)
# always close the connection. close() calls _rollback() if there
# is an open transaction
self.close(conn)
except Exception:
# if rollback or close failed, remove our busted connection
self.clear_thread_connection()
Expand Down Expand Up @@ -230,11 +233,10 @@ def _close_handle(cls, connection: Connection) -> None:
"""Perform the actual close operation."""
# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, 'close'):
logger.debug('On {}: Close'.format(connection.name))
logger.debug(f'On {connection.name}: Close')
connection.handle.close()
else:
logger.debug('On {}: No close available on handle'
.format(connection.name))
logger.debug(f'On {connection.name}: No close available on handle')

@classmethod
def _rollback(cls, connection: Connection) -> None:
Expand All @@ -247,10 +249,11 @@ def _rollback(cls, connection: Connection) -> None:

if connection.transaction_open is False:
raise dbt.exceptions.InternalException(
'Tried to rollback transaction on connection "{}", but '
'it does not have one open!'.format(connection.name))
f'Tried to rollback transaction on connection '
f'"{connection.name}", but it does not have one open!'
)

logger.debug('On {}: ROLLBACK'.format(connection.name))
logger.debug(f'On {connection.name}: ROLLBACK')
cls._rollback_handle(connection)

connection.transaction_open = False
Expand All @@ -268,6 +271,7 @@ def close(cls, connection: Connection) -> Connection:
return connection

if connection.transaction_open and connection.handle:
logger.debug('On {}: ROLLBACK'.format(connection.name))
cls._rollback_handle(connection)
connection.transaction_open = False

Expand Down
80 changes: 40 additions & 40 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,6 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
# databases
return info_schema_name_map

def _list_relations_get_connection(
self, schema_relation: BaseRelation
) -> List[BaseRelation]:
name = f'list_{schema_relation.database}_{schema_relation.schema}'
with self.connection_named(name):
return self.list_relations_without_caching(schema_relation)

def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
"""Populate the relations cache for the given schemas. Returns an
iterable of the schemas populated, as strings.
Expand All @@ -328,10 +321,16 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None:

cache_schemas = self._get_cache_schemas(manifest)
with executor(self.config) as tpe:
futures: List[Future[List[BaseRelation]]] = [
tpe.submit(self._list_relations_get_connection, cache_schema)
for cache_schema in cache_schemas
]
futures: List[Future[List[BaseRelation]]] = []
for cache_schema in cache_schemas:
fut = tpe.submit_connected(
self,
f'list_{cache_schema.database}_{cache_schema.schema}',
self.list_relations_without_caching,
cache_schema
)
futures.append(fut)

for future in as_completed(futures):
# if we can't read the relations we need to just raise anyway,
# so just call future.result() and let that raise on failure
Expand Down Expand Up @@ -935,8 +934,10 @@ def execute_macro(
execution context.
:param kwargs: An optional dict of keyword args used to pass to the
macro.
:param release: If True, release the connection after executing.
:param release: Ignored.
"""
if release is not False:
deprecations.warn('execute-macro-release')
if kwargs is None:
kwargs = {}
if context_override is None:
Expand Down Expand Up @@ -972,11 +973,7 @@ def execute_macro(
macro_function = MacroGenerator(macro, macro_context)

with self.connections.exception_handler(f'macro {macro_name}'):
try:
result = macro_function(**kwargs)
finally:
if release:
self.release_connection()
result = macro_function(**kwargs)
return result

@classmethod
Expand All @@ -1001,24 +998,17 @@ def _get_one_catalog(
manifest: Manifest,
) -> agate.Table:

name = '.'.join([
str(information_schema.database),
'information_schema'
])

with self.connection_named(name):
kwargs = {
'information_schema': information_schema,
'schemas': schemas
}
table = self.execute_macro(
GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
release=True,
# pass in the full manifest so we get any local project
# overrides
manifest=manifest,
)
kwargs = {
'information_schema': information_schema,
'schemas': schemas
}
table = self.execute_macro(
GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
# pass in the full manifest so we get any local project
# overrides
manifest=manifest,
)

results = self._catalog_filter_table(table, manifest)
return results
Expand All @@ -1029,10 +1019,21 @@ def get_catalog(
schema_map = self._get_catalog_schemas(manifest)

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = [
tpe.submit(self._get_one_catalog, info, schemas, manifest)
for info, schemas in schema_map.items() if len(schemas) > 0
]
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue
name = '.'.join([
str(info.database),
'information_schema'
])

fut = tpe.submit_connected(
self, name,
self._get_one_catalog, info, schemas, manifest
)
futures.append(fut)

catalogs, exceptions = catch_as_completed(futures)

return catalogs, exceptions
Expand All @@ -1059,7 +1060,6 @@ def calculate_freshness(
table = self.execute_macro(
FRESHNESS_MACRO_NAME,
kwargs=kwargs,
release=True,
manifest=manifest
)
# now we have a 1-row table of the maximum `loaded_at_field` value and
Expand Down
7 changes: 5 additions & 2 deletions core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import dbt.clients.agate_helper
import dbt.exceptions
from dbt.contracts.connection import Connection
from dbt.contracts.connection import Connection, ConnectionState
from dbt.adapters.base import BaseConnectionManager
from dbt.logger import GLOBAL_LOGGER as logger

Expand Down Expand Up @@ -37,7 +37,10 @@ def cancel_open(self) -> List[str]:

# if the connection failed, the handle will be None so we have
# nothing to cancel.
if connection.handle is not None:
if (
connection.handle is not None and
connection.state == ConnectionState.OPEN
):
self.cancel(connection)
if connection.name is not None:
names.append(connection.name)
Expand Down
10 changes: 10 additions & 0 deletions core/dbt/deprecations.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ class DbtProjectYamlDeprecation(DBTDeprecation):
'''


class ExecuteMacrosReleaseDeprecation(DBTDeprecation):
_name = 'execute-macro-release'
_description = '''\
The "release" argument to execute_macro is now ignored, and will be removed
in a future relase of dbt. At that time, providing a `release` argument
will result in an error.
'''


_adapter_renamed_description = """\
The adapter function `adapter.{old_name}` is deprecated and will be removed in
a future release of dbt. Please use `adapter.{new_name}` instead.
Expand Down Expand Up @@ -151,6 +160,7 @@ def warn(name, *args, **kwargs):
ColumnQuotingDeprecation(),
ModelsKeyNonModelDeprecation(),
DbtProjectYamlDeprecation(),
ExecuteMacrosReleaseDeprecation(),
]

deprecations: Dict[str, DBTDeprecation] = {
Expand Down
48 changes: 27 additions & 21 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,15 @@ def _cancel_connections(self, pool):
dbt.ui.printer.print_timestamped_line(msg, yellow)

else:
for conn_name in adapter.cancel_open_connections():
if self.manifest is not None:
node = self.manifest.nodes.get(conn_name)
if node is not None and node.is_ephemeral_model:
continue
# if we don't have a manifest/don't have a node, print anyway.
dbt.ui.printer.print_cancel_line(conn_name)
with adapter.connection_named('master'):
for conn_name in adapter.cancel_open_connections():
if self.manifest is not None:
node = self.manifest.nodes.get(conn_name)
if node is not None and node.is_ephemeral_model:
continue
# if we don't have a manifest/don't have a node, print
# anyway.
dbt.ui.printer.print_cancel_line(conn_name)

pool.join()

Expand Down Expand Up @@ -457,18 +459,15 @@ def list_schemas(
db_lowercase = dbt.utils.lowercase(db_only.database)
if db_only.database is None:
database_quoted = None
conn_name = 'list_schemas'
else:
database_quoted = str(db_only)
conn_name = f'list_{db_only.database}'

with adapter.connection_named(conn_name):
# we should never create a null schema, so just filter them out
return [
(db_lowercase, s.lower())
for s in adapter.list_schemas(database_quoted)
if s is not None
]
# we should never create a null schema, so just filter them out
return [
(db_lowercase, s.lower())
for s in adapter.list_schemas(database_quoted)
if s is not None
]

def create_schema(relation: BaseRelation) -> None:
db = relation.database or ''
Expand All @@ -480,9 +479,13 @@ def create_schema(relation: BaseRelation) -> None:
create_futures = []

with dbt.utils.executor(self.config) as tpe:
list_futures = [
tpe.submit(list_schemas, db) for db in required_databases
]
for req in required_databases:
if req.database is None:
name = 'list_schemas'
else:
name = f'list_{req.database}'
fut = tpe.submit_connected(adapter, name, list_schemas, req)
list_futures.append(fut)

for ls_future in as_completed(list_futures):
existing_schemas_lowered.update(ls_future.result())
Expand All @@ -499,9 +502,12 @@ def create_schema(relation: BaseRelation) -> None:
db_schema = (db_lower, schema.lower())
if db_schema not in existing_schemas_lowered:
existing_schemas_lowered.add(db_schema)
create_futures.append(
tpe.submit(create_schema, info)

fut = tpe.submit_connected(
adapter, f'create_{info.database or ""}_{info.schema}',
create_schema, info
)
create_futures.append(fut)

for create_future in as_completed(create_futures):
# trigger/re-raise any excceptions while creating schemas
Expand Down
Loading

0 comments on commit 5c18c78

Please sign in to comment.