-
Notifications
You must be signed in to change notification settings - Fork 133
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Testing with unit tests inidividual pieces and examples have a new smoke test / abstract exampe how to use mutate. In that example are also edge cases and how they are handled or if they are supported. Added a more user friendly example by converting the existing pipe_output example and replaced the pipe_output implementation by mutate. Light refactoring / improved naming and added TODOs.
- Loading branch information
1 parent
b51d75d
commit 69c94cc
Showing
16 changed files
with
4,090 additions
and
97 deletions.
There are no files selected for viewing
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
# Mutate | ||
|
||
We give some application suggestions for mutating the outputs of functions in a distributed manner with `@mutate.` | ||
When you scroll through the notebook we build examples from most straight forward applications to more complex logic that showcases the amount of flexibility you get with this decorator. | ||
|
||
Mutate gives the ability to apply the same transformation to the each output of multiple functions in the DAG. It can be particularly useful in the following scenarios: | ||
|
||
1. Loading data and applying pre-cleaning step. | ||
2. Feature engineering via joining, filtering, sorting, applying adjustment factors on a per column basis etc. | ||
3. Experimenting with different transformations across nodes by selectively turning transformations on / off. | ||
|
||
|
||
and effectively replaces: | ||
1. Having to have unique names and then changing wiring if you want to add/remove/replace something. | ||
2. Enabling more verb like names on functions. | ||
3. Potentially simpler "reuse" of transform functions across DAG paths... | ||
|
||
# Modules | ||
The same modules can be viewed and executed in `notebook.ipynb`. | ||
|
||
We have six modules: | ||
1. procedural.py: basic example without using Hamilton | ||
2. pipe_output.py: how the above would be implemented using `pipe_output` from Hamilton | ||
3. mutate.py: how the above would be implemented using `mutate` | ||
4. pipe_output_on_output.py: functionality that allows to apply `pipe_output` to user selected nodes (comes in handy with `extract_columns`/`extract_fields`) | ||
5. mutate_on_output.py: same as above but implemented using `mutate` | ||
6. mutate_twice_the_same: how you would apply the same transformation on a node twice usign `mutate` | ||
|
||
that demonstrate the same behavior achieved either without Hamilton, using `pipe_output` or `mutate` and that should give you some idea of potential application | ||
|
||
![image info](./DAG.png) |
78 changes: 78 additions & 0 deletions
78
examples/mutate/abstract functionality blueprint/mutate.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
from typing import Any, List | ||
|
||
import pandas as pd | ||
|
||
from hamilton.function_modifiers import mutate, source, value | ||
|
||
|
||
def data_1() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]}) | ||
return df | ||
|
||
|
||
def data_2() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict( | ||
{"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]} | ||
) | ||
return df | ||
|
||
|
||
def data_3() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]}) | ||
return df | ||
|
||
|
||
# data1 and data2 | ||
@mutate(data_1, data_2) | ||
def _filter(some_data: pd.DataFrame) -> pd.DataFrame: | ||
"""Remove NAN values. | ||
Decorated with mutate this will be applied to both data_1 and data_2. | ||
""" | ||
return some_data.dropna() | ||
|
||
|
||
# data 2 | ||
# this is for value | ||
@mutate(data_2, missing_row=value(["c", 145])) | ||
def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame: | ||
"""Add row to dataframe. | ||
The functions decorated with mutate can be viewed as steps in pipe_output in the order they | ||
are implemented. This means that data_2 had a row removed with NAN and here we add back a row | ||
by hand that replaces that row. | ||
""" | ||
some_data.loc[-1] = missing_row | ||
return some_data | ||
|
||
|
||
# data 2 | ||
# this is for source | ||
@mutate(data_2, other_data=source("data_3")) | ||
def _join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame: | ||
"""Join two dataframes. | ||
We can use results from other nodes in the DAG by using the `source` functionality. Here we join | ||
data_2 table with another table - data_3 - that is the output of another node. | ||
""" | ||
return some_data.set_index("col_2").join(other_data.set_index("col_1")) | ||
|
||
|
||
# data1 and data2 | ||
@mutate(data_1, data_2) | ||
def _sort(some_data: pd.DataFrame) -> pd.DataFrame: | ||
"""Sort dataframes by first column. | ||
This is the last step of our pipeline(s) and gets again applied to data_1 and data_2. We did some | ||
light pre-processing on data_1 by removing NANs and sorting and more elaborate pre-processing on | ||
data_2 where we added values and joined another table. | ||
""" | ||
columns = some_data.columns | ||
return some_data.sort_values(by=columns[0]) | ||
|
||
|
||
def feat_A(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame: | ||
"""Combining two raw dataframes to create a feature.""" | ||
return ( | ||
data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1")) | ||
).reset_index(names=["col_0"]) |
124 changes: 124 additions & 0 deletions
124
examples/mutate/abstract functionality blueprint/mutate_on_output.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
from typing import Any, Dict, List | ||
|
||
import pandas as pd | ||
|
||
from hamilton.function_modifiers import ( | ||
apply_to, | ||
extract_columns, | ||
extract_fields, | ||
mutate, | ||
source, | ||
value, | ||
) | ||
|
||
|
||
def data_1() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]}) | ||
return df | ||
|
||
|
||
def data_2() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict( | ||
{"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]} | ||
) | ||
return df | ||
|
||
|
||
def data_3() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]}) | ||
return df | ||
|
||
|
||
@extract_fields({"field_1": pd.Series, "field_2": pd.Series}) | ||
def feat_A(data_1: pd.DataFrame, data_2: pd.DataFrame) -> Dict[str, pd.Series]: | ||
df = ( | ||
data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1")) | ||
).reset_index(names=["col_0"]) | ||
return {"field_1": df.iloc[:, 1], "field_2": df.iloc[:, 2]} | ||
|
||
|
||
@extract_columns("col_2", "col_3") | ||
def feat_B(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame: | ||
return ( | ||
data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1")) | ||
).reset_index(names=["col_0"]) | ||
|
||
|
||
def feat_C(field_1: pd.Series, col_3: pd.Series) -> pd.DataFrame: | ||
return pd.concat([field_1, col_3], axis=1) | ||
|
||
|
||
def feat_D(field_2: pd.Series, col_2: pd.Series) -> pd.DataFrame: | ||
return pd.concat([field_2, col_2], axis=1) | ||
|
||
|
||
# data1 and data2 | ||
@mutate(apply_to(data_1).when_in(a=[1, 2, 3]), apply_to(data_2).when_not_in(a=[1, 2, 3])) | ||
def _filter(some_data: pd.DataFrame) -> pd.DataFrame: | ||
"""Remove NAN values. | ||
Mutate accepts a `config.*` family conditional where we can choose when the transform will be applied | ||
onto the target function. | ||
""" | ||
return some_data.dropna() | ||
|
||
|
||
# data 2 | ||
# this is for value | ||
@mutate(apply_to(data_2), missing_row=value(["c", 145])) | ||
def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame: | ||
"""Add row to dataframe. | ||
The functions decorated with mutate can be viewed as steps in pipe_output in the order they | ||
are implemented. This means that data_2 had a row removed with NAN and here we add back a row | ||
by hand that replaces that row. | ||
""" | ||
some_data.loc[-1] = missing_row | ||
return some_data | ||
|
||
|
||
# data 2 | ||
# this is for source | ||
@mutate( | ||
apply_to(data_2).named(name="", namespace="some_random_namespace"), other_data=source("data_3") | ||
) | ||
def join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame: | ||
"""Join two dataframes. | ||
We can use results from other nodes in the DAG by using the `source` functionality. Here we join | ||
data_2 table with another table - data_3 - that is the output of another node. | ||
In addition, mutate also support adding custom names to the nodes. | ||
""" | ||
return some_data.set_index("col_2").join(other_data.set_index("col_1")) | ||
|
||
|
||
# data1 and data2 | ||
@mutate(apply_to(data_1), apply_to(data_2)) | ||
def sort(some_data: pd.DataFrame) -> pd.DataFrame: | ||
"""Sort dataframes by first column. | ||
This is the last step of our pipeline(s) and gets again applied to data_1 and data_2. We did some | ||
light pre-processing on data_1 by removing NANs and sorting and more elaborate pre-processing on | ||
data_2 where we added values and joined another table. | ||
""" | ||
columns = some_data.columns | ||
return some_data.sort_values(by=columns[0]) | ||
|
||
|
||
# we want to apply some adjustment coefficient to all the columns of feat_B, but only to field_1 of feat_A | ||
@mutate( | ||
apply_to(feat_A, factor=value(100)).on_output("field_1").named("Europe"), | ||
apply_to(feat_B, factor=value(10)).named("US"), | ||
) | ||
def _adjustment_factor(some_data: pd.Series, factor: float) -> pd.Series: | ||
"""Adjust the value by some factor. | ||
You can imagine this step occurring later in time. We first constructed our DAG with features A and | ||
B only to realize that something is off. We are now experimenting post-hoc to improve and find the | ||
best possible features. | ||
We first split the features by columns of interest and then adjust them by a regional factor to | ||
combine them into improved features we can use further down the pipeline. | ||
""" | ||
return some_data * factor |
20 changes: 20 additions & 0 deletions
20
examples/mutate/abstract functionality blueprint/mutate_twice_the_same.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from hamilton.function_modifiers import mutate | ||
|
||
|
||
def data_1() -> int: | ||
return 10 | ||
|
||
|
||
@mutate(data_1) | ||
def add_something(user_input: int) -> int: | ||
return user_input + 100 | ||
|
||
|
||
@mutate(data_1) | ||
def add_something_more(user_input: int) -> int: | ||
return user_input + 1000 | ||
|
||
|
||
@mutate(data_1) | ||
def add_something(user_input: int) -> int: # noqa | ||
return user_input + 100 |
Oops, something went wrong.