diff --git a/core/dbt/adapters/base/connections.py b/core/dbt/adapters/base/connections.py index 67789c9834f..218aa287bf9 100644 --- a/core/dbt/adapters/base/connections.py +++ b/core/dbt/adapters/base/connections.py @@ -18,7 +18,17 @@ from dbt.adapters.base.query_headers import ( MacroQueryStringSetter, ) -from dbt.logger import GLOBAL_LOGGER as logger +from dbt.events.functions import fire_event +from dbt.events.types import ( + NewConnection, + ConnectionReused, + ConnectionLeftOpen, + ConnectionLeftOpen2, + ConnectionClosed, + ConnectionClosed2, + Rollback, + RollbackFailed +) from dbt import flags @@ -136,14 +146,10 @@ def set_connection_name(self, name: Optional[str] = None) -> Connection: if conn.name == conn_name and conn.state == 'open': return conn - logger.debug( - 'Acquiring new {} connection "{}".'.format(self.TYPE, conn_name)) + fire_event(NewConnection(conn_name=conn_name, conn_type=self.TYPE)) if conn.state == 'open': - logger.debug( - 'Re-using an available connection from the pool (formerly {}).' - .format(conn.name) - ) + fire_event(ConnectionReused(conn_name=conn_name)) else: conn.handle = LazyHandle(self.open) @@ -190,11 +196,9 @@ def cleanup_all(self) -> None: with self.lock: for connection in self.thread_connections.values(): if connection.state not in {'closed', 'init'}: - logger.debug("Connection '{}' was left open." - .format(connection.name)) + fire_event(ConnectionLeftOpen(conn_name=connection.name)) else: - logger.debug("Connection '{}' was properly closed." - .format(connection.name)) + fire_event(ConnectionClosed(conn_name=connection.name)) self.close(connection) # garbage collect these connections @@ -220,20 +224,17 @@ def _rollback_handle(cls, connection: Connection) -> None: try: connection.handle.rollback() except Exception: - logger.debug( - 'Failed to rollback {}'.format(connection.name), - exc_info=True - ) + fire_event(RollbackFailed(conn_name=connection.name)) @classmethod def _close_handle(cls, connection: Connection) -> None: """Perform the actual close operation.""" # On windows, sometimes connection handles don't have a close() attr. if hasattr(connection.handle, 'close'): - logger.debug(f'On {connection.name}: Close') + fire_event(ConnectionClosed2(conn_name=connection.name)) connection.handle.close() else: - logger.debug(f'On {connection.name}: No close available on handle') + fire_event(ConnectionLeftOpen2(conn_name=connection.name)) @classmethod def _rollback(cls, connection: Connection) -> None: @@ -244,7 +245,7 @@ def _rollback(cls, connection: Connection) -> None: f'"{connection.name}", but it does not have one open!' ) - logger.debug(f'On {connection.name}: ROLLBACK') + fire_event(Rollback(conn_name=connection.name)) cls._rollback_handle(connection) connection.transaction_open = False @@ -256,7 +257,7 @@ def close(cls, connection: Connection) -> Connection: return connection if connection.transaction_open and connection.handle: - logger.debug('On {}: ROLLBACK'.format(connection.name)) + fire_event(Rollback(conn_name=connection.name)) cls._rollback_handle(connection) connection.transaction_open = False diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 732b2e59914..f8ac6d5d4fa 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -29,7 +29,8 @@ from dbt.contracts.graph.manifest import Manifest, MacroManifest from dbt.contracts.graph.parsed import ParsedSeedNode from dbt.exceptions import warn_or_error -from dbt.logger import GLOBAL_LOGGER as logger +from dbt.events.functions import fire_event +from dbt.events.types import CacheMiss, ListRelations from dbt.utils import filter_null_values, executor from dbt.adapters.base.connections import Connection, AdapterResponse @@ -288,9 +289,12 @@ def _schema_is_cached(self, database: Optional[str], schema: str) -> bool: """Check if the schema is cached, and by default logs if it is not.""" if (database, schema) not in self.cache: - logger.debug( - 'On "{}": cache miss for schema "{}.{}", this is inefficient' - .format(self.nice_connection_name(), database, schema) + fire_event( + CacheMiss( + conn_name=self.nice_connection_name, + database=database, + schema=schema + ) ) return False else: @@ -672,9 +676,8 @@ def list_relations( relations = self.list_relations_without_caching( schema_relation ) + fire_event(ListRelations(database=database, schema=schema, relations=relations)) - logger.debug('with database={}, schema={}, relations={}' - .format(database, schema, relations)) return relations def _make_match_kwargs( diff --git a/core/dbt/adapters/cache.py b/core/dbt/adapters/cache.py index 8bbac5cc95f..07a4d875715 100644 --- a/core/dbt/adapters/cache.py +++ b/core/dbt/adapters/cache.py @@ -1,11 +1,26 @@ +import threading from collections import namedtuple from copy import deepcopy -from typing import List, Iterable, Optional, Dict, Set, Tuple, Any -import threading +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple -from dbt.logger import CACHE_LOGGER as logger -from dbt.utils import lowercase import dbt.exceptions +from dbt.events.functions import fire_event +from dbt.events.types import ( + AddLink, + AddRelation, + DropCascade, + DropMissingRelation, + DropRelation, + DumpAfterAddGraph, + DumpAfterRenameSchema, + DumpBeforeAddGraph, + DumpBeforeRenameSchema, + RenameSchema, + TemporaryRelation, + UncachedRelation, + UpdateReference +) +from dbt.utils import lowercase _ReferenceKey = namedtuple('_ReferenceKey', 'database schema identifier') @@ -157,12 +172,6 @@ def dump_graph_entry(self): return [dot_separated(r) for r in self.referenced_by] -def lazy_log(msg, func): - if logger.disabled: - return - logger.debug(msg.format(func())) - - class RelationsCache: """A cache of the relations known to dbt. Keeps track of relationships declared between tables and handles renames/drops as a real database would. @@ -278,6 +287,7 @@ def _add_link(self, referenced_key, dependent_key): referenced.add_reference(dependent) + # TODO: Is this dead code? I can't seem to find it grepping the codebase. def add_link(self, referenced, dependent): """Add a link between two relations to the database. If either relation does not exist, it will be added as an "external" relation. @@ -297,11 +307,7 @@ def add_link(self, referenced, dependent): # if we have not cached the referenced schema at all, we must be # referring to a table outside our control. There's no need to make # a link - we will never drop the referenced relation during a run. - logger.debug( - '{dep!s} references {ref!s} but {ref.database}.{ref.schema} ' - 'is not in the cache, skipping assumed external relation' - .format(dep=dependent, ref=ref_key) - ) + fire_event(UncachedRelation(dep_key=dependent, ref_key=ref_key)) return if ref_key not in self.relations: # Insert a dummy "external" relation. @@ -317,9 +323,7 @@ def add_link(self, referenced, dependent): type=referenced.External ) self.add(dependent) - logger.debug( - 'adding link, {!s} references {!s}'.format(dep_key, ref_key) - ) + fire_event(AddLink(dep_key=dep_key, ref_key=ref_key)) with self.lock: self._add_link(ref_key, dep_key) @@ -330,14 +334,12 @@ def add(self, relation): :param BaseRelation relation: The underlying relation. """ cached = _CachedRelation(relation) - logger.debug('Adding relation: {!s}'.format(cached)) - - lazy_log('before adding: {!s}', self.dump_graph) + fire_event(AddRelation(relation=cached)) + fire_event(DumpBeforeAddGraph(graph_func=self.dump_graph)) with self.lock: self._setdefault(cached) - - lazy_log('after adding: {!s}', self.dump_graph) + fire_event(DumpAfterAddGraph(graph_func=self.dump_graph)) def _remove_refs(self, keys): """Removes all references to all entries in keys. This does not @@ -359,13 +361,10 @@ def _drop_cascade_relation(self, dropped): :param _CachedRelation dropped: An existing _CachedRelation to drop. """ if dropped not in self.relations: - logger.debug('dropped a nonexistent relationship: {!s}' - .format(dropped)) + fire_event(DropMissingRelation(relation=dropped)) return consequences = self.relations[dropped].collect_consequences() - logger.debug( - 'drop {} is cascading to {}'.format(dropped, consequences) - ) + fire_event(DropCascade(dropped=dropped, consequences=consequences)) self._remove_refs(consequences) def drop(self, relation): @@ -380,7 +379,7 @@ def drop(self, relation): :param str identifier: The identifier of the relation to drop. """ dropped = _make_key(relation) - logger.debug('Dropping relation: {!s}'.format(dropped)) + fire_event(DropRelation(dropped=dropped)) with self.lock: self._drop_cascade_relation(dropped) @@ -403,9 +402,8 @@ def _rename_relation(self, old_key, new_relation): # update all the relations that refer to it for cached in self.relations.values(): if cached.is_referenced_by(old_key): - logger.debug( - 'updated reference from {0} -> {2} to {1} -> {2}' - .format(old_key, new_key, cached.key()) + fire_event( + UpdateReference(old_key=old_key, new_key=new_key, cached_key=cached.key()) ) cached.rename_key(old_key, new_key) @@ -435,10 +433,7 @@ def _check_rename_constraints(self, old_key, new_key): ) if old_key not in self.relations: - logger.debug( - 'old key {} not found in self.relations, assuming temporary' - .format(old_key) - ) + fire_event(TemporaryRelation(key=old_key)) return False return True @@ -456,11 +451,9 @@ def rename(self, old, new): """ old_key = _make_key(old) new_key = _make_key(new) - logger.debug('Renaming relation {!s} to {!s}'.format( - old_key, new_key - )) + fire_event(RenameSchema(old_key=old_key, new_key=new_key)) - lazy_log('before rename: {!s}', self.dump_graph) + fire_event(DumpBeforeRenameSchema(graph_func=self.dump_graph)) with self.lock: if self._check_rename_constraints(old_key, new_key): @@ -468,7 +461,7 @@ def rename(self, old, new): else: self._setdefault(_CachedRelation(new)) - lazy_log('after rename: {!s}', self.dump_graph) + fire_event(DumpAfterRenameSchema(graph_func=self.dump_graph)) def get_relations( self, database: Optional[str], schema: Optional[str] diff --git a/core/dbt/adapters/factory.py b/core/dbt/adapters/factory.py index 0de3f2561dc..64021f10536 100644 --- a/core/dbt/adapters/factory.py +++ b/core/dbt/adapters/factory.py @@ -8,10 +8,9 @@ PACKAGE_PATH as GLOBAL_PROJECT_PATH, PROJECT_NAME as GLOBAL_PROJECT_NAME, ) -from dbt.logger import GLOBAL_LOGGER as logger +from dbt.events.functions import fire_event +from dbt.events.types import AdapterImportError, PluginLoadError from dbt.contracts.connection import Credentials, AdapterRequiredConfig - - from dbt.adapters.protocol import ( AdapterProtocol, AdapterConfig, @@ -67,11 +66,12 @@ def load_plugin(self, name: str) -> Type[Credentials]: # if we failed to import the target module in particular, inform # the user about it via a runtime error if exc.name == 'dbt.adapters.' + name: + fire_event(AdapterImportError(exc=exc)) raise RuntimeException(f'Could not find adapter type {name}!') - logger.info(f'Error importing adapter: {exc}') # otherwise, the error had to have come from some underlying # library. Log the stack trace. - logger.debug('', exc_info=True) + + fire_event(PluginLoadError()) raise plugin: AdapterPlugin = mod.Plugin plugin_type = plugin.adapter.type() diff --git a/core/dbt/adapters/sql/connections.py b/core/dbt/adapters/sql/connections.py index d44764b7b69..cb477f2abc2 100644 --- a/core/dbt/adapters/sql/connections.py +++ b/core/dbt/adapters/sql/connections.py @@ -10,7 +10,8 @@ from dbt.contracts.connection import ( Connection, ConnectionState, AdapterResponse ) -from dbt.logger import GLOBAL_LOGGER as logger +from dbt.events.functions import fire_event +from dbt.events.types import ConnectionUsed, SQLQuery, SQLCommit, SQLQueryStatus class SQLConnectionManager(BaseConnectionManager): @@ -58,9 +59,7 @@ def add_query( connection = self.get_thread_connection() if auto_begin and connection.transaction_open is False: self.begin() - - logger.debug('Using {} connection "{}".' - .format(self.TYPE, connection.name)) + fire_event(ConnectionUsed(conn_type=self.TYPE, conn_name=connection.name)) with self.exception_handler(sql): if abridge_sql_log: @@ -68,19 +67,16 @@ def add_query( else: log_sql = sql - logger.debug( - 'On {connection_name}: {sql}', - connection_name=connection.name, - sql=log_sql, - ) + fire_event(SQLQuery(conn_name=connection.name, sql=log_sql)) pre = time.time() cursor = connection.handle.cursor() cursor.execute(sql, bindings) - logger.debug( - "SQL status: {status} in {elapsed:0.2f} seconds", - status=self.get_response(cursor), - elapsed=(time.time() - pre) + + fire_event( + SQLQueryStatus( + status=self.get_response(cursor), elapsed=round((time.time() - pre), 2) + ) ) return connection, cursor @@ -160,7 +156,7 @@ def commit(self): 'Tried to commit transaction on connection "{}", but ' 'it does not have one open!'.format(connection.name)) - logger.debug('On {}: COMMIT'.format(connection.name)) + fire_event(SQLCommit(conn_name=connection.name)) self.add_commit_query() connection.transaction_open = False diff --git a/core/dbt/adapters/sql/impl.py b/core/dbt/adapters/sql/impl.py index 3377453d451..5dc54f3a6b6 100644 --- a/core/dbt/adapters/sql/impl.py +++ b/core/dbt/adapters/sql/impl.py @@ -6,7 +6,9 @@ import dbt.exceptions from dbt.adapters.base import BaseAdapter, available from dbt.adapters.sql import SQLConnectionManager -from dbt.logger import GLOBAL_LOGGER as logger +from dbt.events.functions import fire_event +from dbt.events.types import ColTypeChange, SchemaCreation, SchemaDrop + from dbt.adapters.base.relation import BaseRelation @@ -116,8 +118,13 @@ def expand_column_types(self, goal, current): target_column.can_expand_to(reference_column): col_string_size = reference_column.string_size() new_type = self.Column.string_type(col_string_size) - logger.debug("Changing col type from {} to {} in table {}", - target_column.data_type, new_type, current) + fire_event( + ColTypeChange( + orig_type=target_column.data_type, + new_type=new_type, + table=current, + ) + ) self.alter_column_type(current, column_name, new_type) @@ -175,7 +182,7 @@ def get_columns_in_relation(self, relation): def create_schema(self, relation: BaseRelation) -> None: relation = relation.without_identifier() - logger.debug('Creating schema "{}"', relation) + fire_event(SchemaCreation(relation=relation)) kwargs = { 'relation': relation, } @@ -186,7 +193,7 @@ def create_schema(self, relation: BaseRelation) -> None: def drop_schema(self, relation: BaseRelation) -> None: relation = relation.without_identifier() - logger.debug('Dropping schema "{}".', relation) + fire_event(SchemaDrop(relation=relation)) kwargs = { 'relation': relation, } diff --git a/core/dbt/events/stubs.py b/core/dbt/events/stubs.py new file mode 100644 index 00000000000..b2329ce2787 --- /dev/null +++ b/core/dbt/events/stubs.py @@ -0,0 +1,44 @@ +from typing import ( + Any, + Optional, + NamedTuple, +) + +# N.B.: +# These stubs were autogenerated by stubgen and then hacked +# to pieces to ensure we had something other than "Any" types +# where using external classes to instantiate event subclasses +# in events/types.py. +# +# This goes away when we turn mypy on for everything. +# +# Don't trust them too much at all! + + +class _ReferenceKey(NamedTuple): + database: Any + schema: Any + identifier: Any + + +class _CachedRelation: + referenced_by: Any + inner: Any + + +class AdapterResponse: + code: Optional[str] + rows_affected: Optional[int] + + +class BaseRelation: + path: Any + type: Optional[Any] + quote_character: str + include_policy: Any + quote_policy: Any + dbt_created: bool + + +class InformationSchema(BaseRelation): + information_schema_view: Optional[str] diff --git a/core/dbt/events/types.py b/core/dbt/events/types.py index fd608cc27fb..733e74f5535 100644 --- a/core/dbt/events/types.py +++ b/core/dbt/events/types.py @@ -1,7 +1,8 @@ from abc import ABCMeta, abstractmethod import argparse from dataclasses import dataclass -from typing import Any, List, Optional, Dict +from typing import Any, Callable, cast, Dict, List, Optional, Set, Union +from dbt.events.stubs import _CachedRelation, AdapterResponse, BaseRelation, _ReferenceKey from dbt import ui from dbt.node_types import NodeType from dbt.events.format import format_fancy_output_line, pluralize @@ -310,7 +311,7 @@ def cli_msg(self) -> str: @dataclass -class MacroEventInfo(InfoLevel, CliEventABC): +class MacroEventDebug(DebugLevel, CliEventABC): msg: str def cli_msg(self) -> str: @@ -318,13 +319,315 @@ def cli_msg(self) -> str: @dataclass -class MacroEventDebug(DebugLevel, CliEventABC): +class MacroEventInfo(InfoLevel, CliEventABC): msg: str def cli_msg(self) -> str: return self.msg +@dataclass +class NewConnection(DebugLevel, CliEventABC): + conn_type: str + conn_name: str + + def cli_msg(self) -> str: + return f'Acquiring new {self.conn_type} connection "{self.conn_name}"' + + +@dataclass +class ConnectionReused(DebugLevel, CliEventABC): + conn_name: str + + def cli_msg(self) -> str: + return f"Re-using an available connection from the pool (formerly {self.conn_name})" + + +@dataclass +class ConnectionLeftOpen(DebugLevel, CliEventABC): + conn_name: Optional[str] + + def cli_msg(self) -> str: + return f"Connection '{self.conn_name}' was left open." + + +@dataclass +class ConnectionClosed(DebugLevel, CliEventABC): + conn_name: Optional[str] + + def cli_msg(self) -> str: + return f"Connection '{self.conn_name}' was properly closed." + + +@dataclass +class RollbackFailed(ShowException, DebugLevel, CliEventABC): + conn_name: Optional[str] + + def cli_msg(self) -> str: + return f"Failed to rollback '{self.conn_name}'" + + +# TODO: can we combine this with ConnectionClosed? +@dataclass +class ConnectionClosed2(DebugLevel, CliEventABC): + conn_name: Optional[str] + + def cli_msg(self) -> str: + return f"On {self.conn_name}: Close" + + +# TODO: can we combine this with ConnectionLeftOpen? +@dataclass +class ConnectionLeftOpen2(DebugLevel, CliEventABC): + conn_name: Optional[str] + + def cli_msg(self) -> str: + return f"On {self.conn_name}: No close available on handle" + + +@dataclass +class Rollback(DebugLevel, CliEventABC): + conn_name: Optional[str] + + def cli_msg(self) -> str: + return f"On {self.conn_name}: ROLLBACK" + + +@dataclass +class CacheMiss(DebugLevel, CliEventABC): + conn_name: Any # TODO mypy says this is `Callable[[], str]`?? ¯\_(ツ)_/¯ + database: Optional[str] + schema: str + + def cli_msg(self) -> str: + return ( + f'On "{self.conn_name}": cache miss for schema ' + '"{self.database}.{self.schema}", this is inefficient' + ) + + +@dataclass +class ListRelations(DebugLevel, CliEventABC): + database: Optional[str] + schema: str + relations: List[BaseRelation] + + def cli_msg(self) -> str: + return f"with database={self.database}, schema={self.schema}, relations={self.relations}" + + +@dataclass +class ConnectionUsed(DebugLevel, CliEventABC): + conn_type: str + conn_name: Optional[str] + + def cli_msg(self) -> str: + return f'Using {self.conn_type} connection "{self.conn_name}"' + + +@dataclass +class SQLQuery(DebugLevel, CliEventABC): + conn_name: Optional[str] + sql: str + + def cli_msg(self) -> str: + return f"On {self.conn_name}: {self.sql}" + + +@dataclass +class SQLQueryStatus(DebugLevel, CliEventABC): + status: Union[AdapterResponse, str] + elapsed: float + + def cli_msg(self) -> str: + return f"SQL status: {self.status} in {self.elapsed} seconds" + + +@dataclass +class SQLCommit(DebugLevel, CliEventABC): + conn_name: str + + def cli_msg(self) -> str: + return f"On {self.conn_name}: COMMIT" + + +@dataclass +class ColTypeChange(DebugLevel, CliEventABC): + orig_type: str + new_type: str + table: str + + def cli_msg(self) -> str: + return f"Changing col type from {self.orig_type} to {self.new_type} in table {self.table}" + + +@dataclass +class SchemaCreation(DebugLevel, CliEventABC): + relation: BaseRelation + + def cli_msg(self) -> str: + return f'Creating schema "{self.relation}"' + + +@dataclass +class SchemaDrop(DebugLevel, CliEventABC): + relation: BaseRelation + + def cli_msg(self) -> str: + return f'Dropping schema "{self.relation}".' + + +# TODO pretty sure this is only ever called in dead code +# see: core/dbt/adapters/cache.py _add_link vs add_link +@dataclass +class UncachedRelation(DebugLevel, CliEventABC): + dep_key: _ReferenceKey + ref_key: _ReferenceKey + + def cli_msg(self) -> str: + return ( + f"{self.dep_key} references {str(self.ref_key)} " + "but {self.ref_key.database}.{self.ref_key.schema}" + "is not in the cache, skipping assumed external relation" + ) + + +@dataclass +class AddLink(DebugLevel, CliEventABC): + dep_key: _ReferenceKey + ref_key: _ReferenceKey + + def cli_msg(self) -> str: + return f"adding link, {self.dep_key} references {self.ref_key}" + + +@dataclass +class AddRelation(DebugLevel, CliEventABC): + relation: _CachedRelation + + def cli_msg(self) -> str: + return f"Adding relation: {str(self.relation)}" + + +@dataclass +class DropMissingRelation(DebugLevel, CliEventABC): + relation: _ReferenceKey + + def cli_msg(self) -> str: + return f"dropped a nonexistent relationship: {str(self.relation)}" + + +@dataclass +class DropCascade(DebugLevel, CliEventABC): + dropped: _ReferenceKey + consequences: Set[_ReferenceKey] + + def cli_msg(self) -> str: + return f"drop {self.dropped} is cascading to {self.consequences}" + + +@dataclass +class DropRelation(DebugLevel, CliEventABC): + dropped: _ReferenceKey + + def cli_msg(self) -> str: + return f"Dropping relation: {self.dropped}" + + +@dataclass +class UpdateReference(DebugLevel, CliEventABC): + old_key: _ReferenceKey + new_key: _ReferenceKey + cached_key: _ReferenceKey + + def cli_msg(self) -> str: + return f"updated reference from {self.old_key} -> {self.cached_key} to "\ + "{self.new_key} -> {self.cached_key}" + + +@dataclass +class TemporaryRelation(DebugLevel, CliEventABC): + key: _ReferenceKey + + def cli_msg(self) -> str: + return f"old key {self.key} not found in self.relations, assuming temporary" + + +@dataclass +class RenameSchema(DebugLevel, CliEventABC): + old_key: _ReferenceKey + new_key: _ReferenceKey + + def cli_msg(self) -> str: + return f"Renaming relation {self.old_key} to {self.new_key}" + + +@dataclass +class DumpBeforeAddGraph(DebugLevel, CliEventABC): + graph_func: Callable[[], Dict[str, List[str]]] + + def cli_msg(self) -> str: + # workaround for https://github.com/python/mypy/issues/6910 + # TODO remove when we've upgraded to a mypy version without that bug + func_returns = cast(Callable[[], Dict[str, List[str]]], getattr(self, "graph_func")) + return f"before adding : {func_returns}" + + +@dataclass +class DumpAfterAddGraph(DebugLevel, CliEventABC): + graph_func: Callable[[], Dict[str, List[str]]] + + def cli_msg(self) -> str: + # workaround for https://github.com/python/mypy/issues/6910 + func_returns = cast(Callable[[], Dict[str, List[str]]], getattr(self, "graph_func")) + return f"after adding: {func_returns}" + + +@dataclass +class DumpBeforeRenameSchema(DebugLevel, CliEventABC): + graph_func: Callable[[], Dict[str, List[str]]] + + def cli_msg(self) -> str: + # workaround for https://github.com/python/mypy/issues/6910 + func_returns = cast(Callable[[], Dict[str, List[str]]], getattr(self, "graph_func")) + return f"before rename: {func_returns}" + + +@dataclass +class DumpAfterRenameSchema(DebugLevel, CliEventABC): + graph_func: Callable[[], Dict[str, List[str]]] + + def cli_msg(self) -> str: + # workaround for https://github.com/python/mypy/issues/6910 + func_returns = cast(Callable[[], Dict[str, List[str]]], getattr(self, "graph_func")) + return f"after rename: {func_returns}" + + +@dataclass +class AdapterImportError(InfoLevel, CliEventABC): + exc: ModuleNotFoundError + + def cli_msg(self) -> str: + return f"Error importing adapter: {self.exc}" + + +@dataclass +class PluginLoadError(ShowException, DebugLevel, CliEventABC): + def cli_msg(self): + pass + + +# since mypy doesn't run on every file we need to suggest to mypy that every +# class gets instantiated. But we don't actually want to run this code. +# making the conditional `if False` causes mypy to skip it as dead code so +# we need to skirt around that by computing something it doesn't check statically. +# +# TODO remove these lines once we run mypy everywhere. + +def dump_callable(): + return {"": [""]} # for instantiating `Dump...` methods which take callables. + + @dataclass class NewConnectionOpening(DebugLevel, CliEventABC): connection_state: str @@ -1779,6 +2082,72 @@ def cli_msg(self) -> str: ManifestLoaded() ManifestChecked() ManifestFlatGraphBuilt() + ReportPerformancePath(path="") + GitSparseCheckoutSubdirectory(subdir="") + GitProgressCheckoutRevision(revision="") + GitProgressUpdatingExistingDependency(dir="") + GitProgressPullingNewDependency(dir="") + GitNothingToDo(sha="") + GitProgressUpdatedCheckoutRange(start_sha="", end_sha="") + GitProgressCheckedOutAt(end_sha="") + SystemErrorRetrievingModTime(path="") + SystemCouldNotWrite(path="", reason="", exc=Exception("")) + SystemExecutingCmd(cmd=[""]) + SystemStdOutMsg(bmsg=b"") + SystemStdErrMsg(bmsg=b"") + SystemReportReturnCode(code=0) + SelectorReportInvalidSelector( + selector_methods={"": ""}, spec_method="", raw_spec="" + ) + MacroEventInfo(msg="") + MacroEventDebug(msg="") + NewConnection(conn_type="", conn_name="") + ConnectionReused(conn_name="") + ConnectionLeftOpen(conn_name="") + ConnectionClosed(conn_name="") + RollbackFailed(conn_name="") + ConnectionClosed2(conn_name="") + ConnectionLeftOpen2(conn_name="") + Rollback(conn_name="") + CacheMiss(conn_name="", database="", schema="") + ListRelations(database="", schema="", relations=[]) + ConnectionUsed(conn_type="", conn_name="") + SQLQuery(conn_name="", sql="") + SQLQueryStatus(status="", elapsed=0.1) + SQLCommit(conn_name="") + ColTypeChange(orig_type="", new_type="", table="") + SchemaCreation(relation=BaseRelation()) + SchemaDrop(relation=BaseRelation()) + UncachedRelation( + dep_key=_ReferenceKey(database="", schema="", identifier=""), + ref_key=_ReferenceKey(database="", schema="", identifier=""), + ) + AddLink( + dep_key=_ReferenceKey(database="", schema="", identifier=""), + ref_key=_ReferenceKey(database="", schema="", identifier=""), + ) + AddRelation(relation=_CachedRelation()) + DropMissingRelation(relation=_ReferenceKey(database="", schema="", identifier="")) + DropCascade( + dropped=_ReferenceKey(database="", schema="", identifier=""), + consequences={_ReferenceKey(database="", schema="", identifier="")}, + ) + UpdateReference( + old_key=_ReferenceKey(database="", schema="", identifier=""), + new_key=_ReferenceKey(database="", schema="", identifier=""), + cached_key=_ReferenceKey(database="", schema="", identifier=""), + ) + TemporaryRelation(key=_ReferenceKey(database="", schema="", identifier="")) + RenameSchema( + old_key=_ReferenceKey(database="", schema="", identifier=""), + new_key=_ReferenceKey(database="", schema="", identifier="") + ) + DumpBeforeAddGraph(dump_callable) + DumpAfterAddGraph(dump_callable) + DumpBeforeRenameSchema(dump_callable) + DumpAfterRenameSchema(dump_callable) + AdapterImportError(ModuleNotFoundError()) + PluginLoadError() ReportPerformancePath(path='') GitSparseCheckoutSubdirectory(subdir='') GitProgressCheckoutRevision(revision='') diff --git a/test/integration/051_query_comments_test/test_query_comments.py b/test/integration/051_query_comments_test/test_query_comments.py index 6eed09f416a..4ada01aa991 100644 --- a/test/integration/051_query_comments_test/test_query_comments.py +++ b/test/integration/051_query_comments_test/test_query_comments.py @@ -2,6 +2,7 @@ import io import json import os +import re import dbt.exceptions from dbt.version import __version__ as dbt_version @@ -70,10 +71,12 @@ def run_get_json(self, expect_pass=True): return logs def query_comment(self, model_name, log): + # N.B: a temporary string replacement regex to strip the HH:MM:SS from the log line if present. + # TODO: make this go away when structured logging is stable + log_msg = re.sub("(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d \| )", "", log['message']) prefix = 'On {}: '.format(model_name) - - if log['message'].startswith(prefix): - msg = log['message'][len(prefix):] + if log_msg.startswith(prefix): + msg = log_msg[len(prefix):] if msg in {'COMMIT', 'BEGIN', 'ROLLBACK'}: return None return msg