From 15a3872886b1ad33a2a392112f59bd69c451f9c2 Mon Sep 17 00:00:00 2001 From: Austin Weisgrau Date: Mon, 22 Jul 2024 15:21:30 -0700 Subject: [PATCH] dbtLoggerDatabase class for logging to database --- parsons/utilities/dbt/logging.py | 47 +++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/parsons/utilities/dbt/logging.py b/parsons/utilities/dbt/logging.py index f408efbcc7..9ee8ff7338 100644 --- a/parsons/utilities/dbt/logging.py +++ b/parsons/utilities/dbt/logging.py @@ -6,11 +6,13 @@ from abc import ABC, abstractmethod from typing import Optional +from dbt.contracts.graph.manifest import Manifest from rich.console import Console from rich.logging import RichHandler from rich.markdown import Markdown -from dbt.contracts.graph.manifest import Manifest +from parsons import Table +from parsons.databases.database_connector import DatabaseConnector logger = logging.getLogger(__name__) @@ -175,3 +177,46 @@ def send(self, manifests: list[Manifest]) -> None: from parsons.notifications.slack import Slack Slack.message(channel=self.slack_channel, text=log_text, webhook=self.slack_webhook) + + +class dbtLoggerDatabase(dbtLogger, ABC): + """Log dbt artifacts by loading to a database.""" + + def __init__(self, database_connector: DatabaseConnector, destination_table: str) -> None: + self.db_connector = database_connector + self.destination_table = destination_table + + def format_command_result(self, manifest: Manifest) -> Table: + """Loads all artifact results into a Parsons Table.""" + run_metadata = { + key: getattr(manifest, key) + for key in ( + "command", + "args", + "generated_at", + ) + } + rows = [] + for result in manifest.results: + row = run_metadata.copy() + row.update( + { + key: value + for key, value in result.__dict__.items + if key in ("status", "execution_time", "message", "node") + } + ) + rows.append(row) + tbl = Table(rows) + return tbl + + def format_result(self) -> Table: + tbls = [self.format_command_result(command) for command in self.commands] + tbl = tbls[0].concat(*tbls[1:]) + return tbl + + def send(self, manifests: list[Manifest]) -> None: + self.commands = manifests + log_tbl = self.format_result() + + self.db_connector.copy(log_tbl, self.destination_table, if_exists="append")