Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Struct log for adapter call sites #4189

Merged
merged 16 commits into from
Nov 8, 2021
39 changes: 20 additions & 19 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@
from dbt.adapters.base.query_headers import (
MacroQueryStringSetter,
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import (
NewConnection,
ConnectionReused,
ConnectionLeftOpen,
ConnectionLeftOpen2,
ConnectionClosed,
ConnectionClosed2,
Rollback,
RollbackFailed
)
from dbt import flags


Expand Down Expand Up @@ -136,14 +146,10 @@ def set_connection_name(self, name: Optional[str] = None) -> Connection:
if conn.name == conn_name and conn.state == 'open':
return conn

logger.debug(
'Acquiring new {} connection "{}".'.format(self.TYPE, conn_name))
fire_event(NewConnection(conn_name=conn_name, conn_type=self.TYPE))

if conn.state == 'open':
logger.debug(
'Re-using an available connection from the pool (formerly {}).'
.format(conn.name)
)
fire_event(ConnectionReused(conn_name=conn_name))
else:
conn.handle = LazyHandle(self.open)

Expand Down Expand Up @@ -190,11 +196,9 @@ def cleanup_all(self) -> None:
with self.lock:
for connection in self.thread_connections.values():
if connection.state not in {'closed', 'init'}:
logger.debug("Connection '{}' was left open."
.format(connection.name))
fire_event(ConnectionLeftOpen(conn_name=connection.name))
else:
logger.debug("Connection '{}' was properly closed."
.format(connection.name))
fire_event(ConnectionClosed(conn_name=connection.name))
self.close(connection)

# garbage collect these connections
Expand All @@ -220,20 +224,17 @@ def _rollback_handle(cls, connection: Connection) -> None:
try:
connection.handle.rollback()
except Exception:
logger.debug(
'Failed to rollback {}'.format(connection.name),
exc_info=True
)
fire_event(RollbackFailed(conn_name=connection.name))

@classmethod
def _close_handle(cls, connection: Connection) -> None:
"""Perform the actual close operation."""
# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, 'close'):
logger.debug(f'On {connection.name}: Close')
fire_event(ConnectionClosed2(conn_name=connection.name))
connection.handle.close()
else:
logger.debug(f'On {connection.name}: No close available on handle')
fire_event(ConnectionLeftOpen2(conn_name=connection.name))

@classmethod
def _rollback(cls, connection: Connection) -> None:
Expand All @@ -244,7 +245,7 @@ def _rollback(cls, connection: Connection) -> None:
f'"{connection.name}", but it does not have one open!'
)

logger.debug(f'On {connection.name}: ROLLBACK')
fire_event(Rollback(conn_name=connection.name))
cls._rollback_handle(connection)

connection.transaction_open = False
Expand All @@ -256,7 +257,7 @@ def close(cls, connection: Connection) -> Connection:
return connection

if connection.transaction_open and connection.handle:
logger.debug('On {}: ROLLBACK'.format(connection.name))
fire_event(Rollback(conn_name=connection.name))
cls._rollback_handle(connection)
connection.transaction_open = False

Expand Down
16 changes: 10 additions & 6 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
from dbt.contracts.graph.manifest import Manifest, MacroManifest
from dbt.contracts.graph.parsed import ParsedSeedNode
from dbt.exceptions import warn_or_error
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import CacheMiss, ListRelations
from dbt.utils import filter_null_values, executor

from dbt.adapters.base.connections import Connection, AdapterResponse
Expand Down Expand Up @@ -288,9 +289,13 @@ def _schema_is_cached(self, database: Optional[str], schema: str) -> bool:
"""Check if the schema is cached, and by default logs if it is not."""

