Skip to content

Commit

Permalink
Add tests and docs, refactor mutate
Browse files Browse the repository at this point in the history
Testing with unit tests inidividual pieces and examples have a new smoke
test / abstract exampe how to use mutate. In that example are also edge
cases and how they are handled or if they are supported.

Added a more user friendly example by converting the existing
pipe_output example and replaced the pipe_output implementation by
mutate.

Light refactoring / improved naming and added TODOs.
  • Loading branch information
jernejfrank committed Oct 7, 2024
1 parent 57a3512 commit 6f35ab9
Show file tree
Hide file tree
Showing 16 changed files with 4,090 additions and 97 deletions.
Binary file added dag_example_module.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
25 changes: 23 additions & 2 deletions docs/reference/decorators/pipe.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
=======================
pipe
pipe family
=======================

We have a family of decorators that can help with transforming the input and output of a node in the DAG. For a hands on example have a look at https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/scikit-learn/species_distribution_modeling
We have a family of decorators that represent a chained set of transformations. This specifically solves the "node redefinition"
problem, and is meant to represent a pipeline of chaining/redefinitions. This is similar (and can happily be
used in conjunction with) ``pipe`` in pandas. In Pyspark this is akin to the common operation of redefining a dataframe
with new columns.

For some examples have a look at: https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/scikit-learn/species_distribution_modeling

While it is generally reasonable to contain constructs within a node's function,
you should consider the pipe family for any of the following reasons:

1. You want the transformations to display as nodes in the DAG, with the possibility of storing or visualizing
the result.

2. You want to pull in functions from an external repository, and build the DAG a little more procedurally.

3. You want to use the same function multiple times, but with different parameters -- while ``@does`` / ``@parameterize`` can
do this, this presents an easier way to do this, especially in a chain.

--------------

Expand All @@ -24,3 +40,8 @@ pipe_output
----------------
.. autoclass:: hamilton.function_modifiers.macros.pipe_output
:special-members: __init__

mutate
----------------
.. autoclass:: hamilton.function_modifiers.macros.mutate
:special-members: __init__
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
31 changes: 31 additions & 0 deletions examples/mutate/abstract functionality blueprint/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Mutate

We give some application suggestions for mutating the outputs of functions in a distributed manner with `@mutate.`
When you scroll through the notebook we build examples from most straight forward applications to more complex logic that showcases the amount of flexibility you get with this decorator.

Mutate gives the ability to apply the same transformation to the each output of multiple functions in the DAG. It can be particularly useful in the following scenarios:

1. Loading data and applying pre-cleaning step.
2. Feature engineering via joining, filtering, sorting, applying adjustment factors on a per column basis etc.
3. Experimenting with different transformations across nodes by selectively turning transformations on / off.


and effectively replaces:
1. Having to have unique names and then changing wiring if you want to add/remove/replace something.
2. Enabling more verb like names on functions.
3. Potentially simpler "reuse" of transform functions across DAG paths...

# Modules
The same modules can be viewed and executed in `notebook.ipynb`.

We have six modules:
1. procedural.py: basic example without using Hamilton
2. pipe_output.py: how the above would be implemented using `pipe_output` from Hamilton
3. mutate.py: how the above would be implemented using `mutate`
4. pipe_output_on_output.py: functionality that allows to apply `pipe_output` to user selected nodes (comes in handy with `extract_columns`/`extract_fields`)
5. mutate_on_output.py: same as above but implemented using `mutate`
6. mutate_twice_the_same: how you would apply the same transformation on a node twice usign `mutate`

that demonstrate the same behavior achieved either without Hamilton, using `pipe_output` or `mutate` and that should give you some idea of potential application

![image info](./DAG.png)
78 changes: 78 additions & 0 deletions examples/mutate/abstract functionality blueprint/mutate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from typing import Any, List

import pandas as pd

from hamilton.function_modifiers import mutate, source, value


def data_1() -> pd.DataFrame:
df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]})
return df


def data_2() -> pd.DataFrame:
df = pd.DataFrame.from_dict(
{"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]}
)
return df


def data_3() -> pd.DataFrame:
df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]})
return df


# data1 and data2
@mutate(data_1, data_2)
def _filter(some_data: pd.DataFrame) -> pd.DataFrame:
"""Remove NAN values.
Decorated with mutate this will be applied to both data_1 and data_2.
"""
return some_data.dropna()


# data 2
# this is for value
@mutate(data_2, missing_row=value(["c", 145]))
def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame:
"""Add row to dataframe.
The functions decorated with mutate can be viewed as steps in pipe_output in the order they
are implemented. This means that data_2 had a row removed with NAN and here we add back a row
by hand that replaces that row.
"""
some_data.loc[-1] = missing_row
return some_data


