Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Commit

Permalink
Adds data loader examples
Browse files Browse the repository at this point in the history
We still want built-in adapters, but for now this will work.
  • Loading branch information
elijahbenizzy committed Sep 26, 2022
1 parent a89ad3b commit 3e213a1
Show file tree
Hide file tree
Showing 14 changed files with 3,393 additions and 0 deletions.
44 changes: 44 additions & 0 deletions examples/data_loaders/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Data Loaders

Among multiple uses, Hamilton excels at building maintainble, scalable reprsentations of ETLs.
If you've read through the other guides, it should be pretty clear how hamilton enables transformations (the T in ETL/ELT).
In this example, we'll talk about an approach to _Extracting_ data, and how Hamilton enables you to build out extracts,
in a scalable, pluggable way. For example, being able to switch where data is loaded between development and production
is useful, since you might only want a subsample in development, or even load it from a different source.
Here we'll show you how you can achieve this without cluttering your code with `if else`,
which will make your dataflow easier to maintain in the long run.

The goal is to show you two things:

1. How to load data from various sources
2. How to switch the sources of data you're loading from by swapping out modules, using[polymorphism](https://en.wikipedia.org/wiki/Polymorphism_(computer_science))

As such, we have three data loaders to use:

1. [load_data_mock.py](load_data_mock.py): generates mock data on the fly. Meant to represent a unit-testing/quick iteration scenario.
2. [load_data_csv.py](load_data_csv.py): Uses CSV data. Meant to represent more ad-hoc research.
3. [load_data_duckdb.py](load_data_duckdb.py) Uses a duckdb database (saved locally). Meant to represent more production-ready dataflows,
as well as demonstrate the ease of working with duckdb.

But this is hardly exclusive, or exhaustive. One can easily imagine loading from snowflake, your custom datawarehouse, hdfs, etc...
All by swapping out the data loaders.

# Generating the data

The data comes pregenerated in (test_data)[test_data], but we also included a script (written with hamilton!) to generate new mock data for you.

To run it, run:

- `python generate_test_data.py setup-duck-db --db-path my_data.duckdb` to create a duckdb database from scratch
- 'python generate_test_data.py setup-csv --db-path my_data' to create a set of csv files that will function as our "database"

# Loading/Analyzing the data

To load/analyze the data, you can run the script `run.py`

- `python run.py csv` reads from the `.csv` files and runs all the variables
- `python run.py duckdb` reads from the `duckdb` database and runs all the variables
- `python run.py mock` creates mock data and runs the pipeline

Note that you, as the user, have to manually handle connections/whatnot for duckdb.
We are currently designing the ability to do this natively in hamilton: https://github.com/stitchfix/hamilton/issues/197.
Empty file.
215 changes: 215 additions & 0 deletions examples/data_loaders/generate_test_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import abc
import os
from types import ModuleType
from typing import Any, Dict, List, Optional

import click
import duckdb as duckdb
import pandas as pd

from examples.data_loaders.utils import generate_random_walk_time_series
from hamilton import ad_hoc_utils, driver, function_modifiers
from hamilton.function_modifiers import tag

"""Simple file to generate test data. This will then be saved to various locations for the rest of the example."""


def index(start_date: str = "20200101", end_date: str = "20220901") -> pd.Series:
return pd.Series(pd.date_range(start_date, end_date))


@tag(**{"materialization.table": "marketing_spend"})
def marketing_spend_by_channel(index: pd.Series) -> pd.DataFrame:
"""Marketing spend by channel. Randomly generated, meant to be increasing to broadcast growth.
In this simple case, marketing spend is not partitioned by business line
:param index: TS index to use to generate data
:return:
"""
data = {
"facebook": generate_random_walk_time_series(
num_datapoints=len(index),
start_value=10000,
step_mean=100,
step_stddev=2000,
min_value=0,
),
"twitter": generate_random_walk_time_series(
num_datapoints=len(index),
start_value=10000,
step_mean=50,
step_stddev=1000,
min_value=0,
),
"tv": generate_random_walk_time_series(
num_datapoints=len(index),
start_value=15000,
step_mean=40,
step_stddev=1400,
min_value=0,
),
"youtube": generate_random_walk_time_series(
num_datapoints=len(index),
start_value=10000,
step_mean=40,
step_stddev=1600,
min_value=0,
),
"radio": generate_random_walk_time_series(
num_datapoints=len(index), start_value=5000, step_mean=20, step_stddev=800, min_value=0
),
"billboards": generate_random_walk_time_series(
num_datapoints=len(index), start_value=1000, step_mean=10, step_stddev=800, min_value=0
),
"date": index,
}
return pd.DataFrame(data=data)


