Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Commit

Permalink
Adds data loader examples
Browse files Browse the repository at this point in the history
We still want built-in adapters, but for now this will work.
  • Loading branch information
elijahbenizzy committed Sep 27, 2022
1 parent a89ad3b commit 5e14b48
Show file tree
Hide file tree
Showing 11 changed files with 3,289 additions and 0 deletions.
39 changes: 39 additions & 0 deletions examples/data_loaders/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Data Loaders

Among multiple uses, Hamilton excels at building maintainable, scalable representations of ETLs.
If you've read through the other guides, it should be pretty clear how hamilton enables transformations (the T in ETL/ELT).
In this example, we'll talk about an approach to _Extracting_ data, and how Hamilton enables you to build out extracts,
in a scalable, pluggable way. For example, being able to switch where data is loaded between development and production
is useful, since you might only want a subsample in development, or even load it from a different source.
Here we'll show you how you can achieve this without cluttering your code with `if else`,
which will make your dataflow easier to maintain in the long run.

The goal is to show you two things:

1. How to load data from various sources
2. How to switch the sources of data you're loading from by swapping out modules, using[polymorphism](https://en.wikipedia.org/wiki/Polymorphism_(computer_science))

As such, we have three data loaders to use:

1. [load_data_mock.py](load_data_mock.py): generates mock data on the fly. Meant to represent a unit-testing/quick iteration scenario.
2. [load_data_csv.py](load_data_csv.py): Uses CSV data. Meant to represent more ad-hoc research.
3. [load_data_duckdb.py](load_data_duckdb.py) Uses a duckdb database (saved locally). Meant to represent more production-ready dataflows,
as well as demonstrate the ease of working with duckdb.

But this is hardly exclusive, or exhaustive. One can easily imagine loading from snowflake, your custom datawarehouse, hdfs, etc...
All by swapping out the data loaders.

# The data

The data comes pregenerated in (test_data)[test_data], in both `.csv` and `.duckdb` format.

# Loading/Analyzing the data

To load/analyze the data, you can run the script `run.py`

- `python run.py csv` reads from the `.csv` files and runs all the variables
- `python run.py duckdb` reads from the `duckdb` database and runs all the variables
- `python run.py mock` creates mock data and runs the pipeline

Note that you, as the user, have to manually handle connections/whatnot for duckdb.
We are currently designing the ability to do this natively in hamilton: https://github.com/stitchfix/hamilton/issues/197.
15 changes: 15 additions & 0 deletions examples/data_loaders/load_data_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os

import pandas as pd


def spend(db_path: str) -> pd.DataFrame:
return pd.read_csv(os.path.join(db_path, "marketing_spend.csv"))


def churn(db_path: str) -> pd.DataFrame:
return pd.read_csv(os.path.join(db_path, "marketing_spend.csv"))


def signups(db_path: str) -> pd.DataFrame:
return pd.read_csv(os.path.join(db_path, "marketing_spend.csv"))
18 changes: 18 additions & 0 deletions examples/data_loaders/load_data_duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import duckdb
import pandas as pd


def connection(db_path: str) -> duckdb.DuckDBPyConnection:
return duckdb.connect(database=db_path)


def spend(connection: duckdb.DuckDBPyConnection) -> pd.DataFrame:
return connection.execute("select * from marketing_spend").fetchdf()


def churn(connection: duckdb.DuckDBPyConnection) -> pd.DataFrame:
return connection.execute("select * from churn").fetchdf()


def signups(connection: duckdb.DuckDBPyConnection) -> pd.DataFrame:
return connection.execute("select * from signups").fetchdf()
160 changes: 160 additions & 0 deletions examples/data_loaders/load_data_mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import os

import numpy as np
import pandas as pd


