Skip to content

Commit

Permalink
Audit potential circular dependencies (#8489)
Browse files Browse the repository at this point in the history
  • Loading branch information
aranke authored Sep 12, 2023
1 parent ad04012 commit 05ef3b6
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 214 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20230906-141616.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Audit potential circular dependencies
time: 2023-09-06T14:16:16.58457+01:00
custom:
Author: aranke
Issue: "8349"
3 changes: 2 additions & 1 deletion core/dbt/context/exceptions_jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import NoReturn

from dbt.events.functions import warn_or_error
from dbt.events.helpers import env_secrets, scrub_secrets
from dbt.events.types import JinjaLogWarning

from dbt.exceptions import (
Expand All @@ -26,6 +25,8 @@
ContractError,
ColumnTypeMissingError,
FailFastError,
scrub_secrets,
env_secrets,
)


Expand Down
183 changes: 3 additions & 180 deletions core/dbt/events/eventmgr.py
Original file line number Diff line number Diff line change
@@ -1,184 +1,10 @@
import os
from colorama import Style
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import json
import logging
from logging.handlers import RotatingFileHandler
import threading
import traceback
from typing import Any, Callable, List, Optional, TextIO, Protocol
from typing import Callable, List, Optional, Protocol
from uuid import uuid4
from dbt.events.format import timestamp_to_datetime_string

from dbt.events.base_types import BaseEvent, EventLevel, msg_from_base_event, EventMsg
import dbt.utils

# A Filter is a function which takes a BaseEvent and returns True if the event
# should be logged, False otherwise.
Filter = Callable[[EventMsg], bool]


# Default filter which logs every event
def NoFilter(_: EventMsg) -> bool:
return True


# A Scrubber removes secrets from an input string, returning a sanitized string.
Scrubber = Callable[[str], str]


# Provide a pass-through scrubber implementation, also used as a default
def NoScrubber(s: str) -> str:
return s


class LineFormat(Enum):
PlainText = 1
DebugText = 2
Json = 3


# Map from dbt event levels to python log levels
_log_level_map = {
EventLevel.DEBUG: 10,
EventLevel.TEST: 10,
EventLevel.INFO: 20,
EventLevel.WARN: 30,
EventLevel.ERROR: 40,
}


# We need this function for now because the numeric log severity levels in
# Python do not match those for logbook, so we have to explicitly call the
# correct function by name.
def send_to_logger(l, level: str, log_line: str):
if level == "test":
l.debug(log_line)
elif level == "debug":
l.debug(log_line)
elif level == "info":
l.info(log_line)
elif level == "warn":
l.warning(log_line)
elif level == "error":
l.error(log_line)
else:
raise AssertionError(
f"While attempting to log {log_line}, encountered the unhandled level: {level}"
)


@dataclass
class LoggerConfig:
name: str
filter: Filter = NoFilter
scrubber: Scrubber = NoScrubber
line_format: LineFormat = LineFormat.PlainText
level: EventLevel = EventLevel.WARN
use_colors: bool = False
output_stream: Optional[TextIO] = None
output_file_name: Optional[str] = None
output_file_max_bytes: Optional[int] = 10 * 1024 * 1024 # 10 mb
logger: Optional[Any] = None


class _Logger:
def __init__(self, event_manager: "EventManager", config: LoggerConfig) -> None:
self.name: str = config.name
self.filter: Filter = config.filter
self.scrubber: Scrubber = config.scrubber
self.level: EventLevel = config.level
self.event_manager: EventManager = event_manager
self._python_logger: Optional[logging.Logger] = config.logger

if config.output_stream is not None:
stream_handler = logging.StreamHandler(config.output_stream)
self._python_logger = self._get_python_log_for_handler(stream_handler)

if config.output_file_name:
file_handler = RotatingFileHandler(
filename=str(config.output_file_name),
encoding="utf8",
maxBytes=config.output_file_max_bytes, # type: ignore
backupCount=5,
)
self._python_logger = self._get_python_log_for_handler(file_handler)

def _get_python_log_for_handler(self, handler: logging.Handler):
log = logging.getLogger(self.name)
log.setLevel(_log_level_map[self.level])
handler.setFormatter(logging.Formatter(fmt="%(message)s"))
log.handlers.clear()
log.propagate = False
log.addHandler(handler)
return log

def create_line(self, msg: EventMsg) -> str:
raise NotImplementedError()

def write_line(self, msg: EventMsg):
line = self.create_line(msg)
if self._python_logger is not None:
send_to_logger(self._python_logger, msg.info.level, line)

def flush(self):
if self._python_logger is not None:
for handler in self._python_logger.handlers:
handler.flush()


class _TextLogger(_Logger):
def __init__(self, event_manager: "EventManager", config: LoggerConfig) -> None:
super().__init__(event_manager, config)
self.use_colors = config.use_colors
self.use_debug_format = config.line_format == LineFormat.DebugText

def create_line(self, msg: EventMsg) -> str:
return self.create_debug_line(msg) if self.use_debug_format else self.create_info_line(msg)

def create_info_line(self, msg: EventMsg) -> str:
ts: str = datetime.utcnow().strftime("%H:%M:%S")
scrubbed_msg: str = self.scrubber(msg.info.msg) # type: ignore
return f"{self._get_color_tag()}{ts} {scrubbed_msg}"

def create_debug_line(self, msg: EventMsg) -> str:
log_line: str = ""
# Create a separator if this is the beginning of an invocation
# TODO: This is an ugly hack, get rid of it if we can
ts: str = timestamp_to_datetime_string(msg.info.ts)
if msg.info.name == "MainReportVersion":
separator = 30 * "="
log_line = f"\n\n{separator} {ts} | {self.event_manager.invocation_id} {separator}\n"
scrubbed_msg: str = self.scrubber(msg.info.msg) # type: ignore
level = msg.info.level
log_line += (
f"{self._get_color_tag()}{ts} [{level:<5}]{self._get_thread_name()} {scrubbed_msg}"
)
return log_line

def _get_color_tag(self) -> str:
return "" if not self.use_colors else Style.RESET_ALL

def _get_thread_name(self) -> str:
thread_name = ""
if threading.current_thread().name:
thread_name = threading.current_thread().name
thread_name = thread_name[:10]
thread_name = thread_name.ljust(10, " ")
thread_name = f" [{thread_name}]:"
return thread_name


class _JsonLogger(_Logger):
def create_line(self, msg: EventMsg) -> str:
from dbt.events.functions import msg_to_dict

msg_dict = msg_to_dict(msg)
raw_log_line = json.dumps(msg_dict, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder)
line = self.scrubber(raw_log_line) # type: ignore
return line
from dbt.events.logger import LoggerConfig, _Logger, _TextLogger, _JsonLogger, LineFormat


class EventManager:
Expand Down Expand Up @@ -208,11 +34,8 @@ def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:

def add_logger(self, config: LoggerConfig) -> None:
logger = (
_JsonLogger(self, config)
if config.line_format == LineFormat.Json
else _TextLogger(self, config)
_JsonLogger(config) if config.line_format == LineFormat.Json else _TextLogger(config)
)
logger.event_manager = self
self.loggers.append(logger)

def flush(self):
Expand Down
7 changes: 5 additions & 2 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from dbt.constants import METADATA_ENV_PREFIX
from dbt.events.base_types import BaseEvent, EventLevel, EventMsg
from dbt.events.eventmgr import EventManager, LoggerConfig, LineFormat, NoFilter, IEventManager
from dbt.events.helpers import env_secrets, scrub_secrets
from dbt.events.eventmgr import EventManager, IEventManager
from dbt.events.logger import LoggerConfig, NoFilter, LineFormat
from dbt.exceptions import scrub_secrets, env_secrets
from dbt.events.types import Note
from dbt.flags import get_flags, ENABLE_LEGACY_LOGGER
from dbt.logger import GLOBAL_LOGGER, make_log_dir_if_missing
Expand Down Expand Up @@ -106,6 +107,7 @@ def _get_stdout_config(
log_cache_events,
line_format,
),
invocation_id=EVENT_MANAGER.invocation_id,
output_stream=sys.stdout,
)

Expand All @@ -132,6 +134,7 @@ def _get_logfile_config(
level=level, # File log is *always* debug level
scrubber=env_scrubber,
filter=partial(_logfile_filter, bool(get_flags().LOG_CACHE_EVENTS), line_format),
invocation_id=EVENT_MANAGER.invocation_id,
output_file_name=log_path,
output_file_max_bytes=log_file_max_bytes,
)
Expand Down
16 changes: 0 additions & 16 deletions core/dbt/events/helpers.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,6 @@
import os
from typing import List
from dbt.constants import SECRET_ENV_PREFIX
from datetime import datetime


def env_secrets() -> List[str]:
return [v for k, v in os.environ.items() if k.startswith(SECRET_ENV_PREFIX) and v.strip()]


def scrub_secrets(msg: str, secrets: List[str]) -> str:
scrubbed = str(msg)

for secret in secrets:
scrubbed = scrubbed.replace(secret, "*****")

return scrubbed


# This converts a datetime to a json format datetime string which
# is used in constructing protobuf message timestamps.
def datetime_to_json_string(dt: datetime) -> str:
Expand Down
Loading

0 comments on commit 05ef3b6

Please sign in to comment.