Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dlt plugin #820

Merged
merged 7 commits into from
Apr 17, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions docs/integrations/dlt/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ On this page, you'll learn:

- Extract, Transform, Load (ETL)
- Extract, Load, Transform (ELT)
- dlt materializer plugin for Hamilton

``` {note}
See this [blog post](https://blog.dagworks.io/p/slack-summary-pipeline-with-dlt-ibis) for a more detailed discussion about ETL with dlt + Hamilton
```

## Extract, Transform, Load (ETL)
The key consideration for ETL is that the data has to move twice:
Expand Down Expand Up @@ -289,6 +294,114 @@ results = dr.execute(
)
```

## dlt materializer plugin
We added custom Data Loader/Saver to plug dlt with Hamilton. Compared to the previous approach, it allows to include the dlt operations as part of the Hamilton dataflow and improve lineage / visibility.


``` {note}
See [this notebook](https://github.com/DAGWorks-Inc/hamilton/blob/main/examples/dlt/dlt_plugin.ipynb) for a demo.
```

### DataLoader
The `DataLoader` allows to read in-memory data from a `dlt.Resource`. When working with `dlt.Source`, you can access individual `dlt.Resource` with `source.resource["source_name"]`. This removes the need to write utility functions to read data from dlt (with pandas or Ibis). Contrary to the previous ETL and ELT examples, this approach is useful when you don't want to store the dlt Source data. It effectively connects dlt to Hamilton to enable "Extract, Transform" (ET).


```python
# run.py
from hamilton import driver
from hamilton.io.materialization import from_
import slack # NOTE this is dlt code, not an official Slack library
import transform

source = slack.source(selected_channels=["general"], replies=True)

dr = driver.Builder().with_modules(transform).build()

materializers = [
from_.dlt(
target="general_messages", # node name assigned to the data
resource=source.resources["general_messages"]
),
from_.dlt(
target="general_replies_message",
resource=source.resources["general_replies_message"]
),
]

dr.materialize(*materializers, ...)
zilto marked this conversation as resolved.
Show resolved Hide resolved
```

### DataSaver
The `DataSaver` allows to write node results to any `dlt.Destination`. You'll need to define a `dlt.Pipeline` with the desired `dlt.Destination` and you can specify arguments for the `pipeline.run()` behavior (e.g., incremental loading, primary key, load_file_format). This provides a "Transform, Load" (TL) connector from Hamilton to dlt.

```python
# run.py
from hamilton import driver
from hamilton.io.materialization import to
import slack # NOTE this is dlt code, not an official Slack library
import transform

pipeline = dlt.pipeline(
pipeline_name="slack",
destination='duckdb',
dataset_name="slack_community_backup"
)

dr = driver.Builder().with_modules(transform).build()

materializers = [
to.dlt(
id="threads__dlt", # node name
dependencies=["threads"],
table_name="slack_threads",
pipeline=pipeline,
)
]

dr.materialize(*materializers)
```

### Combining both
You can also combine both the `DataLoader` and `DataSaver`. You will see below that it's almost identical to the ELT example, but now all operations are part of the Hamilton dataflow!


```python
# run.py
from hamilton import driver
from hamilton.io.materialization import from_
import slack # NOTE this is dlt code, not an official Slack library
import transform

pipeline = dlt.pipeline(
pipeline_name="slack",
destination='duckdb',
dataset_name="slack_community_backup"
)
source = slack.source(selected_channels=["general"], replies=True)

dr = driver.Builder().with_modules(transform).build()

materializers = [
from_.dlt(
target="general_messages",
resource=source.resources["general_messages"]
),
from_.dlt(
target="general_replies_message",
resource=source.resources["general_replies_message"]
),
to.dlt(
id="threads__dlt",
dependencies=["threads"],
table_name="slack_threads",
pipeline=pipeline,
)
]

dr.materialize(*materializers)
zilto marked this conversation as resolved.
Show resolved Hide resolved
```


## Next steps
- Our full [code example to ingest Slack data and generate thread summaries](https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/dlt) is available on GitHub.
- Another important pattern in data engineering is reverse ETL, which consists of moving data analytics back to your sources (CRM, Hubspot, Zendesk, etc.). See this [dlt blog](https://dlthub.com/docs/blog/reverse-etl-dlt) to get started.