Skip to content

Commit

Permalink
dlt plugin (#820)
Browse files Browse the repository at this point in the history
* dlt DataLoader and DataSaver with docs and notebook example

materializers = [
    to.dlt(
        id="saver_node",
        dependencies=["table"],
        table_name="my_table",
        pipeline=saver_pipeline,
    )
    from_.dlt(
        target="external",
        resource=my_mock_source.resources["mock_resource"],
    ),
]

---------

Co-authored-by: zilto <tjean@DESKTOP-V6JDCS2>
  • Loading branch information
zilto and zilto authored Apr 17, 2024
1 parent 431cc21 commit d89b03e
Show file tree
Hide file tree
Showing 9 changed files with 652 additions and 6 deletions.
129 changes: 124 additions & 5 deletions docs/integrations/dlt/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ On this page, you'll learn:

- Extract, Transform, Load (ETL)
- Extract, Load, Transform (ELT)
- dlt materializer plugin for Hamilton

``` {note}
See this [blog post](https://blog.dagworks.io/p/slack-summary-pipeline-with-dlt-ibis) for a more detailed discussion about ETL with dlt + Hamilton
```

## Extract, Transform, Load (ETL)
The key consideration for ETL is that the data has to move twice:
Expand Down Expand Up @@ -76,17 +81,19 @@ The key consideration for ETL is that the data has to move twice:
return _table_to_df(client, "general_replies_message")

def threads(
general_messages: pd.DataFrame,
general_message: pd.DataFrame,
general_replies_message: pd.DataFrame,
) -> pd.DataFrame:
"""Reassemble from the union of parent messages and replies"""
columns = ["thread_ts", "ts", "user", "text"]
return pd.concat(
[general_messages[columns], general_replies_message[columns]],
[general_message[columns], general_replies_message[columns]],
axis=0
)
```

![](transform.png)

3. Add the Hamilton dataflow execution code to `run.py`

```python
Expand Down Expand Up @@ -209,7 +216,7 @@ Transformations happen within the data destination, typically a data warehouse.
ibis.set_backend(backend)
return backend

def general_messages(db_con: ibis.BaseBackend, pipeline: dlt.Pipeline) -> ir.Table:
def general_message(db_con: ibis.BaseBackend, pipeline: dlt.Pipeline) -> ir.Table:
"""Load table `general_message` from dlt data"""
return db_con.table(
"general_message",
Expand All @@ -229,13 +236,13 @@ Transformations happen within the data destination, typically a data warehouse.
)

def threads(
general_messages: ir.Table,
general_message: ir.Table,
general_replies_message: ir.Table,
) -> ir.Table:
"""Create the union of `general_message` and `general_replies_message`"""
columns = ["thread_ts", "ts", "user", "text"]
return ibis.union(
general_messages.select(columns),
general_message.select(columns),
general_replies_message.select(columns),
)

Expand Down Expand Up @@ -289,6 +296,118 @@ results = dr.execute(
)
```

## dlt materializer plugin
We added custom Data Loader/Saver to plug dlt with Hamilton. Compared to the previous approach, it allows to include the dlt operations as part of the Hamilton dataflow and improve lineage / visibility.


``` {note}
See [this notebook](https://github.com/DAGWorks-Inc/hamilton/blob/main/examples/dlt/dlt_plugin.ipynb) for a demo.
```

### DataLoader
The `DataLoader` allows to read in-memory data from a `dlt.Resource`. When working with `dlt.Source`, you can access individual `dlt.Resource` with `source.resource["source_name"]`. This removes the need to write utility functions to read data from dlt (with pandas or Ibis). Contrary to the previous ETL and ELT examples, this approach is useful when you don't want to store the dlt Source data. It effectively connects dlt to Hamilton to enable "Extract, Transform" (ET).


```python
# run.py
from hamilton import driver
from hamilton.io.materialization import from_
import slack # NOTE this is dlt code, not an official Slack library
import transform

source = slack.source(selected_channels=["general"], replies=True)

dr = driver.Builder().with_modules(transform).build()

materializers = [
from_.dlt(
target="general_message", # node name assigned to the data
resource=source.resources["general_message"]
),
from_.dlt(
target="general_replies_message",
resource=source.resources["general_replies_message"]
),
]
# when using only loaders (i.e., `from_`), you need to specify
# `additional_vars` to compute, like you would in `.execute(final_vars=["threads"])`
dr.materialize(*materializers, additional_vars=["threads"])
```

### DataSaver
The `DataSaver` allows to write node results to any `dlt.Destination`. You'll need to define a `dlt.Pipeline` with the desired `dlt.Destination` and you can specify arguments for the `pipeline.run()` behavior (e.g., incremental loading, primary key, load_file_format). This provides a "Transform, Load" (TL) connector from Hamilton to dlt.

```python
# run.py
import dlt
from hamilton import driver
from hamilton.io.materialization import to
import slack # NOTE this is dlt code, not an official Slack library
import transform

pipeline = dlt.pipeline(
pipeline_name="slack",
destination='duckdb',
dataset_name="slack_community_backup"
)

dr = driver.Builder().with_modules(transform).build()

materializers = [
to.dlt(
id="threads__dlt", # node name
dependencies=["threads"],
table_name="slack_threads",
pipeline=pipeline,
)
]

dr.materialize(*materializers)
```

### Combining both
You can also combine both the `DataLoader` and `DataSaver`. You will see below that it's almost identical to the ELT example, but now all operations are part of the Hamilton dataflow!


```python
# run.py
import dlt
from hamilton import driver
from hamilton.io.materialization import from_, to
import slack # NOTE this is dlt code, not an official Slack library
import transform

pipeline = dlt.pipeline(
pipeline_name="slack",
destination='duckdb',
dataset_name="slack_community_backup"
)
source = slack.source(selected_channels=["general"], replies=True)

dr = driver.Builder().with_modules(transform).build()

materializers = [
from_.dlt(
target="general_message",
resource=source.resources["general_message"]
),
from_.dlt(
target="general_replies_message",
resource=source.resources["general_replies_message"]
),
to.dlt(
id="threads__dlt",
dependencies=["threads"],
table_name="slack_threads",
pipeline=pipeline,
)
]

dr.materialize(*materializers)
```

![](./materialization.png)

## Next steps
- Our full [code example to ingest Slack data and generate thread summaries](https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/dlt) is available on GitHub.
- Another important pattern in data engineering is reverse ETL, which consists of moving data analytics back to your sources (CRM, Hubspot, Zendesk, etc.). See this [dlt blog](https://dlthub.com/docs/blog/reverse-etl-dlt) to get started.
Binary file added docs/integrations/dlt/materialization.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/integrations/dlt/transform.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit d89b03e

Please sign in to comment.