def spend() -> pd.DataFrame:
data = np.array(
[
(
"2022-08-03T00:00:00.000000000",
104052.98074001,
115300.21226012,
69384.46649019,
49474.45580366,
12851.6540992,
1498.5114764,
"2022-08-03T00:00:00.000000000",
),
(
"2022-08-04T00:00:00.000000000",
103234.15793884,
115326.0151612,
71113.31018247,
52513.19734904,
12344.42778548,
1033.79398268,
"2022-08-04T00:00:00.000000000",
),
(
"2022-08-05T00:00:00.000000000",
101816.40188563,
115194.04661767,
71367.20874633,
51795.51413309,
11536.41253561,
2101.46146166,
"2022-08-05T00:00:00.000000000",
),
(
"2022-08-06T00:00:00.000000000",
102263.53043232,
115601.2888751,
71474.76280964,
52861.22158421,
11652.28867968,
1046.83170946,
"2022-08-06T00:00:00.000000000",
),
(
"2022-08-07T00:00:00.000000000",
103271.09660695,
115306.96341012,
71888.99025677,
50742.70043588,
11160.23631976,
2521.31311947,
"2022-08-07T00:00:00.000000000",
),
(
"2022-08-08T00:00:00.000000000",
100775.86701231,
116634.88666304,
71603.50462531,
52361.08798097,
12869.33161266,
3269.57027156,
"2022-08-08T00:00:00.000000000",
),
(
"2022-08-09T00:00:00.000000000",
101527.74726883,
114868.8422755,
70260.81680881,
49647.9754876,
13187.07115589,
2134.71274923,
"2022-08-09T00:00:00.000000000",
),
(
"2022-08-10T00:00:00.000000000",
101150.73295175,
114941.32547639,
68802.02668922,
49590.55466274,
13129.31334755,
3328.0293293,
"2022-08-10T00:00:00.000000000",
),
(
"2022-08-11T00:00:00.000000000",
100317.64365959,
115682.20050942,
67735.95105252,
50621.23723767,
14019.11780391,
2360.4382216,
"2022-08-11T00:00:00.000000000",
),
(
"2022-08-12T00:00:00.000000000",
102024.067597,
116770.81592363,
66244.22984364,
49503.73825509,
14533.2726457,
1868.18205207,
"2022-08-12T00:00:00.000000000",
),
],
dtype=[
("index", "<M8[ns]"),
("facebook", "<f8"),
("twitter", "<f8"),
("tv", "<f8"),
("youtube", "<f8"),
("radio", "<f8"),
("billboards", "<f8"),
("date", "<M8[ns]"),
],
)
return pd.DataFrame.from_records(data)


def churn() -> pd.DataFrame:
data = np.array(
[
("2022-08-03T00:00:00.000000000", 160, 53, "2022-08-03T00:00:00.000000000"),
("2022-08-04T00:00:00.000000000", 162, 54, "2022-08-04T00:00:00.000000000"),
("2022-08-05T00:00:00.000000000", 162, 50, "2022-08-05T00:00:00.000000000"),
("2022-08-06T00:00:00.000000000", 161, 53, "2022-08-06T00:00:00.000000000"),
("2022-08-07T00:00:00.000000000", 160, 49, "2022-08-07T00:00:00.000000000"),
("2022-08-08T00:00:00.000000000", 160, 52, "2022-08-08T00:00:00.000000000"),
("2022-08-09T00:00:00.000000000", 161, 53, "2022-08-09T00:00:00.000000000"),
("2022-08-10T00:00:00.000000000", 160, 57, "2022-08-10T00:00:00.000000000"),
("2022-08-11T00:00:00.000000000", 156, 56, "2022-08-11T00:00:00.000000000"),
("2022-08-12T00:00:00.000000000", 148, 58, "2022-08-12T00:00:00.000000000"),
],
dtype=[("index", "<M8[ns]"), ("womens", "<i8"), ("mens", "<i8"), ("date", "<M8[ns]")],
)
return pd.DataFrame.from_records(data)


def signups() -> pd.DataFrame:
data = np.array(
[
("2022-08-03T00:00:00.000000000", 2184, 429, "2022-08-03T00:00:00.000000000"),
("2022-08-04T00:00:00.000000000", 2164, 461, "2022-08-04T00:00:00.000000000"),
("2022-08-05T00:00:00.000000000", 2159, 454, "2022-08-05T00:00:00.000000000"),
("2022-08-06T00:00:00.000000000", 2157, 449, "2022-08-06T00:00:00.000000000"),
("2022-08-07T00:00:00.000000000", 2121, 478, "2022-08-07T00:00:00.000000000"),
("2022-08-08T00:00:00.000000000", 2151, 517, "2022-08-08T00:00:00.000000000"),
("2022-08-09T00:00:00.000000000", 2133, 541, "2022-08-09T00:00:00.000000000"),
("2022-08-10T00:00:00.000000000", 2160, 565, "2022-08-10T00:00:00.000000000"),
("2022-08-11T00:00:00.000000000", 2135, 609, "2022-08-11T00:00:00.000000000"),
("2022-08-12T00:00:00.000000000", 2116, 633, "2022-08-12T00:00:00.000000000"),
],
dtype=[("index", "<M8[ns]"), ("womens", "<i8"), ("mens", "<i8"), ("date", "<M8[ns]")],
)
return pd.DataFrame.from_records(data)
78 changes: 78 additions & 0 deletions examples/data_loaders/prep_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import pandas as pd