if (database, schema) not in self.cache:
logger.debug(
'On "{}": cache miss for schema "{}.{}", this is inefficient'
.format(self.nice_connection_name(), database, schema)
# TODO: This makes mypy angry, fix before turning it on
fire_event(
CacheMiss(
conn_name=self.nice_connection_name,
database=database,
schema=schema
)
)
return False
else:
Expand Down Expand Up @@ -672,9 +677,8 @@ def list_relations(
relations = self.list_relations_without_caching(
schema_relation
)
fire_event(ListRelations(database=database, schema=schema, relations=relations))

logger.debug('with database={}, schema={}, relations={}'
.format(database, schema, relations))
return relations

def _make_match_kwargs(
Expand Down
78 changes: 38 additions & 40 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
import threading
from collections import namedtuple
from copy import deepcopy
from typing import List, Iterable, Optional, Dict, Set, Tuple, Any
import threading
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple

from dbt.logger import CACHE_LOGGER as logger
from dbt.utils import lowercase
import dbt.exceptions
from dbt.events.functions import fire_event
from dbt.events.types import (
AddLink,
AddRelation,
DropCascade,
DropMissingRelation,
DropRelation,
DumpAfterAddGraph,
DumpAfterRenameSchema,
DumpBeforeAddGraph,
DumpBeforeRenameSchema,
RenameSchema,
TemporaryRelation,
UncachedRelation,
UpdateReference
)
from dbt.utils import lowercase

_ReferenceKey = namedtuple('_ReferenceKey', 'database schema identifier')

Expand Down Expand Up @@ -157,12 +172,6 @@ def dump_graph_entry(self):
return [dot_separated(r) for r in self.referenced_by]


def lazy_log(msg, func):
if logger.disabled:
return
logger.debug(msg.format(func()))


class RelationsCache:
"""A cache of the relations known to dbt. Keeps track of relationships
declared between tables and handles renames/drops as a real database would.
Expand Down Expand Up @@ -278,6 +287,7 @@ def _add_link(self, referenced_key, dependent_key):

referenced.add_reference(dependent)

# TODO: Is this dead code? I can't seem to find it grepping the codebase.
def add_link(self, referenced, dependent):
"""Add a link between two relations to the database. If either relation
does not exist, it will be added as an "external" relation.
Expand All @@ -297,11 +307,7 @@ def add_link(self, referenced, dependent):
# if we have not cached the referenced schema at all, we must be
# referring to a table outside our control. There's no need to make
# a link - we will never drop the referenced relation during a run.
logger.debug(
'{dep!s} references {ref!s} but {ref.database}.{ref.schema} '
'is not in the cache, skipping assumed external relation'
.format(dep=dependent, ref=ref_key)
)
fire_event(UncachedRelation(dep_key=dependent, ref_key=ref_key))
return
if ref_key not in self.relations:
# Insert a dummy "external" relation.
Expand All @@ -317,9 +323,7 @@ def add_link(self, referenced, dependent):
type=referenced.External
)
self.add(dependent)
logger.debug(
'adding link, {!s} references {!s}'.format(dep_key, ref_key)
)
fire_event(AddLink(dep_key=dep_key, ref_key=ref_key))
with self.lock:
self._add_link(ref_key, dep_key)

Expand All @@ -330,14 +334,15 @@ def add(self, relation):
:param BaseRelation relation: The underlying relation.
"""
cached = _CachedRelation(relation)
logger.debug('Adding relation: {!s}'.format(cached))

lazy_log('before adding: {!s}', self.dump_graph)
fire_event(AddRelation(relation=cached))
# TODO: conditionally fire this event (logger.disabled, if it was std python logger)
fire_event(DumpBeforeAddGraph(graph=self.dump_graph()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH I see what lazy_log is doing here. I think it's guarding against log levels. If we're on info level, but this is debug level, because of strict evaluation, the long running dump graph will get called even though the message will never be displayed. The way the event module works right now, we force the message early so lazy logging won't help. I think this might be worth solving inside fire_event so we as developers don't have to decide whether the construction of a message is long running or not, it only constructs each message as they are needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that being said... we're going to have the same problem with creating the event type. That will evaluate self.dump_graph() as part of __init__. We should change the event types to take a function value instead, and apply the empty arguments at message creation time. Let's cache it in the event type so it doesn't have to be called again.

We're basically threading laziness farther down the pipeline.


with self.lock:
self._setdefault(cached)

lazy_log('after adding: {!s}', self.dump_graph)
# TODO: conditionally fire this event (logger.disabled, if it was std python logger)
fire_event(DumpAfterAddGraph(graph=self.dump_graph()))

def _remove_refs(self, keys):
"""Removes all references to all entries in keys. This does not
Expand All @@ -359,13 +364,10 @@ def _drop_cascade_relation(self, dropped):
:param _CachedRelation dropped: An existing _CachedRelation to drop.
"""
if dropped not in self.relations:
logger.debug('dropped a nonexistent relationship: {!s}'
.format(dropped))
fire_event(DropMissingRelation(relation=dropped))
return
consequences = self.relations[dropped].collect_consequences()
logger.debug(
'drop {} is cascading to {}'.format(dropped, consequences)
)
fire_event(DropCascade(dropped=dropped, consequences=consequences))
self._remove_refs(consequences)

def drop(self, relation):
Expand All @@ -380,7 +382,7 @@ def drop(self, relation):
:param str identifier: The identifier of the relation to drop.
"""
dropped = _make_key(relation)
logger.debug('Dropping relation: {!s}'.format(dropped))
fire_event(DropRelation(dropped=dropped))
with self.lock:
self._drop_cascade_relation(dropped)

Expand All @@ -403,9 +405,8 @@ def _rename_relation(self, old_key, new_relation):
# update all the relations that refer to it
for cached in self.relations.values():
if cached.is_referenced_by(old_key):
logger.debug(
'updated reference from {0} -> {2} to {1} -> {2}'
.format(old_key, new_key, cached.key())
fire_event(
UpdateReference(old_key=old_key, new_key=new_key, cached_key=cached.key())
)
cached.rename_key(old_key, new_key)

Expand Down Expand Up @@ -435,10 +436,7 @@ def _check_rename_constraints(self, old_key, new_key):
)

if old_key not in self.relations:
logger.debug(
'old key {} not found in self.relations, assuming temporary'
.format(old_key)
)
fire_event(TemporaryRelation(key=old_key))
return False
return True

Expand All @@ -456,19 +454,19 @@ def rename(self, old, new):
"""
old_key = _make_key(old)
new_key = _make_key(new)
logger.debug('Renaming relation {!s} to {!s}'.format(
old_key, new_key
))
fire_event(RenameSchema(old_key=old_key, new_key=new_key))

lazy_log('before rename: {!s}', self.dump_graph)
# TODO: conditionally fire this event (logger.disabled, if it was std python logger)
fire_event(DumpBeforeRenameSchema(graph=self.dump_graph()))

with self.lock:
if self._check_rename_constraints(old_key, new_key):
self._rename_relation(old_key, _CachedRelation(new))
else:
self._setdefault(_CachedRelation(new))

lazy_log('after rename: {!s}', self.dump_graph)
# TODO: conditionally fire this event (logger.disabled, if it was std python logger)
DumpAfterRenameSchema(graph=self.dump_graph())

def get_relations(
self, database: Optional[str], schema: Optional[str]
Expand Down
10 changes: 5 additions & 5 deletions core/dbt/adapters/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
PACKAGE_PATH as GLOBAL_PROJECT_PATH,
PROJECT_NAME as GLOBAL_PROJECT_NAME,
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import AdapterImportError, PluginLoadError
from dbt.contracts.connection import Credentials, AdapterRequiredConfig


from dbt.adapters.protocol import (
AdapterProtocol,
AdapterConfig,
Expand Down Expand Up @@ -67,11 +66,12 @@ def load_plugin(self, name: str) -> Type[Credentials]:
# if we failed to import the target module in particular, inform
# the user about it via a runtime error
if exc.name == 'dbt.adapters.' + name:
fire_event(AdapterImportError(exc=exc))
raise RuntimeException(f'Could not find adapter type {name}!')
logger.info(f'Error importing adapter: {exc}')
# otherwise, the error had to have come from some underlying
# library. Log the stack trace.
logger.debug('', exc_info=True)

fire_event(PluginLoadError())
raise
plugin: AdapterPlugin = mod.Plugin
plugin_type = plugin.adapter.type()
Expand Down
24 changes: 10 additions & 14 deletions core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from dbt.contracts.connection import (
Connection, ConnectionState, AdapterResponse
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import ConnectionUsed, SQLQuery, SQLCommit, SQLQueryStatus


class SQLConnectionManager(BaseConnectionManager):
Expand Down Expand Up @@ -58,29 +59,24 @@ def add_query(
connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
self.begin()

logger.debug('Using {} connection "{}".'
.format(self.TYPE, connection.name))
fire_event(ConnectionUsed(conn_type=self.TYPE, conn_name=connection.name))

with self.exception_handler(sql):
if abridge_sql_log:
log_sql = '{}...'.format(sql[:512])
else:
log_sql = sql

logger.debug(
'On {connection_name}: {sql}',
connection_name=connection.name,
sql=log_sql,
)
fire_event(SQLQuery(conn_name=connection.name, sql=log_sql))
pre = time.time()

cursor = connection.handle.cursor()
cursor.execute(sql, bindings)
logger.debug(
"SQL status: {status} in {elapsed:0.2f} seconds",
status=self.get_response(cursor),
elapsed=(time.time() - pre)

fire_event(
SQLQueryStatus(
status=self.get_response(cursor), elapsed=round((time.time() - pre), 2)
)
)

return connection, cursor
Expand Down Expand Up @@ -160,7 +156,7 @@ def commit(self):
'Tried to commit transaction on connection "{}", but '
'it does not have one open!'.format(connection.name))

logger.debug('On {}: COMMIT'.format(connection.name))
fire_event(SQLCommit(conn_name=connection.name))
self.add_commit_query()

connection.transaction_open = False
Expand Down
Loading