Skip to content

Commit

Permalink
Add Example local dlt execution with dbt
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Dec 7, 2024
1 parent a0c4dc2 commit 1025768
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 14 deletions.
42 changes: 41 additions & 1 deletion opendbt/examples.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import importlib
import sys
import tempfile
from multiprocessing.context import SpawnContext
from typing import Dict
Expand All @@ -11,6 +12,10 @@ class DuckDBAdapterV2Custom(DuckDBAdapter):

@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()

with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
Expand All @@ -22,12 +27,47 @@ def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
# Access and call `model` function of the model! NOTE: session argument is None here.
dbt = module.dbtObj(None)
module.model(dbt, None)
# IMPORTANT: here we are passing down duckdb session from the adapter to the model
module.model(dbt=dbt, session=connection.handle)
model_file.close()

@available
def submit_local_dlt_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()

import dlt
# IMPORTANT: here we are pre-configuring and preparing dlt.pipeline for the model!
_pipeline = dlt.pipeline(
pipeline_name=str(parsed_model['unique_id']).replace(".", "-"),
destination=dlt.destinations.duckdb(connection.handle._env.conn),
dataset_name=parsed_model['schema'],
dev_mode=False,
)

with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
print(f"Created temp py file {model_file.name}")
# load and execute python code!
model_name = parsed_model['name']
# Load the module spec
spec = importlib.util.spec_from_file_location(model_name, model_file.name)
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
# Access and call `model` function of the model! NOTE: session argument is None here.
dbt = module.dbtObj(None)
# IMPORTANT: here we are passing down duckdb session from the adapter to the model
module.model(dbt=dbt, pipeline=_pipeline)
model_file.close()

# NOTE! used for testing
class DuckDBAdapterTestingOnlyDbt17(DuckDBAdapter):
Expand Down
26 changes: 26 additions & 0 deletions opendbt/macros/executedlt.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{% materialization executedlt, supported_languages=['python']%}

{%- set identifier = model['alias'] -%}
{%- set language = model['language'] -%}

{% set grant_config = config.get('grants') %}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{{ run_hooks(pre_hooks) }}

{% call noop_statement(name='main', message='Executed DLT pipeline', code=compiled_code, rows_affected=-1, res=None) %}
{%- set res = adapter.submit_local_dlt_job(model, compiled_code) -%}
{% endcall %}
{{ run_hooks(post_hooks) }}

{% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
11 changes: 0 additions & 11 deletions tests/resources/dbttest/models/my_dlt_import_model.py

This file was deleted.

36 changes: 36 additions & 0 deletions tests/resources/dbttest/models/my_executedlt_model.py.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import dlt
from dlt.pipeline import TPipeline


@dlt.resource(
columns={"event_tstamp": {"data_type": "timestamp", "precision": 3}},
primary_key="event_id",
)
def events():
yield [{"event_id": 1, "event_tstamp": "2024-07-30T10:00:00.123"},
{"event_id": 2, "event_tstamp": "2025-02-30T10:00:00.321"}]


def model(dbt, pipeline: TPipeline):
"""
:param dbt:
:param pipeline: Pre-configured dlt pipeline. dlt target connection and dataset is pre-set using the model config!
:return:
"""
dbt.config(materialized="executedlt")
print("========================================================")
print(f"INFO: DLT Pipeline pipeline_name:{pipeline.pipeline_name}")
print(f"INFO: DLT Pipeline dataset_name:{pipeline.dataset_name}")
print(f"INFO: DLT Pipeline staging:{pipeline.staging}")
print(f"INFO: DLT Pipeline destination:{pipeline.destination}")
print(f"INFO: DLT Pipeline _pipeline_storage:{pipeline._pipeline_storage}")
print(f"INFO: DLT Pipeline _schema_storage:{pipeline._schema_storage}")
print(f"INFO: DLT Pipeline state:{pipeline.state}")
print("========================================================")
load_info = pipeline.run(events())
print(load_info)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("========================================================")
return None
31 changes: 31 additions & 0 deletions tests/resources/dbttest/models/my_executepython_dlt_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import dlt


@dlt.resource(
columns={"event_tstamp": {"data_type": "timestamp", "precision": 3}},
primary_key="event_id",
)
def events():
yield [{"event_id": 1, "event_tstamp": "2024-07-30T10:00:00.123"},
{"event_id": 2, "event_tstamp": "2025-02-30T10:00:00.321"}]


def model(dbt, session):
dbt.config(materialized="executepython")
print("========================================================")
print(f"INFO: DLT Version:{dlt.version.__version__}")
print(f"INFO: DBT Duckdb Session:{type(session)}")
print(f"INFO: DBT Duckdb Connection:{type(session._env.conn)}")
print("========================================================")
p = dlt.pipeline(
pipeline_name="dbt_dlt",
destination=dlt.destinations.duckdb(session._env.conn),
dataset_name=dbt.this.schema,
dev_mode=False,
)
load_info = p.run(events())
print(load_info)
row_counts = p.last_trace.last_normalize_info
print(row_counts)
print("========================================================")
return None
14 changes: 14 additions & 0 deletions tests/test_executedlt_materialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pathlib import Path
from unittest import TestCase

from opendbt import OpenDbtProject


class TestOpenDbtProject(TestCase):
RESOURCES_DIR = Path(__file__).parent.joinpath("resources")
DBTTEST_DIR = RESOURCES_DIR.joinpath("dbttest")

def test_run_executedlt_materialization(self):
dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR,
args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom'])
dp.run(command="run", args=['--select', 'my_executedlt_model'])
9 changes: 7 additions & 2 deletions tests/test_executepython_materialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ class TestOpenDbtProject(TestCase):
def test_run_executepython_materialization(self):
dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR,
args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom'])
dp.run(command="run", args=['--select', 'my_executepython_dbt_model'])
dp.run(command="run", args=['--select', 'my_executepython_model'])

def test_run_executepython_dlt_pipeline(self):
dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR,
args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom'])
dp.run(command="run", args=['--select', 'my_executepython_dlt_model'])

def test_run_executepython_materialization_subprocess(self):
dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR,
args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom'])
dp.run(command="run", args=['--select', 'my_executepython_dbt_model'], use_subprocess=True)
dp.run(command="run", args=['--select', 'my_executepython_model'], use_subprocess=True)

0 comments on commit 1025768

Please sign in to comment.