from hamilton.function_modifiers import does, extract_columns, parameterize, source, value


def _sum_series(**series):
return sum(series.values())


@extract_columns(
"facebook_spend",
"twitter_spend",
"tv_spend",
"youtube_spend",
"radio_spend",
"billboards_spend",
"womens_churn",
"mens_churn",
"womens_signups",
"mens_signups",
)
def joined_data(spend: pd.DataFrame, signups: pd.DataFrame, churn: pd.DataFrame) -> pd.DataFrame:
spend = spend.set_index("date").rename(columns=lambda col: col + "_spend")
churn = churn.set_index("date").rename(columns=lambda col: col + "_churn")
signups = signups.set_index("date").rename(columns=lambda col: col + "_signups")
return pd.concat([spend, churn, signups], axis=1)


@does(_sum_series)
def total_marketing_spend(
facebook_spend: pd.Series,
twitter_spend: pd.Series,
tv_spend: pd.Series,
youtube_spend: pd.Series,
radio_spend: pd.Series,
billboards_spend: pd.Series,
) -> pd.Series:
pass


@does(_sum_series)
def total_signups(mens_signups: pd.Series, womens_signups: pd.Series) -> pd.Series:
pass


@does(_sum_series)
def total_churn(mens_churn: pd.Series, womens_churn: pd.Series) -> pd.Series:
pass


def total_customers(total_signups: pd.Series, total_churn: pd.Series) -> pd.Series:
customer_deltas = total_signups + total_churn
return customer_deltas.cumsum()


def acquisition_cost(total_marketing_spend: pd.Series, total_signups: pd.Series) -> pd.Series:
return total_marketing_spend / total_signups


@parameterize(
twitter_spend_smoothed={"lookback_days": value(7), "spend": source("twitter_spend")},
facebook_spend_smoothed={"lookback_days": value(7), "spend": source("facebook_spend")},
radio_spend_smoothed={"lookback_days": value(21), "spend": source("radio_spend")},
tv_spend_smoothed={"lookback_days": value(21), "spend": source("tv_spend")},
billboards_spend_smoothed={"lookback_days": value(7), "spend": source("billboards_spend")},
youtube_spend_smoothed={"lookback_days": value(7), "spend": source("twitter_spend")},
)
def spend_smoothed(lookback_days: int, spend: pd.Series) -> pd.Series:
"""{spend} smoothed by {lookback_days}. Might want to smooth different ad spends differently,
figuring that it takes different amounts of time to get to the customer. A cheap hack at determining
auto-correlation of a series -- this should be a parameter in a model,
but this is to demonstrate the framework
:param lookback_days: Days to smooth over
:param spend: Spend source
:return:
"""
return spend.rolling(window=lookback_days).mean().fillna(0)
2 changes: 2 additions & 0 deletions examples/data_loaders/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
click
duckdb==0.5.0
49 changes: 49 additions & 0 deletions examples/data_loaders/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import click
import load_data_mock
import prep_data

import hamilton.driver
from examples.data_loaders import load_data_csv, load_data_duckdb


@click.group()
def main():
pass


VARS = [
"total_signups",
"total_churn",
"total_marketing_spend",
"acquisition_cost",
"twitter_spend_smoothed",
"facebook_spend_smoothed",
"radio_spend_smoothed",
"tv_spend_smoothed",
"billboards_spend_smoothed",
"youtube_spend_smoothed",
]


@main.command()
def duckdb():
driver = hamilton.driver.Driver(
{"db_path": "./test_data/database.duckdb"}, load_data_duckdb, prep_data
)
print(driver.execute(VARS))


@main.command()
def csv():
driver = hamilton.driver.Driver({"db_path": "test_data"}, load_data_csv, prep_data)
print(driver.execute(VARS))


@main.command()
def mock():
driver = hamilton.driver.Driver({}, load_data_mock, prep_data)
print(driver.execute(VARS))


if __name__ == "__main__":
main()
Loading

0 comments on commit 5e14b48

Please sign in to comment.