Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Commit

Permalink
Implements cleaner spec of reuse_functions, moves and renames
Browse files Browse the repository at this point in the history
This is a log nicer. See discussion here for full context:
#86.
  • Loading branch information
elijahbenizzy committed Feb 18, 2023
1 parent 484678f commit 79ac476
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 302 deletions.
87 changes: 55 additions & 32 deletions decorators.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,21 +399,26 @@ Note that you can also specify custom decorators using the `@check_output_custom

See [data_quality](data_quality.md) for more information on available validators and how to build custom ones.

## @reuse_functions
## @subdag

Currently located under `experimental` -- looking for feedback!

The `@reuse_functions` decorator enables you to rerun components of your DAG with varying parameters. Note that this is immensely powerful -- if we
The `@subdag` decorator enables you to rerun components of your DAG with varying parameters. Note that this is immensely powerful -- if we
draw analogies from Hamilton to standard procedural programming paradigms, we might have the following correspondence:

- `config.when` + friends -- `if/else` statements
- `parameterize`/`extract_columns` -- `for` loop
- `does` -- effectively macros
And so on. `@reuse_functions`takes this one step further.
- `@reuse_functions` -- subroutine definition
And so on. `@subdag` takes this one step further.
- `@subdag` -- subroutine definition
E.G. take a certain set of nodes, and run them with specified parameters.
If you're confused as to why you need this decorator, you should probably stop reading (you most likely don't need it). If this solves a pain point you've had, then continue...
Why might you want to use this? Let's take a look at some examples:
1. You have a feature engineering pipeline that you want to run on multiple datasets. If its exactly the same, this is perfect. If not, this
works perfectly as well, you just have to utilize different functions in each or the `config.when` + `config` parameter to rerun it.
2. You want to train multiple models in the same DAG that share some logic (in features or training) -- this allows you to reuse and continually add more.
3. You want to combine multiple similar DAGs (e.g. one for each business line) into one so you can build a cross-business line model.
This basically bridges the gap between the flexibility of non-declarative pipelining frameworks with the readability/maintainability of declarative ones.

Let's take a look at a simplified example (in [examples/](examples/reusing_functions/reusable_subdags.py)).

Expand All @@ -430,28 +435,27 @@ def unique_users(filtered_interactions: pd.DataFrame, grain: str) -> pd.Series:
"""Gives the number of shares traded by the frequency"""
return ...

@reuse_functions(
with_inputs={"grain": value("day")},
namespace="daily_users_US",
outputs={"unique_users": "unique_users_daily_US"},
with_config={"region": "US"},
load_from=[unique_users, interactions_filtered],
@subdag(
unique_users, interactions_filtered,
inputs={"grain": value("day")},
config={"region": "US"},
)
def quarterly_user_data_US() -> reuse.MultiOutput({"unique_users_daily_US": pd.Series}):
"""Calculates quarterly data for just US users"""
pass
def daily_users_US(unique_users: pd.Series) -> pd.Series:
"""Calculates quarterly data for just US users
Note that this just returns the output. You can modify it if you'd like!
"""

return unique_users

@reuse_functions(
with_inputs={"grain": value("day")},
namespace="daily_users_CA",
outputs={"unique_users": "unique_users_daily_CA"},
with_config={"region": "CA"},
load_from=[unique_users, interactions_filtered],

@subdag(
unique_users, interactions_filtered,
inputs={"grain": value("day")},
config={"region": "CA"},
)
def daily_user_data_CA() -> reuse.MultiOutput({"unique_users_daily_CA": pd.Series}):
def daily_users_CA(unique_users: pd.Series) -> pd.Series:
"""Calculates quarterly data for just canada users"""
pass
return unique_users
```

This example tracks users on a per-value user data. Specifically, we track the following:
Expand All @@ -460,14 +464,33 @@ This example tracks users on a per-value user data. Specifically, we track the f
2. Quarterly user data for the US