@tag(**{"materialization.table": "signups"})
def signups_by_business_line(index: pd.Series) -> pd.DataFrame:
data = {
"womens": generate_random_walk_time_series(
num_datapoints=len(index),
start_value=1000,
step_mean=1,
step_stddev=20,
min_value=0,
apply=int,
),
"mens": generate_random_walk_time_series(
num_datapoints=len(index),
start_value=1000,
step_mean=1,
step_stddev=20,
min_value=0,
apply=int,
),
"date": index,
}
return pd.DataFrame(data)


@tag(**{"materialization.table": "churn"})
def churn_by_business_line(index: pd.Series) -> pd.DataFrame:
data = {
"womens": generate_random_walk_time_series(
num_datapoints=len(index),
start_value=100,
step_mean=0.05,
step_stddev=3,
min_value=0,
apply=int,
),
"mens": generate_random_walk_time_series(
num_datapoints=len(index),
start_value=100,
step_mean=0.05,
step_stddev=3,
min_value=0,
apply=int,
),
"date": index,
}
return pd.DataFrame(data)


@click.group()
def main():
pass


class MaterializationDriver(driver.Driver, abc.ABC):
def __init__(self, config: Dict[str, Any], *modules: ModuleType):
super(MaterializationDriver, self).__init__(config, *modules)

@abc.abstractmethod
def materialize(self, df: pd.DataFrame, table: str):
"""Materializes (saves) the specified dataframe to a db/table combo
:param db:
:param table:
:return:
"""
pass

def materialize_to(self, var: driver.Variable) -> Optional[str]:
"""Returns a db, dtable tuple of materialization
:param var: Variable representing the node in the hamilton DAG
:return: None if we want to bypass materialization, else a string representing the "table"
"""
if "materialization.table" in var.tags:
if var.type != pd.DataFrame:
raise ValueError(
f"Node: {var.name} requests materialization but does not produce a pandas dataframe, rather a: {var.type}"
)
return var.tags["materialization.table"]
return None

def execute_and_materialize(
self, overrides: Dict[str, Any] = None, inputs: Dict[str, Any] = None
):
"""Executes and materializes it
:param overrides:
:param inputs:
:return:
"""
nodes_to_materialize = [
var for var in self.list_available_variables() if self.materialize_to(var) is not None
]
raw_execute_results = self.raw_execute(
[var.name for var in nodes_to_materialize], overrides=overrides, inputs=inputs
)
for node in nodes_to_materialize:
self.materialize(raw_execute_results[node.name], self.materialize_to(node))


class DuckDBMaterializationDriver(MaterializationDriver):
def __init__(self, path: str, config: Dict[str, Any], modules: List[ModuleType]):
super(DuckDBMaterializationDriver, self).__init__(config, *modules)
self.con = duckdb.connect(database=path, read_only=False)

def materialize(self, df: pd.DataFrame, table: str):
self.con.execute(f"CREATE TABLE {table} AS SELECT * from df")
self.con.fetchall()

def close(self):
self.con.close()


class CSVMaterializationDriver(MaterializationDriver):
def __init__(self, path: str, config: Dict[str, Any], modules: List[ModuleType]):
super(CSVMaterializationDriver, self).__init__(config, *modules)
self.path = path

def materialize(self, df: pd.DataFrame, table: str):
if not os.path.exists(self.path):
os.makedirs(self.path, exist_ok=True)
df.to_csv(os.path.join(self.path, f"{table}.csv"))


def _get_module() -> ModuleType:
return ad_hoc_utils.create_temporary_module(
index, marketing_spend_by_channel, signups_by_business_line, churn_by_business_line
)


@main.command()
@click.option("--db-path", type=click.Path(exists=False), required=True)
def setup_duck_db(db_path: str):
driver = DuckDBMaterializationDriver(path=db_path, config={}, modules=[_get_module()])
driver.execute_and_materialize()
driver.close()


@main.command()
@click.option("--db-path", type=click.Path(exists=False))
def setup_csv(db_path: str):
driver = CSVMaterializationDriver(path=db_path, config={}, modules=[_get_module()])
driver.execute_and_materialize()


if __name__ == "__main__":
main()
15 changes: 15 additions & 0 deletions examples/data_loaders/load_data_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os

import pandas as pd


def spend(db_path: str) -> pd.DataFrame:
return pd.read_csv(os.path.join(db_path, "marketing_spend.csv"))


