diff --git a/docs/integrations/dlt/index.md b/docs/integrations/dlt/index.md index 67e57cfd2..9f139d4d1 100644 --- a/docs/integrations/dlt/index.md +++ b/docs/integrations/dlt/index.md @@ -8,6 +8,11 @@ On this page, you'll learn: - Extract, Transform, Load (ETL) - Extract, Load, Transform (ELT) +- dlt materializer plugin for Hamilton + +``` {note} +See this [blog post](https://blog.dagworks.io/p/slack-summary-pipeline-with-dlt-ibis) for a more detailed discussion about ETL with dlt + Hamilton +``` ## Extract, Transform, Load (ETL) The key consideration for ETL is that the data has to move twice: @@ -76,17 +81,19 @@ The key consideration for ETL is that the data has to move twice: return _table_to_df(client, "general_replies_message") def threads( - general_messages: pd.DataFrame, + general_message: pd.DataFrame, general_replies_message: pd.DataFrame, ) -> pd.DataFrame: """Reassemble from the union of parent messages and replies""" columns = ["thread_ts", "ts", "user", "text"] return pd.concat( - [general_messages[columns], general_replies_message[columns]], + [general_message[columns], general_replies_message[columns]], axis=0 ) ``` +![](transform.png) + 3. Add the Hamilton dataflow execution code to `run.py` ```python @@ -209,7 +216,7 @@ Transformations happen within the data destination, typically a data warehouse. ibis.set_backend(backend) return backend - def general_messages(db_con: ibis.BaseBackend, pipeline: dlt.Pipeline) -> ir.Table: + def general_message(db_con: ibis.BaseBackend, pipeline: dlt.Pipeline) -> ir.Table: """Load table `general_message` from dlt data""" return db_con.table( "general_message", @@ -229,13 +236,13 @@ Transformations happen within the data destination, typically a data warehouse. ) def threads( - general_messages: ir.Table, + general_message: ir.Table, general_replies_message: ir.Table, ) -> ir.Table: """Create the union of `general_message` and `general_replies_message`""" columns = ["thread_ts", "ts", "user", "text"] return ibis.union( - general_messages.select(columns), + general_message.select(columns), general_replies_message.select(columns), ) @@ -289,6 +296,118 @@ results = dr.execute( ) ``` +## dlt materializer plugin +We added custom Data Loader/Saver to plug dlt with Hamilton. Compared to the previous approach, it allows to include the dlt operations as part of the Hamilton dataflow and improve lineage / visibility. + + +``` {note} +See [this notebook](https://github.com/DAGWorks-Inc/hamilton/blob/main/examples/dlt/dlt_plugin.ipynb) for a demo. +``` + +### DataLoader +The `DataLoader` allows to read in-memory data from a `dlt.Resource`. When working with `dlt.Source`, you can access individual `dlt.Resource` with `source.resource["source_name"]`. This removes the need to write utility functions to read data from dlt (with pandas or Ibis). Contrary to the previous ETL and ELT examples, this approach is useful when you don't want to store the dlt Source data. It effectively connects dlt to Hamilton to enable "Extract, Transform" (ET). + + +```python +# run.py +from hamilton import driver +from hamilton.io.materialization import from_ +import slack # NOTE this is dlt code, not an official Slack library +import transform + +source = slack.source(selected_channels=["general"], replies=True) + +dr = driver.Builder().with_modules(transform).build() + +materializers = [ + from_.dlt( + target="general_message", # node name assigned to the data + resource=source.resources["general_message"] + ), + from_.dlt( + target="general_replies_message", + resource=source.resources["general_replies_message"] + ), +] +# when using only loaders (i.e., `from_`), you need to specify +# `additional_vars` to compute, like you would in `.execute(final_vars=["threads"])` +dr.materialize(*materializers, additional_vars=["threads"]) +``` + +### DataSaver +The `DataSaver` allows to write node results to any `dlt.Destination`. You'll need to define a `dlt.Pipeline` with the desired `dlt.Destination` and you can specify arguments for the `pipeline.run()` behavior (e.g., incremental loading, primary key, load_file_format). This provides a "Transform, Load" (TL) connector from Hamilton to dlt. + +```python +# run.py +import dlt +from hamilton import driver +from hamilton.io.materialization import to +import slack # NOTE this is dlt code, not an official Slack library +import transform + +pipeline = dlt.pipeline( + pipeline_name="slack", + destination='duckdb', + dataset_name="slack_community_backup" +) + +dr = driver.Builder().with_modules(transform).build() + +materializers = [ + to.dlt( + id="threads__dlt", # node name + dependencies=["threads"], + table_name="slack_threads", + pipeline=pipeline, + ) +] + +dr.materialize(*materializers) +``` + +### Combining both +You can also combine both the `DataLoader` and `DataSaver`. You will see below that it's almost identical to the ELT example, but now all operations are part of the Hamilton dataflow! + + +```python +# run.py +import dlt +from hamilton import driver +from hamilton.io.materialization import from_, to +import slack # NOTE this is dlt code, not an official Slack library +import transform + +pipeline = dlt.pipeline( + pipeline_name="slack", + destination='duckdb', + dataset_name="slack_community_backup" +) +source = slack.source(selected_channels=["general"], replies=True) + +dr = driver.Builder().with_modules(transform).build() + +materializers = [ + from_.dlt( + target="general_message", + resource=source.resources["general_message"] + ), + from_.dlt( + target="general_replies_message", + resource=source.resources["general_replies_message"] + ), + to.dlt( + id="threads__dlt", + dependencies=["threads"], + table_name="slack_threads", + pipeline=pipeline, + ) +] + +dr.materialize(*materializers) +``` + +![](./materialization.png) + ## Next steps - Our full [code example to ingest Slack data and generate thread summaries](https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/dlt) is available on GitHub. - Another important pattern in data engineering is reverse ETL, which consists of moving data analytics back to your sources (CRM, Hubspot, Zendesk, etc.). See this [dlt blog](https://dlthub.com/docs/blog/reverse-etl-dlt) to get started. diff --git a/docs/integrations/dlt/materialization.png b/docs/integrations/dlt/materialization.png new file mode 100644 index 000000000..988461ac4 Binary files /dev/null and b/docs/integrations/dlt/materialization.png differ diff --git a/docs/integrations/dlt/transform.png b/docs/integrations/dlt/transform.png new file mode 100644 index 000000000..4b0e8df87 Binary files /dev/null and b/docs/integrations/dlt/transform.png differ diff --git a/examples/dlt/dlt_plugin.ipynb b/examples/dlt/dlt_plugin.ipynb new file mode 100644 index 000000000..032bff17c --- /dev/null +++ b/examples/dlt/dlt_plugin.ipynb @@ -0,0 +1,345 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# dlt plugin for Hamilton\n", + "This notebook shows how to use Hamilton [materializers](https://hamilton.dagworks.io/en/latest/concepts/materialization/) to move data between Hamilton and dlt.\n", + "\n", + "Content:\n", + "1. Defining an illustrative Hamilton dataflow\n", + "2. `DataSaver`: save Hamilton results to a [dlt Destination](https://dlthub.com/docs/dlt-ecosystem/destinations/)\n", + "3. `DataLoader`: load data from a [dlt Resource](https://dlthub.com/docs/dlt-ecosystem/verified-sources/) (a single table from a Source) into a Hamilton node" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext hamilton.plugins.jupyter_magic\n", + "\n", + "import dlt\n", + "from hamilton import driver\n", + "from hamilton.io.materialization import to, from_\n", + "from hamilton.plugins import dlt_extensions" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "%3\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "table\n", + "\n", + "table\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "print_df_head\n", + "\n", + "print_df_head\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "_print_df_head_inputs\n", + "\n", + "external\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "_print_df_head_inputs->print_df_head\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input\n", + "\n", + "input\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%cell_to_module -m my_module -d\n", + "import pandas as pd\n", + "\n", + "def table() -> pd.DataFrame:\n", + " return pd.DataFrame([{\"C\": 1}, {\"C\": 2}])\n", + "\n", + "def print_df_head(external: pd.DataFrame) -> pd.DataFrame:\n", + " print(\"from print_df_head:\\n\", external.head())" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "dr = driver.Builder().with_modules(my_module).build()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## DataSaver\n", + "With \"Extract, Transform, Load\" (ETL) as frame of reference, here, the Hamilton dataflow is responsible for Transform, and `DltDestination` for Load.\n", + "\n", + "\n", + "Start by defining a dlt `Pipeline` that uses your chosen dlt Destination. This is regular dlt code that you will pass to Hamilton." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "saver_pipeline = dlt.pipeline(pipeline_name=\"saver_pipe\", destination=\"duckdb\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Single dependency\n", + "Define the materializer with `to.dlt()` the example below shows required arguments. You specify an `id` for the materializer and `dependencies` includes the name of a single Hamilton node. Then, specify a `table_name` for the destination and pass the `pipeline`. \n", + "\n", + "The [other keyword arguments](https://dlthub.com/docs/api_reference/pipeline/__init__#run) for `dlt.pipeline.run()` are accepted and allow specifying incremental loading, table schema annotation, and more." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'dlt_metadata': {'pipeline': {'pipeline_name': 'saver_pipe'}, 'metrics': [{'started_at': DateTime(2024, 4, 15, 20, 18, 40, 423767, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 4, 15, 20, 18, 40, 587988, tzinfo=Timezone('UTC')), 'load_id': '1713212320.0496533'}], 'destination_type': 'dlt.destinations.duckdb', 'destination_displayable_credentials': 'duckdb:////home/tjean/projects/dagworks/hamilton/examples/dlt/saver_pipe.duckdb', 'destination_name': 'duckdb', 'environment': None, 'staging_type': None, 'staging_name': None, 'staging_displayable_credentials': None, 'destination_fingerprint': '', 'dataset_name': 'saver_pipe_dataset', 'loads_ids': ['1713212320.0496533'], 'load_packages': [{'load_id': '1713212320.0496533', 'package_path': '/home/tjean/.dlt/pipelines/saver_pipe/load/loaded/1713212320.0496533', 'state': 'loaded', 'completed_at': DateTime(2024, 4, 15, 20, 18, 40, 570607, tzinfo=Timezone('UTC')), 'jobs': [{'state': 'completed_jobs', 'file_path': '/home/tjean/.dlt/pipelines/saver_pipe/load/loaded/1713212320.0496533/completed_jobs/my_table.7352fcd48a.0.parquet', 'file_size': 574, 'created_at': DateTime(2024, 4, 15, 20, 18, 40, 60607, tzinfo=Timezone('UTC')), 'elapsed': 0.5100002288818359, 'failed_message': None, 'table_name': 'my_table', 'file_id': '7352fcd48a', 'retry_count': 0, 'file_format': 'parquet'}], 'schema_hash': 'UE8l1iVz3xnHM+zYpjm8Bqd+3m6rDG++zNubWIUyecg=', 'schema_name': 'saver_pipe', 'tables': []}], 'first_run': False, 'started_at': DateTime(2024, 4, 15, 20, 18, 40, 423767, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 4, 15, 20, 18, 40, 587988, tzinfo=Timezone('UTC'))}}\n" + ] + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "%3\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "table\n", + "\n", + "table\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "saver_node\n", + "\n", + "\n", + "saver_node\n", + "DltDestinationSaver\n", + "\n", + "\n", + "\n", + "table->saver_node\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n", + "output\n", + "\n", + "output\n", + "\n", + "\n", + "\n", + "materializer\n", + "\n", + "\n", + "materializer\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "materializers = [\n", + " to.dlt(\n", + " id=\"saver_node\",\n", + " dependencies=[\"table\"],\n", + " table_name=\"my_table\",\n", + " pipeline=saver_pipeline,\n", + " )\n", + "]\n", + "results, _ = dr.materialize(*materializers)\n", + "print(results[\"saver_node\"])\n", + "dr.visualize_materialization(*materializers)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## DataLoader\n", + "With ETL as a frame of reference, the `DataLoader` uses dlt to run the \"Extract\" step for the passed dlt `Resource`. \n", + "\n", + "Internally, it creates a temporary dlt Pipeline to run the extract and normalize steps then reads the files in-memory. The dlt Pipeline is then deleted. " + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# this is a mock dlt Source for demo purposes\n", + "@dlt.source\n", + "def mock_source():\n", + " iterable_data = [{\"col\": 1}, {\"col\": 2}, {\"col\": 3}] * 100\n", + " \n", + " @dlt.resource\n", + " def mock_resource():\n", + " yield from iterable_data\n", + " \n", + " yield mock_resource\n", + " \n", + "my_mock_source = mock_source()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Single resource\n", + "To define the materializer, give it a `target` Hamilton node and pass a dlt Resource to `resource`. When working with a dlt Source, you can access individual resources via the dictionary `Source.resource[RESOURCE_NAME]`" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "from print_df_head:\n", + " col _dlt_load_id _dlt_id\n", + "0 1 1713212320.641402 vG/cmM5Ty/F/WQ\n", + "1 2 1713212320.641402 xZ/tUsoBiWTneQ\n", + "2 3 1713212320.641402 d+8Ah9hx2214Vw\n", + "3 1 1713212320.641402 XxYKANM6PlMl2A\n", + "4 2 1713212320.641402 jnnYhXp5KA2NsQ\n" + ] + } + ], + "source": [ + "materializers = [\n", + " from_.dlt(\n", + " target=\"external\",\n", + " resource=my_mock_source.resources[\"mock_resource\"],\n", + " ),\n", + "]\n", + "\n", + "metadata, _ = dr.materialize(\n", + " *materializers,\n", + " additional_vars=[\"print_df_head\"]\n", + ")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/hamilton/function_modifiers/base.py b/hamilton/function_modifiers/base.py index be3038443..2e8a7068f 100644 --- a/hamilton/function_modifiers/base.py +++ b/hamilton/function_modifiers/base.py @@ -36,6 +36,7 @@ "sklearn_plot", "vaex", "ibis", + "dlt", ] for plugin_module in plugins_modules: try: diff --git a/hamilton/plugins/dlt_extensions.py b/hamilton/plugins/dlt_extensions.py new file mode 100644 index 000000000..66525ba43 --- /dev/null +++ b/hamilton/plugins/dlt_extensions.py @@ -0,0 +1,134 @@ +import dataclasses +from typing import Any, Collection, Dict, Iterable, Literal, Optional, Sequence, Tuple, Type + +import dlt +import pandas as pd +from dlt.common.destination.capabilities import TLoaderFileFormat +from dlt.common.schema import Schema, TColumnSchema + +# importing TDestinationReferenceArg fails if Destination isn't imported +from dlt.extract.resource import DltResource + +from hamilton import registry +from hamilton.io import utils +from hamilton.io.data_adapters import DataLoader, DataSaver + +DATAFRAME_TYPES = [Iterable, pd.DataFrame] + +# TODO add types for other Dataframe libraries +try: + import pyarrow as pa + + DATAFRAME_TYPES.extend([pa.Table, pa.RecordBatch]) +except ModuleNotFoundError: + pass + +# convert to tuple to dynamically define type `Union[DATAFRAME_TYPES]` +DATAFRAME_TYPES = tuple(DATAFRAME_TYPES) +COLUMN_FRIENDLY_DF_TYPE = False + + +@dataclasses.dataclass +class DltResourceLoader(DataLoader): + resource: DltResource + + @classmethod + def name(cls) -> str: + return "dlt" + + @classmethod + def applicable_types(cls) -> Collection[Type]: + return [pd.DataFrame] + + def load_data(self, type_: Type) -> Tuple[pd.DataFrame, Dict[str, Any]]: + """Creates a pipeline and conduct `extract` and `normalize` steps. + Then, "load packages" are read with pandas + """ + pipeline = dlt.pipeline( + pipeline_name="Hamilton-DltResourceLoader", destination="filesystem" + ) + pipeline.extract(self.resource) + normalize_info = pipeline.normalize(loader_file_format="parquet") + + partition_file_paths = [] + package = normalize_info.load_packages[0] + for job in package.jobs["new_jobs"]: + if job.job_file_info.table_name == self.resource.name: + partition_file_paths.append(job.file_path) + + # TODO use pyarrow directly to support different dataframe libraries + # ref: https://github.com/dlt-hub/verified-sources/blob/master/sources/filesystem/readers.py + # ref: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html#pyarrow.parquet.ParquetDataset + df = pd.concat([pd.read_parquet(f) for f in partition_file_paths], ignore_index=True) + + # delete the pipeline + pipeline.drop() + + metadata = utils.get_dataframe_metadata(df) + return df, metadata + + +# TODO handle behavior with `combine=`, currently only supports materializing a single node +@dataclasses.dataclass +class DltDestinationSaver(DataSaver): + """Materialize results using a dlt pipeline with the specified destination. + + In reference to an Extract, Transform, Load (ETL) pipeline, here, the Hamilton + dataflow is responsible for Transform, and `DltDestination` for Load. + """ + + pipeline: dlt.Pipeline + table_name: str + primary_key: Optional[str] = None + write_disposition: Optional[Literal["skip", "append", "replace", "merge"]] = None + columns: Optional[Sequence[TColumnSchema]] = None + schema: Optional[Schema] = None + loader_file_format: Optional[TLoaderFileFormat] = None + + @classmethod + def name(cls) -> str: + return "dlt" + + @classmethod + def applicable_types(cls) -> Collection[Type]: + return DATAFRAME_TYPES + + def _get_kwargs(self) -> dict: + kwargs = {} + fields_to_skip = ["pipeline"] + for field in dataclasses.fields(self): + field_value = getattr(self, field.name) + if field.name in fields_to_skip: + continue + + if field_value != field.default: + kwargs[field.name] = field_value + + return kwargs + + # TODO get pyarrow table from polars, dask, etc. + def save_data(self, data) -> Dict[str, Any]: + """ + ref: https://dlthub.com/docs/dlt-ecosystem/verified-sources/arrow-pandas + """ + if isinstance(data, dict): + raise NotImplementedError( + "DltDestinationSaver received data of type `dict`." + "Currently, it doesn't support specifying `combine=base.DictResult()`" + ) + + load_info = self.pipeline.run(data, **self._get_kwargs()) + # follows the pattern of metadata output found in hamilton.io.utils + return {"dlt_metadata": load_info.asdict()} + + +def register_data_loaders(): + """Function to register the data loaders for this extension.""" + for loader in [ + DltDestinationSaver, + DltResourceLoader, + ]: + registry.register_adapter(loader) + + +register_data_loaders() diff --git a/requirements-docs.txt b/requirements-docs.txt index ffbc66d31..1cc78c496 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -5,6 +5,8 @@ dask-expr dask[distributed] ddtrace diskcache +# required for all the plugins +dlt # furo -- install from main for now until the next release is out: git+https://github.com/pradyunsg/furo@main gitpython # Required for parsing git info for generation of data-adapter docs @@ -16,7 +18,6 @@ mock==1.0.1 # read the docs pins myst-parser==2.0.0 # latest version of myst at this time pandera pillow -# required for all the plugins polars pyarrow >= 1.0.0 pyspark diff --git a/requirements-test.txt b/requirements-test.txt index 881815c32..da714f6bf 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -2,6 +2,7 @@ connectorx dask dask-expr; python_version >= '3.9' diskcache +dlt fsspec graphviz kaleido diff --git a/tests/plugins/test_dlt_extensions.py b/tests/plugins/test_dlt_extensions.py new file mode 100644 index 000000000..cc320b584 --- /dev/null +++ b/tests/plugins/test_dlt_extensions.py @@ -0,0 +1,45 @@ +from pathlib import Path + +import dlt +import pandas as pd +import pyarrow as pa +import pytest +from dlt.destinations import filesystem + +from hamilton.plugins.dlt_extensions import DltDestinationSaver, DltResourceLoader + + +def pandas_df(): + return pd.DataFrame({"a": [1, 2], "b": [1, 2]}) + + +def iterable(): + return [{"a": 1, "b": 3}, {"a": 2, "b": 4}] + + +def pyarrow_table(): + col_a = pa.array([1, 2]) + col_b = pa.array([3, 4]) + return pa.Table.from_arrays([col_a, col_b], names=["a", "b"]) + + +@pytest.mark.parametrize("data", [iterable(), pandas_df(), pyarrow_table()]) +def test_dlt_destination_saver(data, tmp_path): + save_pipe = dlt.pipeline(destination=filesystem(bucket_url=tmp_path.as_uri())) + saver = DltDestinationSaver(pipeline=save_pipe, table_name="test_table") + + metadata = saver.save_data(data) + + assert len(metadata["dlt_metadata"]["load_packages"]) == 1 + assert metadata["dlt_metadata"]["load_packages"][0]["state"] == "loaded" + assert Path(metadata["dlt_metadata"]["load_packages"][0]["jobs"][0]["file_path"]).exists() + + +def test_dlt_source_loader(): + resource = dlt.resource([{"a": 1, "b": 3}, {"a": 2, "b": 4}], name="mock_resource") + loader = DltResourceLoader(resource=resource) + + loaded_data, metadata = loader.load_data(pd.DataFrame) + + assert len(loaded_data) == len([row for row in resource]) + assert "_dlt_load_id" in loaded_data.columns