-
Notifications
You must be signed in to change notification settings - Fork 133
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Testing with unit tests inidividual peices and examples have a new smoke test / abstract exampe how to use mutate. In that example are also edge cases and how they are handled or if they are supported. Added a more user friendly example by converting the existing pipe_output example and replaced the pipe_output implementation by mutate.
- Loading branch information
1 parent
57a3512
commit 325e613
Showing
15 changed files
with
3,844 additions
and
11 deletions.
There are no files selected for viewing
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# Mutate | ||
|
||
We give some application suggestions for mutating the outputs of functions in a distributed manner with `@mutate.` | ||
When you scroll through the notebook we build examples from most straight forward applications to more complex logic that showcases the amount of flexibility you get with this decorator. | ||
|
||
Mutate gives the ability to apply the same transformation to the each output of multiple functions in the DAG. It can be particularly useful in the following scenarios: | ||
|
||
1. Loading data and applying pre-cleaning step. | ||
2. Feature engineering via joining, filtering, sorting, applying adjustment factors on a per column basis etc. | ||
3. Experimenting with different transformations across nodes by selectively turning transformations on / off. | ||
|
||
|
||
and effectively replaces: | ||
1. Having to have unique names and then changing wiring if you want to add/remove/replace something. | ||
2. Enabling more verb like names on functions. | ||
3. Potentially simpler "reuse" of transform functions across DAG paths... | ||
|
||
# Modules | ||
The same modules can be viewed and executed in `notebook.ipynb`. | ||
|
||
We have three modules: | ||
1. procedural.py | ||
2. pipe_output.py | ||
3. pipe_output_on_output.py | ||
4. mutate.py | ||
5. mutate_on_output.py | ||
|
||
that demonstrate the same behavior achieved either without Hamilton, using `pipe_output` or `mutate` and that should give you some idea of potential application | ||
|
||
![image info](./DAG.png) |
56 changes: 56 additions & 0 deletions
56
examples/mutate/abstract functionality blueprint/mutate.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
from typing import Any, List | ||
|
||
import pandas as pd | ||
|
||
from hamilton.function_modifiers import mutate, source, value | ||
|
||
|
||
def data_1() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]}) | ||
return df | ||
|
||
|
||
def data_2() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict( | ||
{"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]} | ||
) | ||
return df | ||
|
||
|
||
def data_3() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]}) | ||
return df | ||
|
||
|
||
# data1 and data2 | ||
@mutate(data_1, data_2) | ||
def _filter(some_data: pd.DataFrame) -> pd.DataFrame: | ||
return some_data.dropna() | ||
|
||
|
||
# data 2 | ||
# this is for value | ||
@mutate(data_2, missing_row=value(["c", 145])) | ||
def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame: | ||
some_data.loc[-1] = missing_row | ||
return some_data | ||
|
||
|
||
# data 2 | ||
# this is for source | ||
@mutate(data_2, other_data=source("data_3")) | ||
def _join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame: | ||
return some_data.set_index("col_2").join(other_data.set_index("col_1")) | ||
|
||
|
||
# data1 and data2 | ||
@mutate(data_1, data_2) | ||
def _sort(some_data: pd.DataFrame) -> pd.DataFrame: | ||
columns = some_data.columns | ||
return some_data.sort_values(by=columns[0]) | ||
|
||
|
||
def feat_A(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame: | ||
return ( | ||
data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1")) | ||
).reset_index(names=["col_0"]) |
91 changes: 91 additions & 0 deletions
91
examples/mutate/abstract functionality blueprint/mutate_on_output.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
from typing import Any, Dict, List | ||
|
||
import pandas as pd | ||
|
||
from hamilton.function_modifiers import ( | ||
apply_to, | ||
extract_columns, | ||
extract_fields, | ||
mutate, | ||
source, | ||
value, | ||
) | ||
|
||
|
||
def data_1() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]}) | ||
return df | ||
|
||
|
||
def data_2() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict( | ||
{"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]} | ||
) | ||
return df | ||
|
||
|
||
def data_3() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]}) | ||
return df | ||
|
||
|
||
@extract_fields({"field_1": pd.Series, "field_2": pd.Series}) | ||
def feat_A(data_1: pd.DataFrame, data_2: pd.DataFrame) -> Dict[str, pd.Series]: | ||
df = ( | ||
data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1")) | ||
).reset_index(names=["col_0"]) | ||
return {"field_1": df.iloc[:, 1], "field_2": df.iloc[:, 2]} | ||
|
||
|
||
@extract_columns("col_2", "col_3") | ||
def feat_B(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame: | ||
return ( | ||
data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1")) | ||
).reset_index(names=["col_0"]) | ||
|
||
|
||
def feat_C(field_1: pd.Series, col_3: pd.Series) -> pd.DataFrame: | ||
return pd.concat([field_1, col_3], axis=1) | ||
|
||
|
||
def feat_D(field_2: pd.Series, col_2: pd.Series) -> pd.DataFrame: | ||
return pd.concat([field_2, col_2], axis=1) | ||
|
||
|
||
# data1 and data2 | ||
@mutate(apply_to(data_1).when_in(a=[1, 2, 3]), apply_to(data_2).when_not_in(a=[1, 2, 3])) | ||
def _filter(some_data: pd.DataFrame) -> pd.DataFrame: | ||
return some_data.dropna() | ||
|
||
|
||
# data 2 | ||
# this is for value | ||
@mutate(apply_to(data_2), missing_row=value(["c", 145])) | ||
def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame: | ||
some_data.loc[-1] = missing_row | ||
return some_data | ||
|
||
|
||
# data 2 | ||
# this is for source | ||
@mutate( | ||
apply_to(data_2).named(name="", namespace="some_random_namespace"), other_data=source("data_3") | ||
) | ||
def join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame: | ||
return some_data.set_index("col_2").join(other_data.set_index("col_1")) | ||
|
||
|
||
# data1 and data2 | ||
@mutate(apply_to(data_1).when_not(a=0), apply_to(data_2).when(a=0)) | ||
def sort(some_data: pd.DataFrame) -> pd.DataFrame: | ||
columns = some_data.columns | ||
return some_data.sort_values(by=columns[0]) | ||
|
||
|
||
# we want to apply some adjustment coefficient to all the columns of feat_B, but only to field_1 of feat_A | ||
@mutate( | ||
apply_to(feat_A, factor=value(100)).on_output("field_1").named("Europe"), | ||
apply_to(feat_B, factor=value(10)).named("US"), | ||
) | ||
def _adjustment_factor(some_data: pd.Series, factor: float) -> pd.Series: | ||
return some_data * factor |
20 changes: 20 additions & 0 deletions
20
examples/mutate/abstract functionality blueprint/mutate_twice_the_same.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from hamilton.function_modifiers import mutate | ||
|
||
|
||
def data_1() -> int: | ||
return 10 | ||
|
||
|
||
@mutate(data_1) | ||
def add_something(user_input: int) -> int: | ||
return user_input + 100 | ||
|
||
|
||
@mutate(data_1) | ||
def add_something_more(user_input: int) -> int: | ||
return user_input + 1000 | ||
|
||
|
||
@mutate(data_1) | ||
def add_something(user_input: int) -> int: # noqa | ||
return user_input + 100 |
Oops, something went wrong.