From 10257688ca76a1be54d10c3103d7a489ee8e48fc Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 7 Dec 2024 15:09:31 +0100 Subject: [PATCH] Add Example local dlt execution with dbt --- opendbt/examples.py | 42 ++++++++++++++++++- opendbt/macros/executedlt.sql | 26 ++++++++++++ .../dbttest/models/my_dlt_import_model.py | 11 ----- .../dbttest/models/my_executedlt_model.py.py | 36 ++++++++++++++++ .../models/my_executepython_dlt_model.py | 31 ++++++++++++++ ...dbt_model.py => my_executepython_model.py} | 0 tests/test_executedlt_materialization.py | 14 +++++++ tests/test_executepython_materialization.py | 9 +++- 8 files changed, 155 insertions(+), 14 deletions(-) create mode 100644 opendbt/macros/executedlt.sql delete mode 100644 tests/resources/dbttest/models/my_dlt_import_model.py create mode 100644 tests/resources/dbttest/models/my_executedlt_model.py.py create mode 100644 tests/resources/dbttest/models/my_executepython_dlt_model.py rename tests/resources/dbttest/models/{my_executepython_dbt_model.py => my_executepython_model.py} (100%) create mode 100644 tests/test_executedlt_materialization.py diff --git a/opendbt/examples.py b/opendbt/examples.py index 5392a3c..416c458 100644 --- a/opendbt/examples.py +++ b/opendbt/examples.py @@ -1,4 +1,5 @@ import importlib +import sys import tempfile from multiprocessing.context import SpawnContext from typing import Dict @@ -11,6 +12,10 @@ class DuckDBAdapterV2Custom(DuckDBAdapter): @available def submit_local_python_job(self, parsed_model: Dict, compiled_code: str): + connection = self.connections.get_if_exists() + if not connection: + connection = self.connections.get_thread_connection() + with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file: model_file.write(compiled_code.lstrip().encode('utf-8')) model_file.flush() @@ -22,12 +27,47 @@ def submit_local_python_job(self, parsed_model: Dict, compiled_code: str): # Create a module object module = importlib.util.module_from_spec(spec) # Load the module + sys.modules[model_name] = module spec.loader.exec_module(module) # Access and call `model` function of the model! NOTE: session argument is None here. dbt = module.dbtObj(None) - module.model(dbt, None) + # IMPORTANT: here we are passing down duckdb session from the adapter to the model + module.model(dbt=dbt, session=connection.handle) model_file.close() + @available + def submit_local_dlt_job(self, parsed_model: Dict, compiled_code: str): + connection = self.connections.get_if_exists() + if not connection: + connection = self.connections.get_thread_connection() + + import dlt + # IMPORTANT: here we are pre-configuring and preparing dlt.pipeline for the model! + _pipeline = dlt.pipeline( + pipeline_name=str(parsed_model['unique_id']).replace(".", "-"), + destination=dlt.destinations.duckdb(connection.handle._env.conn), + dataset_name=parsed_model['schema'], + dev_mode=False, + ) + + with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file: + model_file.write(compiled_code.lstrip().encode('utf-8')) + model_file.flush() + print(f"Created temp py file {model_file.name}") + # load and execute python code! + model_name = parsed_model['name'] + # Load the module spec + spec = importlib.util.spec_from_file_location(model_name, model_file.name) + # Create a module object + module = importlib.util.module_from_spec(spec) + # Load the module + sys.modules[model_name] = module + spec.loader.exec_module(module) + # Access and call `model` function of the model! NOTE: session argument is None here. + dbt = module.dbtObj(None) + # IMPORTANT: here we are passing down duckdb session from the adapter to the model + module.model(dbt=dbt, pipeline=_pipeline) + model_file.close() # NOTE! used for testing class DuckDBAdapterTestingOnlyDbt17(DuckDBAdapter): diff --git a/opendbt/macros/executedlt.sql b/opendbt/macros/executedlt.sql new file mode 100644 index 0000000..2f20731 --- /dev/null +++ b/opendbt/macros/executedlt.sql @@ -0,0 +1,26 @@ +{% materialization executedlt, supported_languages=['python']%} + + {%- set identifier = model['alias'] -%} + {%- set language = model['language'] -%} + + {% set grant_config = config.get('grants') %} + + {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + {%- set target_relation = api.Relation.create(identifier=identifier, + schema=schema, + database=database, type='table') -%} + {{ run_hooks(pre_hooks) }} + + {% call noop_statement(name='main', message='Executed DLT pipeline', code=compiled_code, rows_affected=-1, res=None) %} + {%- set res = adapter.submit_local_dlt_job(model, compiled_code) -%} + {% endcall %} + {{ run_hooks(post_hooks) }} + + {% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {{ return({'relations': [target_relation]}) }} + +{% endmaterialization %} diff --git a/tests/resources/dbttest/models/my_dlt_import_model.py b/tests/resources/dbttest/models/my_dlt_import_model.py deleted file mode 100644 index e70e106..0000000 --- a/tests/resources/dbttest/models/my_dlt_import_model.py +++ /dev/null @@ -1,11 +0,0 @@ -@dlt.resource( - columns={"event_tstamp": {"data_type": "timestamp", "timezone": False}}, - primary_key="event_id", -) -def events(): - yield [{"event_id": 1, "event_tstamp": "2024-07-30T10:00:00.123+00:00"}] - - -def model(dbt, session): - print(session) - print(type(session)) diff --git a/tests/resources/dbttest/models/my_executedlt_model.py.py b/tests/resources/dbttest/models/my_executedlt_model.py.py new file mode 100644 index 0000000..83ccb77 --- /dev/null +++ b/tests/resources/dbttest/models/my_executedlt_model.py.py @@ -0,0 +1,36 @@ +import dlt +from dlt.pipeline import TPipeline + + +@dlt.resource( + columns={"event_tstamp": {"data_type": "timestamp", "precision": 3}}, + primary_key="event_id", +) +def events(): + yield [{"event_id": 1, "event_tstamp": "2024-07-30T10:00:00.123"}, + {"event_id": 2, "event_tstamp": "2025-02-30T10:00:00.321"}] + + +def model(dbt, pipeline: TPipeline): + """ + + :param dbt: + :param pipeline: Pre-configured dlt pipeline. dlt target connection and dataset is pre-set using the model config! + :return: + """ + dbt.config(materialized="executedlt") + print("========================================================") + print(f"INFO: DLT Pipeline pipeline_name:{pipeline.pipeline_name}") + print(f"INFO: DLT Pipeline dataset_name:{pipeline.dataset_name}") + print(f"INFO: DLT Pipeline staging:{pipeline.staging}") + print(f"INFO: DLT Pipeline destination:{pipeline.destination}") + print(f"INFO: DLT Pipeline _pipeline_storage:{pipeline._pipeline_storage}") + print(f"INFO: DLT Pipeline _schema_storage:{pipeline._schema_storage}") + print(f"INFO: DLT Pipeline state:{pipeline.state}") + print("========================================================") + load_info = pipeline.run(events()) + print(load_info) + row_counts = pipeline.last_trace.last_normalize_info + print(row_counts) + print("========================================================") + return None diff --git a/tests/resources/dbttest/models/my_executepython_dlt_model.py b/tests/resources/dbttest/models/my_executepython_dlt_model.py new file mode 100644 index 0000000..547b008 --- /dev/null +++ b/tests/resources/dbttest/models/my_executepython_dlt_model.py @@ -0,0 +1,31 @@ +import dlt + + +@dlt.resource( + columns={"event_tstamp": {"data_type": "timestamp", "precision": 3}}, + primary_key="event_id", +) +def events(): + yield [{"event_id": 1, "event_tstamp": "2024-07-30T10:00:00.123"}, + {"event_id": 2, "event_tstamp": "2025-02-30T10:00:00.321"}] + + +def model(dbt, session): + dbt.config(materialized="executepython") + print("========================================================") + print(f"INFO: DLT Version:{dlt.version.__version__}") + print(f"INFO: DBT Duckdb Session:{type(session)}") + print(f"INFO: DBT Duckdb Connection:{type(session._env.conn)}") + print("========================================================") + p = dlt.pipeline( + pipeline_name="dbt_dlt", + destination=dlt.destinations.duckdb(session._env.conn), + dataset_name=dbt.this.schema, + dev_mode=False, + ) + load_info = p.run(events()) + print(load_info) + row_counts = p.last_trace.last_normalize_info + print(row_counts) + print("========================================================") + return None diff --git a/tests/resources/dbttest/models/my_executepython_dbt_model.py b/tests/resources/dbttest/models/my_executepython_model.py similarity index 100% rename from tests/resources/dbttest/models/my_executepython_dbt_model.py rename to tests/resources/dbttest/models/my_executepython_model.py diff --git a/tests/test_executedlt_materialization.py b/tests/test_executedlt_materialization.py new file mode 100644 index 0000000..ce38806 --- /dev/null +++ b/tests/test_executedlt_materialization.py @@ -0,0 +1,14 @@ +from pathlib import Path +from unittest import TestCase + +from opendbt import OpenDbtProject + + +class TestOpenDbtProject(TestCase): + RESOURCES_DIR = Path(__file__).parent.joinpath("resources") + DBTTEST_DIR = RESOURCES_DIR.joinpath("dbttest") + + def test_run_executedlt_materialization(self): + dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR, + args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom']) + dp.run(command="run", args=['--select', 'my_executedlt_model']) diff --git a/tests/test_executepython_materialization.py b/tests/test_executepython_materialization.py index a732724..bb13a11 100644 --- a/tests/test_executepython_materialization.py +++ b/tests/test_executepython_materialization.py @@ -11,9 +11,14 @@ class TestOpenDbtProject(TestCase): def test_run_executepython_materialization(self): dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR, args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom']) - dp.run(command="run", args=['--select', 'my_executepython_dbt_model']) + dp.run(command="run", args=['--select', 'my_executepython_model']) + + def test_run_executepython_dlt_pipeline(self): + dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR, + args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom']) + dp.run(command="run", args=['--select', 'my_executepython_dlt_model']) def test_run_executepython_materialization_subprocess(self): dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR, args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom']) - dp.run(command="run", args=['--select', 'my_executepython_dbt_model'], use_subprocess=True) + dp.run(command="run", args=['--select', 'my_executepython_model'], use_subprocess=True)