Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Organize code structure #16

Merged
merged 11 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 48 additions & 22 deletions opendbt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import argparse
import logging
import os
import subprocess
import sys
from pathlib import Path

from dbt.cli.main import dbtRunner as DbtCliRunner
from dbt.cli.main import dbtRunnerResult
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.results import RunResult
from dbt.exceptions import DbtRuntimeError

import opendbt.client
from opendbt.overrides import patch_dbt
from opendbt.utils import Utils

######################
patch_dbt()


######################

class OpenDbtLogger:
_log = None
Expand All @@ -27,6 +35,39 @@ def log(self) -> logging.Logger:
return self._log


class OpenDbtCli:

@staticmethod
def run(args: list) -> dbtRunnerResult:
"""
Run dbt with the given arguments.

:param args: The arguments to pass to dbt.
:return: The result of the dbt run.
"""
# https://docs.getdbt.com/reference/programmatic-invocations
dbt = DbtCliRunner()
result: dbtRunnerResult = dbt.invoke(args)
if result.success:
return result

# print query for user to run and see the failing rows
rer: RunResult

_exception = result.exception if result.exception else None
if (_exception is None and result.result and result.result.results and
len(result.result.results) > 0 and result.result.results[0].message
):
_exception = DbtRuntimeError(result.result.results[0].message)

if _exception is None:
DbtRuntimeError(f"DBT execution failed!")
if _exception:
raise _exception
else:
return result


class OpenDbtProject(OpenDbtLogger):
"""
This class is used to take action on a dbt project.
Expand Down Expand Up @@ -54,11 +95,14 @@ def run(self, command: str = "build", target: str = None, args: list = None, use
run_args.remove("--no-write-json")

if use_subprocess:
shell = False
self.log.info("Working dir is %s" % os.getcwd())
self.log.info("Running command (shell=%s) `%s`" % (shell, " ".join(command)))
Utils.runcommand(command=['opendbt'] + run_args)
return None
else:
self.log.info(f"Running `dbt {' '.join(run_args)}`")
return client.OpenDbtCli.run(args=run_args)
return OpenDbtCli.run(args=run_args)

def manifest(self, partial_parse=True, no_write_manifest=True) -> Manifest:
args = []
Expand All @@ -78,28 +122,10 @@ def generate_docs(self, args: list = None):
self.run(command="docs", args=_args)


class Utils(object):

@staticmethod
def runcommand(command: list, shell=False):
logger = OpenDbtLogger()

logger.log.info("Working dir is %s" % os.getcwd())
logger.log.info("Running command (shell=%s) `%s`" % (shell, " ".join(command)))
with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1,
universal_newlines=True, shell=shell) as p:
for line in p.stdout:
if line:
print(line.strip())

if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, p.args)


def main():
p = argparse.ArgumentParser()
_, args = p.parse_known_args()
client.OpenDbtCli.run(args=args)
OpenDbtCli.run(args=args)


if __name__ == "__main__":
Expand Down
62 changes: 0 additions & 62 deletions opendbt/client.py

This file was deleted.

33 changes: 0 additions & 33 deletions opendbt/dbt17.py

This file was deleted.

38 changes: 0 additions & 38 deletions opendbt/dbt18.py

This file was deleted.

48 changes: 0 additions & 48 deletions opendbt/dbtcommon.py

This file was deleted.

2 changes: 1 addition & 1 deletion opendbt/examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dbt.adapters.base import available
from dbt.adapters.duckdb import DuckDBAdapter

from opendbt import Utils
from opendbt.utils import Utils


class DuckDBAdapterV2Custom(DuckDBAdapter):
Expand Down
15 changes: 15 additions & 0 deletions opendbt/overrides/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import dbt
from packaging.version import Version

from opendbt.overrides import common


def patch_dbt():
# ================================================================================================================
# Monkey Patching! Override dbt lib AdapterContainer.register_adapter method with new one above
# ================================================================================================================
if Version(dbt.version.get_installed_version().to_version_string(skip_matcher=True)) < Version("1.8.0"):
dbt.task.generate.GenerateTask = common.OpenDbtGenerateTask
else:
dbt.task.docs.generate.GenerateTask = common.OpenDbtGenerateTask
dbt.adapters.factory.FACTORY = common.OpenDbtAdapterContainer()
65 changes: 65 additions & 0 deletions opendbt/overrides/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import importlib

DBT_CUSTOM_ADAPTER_VAR = 'dbt_custom_adapter'
import shutil
from pathlib import Path

import click
from packaging.version import Version
import dbt

if Version(dbt.version.get_installed_version().to_version_string(skip_matcher=True)) < Version("1.8.0"):
from dbt.task.generate import GenerateTask
from opendbt.overrides.dbt17 import AdapterContainerDbtOverride
else:
from dbt.task.docs.generate import GenerateTask
from opendbt.overrides.dbt18 import AdapterContainerDbtOverride


class OpenDbtAdapterContainer(AdapterContainerDbtOverride):

def get_custom_adapter_config_value(self, config: 'AdapterRequiredConfig') -> str:
# FIRST: it's set as cli value: dbt run --vars {'dbt_custom_adapter': 'custom_adapters.DuckDBAdapterV1Custom'}
if hasattr(config, 'cli_vars') and DBT_CUSTOM_ADAPTER_VAR in config.cli_vars:
custom_adapter_class_name: str = config.cli_vars[DBT_CUSTOM_ADAPTER_VAR]
if custom_adapter_class_name and custom_adapter_class_name.strip():
return custom_adapter_class_name
# SECOND: it's set inside dbt_project.yml
if hasattr(config, 'vars') and DBT_CUSTOM_ADAPTER_VAR in config.vars.to_dict():
custom_adapter_class_name: str = config.vars.to_dict()[DBT_CUSTOM_ADAPTER_VAR]
if custom_adapter_class_name and custom_adapter_class_name.strip():
return custom_adapter_class_name

return None

def get_custom_adapter_class_by_name(self, custom_adapter_class_name: str):
if "." not in custom_adapter_class_name:
raise ValueError(f"Unexpected adapter class name: `{custom_adapter_class_name}` ,"
f"Expecting something like:`my.sample.library.MyAdapterClass`")

__module, __class = custom_adapter_class_name.rsplit('.', 1)
try:
user_adapter_module = importlib.import_module(__module)
user_adapter_class = getattr(user_adapter_module, __class)
return user_adapter_class
except ModuleNotFoundError as mnfe:
raise Exception(f"Module of provided adapter not found, provided: {custom_adapter_class_name}") from mnfe


class OpenDbtGenerateTask(GenerateTask):

def deploy_user_index_html(self):
# run custom code
target = Path(self.config.project_target_path).joinpath("index.html")
for dir in self.config.docs_paths:
index_html = Path(self.config.project_root).joinpath(dir).joinpath("index.html")
if index_html.is_file() and index_html.exists():
# override default dbt provided index.html with user index.html file
shutil.copyfile(index_html, target)
click.echo(f"Using user provided documentation page: {index_html.as_posix()}")
break

def run(self):
# Call the original dbt run method
super().run()
self.deploy_user_index_html()
Loading
Loading