def churn(db_path: str) -> pd.DataFrame:
return pd.read_csv(os.path.join(db_path, "marketing_spend.csv"))


def signups(db_path: str) -> pd.DataFrame:
return pd.read_csv(os.path.join(db_path, "marketing_spend.csv"))
18 changes: 18 additions & 0 deletions examples/data_loaders/load_data_duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import duckdb
import pandas as pd


def connection(db_path: str) -> duckdb.DuckDBPyConnection:
return duckdb.connect(database=db_path)


def spend(connection: duckdb.DuckDBPyConnection) -> pd.DataFrame:
return connection.execute("select * from marketing_spend").fetchdf()


def churn(connection: duckdb.DuckDBPyConnection) -> pd.DataFrame:
return connection.execute("select * from churn").fetchdf()


def signups(connection: duckdb.DuckDBPyConnection) -> pd.DataFrame:
return connection.execute("select * from signups").fetchdf()
20 changes: 20 additions & 0 deletions examples/data_loaders/load_data_mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os

import generate_test_data
import pandas as pd


def index(start_date: str = "20220801", end_date: str = "20220801") -> pd.Series:
return pd.Series(pd.date_range(start=start_date, end=end_date))


def spend(index: pd.Series) -> pd.DataFrame:
return generate_test_data.marketing_spend_by_channel(index)


def churn(index: pd.Series) -> pd.DataFrame:
return generate_test_data.churn_by_business_line(index)


def signups(index: pd.Series) -> pd.DataFrame:
return generate_test_data.signups_by_business_line(index)
78 changes: 78 additions & 0 deletions examples/data_loaders/prep_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import pandas as pd

from hamilton.function_modifiers import does, extract_columns, parameterize, source, value


def _sum_series(**series):
return sum(series.values())


@extract_columns(
"facebook_spend",
"twitter_spend",
"tv_spend",
"youtube_spend",
"radio_spend",
"billboards_spend",
"womens_churn",
"mens_churn",
"womens_signups",
"mens_signups",
)
def joined_data(spend: pd.DataFrame, signups: pd.DataFrame, churn: pd.DataFrame) -> pd.DataFrame:
spend = spend.set_index("date").rename(columns=lambda col: col + "_spend")
churn = churn.set_index("date").rename(columns=lambda col: col + "_churn")
signups = signups.set_index("date").rename(columns=lambda col: col + "_signups")
return pd.concat([spend, churn, signups], axis=1)


@does(_sum_series)
def total_marketing_spend(
facebook_spend: pd.Series,
twitter_spend: pd.Series,
tv_spend: pd.Series,
youtube_spend: pd.Series,
radio_spend: pd.Series,
billboards_spend: pd.Series,
) -> pd.Series:
pass


@does(_sum_series)
def total_signups(mens_signups: pd.Series, womens_signups: pd.Series) -> pd.Series:
pass


@does(_sum_series)
def total_churn(mens_churn: pd.Series, womens_churn: pd.Series) -> pd.Series:
pass


def total_customers(total_signups: pd.Series, total_churn: pd.Series) -> pd.Series:
customer_deltas = total_signups + total_churn
return customer_deltas.cumsum()


def acquisition_cost(total_marketing_spend: pd.Series, total_signups: pd.Series) -> pd.Series:
return total_marketing_spend / total_signups


@parameterize(
twitter_spend_smoothed={"lookback_days": value(7), "spend": source("twitter_spend")},
facebook_spend_smoothed={"lookback_days": value(7), "spend": source("facebook_spend")},
radio_spend_smoothed={"lookback_days": value(21), "spend": source("radio_spend")},
tv_spend_smoothed={"lookback_days": value(21), "spend": source("tv_spend")},
billboards_spend_smoothed={"lookback_days": value(7), "spend": source("billboards_spend")},
youtube_spend_smoothed={"lookback_days": value(7), "spend": source("twitter_spend")},
)
def spend_smoothed(lookback_days: int, spend: pd.Series) -> pd.Series:
"""{spend} smoothed by {lookback_days}. Might want to smooth different ad spends differently,
figuring that it takes different amounts of time to get to the customer. A cheap hack at determining
auto-correlation of a series -- this should be a parameter in a model,
but this is to demonstrate the framework
:param lookback_days: Days to smooth over
:param spend: Spend source
:return:
"""
return spend.rolling(window=lookback_days).mean()
2 changes: 2 additions & 0 deletions examples/data_loaders/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
click
duckdb==0.5.0
Loading

0 comments on commit 3e213a1

Please sign in to comment.