# data 2
# this is for source
@mutate(data_2, other_data=source("data_3"))
def _join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame:
"""Join two dataframes.
We can use results from other nodes in the DAG by using the `source` functionality. Here we join
data_2 table with another table - data_3 - that is the output of another node.
"""
return some_data.set_index("col_2").join(other_data.set_index("col_1"))


# data1 and data2
@mutate(data_1, data_2)
def _sort(some_data: pd.DataFrame) -> pd.DataFrame:
"""Sort dataframes by first column.
This is the last step of our pipeline(s) and gets again applied to data_1 and data_2. We did some
light pre-processing on data_1 by removing NANs and sorting and more elaborate pre-processing on
data_2 where we added values and joined another table.
"""
columns = some_data.columns
return some_data.sort_values(by=columns[0])


def feat_A(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame:
"""Combining two raw dataframes to create a feature."""
return (
data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1"))
).reset_index(names=["col_0"])
124 changes: 124 additions & 0 deletions examples/mutate/abstract functionality blueprint/mutate_on_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from typing import Any, Dict, List

import pandas as pd

from hamilton.function_modifiers import (
apply_to,
extract_columns,
extract_fields,
mutate,
source,
value,
)


def data_1() -> pd.DataFrame:
df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]})
return df


def data_2() -> pd.DataFrame:
df = pd.DataFrame.from_dict(
{"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]}
)
return df


def data_3() -> pd.DataFrame:
df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]})
return df


@extract_fields({"field_1": pd.Series, "field_2": pd.Series})
def feat_A(data_1: pd.DataFrame, data_2: pd.DataFrame) -> Dict[str, pd.Series]:
df = (
data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1"))
).reset_index(names=["col_0"])
return {"field_1": df.iloc[:, 1], "field_2": df.iloc[:, 2]}


@extract_columns("col_2", "col_3")
def feat_B(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame:
return (
data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1"))
).reset_index(names=["col_0"])


def feat_C(field_1: pd.Series, col_3: pd.Series) -> pd.DataFrame:
return pd.concat([field_1, col_3], axis=1)


def feat_D(field_2: pd.Series, col_2: pd.Series) -> pd.DataFrame:
return pd.concat([field_2, col_2], axis=1)


# data1 and data2
@mutate(apply_to(data_1).when_in(a=[1, 2, 3]), apply_to(data_2).when_not_in(a=[1, 2, 3]))
def _filter(some_data: pd.DataFrame) -> pd.DataFrame:
"""Remove NAN values.
Mutate accepts a `config.*` family conditional where we can choose when the transform will be applied
onto the target function.
"""
return some_data.dropna()


# data 2
# this is for value
@mutate(apply_to(data_2), missing_row=value(["c", 145]))
def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame:
"""Add row to dataframe.
The functions decorated with mutate can be viewed as steps in pipe_output in the order they
are implemented. This means that data_2 had a row removed with NAN and here we add back a row
by hand that replaces that row.
"""
some_data.loc[-1] = missing_row
return some_data


# data 2
# this is for source
@mutate(
apply_to(data_2).named(name="", namespace="some_random_namespace"), other_data=source("data_3")
)
def join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame:
"""Join two dataframes.
We can use results from other nodes in the DAG by using the `source` functionality. Here we join
data_2 table with another table - data_3 - that is the output of another node.
In addition, mutate also support adding custom names to the nodes.
"""
return some_data.set_index("col_2").join(other_data.set_index("col_1"))


# data1 and data2
@mutate(apply_to(data_1), apply_to(data_2))
def sort(some_data: pd.DataFrame) -> pd.DataFrame:
"""Sort dataframes by first column.
This is the last step of our pipeline(s) and gets again applied to data_1 and data_2. We did some
light pre-processing on data_1 by removing NANs and sorting and more elaborate pre-processing on
data_2 where we added values and joined another table.
"""
columns = some_data.columns
return some_data.sort_values(by=columns[0])


# we want to apply some adjustment coefficient to all the columns of feat_B, but only to field_1 of feat_A
@mutate(
apply_to(feat_A, factor=value(100)).on_output("field_1").named("Europe"),
apply_to(feat_B, factor=value(10)).named("US"),
)
def _adjustment_factor(some_data: pd.Series, factor: float) -> pd.Series:
"""Adjust the value by some factor.
You can imagine this step occurring later in time. We first constructed our DAG with features A and
B only to realize that something is off. We are now experimenting post-hoc to improve and find the
best possible features.
We first split the features by columns of interest and then adjust them by a regional factor to
combine them into improved features we can use further down the pipeline.
"""
return some_data * factor
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from hamilton.function_modifiers import mutate


def data_1() -> int:
return 10


@mutate(data_1)
def add_something(user_input: int) -> int:
return user_input + 100


@mutate(data_1)
def add_something_more(user_input: int) -> int:
return user_input + 1000


@mutate(data_1)
def add_something(user_input: int) -> int: # noqa
return user_input + 100
Loading

0 comments on commit 6f35ab9

Please sign in to comment.