-
Notifications
You must be signed in to change notification settings - Fork 133
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
b78374b
commit 9c58a46
Showing
10 changed files
with
1,274 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# Mutate | ||
|
||
We demonstrate the ability to mutate the outputs of functions in a distributed manner with `@mutate.` | ||
|
||
Mutate gives the ability to apply the same transformation to the each output of multiple functions in the DAG. It can be particularly useful in the following scenarios: | ||
|
||
1. Loading data and applying pre-cleaning step. | ||
2. Feature engineering via joining, filtering, sorting, etc. | ||
3. Experimenting with different transformations across nodes by selectively turning transformations on / off. | ||
|
||
|
||
and effectively replaces: | ||
1. Having to have unique names and then changing wiring if you want to add/remove/replace something. | ||
2. Enabling more verb like names on functions. | ||
3. Potentially simpler "reuse" of transform functions across DAG paths... | ||
|
||
# Modules | ||
The same modules can be viewed and executed in `notebook.ipynb`. | ||
|
||
We have three modules: | ||
1. procedural.py | ||
2. pipe_output.py | ||
3. mutate.py | ||
|
||
that demonstrate the same behavior achieved either without Hamilton, using `pipe_output` or `mutate` and that should give you some idea of a potential application. | ||
|
||
![image info](./dag.png) | ||
|
||
# Description | ||
We have two complimentary decorators that can help with transforming input / output of a node in the DAG: `pipe_input` and `pipe_output`. |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
from typing import Any, List | ||
|
||
import pandas as pd | ||
|
||
from hamilton.function_modifiers import mutate, source, value | ||
|
||
|
||
def data_1() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]}) | ||
return df | ||
|
||
|
||
def data_2() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict( | ||
{"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]} | ||
) | ||
return df | ||
|
||
|
||
def data_3() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]}) | ||
return df | ||
|
||
|
||
# data1 and data2 | ||
@mutate(data_1, data_2) | ||
def _filter(some_data: pd.DataFrame) -> pd.DataFrame: | ||
return some_data.dropna() | ||
|
||
|
||
# data 2 | ||
# this is for value | ||
@mutate(data_2, missing_row=value(["c", 145])) | ||
def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame: | ||
some_data.loc[-1] = missing_row | ||
return some_data | ||
|
||
|
||
# data 2 | ||
# this is for source | ||
@mutate(data_2, other_data=source("data_3")) | ||
def _join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame: | ||
return some_data.set_index("col_2").join(other_data.set_index("col_1")) | ||
|
||
|
||
# data1 and data2 | ||
@mutate(data_1, data_2) | ||
def _sort(some_data: pd.DataFrame) -> pd.DataFrame: | ||
columns = some_data.columns | ||
return some_data.sort_values(by=columns[0]) |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
from typing import Any, List | ||
|
||
import pandas as pd | ||
|
||
from hamilton.function_modifiers import pipe_output, source, step, value | ||
|
||
|
||
# data1 and data2 | ||
def _filter(some_data: pd.DataFrame) -> pd.DataFrame: | ||
return some_data.dropna() | ||
|
||
|
||
# data 2 | ||
# this is for value | ||
def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame: | ||
some_data.loc[-1] = missing_row | ||
return some_data | ||
|
||
|
||
# data 2 | ||
# this is for source | ||
def _join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame: | ||
return some_data.set_index("col_2").join(other_data.set_index("col_1")) | ||
|
||
|
||
# data1 and data2 | ||
def _sort(some_data: pd.DataFrame) -> pd.DataFrame: | ||
columns = some_data.columns | ||
return some_data.sort_values(by=columns[0]) | ||
|
||
|
||
@pipe_output( | ||
step(_filter), | ||
step(_sort), | ||
) | ||
def data_1() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]}) | ||
return df | ||
|
||
|
||
@pipe_output( | ||
step(_filter), | ||
step(_add_missing_value, missing_row=value(["c", 145])), | ||
step(_join, other_data=source("data_3")), | ||
step(_sort), | ||
) | ||
def data_2() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict( | ||
{"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]} | ||
) | ||
return df | ||
|
||
|
||
def data_3() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]}) | ||
return df |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
from typing import Any, List | ||
|
||
import pandas as pd | ||
|
||
|
||
def data_1() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]}) | ||
return df | ||
|
||
|
||
def data_2() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict( | ||
{"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]} | ||
) | ||
return df | ||
|
||
|
||
def data_3() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]}) | ||
return df | ||
|
||
|
||
# data1 and data2 | ||
def _filter(some_data: pd.DataFrame) -> pd.DataFrame: | ||
return some_data.dropna() | ||
|
||
|
||
# data 2 | ||
# this is for value | ||
def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame: | ||
some_data.loc[-1] = missing_row | ||
return some_data | ||
|
||
|
||
# data 2 | ||
# this is for source | ||
def _join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame: | ||
return some_data.set_index("col_2").join(other_data.set_index("col_1")) | ||
|
||
|
||
# data1 and data2 | ||
def _sort(some_data: pd.DataFrame) -> pd.DataFrame: | ||
columns = some_data.columns | ||
return some_data.sort_values(by=columns[0]) | ||
|
||
|
||
if __name__ == "__main__": | ||
# print("Filter data 1") | ||
# print(_filter(data_1())) | ||
# print("Sort data 1") | ||
print("Final data 1") | ||
print(_sort(_filter(data_1()))) | ||
# print("Filter data 2") | ||
# print(_filter(data_2())) | ||
# print("Add missing value data 2") | ||
# print(_add_missing_value(_filter(data_2()),missing_row=['c', 145])) | ||
# print("Join data 2 and data 3") | ||
# print(_join(_add_missing_value(_filter(data_2()),missing_row=['c', 145]),other_data=data_3())) | ||
# print("Sort joined dataframe") | ||
print("Final data 2") | ||
print( | ||
_sort( | ||
_join( | ||
_add_missing_value(_filter(data_2()), missing_row=["c", 145]), other_data=data_3() | ||
) | ||
) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Any, List | ||
|
||
import pandas as pd | ||
|
||
|
||
def data_1() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]}) | ||
return df | ||
|
||
|
||
def data_2() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict( | ||
{"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]} | ||
) | ||
return df | ||
|
||
|
||
def data_3() -> pd.DataFrame: | ||
df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]}) | ||
return df | ||
|
||
|
||
# data1 and data2 | ||
def _filter(some_data: pd.DataFrame) -> pd.DataFrame: | ||
return some_data.dropna() | ||
|
||
|
||
# data 2 | ||
# this is for value | ||
def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame: | ||
some_data.loc[-1] = missing_row | ||
return some_data | ||
|
||
|
||
# data 2 | ||
# this is for source | ||
def _join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame: | ||
return some_data.set_index("col_2").join(other_data.set_index("col_1")) | ||
|
||
|
||
# data1 and data2 | ||
def _sort(some_data: pd.DataFrame) -> pd.DataFrame: | ||
columns = some_data.columns | ||
return some_data.sort_values(by=columns[0]) | ||
|
||
|
||
if __name__ == "__main__": | ||
print("Filter data 1") | ||
print(_filter(data_1())) | ||
print("Sort data 1") | ||
print(_sort(_filter(data_1()))) | ||
print("Filter data 2") | ||
print(_filter(data_2())) | ||
print("Add missing value data 2") | ||
print(_add_missing_value(_filter(data_2()), missing_row=["c", 145])) | ||
print("Join data 2 and data 3") | ||
print(_join(_add_missing_value(_filter(data_2()), missing_row=["c", 145]), other_data=data_3())) | ||
print("Sort joined dataframe") | ||
print( | ||
_sort( | ||
_join( | ||
_add_missing_value(_filter(data_2()), missing_row=["c", 145]), other_data=data_3() | ||
) | ||
) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.