These each live under a separate namespace -- this exists solely so the two sets of similar nodes can coexist.
Note this set is contrived to demonstarte functionality -- it should be easy to imagine how we could add more variations.
The inputs to the `reuse_functions` decorator takes in a variety of inputs that determine _which_ functions to reuse, _how_ to use them, and _where_ they should live.
- _which_ functions to reuse is specified by the `load_from` input, which is either a collection of modules or a collection of functions. These are used to resolve nodes that will end up in the produced subDAG.
- _how_ to reuse the functions is specified by two parameters. `with_config` provides configuration overrides to use to generate the subDAG (in this case the region), and `with_inputs` provides inputs to the nodes (fimilar to `parameterize`)
- _where_ the produced subDAG shoud live is specified by two parameters. `namespace` gives a namespace under which these nodes live. All this means is that a nodes name will be `{namespace}.{node_name}`.
`outputs` provides a mapping so you can access these later, without referring to the namespace. E.G. `outputs={"unique_users": "unique_users_daily_US"}` means that the `unique_users` output from this
subDAG will get mapped to the node name `unique_users_daily_US`. This way you can use it as a function parameter later on.
Note this set is contrived to demonstrate functionality -- it should be easy to imagine how we could add more variations.

The inputs to the `subdag` decorator takes in a variety of inputs that determine _which_ possible set of functions will constitute the DAG, and then specifically _what_ the DAG is and _how_ it is connected.
- _which_ functions constitute the subdag is specified by the `*args` input, which is a collection of modules/functions. These are used to determine the functions (i.e. nodes) that could end up in the produced subDAG.
- _what_ the sub-DAG is, and _how_ it is connected is specified by two parameters. `config` provides configuration to use to generate the subDAG (in this case the region), and `inputs` provides inputs to connect the subdag with the current DAG (similar in spirit to `@parameterize`).


Note that, if you wanted to do this functionality without this decorator, you'd have two options:
1. Rewrite every function for each scenario -- this is repetetive and doesn't scale
2. Utilize the `driver` within the functions -- E.G.

```python
def daily_users_CA(unique_users: pd.Series) -> pd.Series:
"""Calculates quarterly data for just canada users"""
dr = hamilton.driver.Driver({"region" : "CA"}, unique_users, interactions_filtered)
return dr.execute(["unique_users"], inputs={"grain": value("day")})["unique_users"]
```

While this is a clever approach (and you can use it if you want), there are a few drawbacks:

1. You have no visibility into the subdag -- hamilton has no knowledge about what's being run
2. Any execution parameters have to be reproduced for the driver. E.G. running on dask, etc..., is unknown to the driver.
3. DAG compilation is done at runtime -- you would not know any type-errors until the function is running

That said, this *is* a good mental model for explaining subdag functionality. Think about
it as a driver within a function, but in a way that hamilton can understand and operate.
Best of both worlds!

## @parameterize_extract_columns

Expand Down
12 changes: 12 additions & 0 deletions examples/reusing_functions/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# subdag operator

This README demonstrates the use of the subdag operator.

The subdag operator allows you to effectively run a driver within a node.
In this case, we are calculating unique website visitors from the following set of parameters:

1. Region = CA (canada) or US (United States)
2. Granularity of data = (day, week, month)

You can find the code in [unique_users.py](unique_users.py) and [reusable_subdags.py](reusable_subdags.py)
and look at how we run it in [main.py](main.py).
41 changes: 23 additions & 18 deletions examples/reusing_functions/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,28 @@ def build_result(self, **outputs: Dict[str, Any]) -> Any:
return pd.DataFrame(resampled_results).bfill()


dr = Driver(
{},
reusable_subdags,
adapter=SimplePythonGraphAdapter(
result_builder=TimeSeriesJoinResultsBuilder(upsample_frequency="D")
),
)
def main():
dr = Driver(
{},
reusable_subdags,
adapter=SimplePythonGraphAdapter(
result_builder=TimeSeriesJoinResultsBuilder(upsample_frequency="D")
),
)

result = dr.execute(
[
"daily_unique_users_US",
"daily_unique_users_CA",
"weekly_unique_users_US",
"weekly_unique_users_CA",
"monthly_unique_users_US",
"monthly_unique_users_CA",
]
)

print(result)

result = dr.execute(
[
"unique_users_daily_US",
"unique_users_daily_CA",
"unique_users_weekly_US",
"unique_users_weekly_CA",
"unique_users_monthly_US",
"unique_users_monthly_CA",
]
)

print(result)
if __name__ == "__main__":
main()
88 changes: 37 additions & 51 deletions examples/reusing_functions/reusable_subdags.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import pandas as pd
import unique_users

