diff --git a/dag_example_module.png b/dag_example_module.png
new file mode 100644
index 000000000..5351fb719
Binary files /dev/null and b/dag_example_module.png differ
diff --git a/docs/reference/decorators/pipe.rst b/docs/reference/decorators/pipe.rst
index 18d5bbad8..a6cef8751 100644
--- a/docs/reference/decorators/pipe.rst
+++ b/docs/reference/decorators/pipe.rst
@@ -1,8 +1,24 @@
=======================
-pipe
+pipe family
=======================
-We have a family of decorators that can help with transforming the input and output of a node in the DAG. For a hands on example have a look at https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/scikit-learn/species_distribution_modeling
+We have a family of decorators that represent a chained set of transformations. This specifically solves the "node redefinition"
+problem, and is meant to represent a pipeline of chaining/redefinitions. This is similar (and can happily be
+used in conjunction with) ``pipe`` in pandas. In Pyspark this is akin to the common operation of redefining a dataframe
+with new columns.
+
+For some examples have a look at: https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/scikit-learn/species_distribution_modeling
+
+While it is generally reasonable to contain constructs within a node's function,
+you should consider the pipe family for any of the following reasons:
+
+1. You want the transformations to display as nodes in the DAG, with the possibility of storing or visualizing
+the result.
+
+2. You want to pull in functions from an external repository, and build the DAG a little more procedurally.
+
+3. You want to use the same function multiple times, but with different parameters -- while ``@does`` / ``@parameterize`` can
+do this, this presents an easier way to do this, especially in a chain.
--------------
@@ -24,3 +40,8 @@ pipe_output
----------------
.. autoclass:: hamilton.function_modifiers.macros.pipe_output
:special-members: __init__
+
+mutate
+----------------
+.. autoclass:: hamilton.function_modifiers.macros.mutate
+ :special-members: __init__
diff --git a/examples/mutate/abstract functionality blueprint/DAG.png b/examples/mutate/abstract functionality blueprint/DAG.png
new file mode 100644
index 000000000..8ace57862
Binary files /dev/null and b/examples/mutate/abstract functionality blueprint/DAG.png differ
diff --git a/examples/mutate/abstract functionality blueprint/README b/examples/mutate/abstract functionality blueprint/README
new file mode 100644
index 000000000..342fc6033
--- /dev/null
+++ b/examples/mutate/abstract functionality blueprint/README
@@ -0,0 +1,31 @@
+# Mutate
+
+We give some application suggestions for mutating the outputs of functions in a distributed manner with `@mutate.`
+When you scroll through the notebook we build examples from most straight forward applications to more complex logic that showcases the amount of flexibility you get with this decorator.
+
+Mutate gives the ability to apply the same transformation to the each output of multiple functions in the DAG. It can be particularly useful in the following scenarios:
+
+1. Loading data and applying pre-cleaning step.
+2. Feature engineering via joining, filtering, sorting, applying adjustment factors on a per column basis etc.
+3. Experimenting with different transformations across nodes by selectively turning transformations on / off.
+
+
+and effectively replaces:
+1. Having to have unique names and then changing wiring if you want to add/remove/replace something.
+2. Enabling more verb like names on functions.
+3. Potentially simpler "reuse" of transform functions across DAG paths...
+
+# Modules
+The same modules can be viewed and executed in `notebook.ipynb`.
+
+We have six modules:
+1. procedural.py: basic example without using Hamilton
+2. pipe_output.py: how the above would be implemented using `pipe_output` from Hamilton
+3. mutate.py: how the above would be implemented using `mutate`
+4. pipe_output_on_output.py: functionality that allows to apply `pipe_output` to user selected nodes (comes in handy with `extract_columns`/`extract_fields`)
+5. mutate_on_output.py: same as above but implemented using `mutate`
+6. mutate_twice_the_same: how you would apply the same transformation on a node twice usign `mutate`
+
+that demonstrate the same behavior achieved either without Hamilton, using `pipe_output` or `mutate` and that should give you some idea of potential application
+
+![image info](./DAG.png)
diff --git a/examples/mutate/abstract functionality blueprint/mutate.py b/examples/mutate/abstract functionality blueprint/mutate.py
new file mode 100644
index 000000000..cbf85de83
--- /dev/null
+++ b/examples/mutate/abstract functionality blueprint/mutate.py
@@ -0,0 +1,78 @@
+from typing import Any, List
+
+import pandas as pd
+
+from hamilton.function_modifiers import mutate, source, value
+
+
+def data_1() -> pd.DataFrame:
+ df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]})
+ return df
+
+
+def data_2() -> pd.DataFrame:
+ df = pd.DataFrame.from_dict(
+ {"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]}
+ )
+ return df
+
+
+def data_3() -> pd.DataFrame:
+ df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]})
+ return df
+
+
+# data1 and data2
+@mutate(data_1, data_2)
+def _filter(some_data: pd.DataFrame) -> pd.DataFrame:
+ """Remove NAN values.
+
+ Decorated with mutate this will be applied to both data_1 and data_2.
+ """
+ return some_data.dropna()
+
+
+# data 2
+# this is for value
+@mutate(data_2, missing_row=value(["c", 145]))
+def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame:
+ """Add row to dataframe.
+
+ The functions decorated with mutate can be viewed as steps in pipe_output in the order they
+ are implemented. This means that data_2 had a row removed with NAN and here we add back a row
+ by hand that replaces that row.
+ """
+ some_data.loc[-1] = missing_row
+ return some_data
+
+
+# data 2
+# this is for source
+@mutate(data_2, other_data=source("data_3"))
+def _join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame:
+ """Join two dataframes.
+
+ We can use results from other nodes in the DAG by using the `source` functionality. Here we join
+ data_2 table with another table - data_3 - that is the output of another node.
+ """
+ return some_data.set_index("col_2").join(other_data.set_index("col_1"))
+
+
+# data1 and data2
+@mutate(data_1, data_2)
+def _sort(some_data: pd.DataFrame) -> pd.DataFrame:
+ """Sort dataframes by first column.
+
+ This is the last step of our pipeline(s) and gets again applied to data_1 and data_2. We did some
+ light pre-processing on data_1 by removing NANs and sorting and more elaborate pre-processing on
+ data_2 where we added values and joined another table.
+ """
+ columns = some_data.columns
+ return some_data.sort_values(by=columns[0])
+
+
+def feat_A(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame:
+ """Combining two raw dataframes to create a feature."""
+ return (
+ data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1"))
+ ).reset_index(names=["col_0"])
diff --git a/examples/mutate/abstract functionality blueprint/mutate_on_output.py b/examples/mutate/abstract functionality blueprint/mutate_on_output.py
new file mode 100644
index 000000000..ef0ca0f71
--- /dev/null
+++ b/examples/mutate/abstract functionality blueprint/mutate_on_output.py
@@ -0,0 +1,124 @@
+from typing import Any, Dict, List
+
+import pandas as pd
+
+from hamilton.function_modifiers import (
+ apply_to,
+ extract_columns,
+ extract_fields,
+ mutate,
+ source,
+ value,
+)
+
+
+def data_1() -> pd.DataFrame:
+ df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]})
+ return df
+
+
+def data_2() -> pd.DataFrame:
+ df = pd.DataFrame.from_dict(
+ {"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]}
+ )
+ return df
+
+
+def data_3() -> pd.DataFrame:
+ df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]})
+ return df
+
+
+@extract_fields({"field_1": pd.Series, "field_2": pd.Series})
+def feat_A(data_1: pd.DataFrame, data_2: pd.DataFrame) -> Dict[str, pd.Series]:
+ df = (
+ data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1"))
+ ).reset_index(names=["col_0"])
+ return {"field_1": df.iloc[:, 1], "field_2": df.iloc[:, 2]}
+
+
+@extract_columns("col_2", "col_3")
+def feat_B(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame:
+ return (
+ data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1"))
+ ).reset_index(names=["col_0"])
+
+
+def feat_C(field_1: pd.Series, col_3: pd.Series) -> pd.DataFrame:
+ return pd.concat([field_1, col_3], axis=1)
+
+
+def feat_D(field_2: pd.Series, col_2: pd.Series) -> pd.DataFrame:
+ return pd.concat([field_2, col_2], axis=1)
+
+
+# data1 and data2
+@mutate(apply_to(data_1).when_in(a=[1, 2, 3]), apply_to(data_2).when_not_in(a=[1, 2, 3]))
+def _filter(some_data: pd.DataFrame) -> pd.DataFrame:
+ """Remove NAN values.
+
+ Mutate accepts a `config.*` family conditional where we can choose when the transform will be applied
+ onto the target function.
+ """
+ return some_data.dropna()
+
+
+# data 2
+# this is for value
+@mutate(apply_to(data_2), missing_row=value(["c", 145]))
+def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame:
+ """Add row to dataframe.
+
+ The functions decorated with mutate can be viewed as steps in pipe_output in the order they
+ are implemented. This means that data_2 had a row removed with NAN and here we add back a row
+ by hand that replaces that row.
+ """
+ some_data.loc[-1] = missing_row
+ return some_data
+
+
+# data 2
+# this is for source
+@mutate(
+ apply_to(data_2).named(name="", namespace="some_random_namespace"), other_data=source("data_3")
+)
+def join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame:
+ """Join two dataframes.
+
+ We can use results from other nodes in the DAG by using the `source` functionality. Here we join
+ data_2 table with another table - data_3 - that is the output of another node.
+
+ In addition, mutate also support adding custom names to the nodes.
+ """
+ return some_data.set_index("col_2").join(other_data.set_index("col_1"))
+
+
+# data1 and data2
+@mutate(apply_to(data_1), apply_to(data_2))
+def sort(some_data: pd.DataFrame) -> pd.DataFrame:
+ """Sort dataframes by first column.
+
+ This is the last step of our pipeline(s) and gets again applied to data_1 and data_2. We did some
+ light pre-processing on data_1 by removing NANs and sorting and more elaborate pre-processing on
+ data_2 where we added values and joined another table.
+ """
+ columns = some_data.columns
+ return some_data.sort_values(by=columns[0])
+
+
+# we want to apply some adjustment coefficient to all the columns of feat_B, but only to field_1 of feat_A
+@mutate(
+ apply_to(feat_A, factor=value(100)).on_output("field_1").named("Europe"),
+ apply_to(feat_B, factor=value(10)).named("US"),
+)
+def _adjustment_factor(some_data: pd.Series, factor: float) -> pd.Series:
+ """Adjust the value by some factor.
+
+ You can imagine this step occurring later in time. We first constructed our DAG with features A and
+ B only to realize that something is off. We are now experimenting post-hoc to improve and find the
+ best possible features.
+
+ We first split the features by columns of interest and then adjust them by a regional factor to
+ combine them into improved features we can use further down the pipeline.
+ """
+ return some_data * factor
diff --git a/examples/mutate/abstract functionality blueprint/mutate_twice_the_same.py b/examples/mutate/abstract functionality blueprint/mutate_twice_the_same.py
new file mode 100644
index 000000000..b67fa26a3
--- /dev/null
+++ b/examples/mutate/abstract functionality blueprint/mutate_twice_the_same.py
@@ -0,0 +1,20 @@
+from hamilton.function_modifiers import mutate
+
+
+def data_1() -> int:
+ return 10
+
+
+@mutate(data_1)
+def add_something(user_input: int) -> int:
+ return user_input + 100
+
+
+@mutate(data_1)
+def add_something_more(user_input: int) -> int:
+ return user_input + 1000
+
+
+@mutate(data_1)
+def add_something(user_input: int) -> int: # noqa
+ return user_input + 100
diff --git a/examples/mutate/abstract functionality blueprint/notebook.ipynb b/examples/mutate/abstract functionality blueprint/notebook.ipynb
new file mode 100644
index 000000000..f5d27fca5
--- /dev/null
+++ b/examples/mutate/abstract functionality blueprint/notebook.ipynb
@@ -0,0 +1,2386 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Using `mutate` for pre-processing and feature engineering \n",
+ "\n",
+ "We give some application suggestions for mutating the outputs of functions in a distributed manner with `@mutate.`\n",
+ "When you scroll through the notebook we build examples from most straight forward applications to more complex logic that showcases the amount of flexibility you get with this decorator.\n",
+ "\n",
+ "Mutate gives the ability to apply the same transformation to the each output of multiple functions in the DAG. It can be particularly useful in the following scenarios:\n",
+ "\n",
+ "1. Loading data and applying pre-cleaning step.\n",
+ "2. Feature engineering via joining, filtering, sorting, applying adjustment factors on a per column basis etc.\n",
+ "3. Experimenting with different transformations across nodes by selectively turning transformations on / off.\n",
+ "\n",
+ "\n",
+ "and effectively replaces:\n",
+ "1. Having to have unique names and then changing wiring if you want to add/remove/replace something.\n",
+ "2. Enabling more verb like names on functions.\n",
+ "3. Potentially simpler \"reuse\" of transform functions across DAG paths..."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# from hamilton import registry\n",
+ "# registry.disable_autoload()\n",
+ "\n",
+ "%load_ext hamilton.plugins.jupyter_magic\n",
+ "from hamilton import driver"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Final data 1\n",
+ " col_1 col_2\n",
+ "3 0 d\n",
+ "1 2 b\n",
+ "0 3 a\n",
+ "Final data 2\n",
+ " col_1 col_2\n",
+ "col_2 \n",
+ "150 a 10\n",
+ "155 b 23\n",
+ "145 c 32\n",
+ "200 d 50\n",
+ "5000 e 0\n"
+ ]
+ },
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ " \n",
+ "\n",
+ "cluster__legend \n",
+ " \n",
+ "Legend \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2 \n",
+ " \n",
+ "data_2 \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_3 \n",
+ " \n",
+ "data_3 \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1 \n",
+ " \n",
+ "data_1 \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "function \n",
+ " \n",
+ "function \n",
+ " \n",
+ " \n",
+ " \n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "%%cell_to_module -m procedural --display\n",
+ "\n",
+ "from typing import Any, List\n",
+ "import pandas as pd\n",
+ "\n",
+ "\n",
+ "def data_1()->pd.DataFrame:\n",
+ " df = pd.DataFrame.from_dict({'col_1': [3, 2, pd.NA, 0], 'col_2': ['a', 'b', pd.NA, 'd']})\n",
+ " return df\n",
+ "\n",
+ "def data_2()->pd.DataFrame:\n",
+ " df = pd.DataFrame.from_dict({'col_1': ['a', 'b', pd.NA, 'd', 'e'], 'col_2': [150, 155, 145, 200, 5000]})\n",
+ " return df\n",
+ "\n",
+ "def data_3()->pd.DataFrame:\n",
+ " df = pd.DataFrame.from_dict({'col_1': [150, 155, 145, 200, 5000], 'col_2': [10,23, 32, 50, 0]})\n",
+ " return df\n",
+ "\n",
+ "# data1 and data2\n",
+ "def _filter(some_data:pd.DataFrame)->pd.DataFrame:\n",
+ " return some_data.dropna()\n",
+ "\n",
+ "# data 2\n",
+ "# this is for value\n",
+ "def _add_missing_value(some_data:pd.DataFrame, missing_row:List[Any])->pd.DataFrame:\n",
+ " some_data.loc[-1] = missing_row\n",
+ " return some_data\n",
+ "\n",
+ "# data 2\n",
+ "# this is for source\n",
+ "def _join(some_data:pd.DataFrame, other_data:pd.DataFrame)->pd.DataFrame:\n",
+ " return some_data.set_index('col_2').join(other_data.set_index('col_1'))\n",
+ "\n",
+ "# data1 and data2\n",
+ "def _sort(some_data:pd.DataFrame)->pd.DataFrame:\n",
+ " columns = some_data.columns\n",
+ " return some_data.sort_values(by=columns[0])\n",
+ "\n",
+ "if __name__ == \"__main__\":\n",
+ " # print(\"Filter data 1\")\n",
+ " # print(_filter(data_1()))\n",
+ " # print(\"Sort data 1\")\n",
+ " print(\"Final data 1\")\n",
+ " print(_sort(_filter(data_1())))\n",
+ " # print(\"Filter data 2\")\n",
+ " # print(_filter(data_2()))\n",
+ " # print(\"Add missing value data 2\")\n",
+ " # print(_add_missing_value(_filter(data_2()),missing_row=['c', 145]))\n",
+ " # print(\"Join data 2 and data 3\")\n",
+ " # print(_join(_add_missing_value(_filter(data_2()),missing_row=['c', 145]),other_data=data_3()))\n",
+ " # print(\"Sort joined dataframe\")\n",
+ " print(\"Final data 2\")\n",
+ " print(_sort(_join(_add_missing_value(_filter(data_2()),missing_row=['c', 145]),other_data=data_3())))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "node_to_execute = [\"data_1\", \"data_2\", \"feat_A\"]"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ " \n",
+ "\n",
+ "cluster__legend \n",
+ " \n",
+ "Legend \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_add_missing_value \n",
+ " \n",
+ "data_2.with_add_missing_value \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_join \n",
+ " \n",
+ "data_2.with_join \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_add_missing_value->data_2.with_join \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1_raw \n",
+ " \n",
+ "data_1_raw \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_filter \n",
+ " \n",
+ "data_1.with_filter \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1_raw->data_1.with_filter \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1 \n",
+ " \n",
+ "data_1 \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "feat_A \n",
+ " \n",
+ "feat_A \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1->feat_A \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_filter \n",
+ " \n",
+ "data_2.with_filter \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_filter->data_2.with_add_missing_value \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_sort \n",
+ " \n",
+ "data_1.with_sort \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_sort->data_1 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2_raw \n",
+ " \n",
+ "data_2_raw \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2_raw->data_2.with_filter \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_filter->data_1.with_sort \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_sort \n",
+ " \n",
+ "data_2.with_sort \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_join->data_2.with_sort \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2 \n",
+ " \n",
+ "data_2 \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_sort->data_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2->feat_A \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_3 \n",
+ " \n",
+ "data_3 \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_3->data_2.with_join \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "function \n",
+ " \n",
+ "function \n",
+ " \n",
+ "\n",
+ "\n",
+ "output \n",
+ " \n",
+ "output \n",
+ " \n",
+ " \n",
+ " \n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " col_1 \n",
+ " col_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " 3 \n",
+ " 0 \n",
+ " d \n",
+ " \n",
+ " \n",
+ " 1 \n",
+ " 2 \n",
+ " b \n",
+ " \n",
+ " \n",
+ " 0 \n",
+ " 3 \n",
+ " a \n",
+ " \n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " col_1 col_2\n",
+ "3 0 d\n",
+ "1 2 b\n",
+ "0 3 a"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " col_1 \n",
+ " col_2 \n",
+ " \n",
+ " \n",
+ " col_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " 150 \n",
+ " a \n",
+ " 10 \n",
+ " \n",
+ " \n",
+ " 155 \n",
+ " b \n",
+ " 23 \n",
+ " \n",
+ " \n",
+ " 145 \n",
+ " c \n",
+ " 32 \n",
+ " \n",
+ " \n",
+ " 200 \n",
+ " d \n",
+ " 50 \n",
+ " \n",
+ " \n",
+ " 5000 \n",
+ " e \n",
+ " 0 \n",
+ " \n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " col_1 col_2\n",
+ "col_2 \n",
+ "150 a 10\n",
+ "155 b 23\n",
+ "145 c 32\n",
+ "200 d 50\n",
+ "5000 e 0"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " col_0 \n",
+ " col_1 \n",
+ " col_3 \n",
+ " col_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " 0 \n",
+ " d \n",
+ " 0 \n",
+ " 200 \n",
+ " 50 \n",
+ " \n",
+ " \n",
+ " 1 \n",
+ " b \n",
+ " 2 \n",
+ " 155 \n",
+ " 23 \n",
+ " \n",
+ " \n",
+ " 2 \n",
+ " a \n",
+ " 3 \n",
+ " 150 \n",
+ " 10 \n",
+ " \n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " col_0 col_1 col_3 col_2\n",
+ "0 d 0 200 50\n",
+ "1 b 2 155 23\n",
+ "2 a 3 150 10"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "%%cell_to_module -m pipe_output --display --execute node_to_execute\n",
+ "\n",
+ "from typing import Any, List\n",
+ "import pandas as pd\n",
+ "from hamilton.function_modifiers import pipe_output, step, source, value\n",
+ "\n",
+ "# data1 and data2\n",
+ "def _filter(some_data:pd.DataFrame)->pd.DataFrame:\n",
+ " return some_data.dropna()\n",
+ "\n",
+ "# data 2\n",
+ "# this is for value\n",
+ "def _add_missing_value(some_data:pd.DataFrame, missing_row:List[Any])->pd.DataFrame:\n",
+ " some_data.loc[-1] = missing_row\n",
+ " return some_data\n",
+ "\n",
+ "# data 2\n",
+ "# this is for source\n",
+ "def _join(some_data:pd.DataFrame, other_data:pd.DataFrame)->pd.DataFrame:\n",
+ " return some_data.set_index('col_2').join(other_data.set_index('col_1'))\n",
+ "\n",
+ "# data1 and data2\n",
+ "def _sort(some_data:pd.DataFrame)->pd.DataFrame:\n",
+ " columns = some_data.columns\n",
+ " return some_data.sort_values(by=columns[0])\n",
+ "\n",
+ "@pipe_output(\n",
+ " step(_filter),\n",
+ " step(_sort),\n",
+ ")\n",
+ "def data_1()->pd.DataFrame:\n",
+ " df = pd.DataFrame.from_dict({'col_1': [3, 2, pd.NA, 0], 'col_2': ['a', 'b', pd.NA, 'd']})\n",
+ " return df\n",
+ "\n",
+ "@pipe_output(\n",
+ " step(_filter),\n",
+ " step(_add_missing_value,missing_row=value(['c', 145])),\n",
+ " step(_join, other_data=source('data_3')),\n",
+ " step(_sort),\n",
+ ")\n",
+ "def data_2()->pd.DataFrame:\n",
+ " df = pd.DataFrame.from_dict({'col_1': ['a', 'b', pd.NA, 'd', 'e'], 'col_2': [150, 155, 145, 200, 5000]})\n",
+ " return df\n",
+ "\n",
+ "def data_3()->pd.DataFrame:\n",
+ " df = pd.DataFrame.from_dict({'col_1': [150, 155, 145, 200, 5000], 'col_2': [10,23, 32, 50, 0]})\n",
+ " return df\n",
+ "\n",
+ "def feat_A(data_1:pd.DataFrame, data_2:pd.DataFrame)->pd.DataFrame:\n",
+ " return (data_1.set_index('col_2').join(data_2.reset_index(names=['col_3']).set_index('col_1'))).reset_index(names=[\"col_0\"])\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ " \n",
+ "\n",
+ "cluster__legend \n",
+ " \n",
+ "Legend \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_add_missing_value \n",
+ " \n",
+ "data_2.with_add_missing_value \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_join \n",
+ " \n",
+ "data_2.with_join \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_add_missing_value->data_2.with_join \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1_raw \n",
+ " \n",
+ "data_1_raw \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_filter \n",
+ " \n",
+ "data_1.with_filter \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1_raw->data_1.with_filter \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1 \n",
+ " \n",
+ "data_1 \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "feat_A \n",
+ " \n",
+ "feat_A \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1->feat_A \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_filter \n",
+ " \n",
+ "data_2.with_filter \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_filter->data_2.with_add_missing_value \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_sort \n",
+ " \n",
+ "data_1.with_sort \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_sort->data_1 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2_raw \n",
+ " \n",
+ "data_2_raw \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2_raw->data_2.with_filter \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_filter->data_1.with_sort \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_sort \n",
+ " \n",
+ "data_2.with_sort \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_join->data_2.with_sort \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2 \n",
+ " \n",
+ "data_2 \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_sort->data_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2->feat_A \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_3 \n",
+ " \n",
+ "data_3 \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_3->data_2.with_join \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "function \n",
+ " \n",
+ "function \n",
+ " \n",
+ "\n",
+ "\n",
+ "output \n",
+ " \n",
+ "output \n",
+ " \n",
+ " \n",
+ " \n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " col_1 \n",
+ " col_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " 3 \n",
+ " 0 \n",
+ " d \n",
+ " \n",
+ " \n",
+ " 1 \n",
+ " 2 \n",
+ " b \n",
+ " \n",
+ " \n",
+ " 0 \n",
+ " 3 \n",
+ " a \n",
+ " \n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " col_1 col_2\n",
+ "3 0 d\n",
+ "1 2 b\n",
+ "0 3 a"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " col_1 \n",
+ " col_2 \n",
+ " \n",
+ " \n",
+ " col_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " 150 \n",
+ " a \n",
+ " 10 \n",
+ " \n",
+ " \n",
+ " 155 \n",
+ " b \n",
+ " 23 \n",
+ " \n",
+ " \n",
+ " 145 \n",
+ " c \n",
+ " 32 \n",
+ " \n",
+ " \n",
+ " 200 \n",
+ " d \n",
+ " 50 \n",
+ " \n",
+ " \n",
+ " 5000 \n",
+ " e \n",
+ " 0 \n",
+ " \n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " col_1 col_2\n",
+ "col_2 \n",
+ "150 a 10\n",
+ "155 b 23\n",
+ "145 c 32\n",
+ "200 d 50\n",
+ "5000 e 0"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " col_0 \n",
+ " col_1 \n",
+ " col_3 \n",
+ " col_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " 0 \n",
+ " d \n",
+ " 0 \n",
+ " 200 \n",
+ " 50 \n",
+ " \n",
+ " \n",
+ " 1 \n",
+ " b \n",
+ " 2 \n",
+ " 155 \n",
+ " 23 \n",
+ " \n",
+ " \n",
+ " 2 \n",
+ " a \n",
+ " 3 \n",
+ " 150 \n",
+ " 10 \n",
+ " \n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " col_0 col_1 col_3 col_2\n",
+ "0 d 0 200 50\n",
+ "1 b 2 155 23\n",
+ "2 a 3 150 10"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "%%cell_to_module -m mutate --display --execute node_to_execute\n",
+ "\n",
+ "from typing import Any, List\n",
+ "import pandas as pd\n",
+ "from hamilton.function_modifiers import mutate, apply_to, source, value\n",
+ "\n",
+ "def data_1()->pd.DataFrame:\n",
+ " df = pd.DataFrame.from_dict({'col_1': [3, 2, pd.NA, 0], 'col_2': ['a', 'b', pd.NA, 'd']})\n",
+ " return df\n",
+ "\n",
+ "def data_2()->pd.DataFrame:\n",
+ " df = pd.DataFrame.from_dict({'col_1': ['a', 'b', pd.NA, 'd', 'e'], 'col_2': [150, 155, 145, 200, 5000]})\n",
+ " return df\n",
+ "\n",
+ "def data_3()->pd.DataFrame:\n",
+ " df = pd.DataFrame.from_dict({'col_1': [150, 155, 145, 200, 5000], 'col_2': [10,23, 32, 50, 0]})\n",
+ " return df\n",
+ "\n",
+ "# data1 and data2\n",
+ "@mutate(\n",
+ " data_1, data_2\n",
+ " )\n",
+ "def _filter(some_data:pd.DataFrame)->pd.DataFrame:\n",
+ " \"\"\"Remove NAN values.\n",
+ " \n",
+ " Decorated with mutate this will be applied to both data_1 and data_2.\n",
+ " \"\"\"\n",
+ " return some_data.dropna()\n",
+ "\n",
+ "# data 2\n",
+ "# this is for value\n",
+ "@mutate(\n",
+ " data_2, \n",
+ " missing_row=value(['c', 145])\n",
+ " )\n",
+ "def _add_missing_value(some_data:pd.DataFrame, missing_row:List[Any])->pd.DataFrame:\n",
+ " \"\"\"Add row to dataframe.\n",
+ " \n",
+ " The functions decorated with mutate can be viewed as steps in pipe_output in the order they\n",
+ " are implemented. This means that data_2 had a row removed with NAN and here we add back a row\n",
+ " by hand that replaces that row.\n",
+ " \"\"\"\n",
+ " some_data.loc[-1] = missing_row\n",
+ " return some_data\n",
+ "\n",
+ "# data 2\n",
+ "# this is for source\n",
+ "@mutate(\n",
+ " data_2, \n",
+ " other_data=source('data_3')\n",
+ " )\n",
+ "def _join(some_data:pd.DataFrame, other_data:pd.DataFrame)->pd.DataFrame:\n",
+ " \"\"\"Join two dataframes.\n",
+ " \n",
+ " We can use results from other nodes in the DAG by using the `source` functionality. Here we join\n",
+ " data_2 table with another table - data_3 - that is the output of another node.\n",
+ " \"\"\"\n",
+ " return some_data.set_index('col_2').join(other_data.set_index('col_1'))\n",
+ "\n",
+ "# data1 and data2\n",
+ "@mutate(\n",
+ " data_1, data_2\n",
+ ")\n",
+ "def _sort(some_data:pd.DataFrame)->pd.DataFrame:\n",
+ " \"\"\"Sort dataframes by first column.\n",
+ " \n",
+ " This is the last step of our pipeline(s) and gets again applied to data_1 and data_2. We did some \n",
+ " light pre-processing on data_1 by removing NANs and sorting and more elaborate pre-processing on\n",
+ " data_2 where we added values and joined another table.\n",
+ " \"\"\"\n",
+ " columns = some_data.columns\n",
+ " return some_data.sort_values(by=columns[0])\n",
+ "\n",
+ "def feat_A(data_1:pd.DataFrame, data_2:pd.DataFrame)->pd.DataFrame:\n",
+ " \"\"\"Combining two raw dataframes to create a feature.\"\"\"\n",
+ " return (data_1.set_index('col_2').join(data_2.reset_index(names=['col_3']).set_index('col_1'))).reset_index(names=[\"col_0\"])\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# We can run such pipelines also remotely and can track them in the UI"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from hamilton import base\n",
+ "from hamilton_sdk import adapters\n",
+ "from hamilton.plugins.h_ray import RayGraphAdapter,RayTaskExecutor\n",
+ "import ray\n",
+ "\n",
+ "import mutate\n",
+ "\n",
+ "remote_executor = RayTaskExecutor(num_cpus=4)\n",
+ "shutdown = ray.shutdown\n",
+ "\n",
+ "project_id = 3\n",
+ "username = \"jf\""
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "tracker_ray = adapters.HamiltonTracker(\n",
+ " project_id=project_id,\n",
+ " username=username,\n",
+ " dag_name=\"mutate ray graph adapter\",\n",
+ " )\n",
+ "ray.init()\n",
+ "rga = RayGraphAdapter(result_builder=base.DictResult(), shutdown_ray_on_completion=True)\n",
+ "dr = driver.Builder().with_modules(mutate).with_adapters(rga, tracker_ray).build()\n",
+ "result = dr.execute(final_vars=[\"data_1\", \"data_2\", \"feat_A\"])\n",
+ "print(result)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "tracker_ray = adapters.HamiltonTracker(\n",
+ " project_id=project_id,\n",
+ " username=username,\n",
+ " dag_name=\"mutate ray task executor\",\n",
+ " )\n",
+ "\n",
+ "dr = (\n",
+ " driver.Builder()\n",
+ " .enable_dynamic_execution(allow_experimental_mode=True)\n",
+ " .with_modules(mutate)\n",
+ " .with_remote_executor(remote_executor)\n",
+ " .with_adapters(tracker_ray)\n",
+ " .build()\n",
+ " )\n",
+ "\n",
+ "print(dr.execute(final_vars=[\"data_1\", \"data_2\", \"feat_A\"]))\n",
+ "if shutdown is not None:\n",
+ " shutdown()\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# pipe_output allows for targeting specific nodes\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ " \n",
+ "\n",
+ "cluster__legend \n",
+ " \n",
+ "Legend \n",
+ " \n",
+ "\n",
+ "\n",
+ "foo \n",
+ " \n",
+ "foo \n",
+ "Dict \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1_raw \n",
+ " \n",
+ "field_1_raw \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "foo->field_1_raw \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_2_raw \n",
+ " \n",
+ "field_2_raw \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "foo->field_2_raw \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_3_raw \n",
+ " \n",
+ "field_3_raw \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "foo->field_3_raw \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1 \n",
+ " \n",
+ "field_1 \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "a \n",
+ " \n",
+ "a \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "a->foo \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_2 \n",
+ " \n",
+ "field_2 \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_3.with_something_else \n",
+ " \n",
+ "field_3.with_something_else \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_3.transform_2 \n",
+ " \n",
+ "field_3.transform_2 \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_3.with_something_else->field_3.transform_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_3 \n",
+ " \n",
+ "field_3 \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1.with_something_else \n",
+ " \n",
+ "field_1.with_something_else \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1_raw->field_1.with_something_else \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_3.transform_2->field_3 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1.transform_1 \n",
+ " \n",
+ "field_1.transform_1 \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1.transform_2 \n",
+ " \n",
+ "field_1.transform_2 \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1.transform_1->field_1.transform_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_2.with_something_else \n",
+ " \n",
+ "field_2.with_something_else \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_2.with_something_else->field_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1.transform_2->field_1 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_2_raw->field_2.with_something_else \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1.with_something_else->field_1.transform_1 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_3_raw->field_3.with_something_else \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "function \n",
+ " \n",
+ "function \n",
+ " \n",
+ " \n",
+ " \n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "%%cell_to_module -m pipe_output_on_output --display\n",
+ "from typing import Dict\n",
+ "from hamilton.function_modifiers import extract_fields, pipe_input, pipe_output, step\n",
+ "\n",
+ "def _pre_step(something:int)->int:\n",
+ " return something + 10\n",
+ "\n",
+ "def _post_step(something:int)->int:\n",
+ " return something + 100\n",
+ "\n",
+ "def _something_else(something:int)->int:\n",
+ " return something + 1000\n",
+ "\n",
+ "def a()->int:\n",
+ " return 10\n",
+ "\n",
+ "@pipe_output(\n",
+ " step(_something_else), # gets applied to all sink nodes\n",
+ " step(_pre_step).named(name=\"transform_1\").on_output(\"field_1\"), # only applied to field_1\n",
+ " step(_post_step).named(name=\"transform_2\").on_output([\"field_1\", \"field_3\"]), # applied to field_1 and field_3\n",
+ ")\n",
+ "@extract_fields(\n",
+ " {\"field_1\":int, \"field_2\":int, \"field_3\":int}\n",
+ ")\n",
+ "def foo(a:int)->Dict[str,int]:\n",
+ " return {\"field_1\":1, \"field_2\":2, \"field_3\":3}"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Similarly mutate allows to specify which nodes it gets applied to\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "node_to_execute = [\"data_1\", \"data_2\", \"feat_C\", \"feat_D\"]"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ " \n",
+ "\n",
+ "cluster__legend \n",
+ " \n",
+ "Legend \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2. \n",
+ " \n",
+ "data_2. \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_sort \n",
+ " \n",
+ "data_2.with_sort \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.->data_2.with_sort \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_add_missing_value \n",
+ " \n",
+ "data_2.with_add_missing_value \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_add_missing_value->data_2. \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_2 \n",
+ " \n",
+ "field_2 \n",
+ "Series \n",
+ " \n",
+ "\n",
+ "\n",
+ "feat_D \n",
+ " \n",
+ "feat_D \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_2->feat_D \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1_raw \n",
+ " \n",
+ "data_1_raw \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_sort \n",
+ " \n",
+ "data_1.with_sort \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1_raw->data_1.with_sort \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1_raw \n",
+ " \n",
+ "field_1_raw \n",
+ "Series \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1.Europe \n",
+ " \n",
+ "field_1.Europe \n",
+ "Series \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1_raw->field_1.Europe \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1 \n",
+ " \n",
+ "data_1 \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_sort->data_1 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2_raw \n",
+ " \n",
+ "data_2_raw \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_filter \n",
+ " \n",
+ "data_2.with_filter \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2_raw->data_2.with_filter \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_2.Europe \n",
+ " \n",
+ "field_2.Europe \n",
+ "Series \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_2.Europe->field_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "feat_B \n",
+ " \n",
+ "feat_B \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "col_3_raw \n",
+ " \n",
+ "col_3_raw \n",
+ "Series \n",
+ " \n",
+ "\n",
+ "\n",
+ "feat_B->col_3_raw \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "col_2_raw \n",
+ " \n",
+ "col_2_raw \n",
+ "Series \n",
+ " \n",
+ "\n",
+ "\n",
+ "feat_B->col_2_raw \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "feat_A \n",
+ " \n",
+ "feat_A \n",
+ "Dict \n",
+ " \n",
+ "\n",
+ "\n",
+ "feat_A->field_1_raw \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_2_raw \n",
+ " \n",
+ "field_2_raw \n",
+ "Series \n",
+ " \n",
+ "\n",
+ "\n",
+ "feat_A->field_2_raw \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2 \n",
+ " \n",
+ "data_2 \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_sort->data_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "feat_C \n",
+ " \n",
+ "feat_C \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1 \n",
+ " \n",
+ "field_1 \n",
+ "Series \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1.Europe->field_1 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_1->feat_C \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "col_2 \n",
+ " \n",
+ "col_2 \n",
+ "Series \n",
+ " \n",
+ "\n",
+ "\n",
+ "col_2->feat_D \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "col_3 \n",
+ " \n",
+ "col_3 \n",
+ "Series \n",
+ " \n",
+ "\n",
+ "\n",
+ "col_3->feat_C \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1->feat_B \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1->feat_A \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "col_2.US \n",
+ " \n",
+ "col_2.US \n",
+ "Series \n",
+ " \n",
+ "\n",
+ "\n",
+ "col_2.US->col_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2.with_filter->data_2.with_add_missing_value \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "col_3.US \n",
+ " \n",
+ "col_3.US \n",
+ "Series \n",
+ " \n",
+ "\n",
+ "\n",
+ "col_3.US->col_3 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "col_3_raw->col_3.US \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2->feat_B \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_2->feat_A \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_3 \n",
+ " \n",
+ "data_3 \n",
+ "DataFrame \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_3->data_2. \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "field_2_raw->field_2.Europe \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "col_2_raw->col_2.US \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "function \n",
+ " \n",
+ "function \n",
+ " \n",
+ "\n",
+ "\n",
+ "output \n",
+ " \n",
+ "output \n",
+ " \n",
+ " \n",
+ " \n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " col_1 \n",
+ " col_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " 3 \n",
+ " 0 \n",
+ " d \n",
+ " \n",
+ " \n",
+ " 1 \n",
+ " 2 \n",
+ " b \n",
+ " \n",
+ " \n",
+ " 0 \n",
+ " 3 \n",
+ " a \n",
+ " \n",
+ " \n",
+ " 2 \n",
+ " <NA> \n",
+ " <NA> \n",
+ " \n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " col_1 col_2\n",
+ "3 0 d\n",
+ "1 2 b\n",
+ "0 3 a\n",
+ "2 "
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " col_1 \n",
+ " col_2 \n",
+ " \n",
+ " \n",
+ " col_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " 150 \n",
+ " a \n",
+ " 10 \n",
+ " \n",
+ " \n",
+ " 155 \n",
+ " b \n",
+ " 23 \n",
+ " \n",
+ " \n",
+ " 145 \n",
+ " c \n",
+ " 32 \n",
+ " \n",
+ " \n",
+ " 200 \n",
+ " d \n",
+ " 50 \n",
+ " \n",
+ " \n",
+ " 5000 \n",
+ " e \n",
+ " 0 \n",
+ " \n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " col_1 col_2\n",
+ "col_2 \n",
+ "150 a 10\n",
+ "155 b 23\n",
+ "145 c 32\n",
+ "200 d 50\n",
+ "5000 e 0"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " col_1 \n",
+ " col_3 \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " 0 \n",
+ " 0 \n",
+ " 2000.0 \n",
+ " \n",
+ " \n",
+ " 1 \n",
+ " 200 \n",
+ " 1550.0 \n",
+ " \n",
+ " \n",
+ " 2 \n",
+ " 300 \n",
+ " 1500.0 \n",
+ " \n",
+ " \n",
+ " 3 \n",
+ " <NA> \n",
+ " NaN \n",
+ " \n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " col_1 col_3\n",
+ "0 0 2000.0\n",
+ "1 200 1550.0\n",
+ "2 300 1500.0\n",
+ "3 NaN"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " col_3 \n",
+ " col_2 \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " 0 \n",
+ " 20000.0 \n",
+ " 500.0 \n",
+ " \n",
+ " \n",
+ " 1 \n",
+ " 15500.0 \n",
+ " 230.0 \n",
+ " \n",
+ " \n",
+ " 2 \n",
+ " 15000.0 \n",
+ " 100.0 \n",
+ " \n",
+ " \n",
+ " 3 \n",
+ " NaN \n",
+ " NaN \n",
+ " \n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " col_3 col_2\n",
+ "0 20000.0 500.0\n",
+ "1 15500.0 230.0\n",
+ "2 15000.0 100.0\n",
+ "3 NaN NaN"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "%%cell_to_module -m mutate_on_output --display --execute node_to_execute\n",
+ "from typing import Any,Dict, List\n",
+ "import pandas as pd\n",
+ "from hamilton.function_modifiers import extract_fields, extract_columns, apply_to, mutate, value, source\n",
+ "\n",
+ "\n",
+ "def data_1() -> pd.DataFrame:\n",
+ " df = pd.DataFrame.from_dict({\"col_1\": [3, 2, pd.NA, 0], \"col_2\": [\"a\", \"b\", pd.NA, \"d\"]})\n",
+ " return df\n",
+ "\n",
+ "\n",
+ "def data_2() -> pd.DataFrame:\n",
+ " df = pd.DataFrame.from_dict(\n",
+ " {\"col_1\": [\"a\", \"b\", pd.NA, \"d\", \"e\"], \"col_2\": [150, 155, 145, 200, 5000]}\n",
+ " )\n",
+ " return df\n",
+ "\n",
+ "\n",
+ "def data_3() -> pd.DataFrame:\n",
+ " df = pd.DataFrame.from_dict({\"col_1\": [150, 155, 145, 200, 5000], \"col_2\": [10, 23, 32, 50, 0]})\n",
+ " return df\n",
+ "\n",
+ "\n",
+ "@extract_fields(\n",
+ " {'field_1':pd.Series,\n",
+ " 'field_2':pd.Series})\n",
+ "def feat_A(data_1:pd.DataFrame, data_2:pd.DataFrame)->Dict[str, pd.Series]:\n",
+ " df = (data_1.set_index('col_2').join(data_2.reset_index(names=['col_3']).set_index('col_1'))).reset_index(names=[\"col_0\"])\n",
+ " return {\n",
+ " \"field_1\": df.iloc[:, 1],\n",
+ " \"field_2\": df.iloc[:, 2]\n",
+ " }\n",
+ "\n",
+ "\n",
+ "@extract_columns('col_2','col_3')\n",
+ "def feat_B(data_1:pd.DataFrame, data_2:pd.DataFrame)->pd.DataFrame:\n",
+ " return (data_1.set_index('col_2').join(data_2.reset_index(names=['col_3']).set_index('col_1'))).reset_index(names=[\"col_0\"])\n",
+ "\n",
+ "\n",
+ "def feat_C(field_1:pd.Series, col_3:pd.Series)->pd.DataFrame:\n",
+ " return pd.concat([field_1, col_3], axis=1)\n",
+ "\n",
+ "def feat_D(field_2:pd.Series, col_2:pd.Series)->pd.DataFrame:\n",
+ " return pd.concat([field_2, col_2], axis=1)\n",
+ "\n",
+ "# data1 and data2\n",
+ "@mutate(\n",
+ " apply_to(data_1).when_in(a=[1,2,3]),\n",
+ " apply_to(data_2).when_not_in(a=[1,2,3])\n",
+ ")\n",
+ "def _filter(some_data:pd.DataFrame)->pd.DataFrame:\n",
+ " \"\"\"Remove NAN values.\n",
+ " \n",
+ " Mutate accepts a `config.*` family conditional where we can choose when the transform will be applied\n",
+ " onto the target function.\n",
+ " \"\"\"\n",
+ " return some_data.dropna()\n",
+ "\n",
+ "# data 2\n",
+ "# this is for value\n",
+ "@mutate(apply_to(data_2), missing_row=value(['c', 145]))\n",
+ "def _add_missing_value(some_data:pd.DataFrame, missing_row:List[Any])->pd.DataFrame:\n",
+ " \"\"\"Add row to dataframe.\n",
+ " \n",
+ " The functions decorated with mutate can be viewed as steps in pipe_output in the order they\n",
+ " are implemented. This means that data_2 had a row removed with NAN and here we add back a row\n",
+ " by hand that replaces that row.\n",
+ " \"\"\"\n",
+ " some_data.loc[-1] = missing_row\n",
+ " return some_data\n",
+ "\n",
+ "# data 2\n",
+ "# this is for source\n",
+ "@mutate(apply_to(data_2).named(name=\"\",namespace=\"some_random_namespace\"), other_data=source('data_3'))\n",
+ "def join(some_data:pd.DataFrame, other_data:pd.DataFrame)->pd.DataFrame:\n",
+ " \"\"\"Join two dataframes.\n",
+ " \n",
+ " We can use results from other nodes in the DAG by using the `source` functionality. Here we join\n",
+ " data_2 table with another table - data_3 - that is the output of another node.\n",
+ "\n",
+ " In addition, mutate also support adding custom names to the nodes.\n",
+ " \"\"\"\n",
+ " return some_data.set_index('col_2').join(other_data.set_index('col_1'))\n",
+ "\n",
+ "# data1 and data2\n",
+ "@mutate(\n",
+ " apply_to(data_1),\n",
+ " apply_to(data_2)\n",
+ ")\n",
+ "def sort(some_data:pd.DataFrame)->pd.DataFrame:\n",
+ " \"\"\"Sort dataframes by first column.\n",
+ " \n",
+ " This is the last step of our pipeline(s) and gets again applied to data_1 and data_2. We did some \n",
+ " light pre-processing on data_1 by removing NANs and sorting and more elaborate pre-processing on\n",
+ " data_2 where we added values and joined another table.\n",
+ " \"\"\"\n",
+ " columns = some_data.columns\n",
+ " return some_data.sort_values(by=columns[0])\n",
+ "\n",
+ "\n",
+ "# we want to apply some adjustment coefficient to all the columns of feat_B, but only to field_1 of feat_A\n",
+ "@mutate(\n",
+ " apply_to(feat_A, factor=value(100)).on_output('field_1').named(\"Europe\"),\n",
+ " apply_to(feat_B, factor=value(10)).named(\"US\")\n",
+ ")\n",
+ "def _adjustment_factor(some_data:pd.Series, factor:float)->pd.Series:\n",
+ " \"\"\"Adjust the value by some factor.\n",
+ " \n",
+ " You can imagine this step occurring later in time. We first constructed our DAG with features A and\n",
+ " B only to realize that something is off. We are now experimenting post-hoc to improve and find the \n",
+ " best possible features.\n",
+ "\n",
+ " We first split the features by columns of interest and then adjust them by a regional factor to \n",
+ " combine them into improved features we can use further down the pipeline.\n",
+ " \"\"\"\n",
+ " return some_data * factor\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Apply mutate twice by copying the function"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "node_to_execute = [\"data_1\"]"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ " \n",
+ "\n",
+ "cluster__legend \n",
+ " \n",
+ "Legend \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1_raw \n",
+ " \n",
+ "data_1_raw \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_add_something \n",
+ " \n",
+ "data_1.with_add_something \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1_raw->data_1.with_add_something \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_add_something_more \n",
+ " \n",
+ "data_1.with_add_something_more \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_add_something_1 \n",
+ " \n",
+ "data_1.with_add_something_1 \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_add_something_more->data_1.with_add_something_1 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_add_something->data_1.with_add_something_more \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1 \n",
+ " \n",
+ "data_1 \n",
+ "int \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_1.with_add_something_1->data_1 \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "function \n",
+ " \n",
+ "function \n",
+ " \n",
+ "\n",
+ "\n",
+ "output \n",
+ " \n",
+ "output \n",
+ " \n",
+ " \n",
+ " \n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/plain": [
+ "1210"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "%%cell_to_module -m mutate_twice_the_same --display --execute node_to_execute\n",
+ "\n",
+ "from typing import Any, List\n",
+ "import pandas as pd\n",
+ "from hamilton.function_modifiers import mutate, apply_to, source, value\n",
+ "\n",
+ "\n",
+ "def data_1()->int:\n",
+ " return 10\n",
+ "\n",
+ "\n",
+ "@mutate(data_1)\n",
+ "def add_something(user_input:int)->int:\n",
+ " return user_input + 100\n",
+ "\n",
+ "@mutate(data_1)\n",
+ "def add_something_more(user_input:int)->int:\n",
+ " return user_input + 1000\n",
+ "\n",
+ "@mutate(data_1)\n",
+ "def add_something(user_input:int)->int: # noqa\n",
+ " return user_input + 100\n",
+ "\n",
+ "\n",
+ "\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "ham_scikit",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.10.14"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}
diff --git a/examples/mutate/abstract functionality blueprint/pipe_output.py b/examples/mutate/abstract functionality blueprint/pipe_output.py
new file mode 100644
index 000000000..b5cec9c22
--- /dev/null
+++ b/examples/mutate/abstract functionality blueprint/pipe_output.py
@@ -0,0 +1,62 @@
+from typing import Any, List
+
+import pandas as pd
+
+from hamilton.function_modifiers import pipe_output, source, step, value
+
+
+# data1 and data2
+def _filter(some_data: pd.DataFrame) -> pd.DataFrame:
+ return some_data.dropna()
+
+
+# data 2
+# this is for value
+def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame:
+ some_data.loc[-1] = missing_row
+ return some_data
+
+
+# data 2
+# this is for source
+def _join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame:
+ return some_data.set_index("col_2").join(other_data.set_index("col_1"))
+
+
+# data1 and data2
+def _sort(some_data: pd.DataFrame) -> pd.DataFrame:
+ columns = some_data.columns
+ return some_data.sort_values(by=columns[0])
+
+
+@pipe_output(
+ step(_filter),
+ step(_sort),
+)
+def data_1() -> pd.DataFrame:
+ df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]})
+ return df
+
+
+@pipe_output(
+ step(_filter),
+ step(_add_missing_value, missing_row=value(["c", 145])),
+ step(_join, other_data=source("data_3")),
+ step(_sort),
+)
+def data_2() -> pd.DataFrame:
+ df = pd.DataFrame.from_dict(
+ {"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]}
+ )
+ return df
+
+
+def data_3() -> pd.DataFrame:
+ df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]})
+ return df
+
+
+def feat_A(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame:
+ return (
+ data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1"))
+ ).reset_index(names=["col_0"])
diff --git a/examples/mutate/abstract functionality blueprint/pipe_output_on_output.py b/examples/mutate/abstract functionality blueprint/pipe_output_on_output.py
new file mode 100644
index 000000000..198836593
--- /dev/null
+++ b/examples/mutate/abstract functionality blueprint/pipe_output_on_output.py
@@ -0,0 +1,31 @@
+from typing import Dict
+
+from hamilton.function_modifiers import extract_fields, pipe_output, step
+
+
+def _pre_step(something: int) -> int:
+ return something + 10
+
+
+def _post_step(something: int) -> int:
+ return something + 100
+
+
+def _something_else(something: int) -> int:
+ return something + 1000
+
+
+def a() -> int:
+ return 10
+
+
+@pipe_output(
+ step(_something_else), # gets applied to all sink nodes
+ step(_pre_step).named(name="transform_1").on_output("field_1"), # only applied to field_1
+ step(_post_step)
+ .named(name="transform_2")
+ .on_output(["field_1", "field_3"]), # applied to field_1 and field_3
+)
+@extract_fields({"field_1": int, "field_2": int, "field_3": int})
+def foo(a: int) -> Dict[str, int]:
+ return {"field_1": 1, "field_2": 2, "field_3": 3}
diff --git a/examples/mutate/abstract functionality blueprint/procedural.py b/examples/mutate/abstract functionality blueprint/procedural.py
new file mode 100644
index 000000000..f95d5a0a0
--- /dev/null
+++ b/examples/mutate/abstract functionality blueprint/procedural.py
@@ -0,0 +1,67 @@
+from typing import Any, List
+
+import pandas as pd
+
+
+def data_1() -> pd.DataFrame:
+ df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]})
+ return df
+
+
+def data_2() -> pd.DataFrame:
+ df = pd.DataFrame.from_dict(
+ {"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]}
+ )
+ return df
+
+
+def data_3() -> pd.DataFrame:
+ df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]})
+ return df
+
+
+# data1 and data2
+def _filter(some_data: pd.DataFrame) -> pd.DataFrame:
+ return some_data.dropna()
+
+
+# data 2
+# this is for value
+def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame:
+ some_data.loc[-1] = missing_row
+ return some_data
+
+
+# data 2
+# this is for source
+def _join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame:
+ return some_data.set_index("col_2").join(other_data.set_index("col_1"))
+
+
+# data1 and data2
+def _sort(some_data: pd.DataFrame) -> pd.DataFrame:
+ columns = some_data.columns
+ return some_data.sort_values(by=columns[0])
+
+
+if __name__ == "__main__":
+ # print("Filter data 1")
+ # print(_filter(data_1()))
+ # print("Sort data 1")
+ print("Final data 1")
+ print(_sort(_filter(data_1())))
+ # print("Filter data 2")
+ # print(_filter(data_2()))
+ # print("Add missing value data 2")
+ # print(_add_missing_value(_filter(data_2()),missing_row=['c', 145]))
+ # print("Join data 2 and data 3")
+ # print(_join(_add_missing_value(_filter(data_2()),missing_row=['c', 145]),other_data=data_3()))
+ # print("Sort joined dataframe")
+ print("Final data 2")
+ print(
+ _sort(
+ _join(
+ _add_missing_value(_filter(data_2()), missing_row=["c", 145]), other_data=data_3()
+ )
+ )
+ )
diff --git a/examples/scikit-learn/species_distribution_modeling/hamilton_notebook.ipynb b/examples/scikit-learn/species_distribution_modeling/hamilton_notebook.ipynb
index 1694f2f82..a627be54c 100644
--- a/examples/scikit-learn/species_distribution_modeling/hamilton_notebook.ipynb
+++ b/examples/scikit-learn/species_distribution_modeling/hamilton_notebook.ipynb
@@ -11,7 +11,7 @@
},
{
"cell_type": "code",
- "execution_count": 3,
+ "execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
@@ -938,6 +938,926 @@
"plt.show()\n"
]
},
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# New feature we can use `@mutate` for distributed function output transforms"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ " \n",
+ "\n",
+ "cluster__legend \n",
+ " \n",
+ "Legend \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test.with_decision_function \n",
+ " \n",
+ "prediction_test.with_decision_function \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test \n",
+ " \n",
+ "prediction_test \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test.with_decision_function->prediction_test \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train.with_prediction_step \n",
+ " \n",
+ "prediction_train.with_prediction_step \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train \n",
+ " \n",
+ "prediction_train \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train.with_prediction_step->prediction_train \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train.with_OneClassSVM_model \n",
+ " \n",
+ "prediction_train.with_OneClassSVM_model \n",
+ "OneClassSVM \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train.with_decision_function \n",
+ " \n",
+ "prediction_train.with_decision_function \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train.with_OneClassSVM_model->prediction_train.with_decision_function \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test.with_OneClassSVM_model \n",
+ " \n",
+ "prediction_test.with_OneClassSVM_model \n",
+ "OneClassSVM \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test.with_OneClassSVM_model->prediction_test.with_decision_function \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test_raw \n",
+ " \n",
+ "prediction_test_raw \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test_raw->prediction_test.with_OneClassSVM_model \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train_raw \n",
+ " \n",
+ "prediction_train_raw \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train_raw->prediction_train.with_OneClassSVM_model \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train.with_decision_function->prediction_train.with_prediction_step \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "_prediction_test.with_decision_function_inputs \n",
+ " \n",
+ "std \n",
+ "ndarray \n",
+ "mean \n",
+ "ndarray \n",
+ "test_cover_std \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "_prediction_test.with_decision_function_inputs->prediction_test.with_decision_function \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "_prediction_train.with_prediction_step_inputs \n",
+ " \n",
+ "data \n",
+ "Bunch \n",
+ "idx \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "_prediction_train.with_prediction_step_inputs->prediction_train.with_prediction_step \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "_prediction_test_raw_inputs \n",
+ " \n",
+ "train_cover_std \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "_prediction_test_raw_inputs->prediction_test_raw \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "_prediction_train_raw_inputs \n",
+ " \n",
+ "train_cover_std \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "_prediction_train_raw_inputs->prediction_train_raw \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "_prediction_train.with_decision_function_inputs \n",
+ " \n",
+ "coverages_land \n",
+ "ndarray \n",
+ "std \n",
+ "ndarray \n",
+ "mean \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "_prediction_train.with_decision_function_inputs->prediction_train.with_decision_function \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "input \n",
+ " \n",
+ "input \n",
+ " \n",
+ "\n",
+ "\n",
+ "function \n",
+ " \n",
+ "function \n",
+ " \n",
+ " \n",
+ " \n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "%%cell_to_module train_and_predict_using_mutate\n",
+ "import numpy as np\n",
+ "import numpy.typing as npt\n",
+ "from sklearn import svm\n",
+ "from sklearn.utils._bunch import Bunch\n",
+ "\n",
+ "from hamilton.function_modifiers import mutate, apply_to, source, value\n",
+ "\n",
+ "\n",
+ "def prediction_train(train_cover_std: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:\n",
+ " return train_cover_std\n",
+ "\n",
+ "\n",
+ "def prediction_test(train_cover_std: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:\n",
+ " return train_cover_std\n",
+ "\n",
+ "\n",
+ "@mutate(\n",
+ " prediction_train, \n",
+ " prediction_test, \n",
+ " nu=value(0.1), kernel=value(\"rbf\"), gamma=value(0.5)\n",
+ ")\n",
+ "def _OneClassSVM_model(\n",
+ " training_set: npt.NDArray[np.float64], nu: float, kernel: str, gamma: float\n",
+ ") -> svm.OneClassSVM:\n",
+ " clf = svm.OneClassSVM(nu=nu, kernel=kernel, gamma=gamma)\n",
+ " clf.fit(training_set)\n",
+ " return clf\n",
+ "\n",
+ "@mutate(\n",
+ " apply_to(prediction_train,underlying_data=source(\"coverages_land\"),mean=source(\"mean\"),std=source(\"std\")),\n",
+ " apply_to(prediction_test,underlying_data=source(\"test_cover_std\"),mean=source(\"mean\"),std=source(\"std\")),\n",
+ ")\n",
+ "def _decision_function(\n",
+ " model: svm.OneClassSVM,\n",
+ " underlying_data: npt.NDArray[np.float64],\n",
+ " mean: npt.NDArray[np.float64],\n",
+ " std: npt.NDArray[np.float64],\n",
+ ") -> npt.NDArray[np.float64]:\n",
+ " return model.decision_function((underlying_data - mean) / std)\n",
+ "\n",
+ "\n",
+ "@mutate(\n",
+ " prediction_train,\n",
+ " idx=source(\"idx\"), data=source(\"data\")\n",
+ ")\n",
+ "def _prediction_step(\n",
+ " decision: npt.NDArray[np.float64], idx: npt.NDArray[np.float64], data: Bunch\n",
+ ") -> npt.NDArray[np.float64]:\n",
+ " Z = decision.min() * np.ones((data.Ny, data.Nx), dtype=np.float64)\n",
+ " Z[idx[0], idx[1]] = decision\n",
+ " return Z\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "(,) is from module train_and_predict_using_mutate\n",
+ "________________________________________________________________________________\n",
+ "Modeling distribution of species 'bradypus_variegatus_0'\n",
+ "\n",
+ " Area under the ROC curve : 0.868443\n",
+ "________________________________________________________________________________\n",
+ "Modeling distribution of species 'microryzomys_minutus_0'\n",
+ "\n",
+ " Area under the ROC curve : 0.993919\n"
+ ]
+ },
+ {
+ "data": {
+ "image/png": "",
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "from hamilton import driver\n",
+ "import grids, load_data, postprocessing_results, preprocessing, train_and_predict, train_and_predict_using_mutate\n",
+ "\n",
+ "dr = (\n",
+ " driver.Builder()\n",
+ " .with_modules(\n",
+ " grids, \n",
+ " load_data, \n",
+ " postprocessing_results, \n",
+ " preprocessing, \n",
+ " train_and_predict, \n",
+ " train_and_predict_using_mutate\n",
+ " )\n",
+ " .allow_module_overrides()\n",
+ " .build()\n",
+ " )\n",
+ "\n",
+ "print(f\"{dr.list_available_variables()[-3].originating_functions} is from module {dr.list_available_variables()[-3].originating_functions[0].__module__}\")\n",
+ "# dr.visualize_execution(inputs={\"chosen_species\": \"aaa\"}, final_vars = [\"plot_species_distribution\"])\n",
+ "\n",
+ "import matplotlib.pyplot as plt\n",
+ "from run import plot_helper\n",
+ "\n",
+ "species=(\"bradypus_variegatus_0\", \"microryzomys_minutus_0\")\n",
+ "for i, name in enumerate(species):\n",
+ " print(\"_\" * 80)\n",
+ " print(\"Modeling distribution of species '%s'\" % name)\n",
+ " inputs = {\"chosen_species\": name}\n",
+ " final_vars = [\"plot_species_distribution\"]\n",
+ " results = dr.execute(inputs=inputs,final_vars=final_vars)[final_vars[0]]\n",
+ " plot_helper(i=i,**results)\n",
+ " \n",
+ "plt.show()\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ " \n",
+ "\n",
+ "cluster__legend \n",
+ " \n",
+ "Legend \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_grid_.with_construct_grids \n",
+ " \n",
+ "data_grid_.with_construct_grids \n",
+ "Tuple \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_grid_ \n",
+ " \n",
+ "data_grid_ \n",
+ "Tuple \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_grid_.with_construct_grids->data_grid_ \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test.with_decision_function \n",
+ " \n",
+ "prediction_test.with_decision_function \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test \n",
+ " \n",
+ "prediction_test \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test.with_decision_function->prediction_test \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train.with_decision_function \n",
+ " \n",
+ "prediction_train.with_decision_function \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train.with_prediction_step \n",
+ " \n",
+ "prediction_train.with_prediction_step \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train.with_decision_function->prediction_train.with_prediction_step \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train \n",
+ " \n",
+ "prediction_train \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train.with_prediction_step->prediction_train \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "std \n",
+ " \n",
+ "std \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "std->prediction_test.with_decision_function \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "std->prediction_train.with_decision_function \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "coverages_land \n",
+ " \n",
+ "coverages_land \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "coverages_land->prediction_train.with_decision_function \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "species \n",
+ " \n",
+ "species \n",
+ "Dict \n",
+ " \n",
+ "\n",
+ "\n",
+ "species->std \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "bunch \n",
+ " \n",
+ "bunch \n",
+ "Bunch \n",
+ " \n",
+ "\n",
+ "\n",
+ "species->bunch \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "train_cover_std \n",
+ " \n",
+ "train_cover_std \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "species->train_cover_std \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "test_cover_std \n",
+ " \n",
+ "test_cover_std \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "species->test_cover_std \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "mean \n",
+ " \n",
+ "mean \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "species->mean \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "background_points \n",
+ " \n",
+ "background_points \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_background \n",
+ " \n",
+ "prediction_background \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "background_points->prediction_background \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "plot_species_distribution \n",
+ " \n",
+ "plot_species_distribution \n",
+ "Dict \n",
+ " \n",
+ "\n",
+ "\n",
+ "bunch->plot_species_distribution \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "area_under_curve \n",
+ " \n",
+ "area_under_curve \n",
+ "float \n",
+ " \n",
+ "\n",
+ "\n",
+ "area_under_curve->plot_species_distribution \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "species.with_create_species_bunch \n",
+ " \n",
+ "species.with_create_species_bunch \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_grid_->species.with_create_species_bunch \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "meshgrid \n",
+ " \n",
+ "meshgrid \n",
+ "Tuple \n",
+ " \n",
+ "\n",
+ "\n",
+ "data_grid_->meshgrid \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "levels \n",
+ " \n",
+ "levels \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train->levels \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train->plot_species_distribution \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_background.with_normalize \n",
+ " \n",
+ "prediction_background.with_normalize \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train->prediction_background.with_normalize \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "species.with_standardize_features \n",
+ " \n",
+ "species.with_standardize_features \n",
+ "Tuple \n",
+ " \n",
+ "\n",
+ "\n",
+ "species.with_standardize_features->species \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test->area_under_curve \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_background->area_under_curve \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "land_reference \n",
+ " \n",
+ "land_reference \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "land_reference->plot_species_distribution \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "idx \n",
+ " \n",
+ "idx \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "land_reference->idx \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "land_reference->prediction_background.with_normalize \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "levels->plot_species_distribution \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test_raw \n",
+ " \n",
+ "prediction_test_raw \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "train_cover_std->prediction_test_raw \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train_raw \n",
+ " \n",
+ "prediction_train_raw \n",
+ "ndarray \n",
+ " \n",
+ "\n",
+ "\n",
+ "train_cover_std->prediction_train_raw \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train.with_OneClassSVM_model \n",
+ " \n",
+ "prediction_train.with_OneClassSVM_model \n",
+ "OneClassSVM \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train.with_OneClassSVM_model->prediction_train.with_decision_function \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "species.with_create_species_bunch->species.with_standardize_features \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test.with_OneClassSVM_model \n",
+ " \n",
+ "prediction_test.with_OneClassSVM_model \n",
+ "OneClassSVM \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test.with_OneClassSVM_model->prediction_test.with_decision_function \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "test_cover_std->prediction_test.with_decision_function \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "meshgrid->plot_species_distribution \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_test_raw->prediction_test.with_OneClassSVM_model \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_train_raw->prediction_train.with_OneClassSVM_model \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "idx->prediction_train.with_prediction_step \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "idx->coverages_land \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "prediction_background.with_normalize->prediction_background \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "mean->prediction_test.with_decision_function \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "mean->prediction_train.with_decision_function \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data \n",
+ " \n",
+ "data \n",
+ "Bunch \n",
+ " \n",
+ "\n",
+ "\n",
+ "data->data_grid_.with_construct_grids \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data->prediction_train.with_prediction_step \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data->coverages_land \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data->background_points \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data->land_reference \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "data->species.with_create_species_bunch \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "_species.with_create_species_bunch_inputs \n",
+ " \n",
+ "chosen_species \n",
+ "str \n",
+ " \n",
+ "\n",
+ "\n",
+ "_species.with_create_species_bunch_inputs->species.with_create_species_bunch \n",
+ " \n",
+ " \n",
+ " \n",
+ "\n",
+ "\n",
+ "input \n",
+ " \n",
+ "input \n",
+ " \n",
+ "\n",
+ "\n",
+ "function \n",
+ " \n",
+ "function \n",
+ " \n",
+ "\n",
+ "\n",
+ "output \n",
+ " \n",
+ "output \n",
+ " \n",
+ " \n",
+ " \n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 2,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "dr.visualize_execution(inputs=inputs,final_vars=final_vars)"
+ ]
+ },
{
"cell_type": "code",
"execution_count": null,
diff --git a/examples/scikit-learn/species_distribution_modeling/train_and_predict_using_mutate.py b/examples/scikit-learn/species_distribution_modeling/train_and_predict_using_mutate.py
new file mode 100644
index 000000000..8e823c025
--- /dev/null
+++ b/examples/scikit-learn/species_distribution_modeling/train_and_predict_using_mutate.py
@@ -0,0 +1,55 @@
+import numpy as np
+import numpy.typing as npt
+from sklearn import svm
+from sklearn.utils._bunch import Bunch
+
+from hamilton.function_modifiers import apply_to, mutate, source, value
+
+
+def prediction_train(train_cover_std: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
+ return train_cover_std
+
+
+def prediction_test(train_cover_std: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
+ return train_cover_std
+
+
+@mutate(prediction_train, prediction_test, nu=value(0.1), kernel=value("rbf"), gamma=value(0.5))
+def _OneClassSVM_model(
+ training_set: npt.NDArray[np.float64], nu: float, kernel: str, gamma: float
+) -> svm.OneClassSVM:
+ clf = svm.OneClassSVM(nu=nu, kernel=kernel, gamma=gamma)
+ clf.fit(training_set)
+ return clf
+
+
+@mutate(
+ apply_to(
+ prediction_train,
+ underlying_data=source("coverages_land"),
+ mean=source("mean"),
+ std=source("std"),
+ ),
+ apply_to(
+ prediction_test,
+ underlying_data=source("test_cover_std"),
+ mean=source("mean"),
+ std=source("std"),
+ ),
+)
+def _decision_function(
+ model: svm.OneClassSVM,
+ underlying_data: npt.NDArray[np.float64],
+ mean: npt.NDArray[np.float64],
+ std: npt.NDArray[np.float64],
+) -> npt.NDArray[np.float64]:
+ return model.decision_function((underlying_data - mean) / std)
+
+
+@mutate(prediction_train, idx=source("idx"), data=source("data"))
+def _prediction_step(
+ decision: npt.NDArray[np.float64], idx: npt.NDArray[np.float64], data: Bunch
+) -> npt.NDArray[np.float64]:
+ Z = decision.min() * np.ones((data.Ny, data.Nx), dtype=np.float64)
+ Z[idx[0], idx[1]] = decision
+ return Z
diff --git a/hamilton/function_modifiers/__init__.py b/hamilton/function_modifiers/__init__.py
index 333f103f4..52b7906eb 100644
--- a/hamilton/function_modifiers/__init__.py
+++ b/hamilton/function_modifiers/__init__.py
@@ -63,7 +63,9 @@
pipe = macros.pipe
pipe_input = macros.pipe_input
pipe_output = macros.pipe_output
+mutate = macros.mutate
step = macros.step
+apply_to = macros.apply_to
# resolve transform/model decorator
dynamic_transform = macros.dynamic_transform
diff --git a/hamilton/function_modifiers/base.py b/hamilton/function_modifiers/base.py
index 92a8b763a..131ee0383 100644
--- a/hamilton/function_modifiers/base.py
+++ b/hamilton/function_modifiers/base.py
@@ -221,26 +221,6 @@ def transform_dag(
pass
-# TODO -- delete this/replace with the version that will be added by
-# https://github.com/DAGWorks-Inc/hamilton/pull/249/ as part of the Node class
-def _reassign_inputs(node_: node.Node, input_names: Dict[str, Any]) -> node.Node:
- """Reassigns the input names of a node. Useful for applying
- a node to a separate input if needed. Note that things can get a
- little strange if you have multiple inputs with the same name, so
- be careful about how you use this.
- :param input_names: Input name map to reassign
- :return: A node with the input names reassigned
- """
-
- def new_callable(**kwargs) -> Any:
- reverse_input_names = {v: k for k, v in input_names.items()}
- return node_.callable(**{reverse_input_names.get(k, k): v for k, v in kwargs.items()})
-
- new_input_types = {input_names.get(k, k): v for k, v in node_.input_types.items()}
- out = node_.copy_with(callabl=new_callable, input_types=new_input_types)
- return out
-
-
class NodeInjector(SubDAGModifier, abc.ABC):
"""Injects a value as a source node in the DAG. This is a special case of the SubDAGModifier,
which gets all the upstream (required) nodes from the subdag and gives the decorator a chance
@@ -293,7 +273,7 @@ def transform_dag(
for node_ in nodes:
# if there's an intersection then we want to rename the input
if set(node_.input_types.keys()) & set(rename_map.keys()):
- out.append(_reassign_inputs(node_, rename_map))
+ out.append(node_.reassign_inputs(input_names=rename_map))
else:
out.append(node_)
out.extend(nodes_to_inject)
diff --git a/hamilton/function_modifiers/macros.py b/hamilton/function_modifiers/macros.py
index 00bf05689..b3ddd40d6 100644
--- a/hamilton/function_modifiers/macros.py
+++ b/hamilton/function_modifiers/macros.py
@@ -323,23 +323,38 @@ class Applicable:
def __init__(
self,
- fn: Callable,
+ fn: Union[Callable, str, None],
args: Tuple[Union[Any, SingleDependency], ...],
kwargs: Dict[str, Union[Any, SingleDependency]],
+ target_fn: Union[Callable, str, None] = None,
_resolvers: List[ConfigResolver] = None,
_name: Optional[str] = None,
_namespace: Union[str, None, EllipsisType] = ...,
+ _target: base.TargetType = None,
):
"""Instantiates an Applicable.
- :param fn: Function it takes in
+ We allow fn=None for the use-cases where we want to store the Applicable config (i.e. .when* family, namespace, target, etc.)
+ but do not yet the access to the actual function we are turning into the Applicable. In addition, in case the target nodes come
+ from a function (using extract_columns/extract_fields) we can pass target_fn to have access to its pointer that we can decorate
+ programmatically. See `apply_to` and `mutate` for an example.
+
:param args: Args (*args) to pass to the function
- :param kwargs: Kwargs (**kwargs) to pass to the function
+ :param fn: Function it takes in. Can be None to create an Applicable placeholder with delayed choice of function.
+ :param target_fn: Function the applicable will be applied to
:param _resolvers: Resolvers to use for the function
:param _name: Name of the node to be created
:param _namespace: Namespace of the node to be created -- currently only single-level namespaces are supported
+ :param _target: Selects which target nodes it will be appended onto. By default all.
+ :param kwargs: Kwargs (**kwargs) to pass to the function
"""
+
+ if isinstance(fn, str) or isinstance(target_fn, str):
+ raise TypeError("Strings are not supported currently. Please provide function pointer.")
+
self.fn = fn
+ self.target_fn = target_fn
+
if "_name" in kwargs:
raise ValueError("Cannot pass in _name as a kwarg")
@@ -349,6 +364,7 @@ def __init__(
self.resolvers = _resolvers if _resolvers is not None else []
self.name = _name
self.namespace = _namespace
+ self.target = _target
def _with_resolvers(self, *additional_resolvers: ConfigResolver) -> "Applicable":
"""Helper function for the .when* group"""
@@ -359,6 +375,7 @@ def _with_resolvers(self, *additional_resolvers: ConfigResolver) -> "Applicable"
_namespace=self.namespace,
args=self.args,
kwargs=self.kwargs,
+ target_fn=self.target_fn,
)
def when(self, **key_value_pairs) -> "Applicable":
@@ -411,6 +428,7 @@ def namespaced(self, namespace: NamespaceType) -> "Applicable":
_namespace=namespace,
args=self.args,
kwargs=self.kwargs,
+ target_fn=self.target_fn,
)
def resolves(self, config: Dict[str, Any]) -> bool:
@@ -448,6 +466,27 @@ def named(self, name: str, namespace: NamespaceType = ...) -> "Applicable":
),
args=self.args,
kwargs=self.kwargs,
+ target_fn=self.target_fn,
+ )
+
+ def on_output(self, target: base.TargetType) -> "Applicable":
+ """Add Target on a single function level.
+
+ This determines to which node(s) it will applies. Should match the same naming convention
+ as the NodeTransorfmLifecycle child class (for example NodeTransformer).
+
+ :param target: Which node(s) to apply on top of
+ :return: The Applicable with specified target
+ """
+ return Applicable(
+ fn=self.fn,
+ _resolvers=self.resolvers,
+ _name=self.name,
+ _namespace=self.namespace,
+ _target=target if target is not None else self.target,
+ args=self.args,
+ kwargs=self.kwargs,
+ target_fn=self.target_fn,
)
def get_config_elements(self) -> List[str]:
@@ -585,17 +624,7 @@ def step(
class pipe_input(base.NodeInjector):
- """Decorator to represent a chained set of transformations. This specifically solves the "node redefinition"
- problem, and is meant to represent a pipeline of chaining/redefinitions. This is similar (and can happily be
- used in conjunction with) `pipe` in pandas. In Pyspark this is akin to the common operation of redefining a dataframe
- with new columns. While it is generally reasonable to contain these constructs within a node's function,
- you should consider `pipe` for any of the following reasons:
-
- 1. You want the transformations to display as nodes in the DAG, with the possibility of storing or visualizing
- the result
- 2. You want to pull in functions from an external repository, and build the DAG a little more procedurally
- 3. You want to use the same function multiple times, but with different parameters -- while `@does`/`@parameterize` can
- do this, this presents an easier way to do this, especially in a chain.
+ """Running a series of transformation on the input of the function.
To demonstrate the rules for chaining nodes, we'll be using the following example. This is
using primitives to demonstrate, but as hamilton is just functions of any python objects, this works perfectly with
@@ -662,9 +691,9 @@ def final_result(upstream_int: int) -> int:
Note that functions must have no position-only arguments (this is rare in python, but hamilton does not handle these).
- This basically means that the functions must be defined similarly to `def fn(x, y, z=10)` and not `def fn(x, y, /, z=10)`.
- In fact, all arguments must be named and "kwarg-friendly", meaning that the function can happily be called with `**kwargs`,
- where kwargs are some set of resolved upstream values. So, no `*args` are allowed, and `**kwargs` (variable keyword-only) are not
+ This basically means that the functions must be defined similarly to ``def fn(x, y, z=10)`` and not ``def fn(x, y, /, z=10)``.
+ In fact, all arguments must be named and "kwarg-friendly", meaning that the function can happily be called with ``**kwargs``,
+ where kwargs are some set of resolved upstream values. So, no ``*args`` are allowed, and ``**kwargs`` (variable keyword-only) are not
permitted. Note that this is not a design limitation, rather an implementation detail -- if you feel like you need this, please
reach out.
@@ -672,9 +701,9 @@ def final_result(upstream_int: int) -> int:
One has two ways to tune the shape/implementation of the subsequent nodes:
- 1. `when`/`when_not`/`when_in`/`when_not_in` -- these are used to filter the application of the function. This is valuable to reflect
+ 1. ``when``/``when_not``/``when_in``/``when_not_in`` -- these are used to filter the application of the function. This is valuable to reflect
if/else conditions in the structure of the DAG, pulling it out of functions, rather than buried within the logic itself. It is functionally
- equivalent to `@config.when`.
+ equivalent to ``@config.when``.
For instance, if you want to include a function in the chain only when a config parameter is set to a certain value, you can do:
@@ -687,14 +716,14 @@ def final_result(upstream_int: int) -> int:
def final_result(upstream_int: int) -> int:
return upstream_int
- This will only apply the first function when the config parameter `foo` is set to `bar`, and the second when it is set to `baz`.
+ This will only apply the first function when the config parameter ``foo`` is set to ``bar``, and the second when it is set to ``baz``.
- 2. `named` -- this is used to name the node. This is useful if you want to refer to intermediate results. If this is left out,
+ 2. ``named`` -- this is used to name the node. This is useful if you want to refer to intermediate results. If this is left out,
hamilton will automatically name the functions in a globally unique manner. The names of
- these functions will not necessarily be stable/guaranteed by the API, so if you want to refer to them, you should use `named`.
+ these functions will not necessarily be stable/guaranteed by the API, so if you want to refer to them, you should use ``named``.
The default namespace will always be the name of the decorated function (which will be the last node in the chain).
- `named` takes in two parameters -- required is the `name` -- this will assign the nodes with a single name and *no* global namespace.
+ ``named`` takes in two parameters -- required is the ``name`` -- this will assign the nodes with a single name and *no* global namespace.
For instance:
.. code-block:: python
@@ -706,15 +735,15 @@ def final_result(upstream_int: int) -> int:
def final_result(upstream_int: int) -> int:
return upstream_int
- The above will create two nodes, `a` and `b`. `a` will be the result of `_add_one`, and `b` will be the result of `_add_two`.
- `final_result` will then be called with the output of `b`. Note that, if these are part of a namespaced operation (a subdag, in particular),
+ The above will create two nodes, ``a`` and ``b``. ``a`` will be the result of ``_add_one``, and ``b`` will be the result of ``_add_two``.
+ ``final_result`` will then be called with the output of ``b``. Note that, if these are part of a namespaced operation (a subdag, in particular),
they *will* get the same namespace as the subdag.
- The second parameter is `namespace`. This is used to specify a namespace for the node. This is useful if you want
+ The second parameter is ``namespace``. This is used to specify a namespace for the node. This is useful if you want
to either (a) ensure that the nodes are namespaced but share a common one to avoid name clashes (usual case), or (b)
if you want a custom namespace (unusual case). To indicate a custom namespace, one need simply pass in a string.
- To indicate that a node should share a namespace with the rest of the step(...) operations in a pipe, one can pass in `...` (the ellipsis).
+ To indicate that a node should share a namespace with the rest of the step(...) operations in a pipe, one can pass in ``...`` (the ellipsis).
.. code-block:: python
:name: Namespaced step
@@ -727,7 +756,7 @@ def final_result(upstream_int: int) -> int:
def final_result(upstream_int: int) -> int:
return upstream_int
- Note that if you pass a namespace argument to the `pipe` function, it will set the namespace on each step operation.
+ Note that if you pass a namespace argument to the ``pipe`` function, it will set the namespace on each step operation.
This is useful if you want to ensure that all the nodes in a pipe have a common namespace, but you want to rename them.
.. code-block:: python
@@ -744,7 +773,7 @@ def final_result(upstream_int: int) -> int:
return upstream_int
In all likelihood, you should not be using this, and this is only here in case you want to expose a node for
- consumption/output later. Setting the namespace in individual nodes as well as in `pipe` is not yet supported.
+ consumption/output later. Setting the namespace in individual nodes as well as in ``pipe`` is not yet supported.
"""
def __init__(
@@ -754,12 +783,12 @@ def __init__(
collapse=False,
_chain=False,
):
- """Instantiates a `@pipe` decorator.
+ """Instantiates a ``@pipe_input`` decorator.
:param transforms: step transformations to be applied, in order
- :param namespace: namespace to apply to all nodes in the pipe. This can be "..." (the default), which resolves to the name of the decorated function, None (which means no namespace), or a string (which means that all nodes will be namespaced with that string). Note that you can either use this *or* namespaces inside pipe()...
+ :param namespace: namespace to apply to all nodes in the pipe. This can be "..." (the default), which resolves to the name of the decorated function, None (which means no namespace), or a string (which means that all nodes will be namespaced with that string). Note that you can either use this *or* namespaces inside ``pipe_input()``...
:param collapse: Whether to collapse this into a single node. This is not currently supported.
- :param _chain: Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. @flow will make use of this.
+ :param _chain: Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. ``@flow`` will make use of this.
"""
self.transforms = transforms
self.collapse = collapse
@@ -883,13 +912,29 @@ def __init__(
# super(flow, self).__init__(*transforms, collapse=collapse, _chain=False)
-class pipe_output(base.SingleNodeNodeTransformer):
+class SingleTargetError(Exception):
+ """We prohibit the target to be raise both globally and locally.
+
+ Decorators that transform the output of a node can be set to transform only
+ a certain output node (useful with extract_columns / extract_fields). Some decorators
+ can group multiple transforms and we can set that certain output node either for all of them
+ or for each individually.
+
+ This is a safeguard, because when you set the global target it creates a subset of those nodes and
+ if the local target is outside of that subset it gets ignore (opposed to the logical assumption that
+ it can override the global one). So we disable that case.
+ """
+
+ pass
+
+
+class pipe_output(base.NodeTransformer):
"""Running a series of transformation on the output of the function.
The decorated function declares the dependency, the body of the function gets executed, and then
- we run a series of transformations on the result of the function specified by `pipe_output`.
+ we run a series of transformations on the result of the function specified by ``pipe_output``.
- If we have nodes **A --> B --> C** in the DAG and decorate `B` with `pipe_output` like
+ If we have nodes **A --> B --> C** in the DAG and decorate **B** with ``pipe_output`` like
.. code-block:: python
:name: Simple @pipe_output example
@@ -901,36 +946,91 @@ class pipe_output(base.SingleNodeNodeTransformer):
def B(...):
return ...
- we obtain the new DAG **A --> B_raw --> B1 --> B2 --> B --> C**, where we can think of the **B_raw --> B1 --> B2 --> B** as a "pipe" that takes the raw output of B, applies to it
- B1, takes the output of B1 applies to it B2 and then gets renamed to B to re-connect to the rest of the DAG.
+ we obtain the new DAG **A --> B_raw --> B1 --> B2 --> B --> C**, where we can think of the **B_raw --> B1 --> B2 --> B** as a "pipe" that takes the raw output of **B**, applies to it
+ **B1**, takes the output of **B1** applies to it **B2** and then gets renamed to **B** to re-connect to the rest of the DAG.
+
+ The rules for chaining nodes are the same as for ``pipe_input``.
+
+ For extra control in case of multiple output nodes, for example after ``extract_field``/ ``extract_columns`` we can also specify the output node that we wish to mutate.
+ The following apply *A* to all fields while *B* only to ``field_1``
+
+ .. code-block:: python
+ :name: Simple @pipe_output example targeting specific nodes
+
+ @extract_columns("col_1", "col_2")
+ def A(...):
+ return ...
+
+ def B(...):
+ return ...
+
- While it is generally reasonable to contain these constructs within a node's function,
- you should consider `pipe_output` for similar reasons as `pipe_input`, namely, for any of the following reasons:
+ @pipe_output(
+ step(A),
+ step(B).on_output("field_1"),
+ )
+ @extract_fields(
+ {"field_1":int, "field_2":int, "field_3":int}
+ )
+ def foo(a:int)->Dict[str,int]:
+ return {"field_1":1, "field_2":2, "field_3":3}
- 1. You want the transformations to display as nodes in the DAG, with the possibility of storing or visualizing
- the result
- 2. You want to pull in functions from an external repository, and build the DAG a little more procedurally
- 3. You want to use the same function multiple times, but with different parameters -- while `@does`/`@parameterize` can
- do this, this presents an easier way to do this, especially in a chain.
+ We can also do this on the global level (but cannot do on both levels at the same time). The following would apply function *A* and function *B* to only ``field_1`` and ``field_2``
- The rules for chaining nodes as the same as for pipe.
+ .. code-block:: python
+ :name: Simple @pipe_output targeting specific nodes local
+
+ @pipe_output(
+ step(A),
+ step(B),
+ on_output = ["field_1","field_2]
+ )
+ @extract_fields(
+ {"field_1":int, "field_2":int, "field_3":int}
+ )
+ def foo(a:int)->Dict[str,int]:
+ return {"field_1":1, "field_2":2, "field_3":3}
"""
+ @classmethod
+ def _validate_single_target_level(cls, target: base.TargetType, transforms: Tuple[Applicable]):
+ """We want to make sure that target gets applied on a single level.
+ Either choose for each step individually what it targets or set it on the global level where
+ all steps will target the same node(s).
+ """
+ if target is not None:
+ for transform in transforms:
+ if transform.target is not None:
+ raise SingleTargetError("Cannot have target set on pipe_output and step level.")
+
def __init__(
self,
*transforms: Applicable,
namespace: NamespaceType = ...,
+ on_output: base.TargetType = None,
collapse=False,
_chain=False,
):
- """Instantiates a `@pipe_output` decorator.
+ """Instantiates a ``@pipe_output`` decorator.
+
+ Warning: if there is a global pipe_output target, the individual ``step(...).target`` would only choose
+ from the subset pre-selected from the global pipe_output target. We have disabled this for now to avoid
+ confusion. Leave global pipe_output target empty if you want to choose between all the nodes on the individual step level.
:param transforms: step transformations to be applied, in order
- :param namespace: namespace to apply to all nodes in the pipe. This can be "..." (the default), which resolves to the name of the decorated function, None (which means no namespace), or a string (which means that all nodes will be namespaced with that string). Note that you can either use this *or* namespaces inside pipe()...
+ :param namespace: namespace to apply to all nodes in the pipe. This can be "..." (the default), which resolves to the name of the decorated function, None (which means no namespace), or a string (which means that all nodes will be namespaced with that string). Note that you can either use this *or* namespaces inside ``pipe_output()``...
+ :param on_output: setting the target node for all steps in the pipe. Leave empty to select all the output nodes.
:param collapse: Whether to collapse this into a single node. This is not currently supported.
- :param _chain: Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. @flow will make use of this.
+ :param _chain: Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. ``@flow`` will make use of this.
"""
- super(pipe_output, self).__init__()
+ pipe_output._validate_single_target_level(target=on_output, transforms=transforms)
+
+ if on_output == ...:
+ raise ValueError(
+ "Cannot apply Elipsis(...) to on_output. Use None, single string or list of strings."
+ )
+
+ super(pipe_output, self).__init__(target=on_output)
self.transforms = transforms
self.collapse = collapse
self.chain = _chain
@@ -944,6 +1044,27 @@ def __init__(
if self.chain:
raise NotImplementedError("@flow() is not yet supported -- this is ")
+ def _filter_individual_target(self, node_):
+ """Resolves target option on the transform level.
+ Adds option that we can decide for each applicable which output node it will target.
+
+ :param node_: The current output node.
+ :return: The set of transforms that target this node
+ """
+ selected_transforms = []
+ for transform in self.transforms:
+ target = transform.target
+ if isinstance(target, str): # user selects single target via string
+ if node_.name == target:
+ selected_transforms.append(transform)
+ elif isinstance(target, Collection): # user inputs a list of targets
+ if node_.name in target:
+ selected_transforms.append(transform)
+ else: # for target=None (default) we include all sink nodes
+ selected_transforms.append(transform)
+
+ return tuple(selected_transforms)
+
def transform_node(
self, node_: node.Node, config: Dict[str, Any], fn: Callable
) -> Collection[node.Node]:
@@ -954,22 +1075,28 @@ def transform_node(
The last node is an identity to the previous one with the original name `function_name` to
represent an exit point of `pipe_output`.
"""
-
- if len(self.transforms) < 1:
+ transforms = self._filter_individual_target(node_)
+ if len(transforms) < 1:
# in case no functions in pipeline we short-circuit and return the original node
return [node_]
+ if self.namespace is None:
+ _namespace = None
+ elif self.namespace is ...:
+ _namespace = node_.name
+ else:
+ _namespace = self.namespace
+
original_node = node_.copy_with(name=f"{node_.name}_raw")
def __identity(foo: Any) -> Any:
return foo
- transforms = self.transforms + (step(__identity).named(fn.__name__),)
-
+ transforms = transforms + (step(__identity).named(fn.__name__),)
nodes, _ = chain_transforms(
first_arg=original_node.name,
transforms=transforms,
- namespace=self.namespace,
+ namespace=_namespace, # self.namespace,
config=config,
fn=fn,
)
@@ -1055,3 +1182,217 @@ def chain_transforms(
)
first_arg = raw_node.name
return nodes, first_arg
+
+
+def apply_to(fn_: Union[Callable, str], **mutating_fn_kwargs: Union[SingleDependency, Any]):
+ """Creates an applicable placeholder with potential kwargs that will be applied to a node (or a subcomponent of a node).
+ See documentation for ``mutate`` to see how this is used. It de facto allows a postponed ``step``.
+
+ We pass fn=None here as this will be the function we are decorating and need to delay passing it in. The target
+ function is the one we wish to mutate and we store it for later access.
+
+ :param fn: Function the applicable will be applied to
+ :param mutating_fn_kwargs: Kwargs (**kwargs) to pass to the mutator function. Must be validly called as f(**kwargs), and have a 1:1 mapping of kwargs to parameters.
+ :return: an applicable placeholder with the target function
+ """
+ return Applicable(fn=None, args=(), kwargs=mutating_fn_kwargs, target_fn=fn_, _resolvers=[])
+
+
+class NotSameModuleError(Exception):
+ """Limit the use of a decorator on functions from the same module.
+
+ Some decorators have the ability to transform also other functions than the one they are decorating (for example mutate).
+ This ensures that all the functions are located within the same module.
+ """
+
+ def __init__(self, fn: Callable, target_fn: Callable):
+ super().__init__(
+ f"The functions have to be in the same module... "
+ f"The target function {target_fn.__name__} is in module {target_fn.__module__} and "
+ f"the mutator function {fn.__name__} is in module {fn.__module__}./n"
+ "Use power user setting to disable this restriction."
+ )
+
+
+class mutate:
+ """Running a transformation on the outputs of a series of functions.
+
+ This is closely related to ``pipe_output`` as it effectively allows you to run transformations on the output of a node without touching that node.
+ We choose which target functions we wish to mutate by the transformation we are decorating. For now, the target functions, that will be mutated,
+ have to be in the same module (come speak to us if you need this capability over multiple modules).
+
+ We suggest you define them with an prefixed underscore to only have them displayed in the `transform pipeline` of the target node.
+
+ If we wish to apply ``_transform1`` to the output of **A** and **B** and ``_transform2`` only to the output
+ of node **B**, we can do this like
+
+ .. code-block:: python
+ :name: Simple @mutate example
+
+ def A(...):
+ return ...
+
+ def B(...):
+ return ...
+
+ @mutate(A, B)
+ def _transform1(...):
+ return ...
+
+ @mutate(B)
+ def _transform2(...):
+ return ...
+
+ we obtain the new pipe-like subDAGs **A_raw --> _transform1 --> A** and **B_raw --> _transform1 --> _transform2 --> B**,
+ where the behavior is the same as ``pipe_output``.
+
+ While it is generally reasonable to use ``pipe_output``, you should consider ``mutate`` in the following scenarios:
+
+ 1. Loading data and applying pre-cleaning step.
+ 2. Feature engineering via joining, filtering, sorting, etc.
+ 3. Experimenting with different transformations across nodes by selectively turning transformations on / off.
+
+ We assume the first argument of the decorated function to be the output of the function we are targeting.
+ For transformations with multiple arguments you can use key word arguments coupled with ``step`` or ``value``
+ the same as with other ``pipe``-family decorators
+
+ .. code-block:: python
+ :name: Simple @mutate example with multiple arguments
+
+ @mutate(A, B, arg2=step('upstream_node'), arg3=value(some_literal), ...)
+ def _transform1(output_from_target:correct_type, arg2:arg2_type, arg3:arg3_type,...):
+ return ...
+
+ You can also select individual args that will be applied to each target node by adding ``apply_to(...)``
+
+ .. code-block:: python
+ :name: Simple @mutate example with multiple arguments allowing individual actions
+
+ @mutate(
+ apply_to(A, arg2=step('upstream_node_1'), arg3=value(some_literal_1)),
+ apply_to(B, arg2=step('upstream_node_2'), arg3=value(some_literal_2)),
+ )
+ def _transform1(output_from_target:correct_type, arg2:arg2_type, arg3:arg3_type, ...):
+ return ...
+
+ In case of multiple output nodes, for example after ``extract_field`` / ``extract_columns`` we can also specify the output node that we wish to mutate.
+ The following would mutate all columns of *A* individually while in the case of function *B* only ``field_1``
+
+ .. code-block:: python
+ :name: @mutate example targeting specific nodes local
+
+ @extract_columns("col_1", "col_2")
+ def A(...):
+ return ...
+
+ @extract_fields(
+ {"field_1":int, "field_2":int, "field_3":int}
+ )
+ def B(...):
+ return ...
+
+ @mutate(
+ apply_to(A),
+ apply_to(B).on_output("field_1"),
+ )
+
+ def foo(a:int)->Dict[str,int]:
+ return {"field_1":1, "field_2":2, "field_3":3}
+ """
+
+ def __init__(
+ self,
+ *target_functions: Union[Applicable, Callable],
+ collapse: bool = False,
+ _chain: bool = False,
+ **mutating_function_kwargs: Union[SingleDependency, Any],
+ ):
+ """Instantiates a ``mutate`` decorator.
+
+ We assume the first argument of the decorated function to be the output of the function we are targeting.
+
+ :param target_functions: functions we wish to mutate the output of
+ :param collapse: Whether to collapse this into a single node. This is not currently supported.
+ :param _chain: Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. ``@flow`` will make use of this.
+ :param \*\*mutating_function_kwargs: other kwargs that the decorated function has. Must be validly called as ``f(**kwargs)``, and have a 1-to-1 mapping of kwargs to parameters. This will be applied for all ``target_functions``, unless ``apply_to`` already has the mutator function kwargs, in which case it takes those.
+ """
+ self.collapse = collapse
+ self.chain = _chain
+ # keeping it here once it gets implemented maybe nice to have options
+ if self.collapse:
+ raise NotImplementedError(
+ "Collapsing functions as one node is not yet implemented for mutate(). Please reach out if you want this feature."
+ )
+ if self.chain:
+ raise NotImplementedError("@flow() is not yet supported -- this is ")
+
+ self.remote_applicables = tuple(
+ [apply_to(fn) if isinstance(fn, Callable) else fn for fn in target_functions]
+ )
+ self.mutating_function_kwargs = mutating_function_kwargs
+
+ # Cross module will require some thought so we are restricting mutate to single module for now
+ self.restrict_to_single_module = True
+
+ def validate_same_module(self, mutating_fn: Callable):
+ """Validates target functions are in the same module as the mutator function.
+
+ :param mutating_fn: Function to validate against
+ :return: Nothing, raises exception if not valid.
+ """
+ local_module = mutating_fn.__module__
+ for remote_applicable in self.remote_applicables:
+ if remote_applicable.target_fn.__module__ != local_module:
+ raise NotSameModuleError(fn=mutating_fn, target_fn=remote_applicable.target_fn)
+
+ def _create_step(self, mutating_fn: Callable, remote_applicable_builder: Applicable):
+ """Adds the correct function for the applicable and resolves kwargs"""
+
+ if not remote_applicable_builder.kwargs:
+ remote_applicable_builder.kwargs = self.mutating_function_kwargs
+
+ remote_applicable_builder.fn = mutating_fn
+
+ return remote_applicable_builder
+
+ def __call__(self, mutating_fn: Callable):
+ """Adds to an existing pipe_output or creates a new pipe_output.
+
+ This is a new type of decorator that builds ``pipe_output`` for multiple nodes in the DAG. It does
+ not fit in the current decorator framework since it does not decorate the node function in the DAG
+ but allows us to "remotely decorate" multiple nodes at once, which needs to happen before the
+ NodeTransformLifecycle gets applied / resolved.
+
+ :param mutating_fn: function that will be used in pipe_output to transform target function
+ :return: mutating_fn, to guarantee function works even when Hamilton driver is not used
+ """
+
+ # TODO: We want to hide such helper function from the DAG by default, since we are manually
+ # adding them to the DAG in a different place
+ # Suggestion: ignore decorator - https://github.com/DAGWorks-Inc/hamilton/issues/1168
+ # if not mutating_fn.__name__.startswith("_"):
+ # mutating_fn.__name__ = "".join(("_", mutating_fn.__name__))
+
+ if self.restrict_to_single_module:
+ self.validate_same_module(mutating_fn=mutating_fn)
+
+ # TODO: If @mutate runs once it's good
+ # If you run that again, it might double-up
+ # In the juptyer notebook/cross-module case we'll want to guard against it.
+ for remote_applicable in self.remote_applicables:
+ new_pipe_step = self._create_step(
+ mutating_fn=mutating_fn, remote_applicable_builder=remote_applicable
+ )
+ found_pipe_output = False
+ if hasattr(remote_applicable.target_fn, base.NodeTransformer.get_lifecycle_name()):
+ for decorator in remote_applicable.target_fn.transform:
+ if isinstance(decorator, pipe_output):
+ decorator.transforms = decorator.transforms + (new_pipe_step,)
+ found_pipe_output = True
+
+ if not found_pipe_output:
+ remote_applicable.target_fn = pipe_output(
+ new_pipe_step, collapse=self.collapse, _chain=self.chain
+ )(remote_applicable.target_fn)
+
+ return mutating_fn
diff --git a/tests/function_modifiers/test_macros.py b/tests/function_modifiers/test_macros.py
index 6695fd24b..bc51bc44a 100644
--- a/tests/function_modifiers/test_macros.py
+++ b/tests/function_modifiers/test_macros.py
@@ -10,13 +10,16 @@
from hamilton.function_modifiers.dependencies import source, value
from hamilton.function_modifiers.macros import (
Applicable,
+ apply_to,
ensure_function_empty,
+ mutate,
pipe_input,
pipe_output,
step,
)
from hamilton.node import DependencyType
+import tests.resources.mutate
import tests.resources.pipe_input
import tests.resources.pipe_output
@@ -237,10 +240,6 @@ def _test_apply_function(foo: int, bar: int, baz: int = 100) -> int:
return foo + bar + baz
-def _test_apply_function_2(foo: int) -> int:
- return foo + 1
-
-
@pytest.mark.parametrize(
"args,kwargs,chain_first_param",
[
@@ -463,6 +462,16 @@ def result_from_downstream_function() -> int:
return 2
+def test_pipe_output_single_target_level_error():
+ with pytest.raises(hamilton.function_modifiers.macros.SingleTargetError):
+ pipe_output(
+ step(_test_apply_function, source("bar_upstream"), baz=value(100)).on_output(
+ "some_node"
+ ),
+ on_output="some_other_node",
+ )
+
+
def test_pipe_output_shortcircuit():
n = node.Node.from_fn(result_from_downstream_function)
decorator = pipe_output()
@@ -531,6 +540,97 @@ def test_pipe_output_inherits_null_namespace():
assert "result_from_downstream_function" in {item.name for item in nodes}
+def test_pipe_output_global_on_output_all():
+ n1 = node.Node.from_fn(result_from_downstream_function, name="node_1")
+ n2 = node.Node.from_fn(result_from_downstream_function, name="node_2")
+
+ decorator = pipe_output(
+ step(_test_apply_function, source("bar_upstream"), baz=value(100)),
+ )
+ nodes = decorator.select_nodes(decorator.target, [n1, n2])
+ assert len(nodes) == 2
+ assert [node_.name for node_ in nodes] == ["node_1", "node_2"]
+
+
+def test_pipe_output_global_on_output_string():
+ n1 = node.Node.from_fn(result_from_downstream_function, name="node_1")
+ n2 = node.Node.from_fn(result_from_downstream_function, name="node_2")
+
+ decorator = pipe_output(
+ step(_test_apply_function, source("bar_upstream"), baz=value(100)), on_output="node_2"
+ )
+ nodes = decorator.select_nodes(decorator.target, [n1, n2])
+ assert len(nodes) == 1
+ assert nodes[0].name == "node_2"
+
+
+def test_pipe_output_global_on_output_list_strings():
+ n1 = node.Node.from_fn(result_from_downstream_function, name="node_1")
+ n2 = node.Node.from_fn(result_from_downstream_function, name="node_2")
+ n3 = node.Node.from_fn(result_from_downstream_function, name="node_3")
+
+ decorator = pipe_output(
+ step(_test_apply_function, source("bar_upstream"), baz=value(100)),
+ on_output=["node_1", "node_2"],
+ )
+ nodes = decorator.select_nodes(decorator.target, [n1, n2, n3])
+ assert len(nodes) == 2
+ assert [node_.name for node_ in nodes] == ["node_1", "node_2"]
+
+
+def test_pipe_output_elipsis_error():
+ with pytest.raises(ValueError):
+ pipe_output(
+ step(_test_apply_function, source("bar_upstream"), baz=value(100)), on_output=...
+ )
+
+
+def test_pipe_output_local_on_output_string():
+ n1 = node.Node.from_fn(result_from_downstream_function, name="node_1")
+ n2 = node.Node.from_fn(result_from_downstream_function, name="node_2")
+
+ decorator = pipe_output(
+ step(_test_apply_function, source("bar_upstream"), baz=value(100))
+ .named("correct_transform")
+ .on_output("node_2"),
+ step(_test_apply_function, source("bar_upstream"), baz=value(100))
+ .named("wrong_transform")
+ .on_output("node_3"),
+ )
+ steps = decorator._filter_individual_target(n1)
+ assert len(steps) == 0
+ steps = decorator._filter_individual_target(n2)
+ assert len(steps) == 1
+ assert steps[0].name == "correct_transform"
+
+
+def test_pipe_output_local_on_output_list_string():
+ n1 = node.Node.from_fn(result_from_downstream_function, name="node_1")
+ n2 = node.Node.from_fn(result_from_downstream_function, name="node_2")
+ n3 = node.Node.from_fn(result_from_downstream_function, name="node_3")
+
+ decorator = pipe_output(
+ step(_test_apply_function, source("bar_upstream"), baz=value(100))
+ .named("correct_transform_list")
+ .on_output(["node_2", "node_3"]),
+ step(_test_apply_function, source("bar_upstream"), baz=value(100))
+ .named("correct_transform_string")
+ .on_output("node_2"),
+ step(_test_apply_function, source("bar_upstream"), baz=value(100))
+ .named("wrong_transform")
+ .on_output("node_5"),
+ )
+ steps = decorator._filter_individual_target(n1)
+ assert len(steps) == 0
+ steps = decorator._filter_individual_target(n2)
+ assert len(steps) == 2
+ assert steps[0].name == "correct_transform_list"
+ assert steps[1].name == "correct_transform_string"
+ steps = decorator._filter_individual_target(n3)
+ assert len(steps) == 1
+ assert steps[0].name == "correct_transform_list"
+
+
def test_pipe_output_end_to_end_simple():
dr = driver.Builder().with_config({"calc_c": True}).build()
@@ -552,7 +652,7 @@ def test_pipe_output_end_to_end_simple():
assert result["downstream_f"] == result["chain_not_using_pipe_output"]
-def test_pipe_output_end_to_end_1():
+def test_pipe_output_end_to_end():
dr = (
driver.Builder()
.with_modules(tests.resources.pipe_output)
@@ -579,14 +679,99 @@ def test_pipe_output_end_to_end_1():
assert result["chain_2_using_pipe_output"] == result["chain_2_not_using_pipe_output"]
-def test_pipe_output_end_to_end_2():
+# Mutate will mark the modules (and leave a mark).
+# Thus calling it a second time (for instance through pmultiple tests) might mess it up slightly...
+# Using fixtures just to be sure.
+
+
+@pytest.fixture(scope="function")
+def _downstream_result_to_mutate():
+ def downstream_result_to_mutate() -> int:
+ return 2
+
+ yield downstream_result_to_mutate
+
+
+@pytest.fixture(scope="function")
+def import_mutate_module():
+ import importlib
+
+ mod = importlib.import_module("tests.resources.mutate")
+ yield mod
+
+
+# This doesn't change so no need to have it as fixture
+def mutator_function(input_1: int, input_2: int) -> int:
+ return input_1 + input_2
+
+
+def test_mutate_convert_callable_to_applicable(_downstream_result_to_mutate):
+ decorator = mutate(_downstream_result_to_mutate)
+
+ assert len(decorator.remote_applicables) == 1
+ remote_applicable = decorator.remote_applicables[0]
+ assert isinstance(remote_applicable, Applicable)
+ assert remote_applicable.fn is None
+ assert remote_applicable.target_fn == _downstream_result_to_mutate
+
+
+def test_mutate_restricted_to_same_module():
+ decorator = mutate(tests.resources.mutate.f_of_interest)
+
+ with pytest.raises(hamilton.function_modifiers.macros.NotSameModuleError):
+ decorator.validate_same_module(mutator_function)
+
+
+def test_mutate_global_kwargs(_downstream_result_to_mutate):
+ decorator = mutate(apply_to(_downstream_result_to_mutate), input_2=17)
+ remote_applicable = decorator.remote_applicables[0]
+
+ pipe_step = decorator._create_step(
+ mutating_fn=mutator_function, remote_applicable_builder=remote_applicable
+ )
+ assert pipe_step.kwargs["input_2"] == 17
+
+
+def test_mutate_local_kwargs_override_global_ones(_downstream_result_to_mutate):
+ decorator = mutate(apply_to(_downstream_result_to_mutate, input_2=13), input_2=17)
+ remote_applicable = decorator.remote_applicables[0]
+
+ pipe_step = decorator._create_step(
+ mutating_fn=mutator_function, remote_applicable_builder=remote_applicable
+ )
+ assert pipe_step.kwargs["input_2"] == 13
+
+
+def test_mutate_end_to_end_simple(import_mutate_module):
+ dr = driver.Builder().with_config({"calc_c": True}).build()
+
dr = (
driver.Builder()
- .with_modules(tests.resources.pipe_output)
+ .with_modules(import_mutate_module)
.with_adapter(base.DefaultAdapter())
.build()
)
+ inputs = {}
+ result = dr.execute(
+ [
+ "downstream_f",
+ "chain_not_using_mutate",
+ ],
+ inputs=inputs,
+ )
+ assert result["downstream_f"] == result["chain_not_using_mutate"]
+
+
+def test_mutate_end_to_end_1(import_mutate_module):
+ dr = (
+ driver.Builder()
+ .with_modules(import_mutate_module)
+ .with_adapter(base.DefaultAdapter())
+ .with_config({"calc_c": True})
+ .build()
+ )
+
inputs = {
"input_1": 10,
"input_2": 20,
@@ -594,12 +779,12 @@ def test_pipe_output_end_to_end_2():
}
result = dr.execute(
[
- "chain_1_using_pipe_output",
- "chain_2_using_pipe_output",
- "chain_1_not_using_pipe_output",
- "chain_2_not_using_pipe_output",
+ "chain_1_using_mutate",
+ "chain_2_using_mutate",
+ "chain_1_not_using_mutate",
+ "chain_2_not_using_mutate",
],
inputs=inputs,
)
- assert result["chain_1_using_pipe_output"] == result["chain_1_not_using_pipe_output"]
- assert result["chain_2_using_pipe_output"] == result["chain_2_not_using_pipe_output"]
+ assert result["chain_1_using_mutate"] == result["chain_1_not_using_mutate"]
+ assert result["chain_2_using_mutate"] == result["chain_2_not_using_mutate"]
diff --git a/tests/resources/mutate.py b/tests/resources/mutate.py
new file mode 100644
index 000000000..0aa212a96
--- /dev/null
+++ b/tests/resources/mutate.py
@@ -0,0 +1,104 @@
+from hamilton.function_modifiers import apply_to, mutate, source, value
+
+
+def upstream() -> str:
+ return "-upstream-"
+
+
+def user_input() -> str:
+ return "-user input-"
+
+
+def f_of_interest(user_input: str) -> str:
+ return user_input + "-raw function-"
+
+
+def downstream_f(f_of_interest: str) -> str:
+ return f_of_interest + "-downstream."
+
+
+@mutate(f_of_interest)
+def _mutate0(x: str) -> str:
+ return x + "-post pipe 0-"
+
+
+@mutate(apply_to(f_of_interest, upstream=source("upstream")).named("random"))
+def _mutate1(x: str, upstream: str) -> str:
+ return x + f"-post pipe 1 with {upstream}-"
+
+
+@mutate(apply_to(f_of_interest, some_val=value(1000)))
+def _mutate2(x: str, some_val: int) -> str:
+ return x + f"-post pipe 2 with value {some_val}-"
+
+
+def chain_not_using_mutate() -> str:
+ t = downstream_f(_mutate2(_mutate1(_mutate0(f_of_interest(user_input())), upstream()), 1000))
+ return t
+
+
+def v() -> int:
+ return 10
+
+
+def chain_1_using_mutate(v: int) -> int:
+ return v * 10
+
+
+def chain_1_not_using_mutate(v: int, input_1: int, input_2: int, calc_c: bool = False) -> int:
+ start = v * 10
+ a = _add_one(start)
+ b = _add_two(a)
+ c = _add_n(b, n=3) if calc_c else b
+ d = _add_n(c, n=input_1)
+ e = _multiply_n(d, n=input_2)
+ return e
+
+
+def chain_2_using_mutate(v: int) -> int:
+ return v + 10
+
+
+def chain_2_not_using_mutate(v: int, input_3: int, calc_c: bool = False) -> int:
+ start = v + 10
+ a = _square(start)
+ b = _multiply_n(a, n=2)
+ return b
+
+
+@mutate(
+ apply_to(chain_2_using_mutate).named("a"),
+)
+def _square(v: int) -> int:
+ return v * v
+
+
+@mutate(
+ apply_to(chain_1_using_mutate).named("a"),
+)
+def _add_one(v: int) -> int:
+ return v + 1
+
+
+@mutate(
+ apply_to(chain_1_using_mutate).named("b"),
+)
+def _add_two(v: int) -> int:
+ return v + 2
+
+
+@mutate(
+ apply_to(chain_1_using_mutate, n=3).named("c").when(calc_c=True),
+ apply_to(chain_1_using_mutate, n=source("input_1")).named("d"),
+)
+def _add_n(v: int, n: int) -> int:
+ return v + n
+
+
+@mutate(
+ apply_to(chain_1_using_mutate).named("e"),
+ apply_to(chain_2_using_mutate, n=value(2)).named("b"),
+ n=source("input_2"),
+)
+def _multiply_n(v: int, *, n: int) -> int:
+ return v * n