from hamilton.experimental.decorators import reuse
from hamilton.experimental.decorators.reuse import reuse_functions
from hamilton.function_modifiers import value
from hamilton.function_modifiers import subdag, value


def website_interactions() -> pd.DataFrame:
Expand Down Expand Up @@ -60,67 +58,55 @@ def website_interactions() -> pd.DataFrame:
return df


@reuse_functions(
with_inputs={"grain": value("day"), "region": value("US")},
namespace="daily_users_US",
outputs={"unique_users": "unique_users_daily_US"},
with_config={"region": "US"},
load_from=[unique_users],
@subdag(
unique_users,
inputs={"grain": value("day")},
config={"region": "US"},
)
def daily_user_data_US() -> reuse.MultiOutput(unique_users_daily_US=pd.Series):
pass
def daily_unique_users_US(unique_users: pd.Series) -> pd.Series:
return unique_users


@reuse_functions(
with_inputs={"grain": value("week"), "region": value("US")},
namespace="weekly_users_US",
outputs={"unique_users": "unique_users_weekly_US"},
with_config={"region": "US"},
load_from=[unique_users],
@subdag(
unique_users,
inputs={"grain": value("week")},
config={"region": "US"},
)
def weekly_user_data_US() -> reuse.MultiOutput(unique_users_weekly_US=pd.Series):
pass
def weekly_unique_users_US(unique_users: pd.Series) -> pd.Series:
return unique_users


@reuse_functions(
with_inputs={"grain": value("month"), "region": value("US")},
namespace="monthly_users_US",
outputs={"unique_users": "unique_users_monthly_US"},
with_config={"region": "US"},
load_from=[unique_users],
@subdag(
unique_users,
inputs={"grain": value("month")},
config={"region": "US"},
)
def monthly_user_data_US() -> reuse.MultiOutput(unique_users_monthly_US=pd.Series):
pass
def monthly_unique_users_US(unique_users: pd.Series) -> pd.Series:
return unique_users


@reuse_functions(
with_inputs={"grain": value("day"), "region": value("CA")},
namespace="daily_user_data_CA",
outputs={"unique_users": "unique_users_daily_CA"},
with_config={"region": "CA"},
load_from=[unique_users],
@subdag(
unique_users,
inputs={"grain": value("day")},
config={"region": "CA"},
)
def daily_user_data_CA() -> reuse.MultiOutput(unique_users_daily_CA=pd.Series):
pass
def daily_unique_users_CA(unique_users: pd.Series) -> pd.Series:
return unique_users


@reuse_functions(
with_inputs={"grain": value("month"), "region": value("CA")},
namespace="weekly_user_data_CA",
outputs={"unique_users": "unique_users_weekly_CA"},
with_config={"region": "CA"},
load_from=[unique_users],
@subdag(
unique_users,
inputs={"grain": value("week")},
config={"region": "CA"},
)
def weekly_user_data_CA() -> reuse.MultiOutput(unique_users_weekly_CA=pd.Series):
pass
def weekly_unique_users_CA(unique_users: pd.Series) -> pd.Series:
return unique_users


@reuse_functions(
with_inputs={"grain": value("day"), "region": value("CA")},
namespace="monthly_user_data_CA",
outputs={"unique_users": "unique_users_monthly_CA"},
with_config={"region": "CA"},
load_from=[unique_users],
@subdag(
unique_users,
inputs={"grain": value("month")},
config={"region": "CA"},
)
def monthly_user_data_CA() -> reuse.MultiOutput(unique_users_monthly_CA=pd.Series):
pass
def monthly_unique_users_CA(unique_users: pd.Series) -> pd.Series:
return unique_users
6 changes: 5 additions & 1 deletion hamilton/function_modifiers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from . import base, configuration, dependencies, expanders, macros, metadata, validation
from . import base, configuration, dependencies, expanders, macros, metadata, recursive, validation

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,3 +61,7 @@
check_output_custom = validation.check_output_custom
IS_DATA_VALIDATOR_TAG = validation.IS_DATA_VALIDATOR_TAG
DATA_VALIDATOR_ORIGINAL_OUTPUT_TAG = validation.DATA_VALIDATOR_ORIGINAL_OUTPUT_TAG

# recursive/subdag operators

subdag = recursive.subdag
Loading

0 comments on commit 79ac476

Please sign in to comment.