Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Commit

Permalink
Gets paraemterized_extract_columns ready for production
Browse files Browse the repository at this point in the history
This adds testing for the decorator, and changes it to a simpler naming
scheme. Currently the naming scheme appends __{i} to the node name. This
is a reserved pattern that won't be used later.
  • Loading branch information
elijahbenizzy committed Jan 29, 2023
1 parent e0d8596 commit b3881f4
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 2 deletions.
62 changes: 62 additions & 0 deletions decorators.md
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,65 @@ The inputs to the `reuse_functions` decorator takes in a variety of inputs that
- _where_ the produced subDAG shoud live is specified by two parameters. `namespace` gives a namespace under which these nodes live. All this means is that a nodes name will be `{namespace}.{node_name}`.
`outputs` provides a mapping so you can access these later, without referring to the namespace. E.G. `outputs={"unique_users": "unique_users_daily_US"}` means that the `unique_users` output from this
subDAG will get mapped to the node name `unique_users_daily_US`. This way you can use it as a function parameter later on.

## @parameterize_extract_columns

`@parameterize_extract_columns` gives you the power of both `@extract_columns` and `@parameterize` in one decorator.

It takes in a list of `Parameterized_Extract` objects, each of which is composed of:
1. A list of columns to extract, and
2. A parameterization that gets used

In the following case, we produce four columns, two for each parameterization.

```python
import pandas as pd
from function_modifiers import parameterize_extract_columns, ParameterizedExtract, source, value
@parameterize_extract_columns(
ParameterizedExtract(
("outseries1a", "outseries2a"),
{"input1": source("inseries1a"), "input2": source("inseries1b"), "input3": value(10)},
),
ParameterizedExtract(
("outseries1b", "outseries2b"),
{"input1": source("inseries2a"), "input2": source("inseries2b"), "input3": value(100)},
),
)
def fn(input1: pd.Series, input2: pd.Series, input3: float) -> pd.DataFrame:
return pd.concat([input1 * input2 * input3, input1 + input2 + input3], axis=1)
```

## @parameterize_frame

`@parameterize_frame` enables you to run parameterize_extract_columns with a dataframe specifying the parameterizations
-- allowing for less verbose specification. The above example can be rewritten as:

```python
from plugins.pandas_implementations import parameterized_frame
df = pd.DataFrame(
[
["outseries1a", "outseries2a", "inseries1a", "inseries2a", 10],
["outseries1b", "outseries2b", "inseries1b", "inseries2b", 100],
# ...
],
# Have to switch as indices have to be unique
columns=[
[
"output1",
"output2",
"input1",
"input2",
"input3",
], # configure whether column is source or value and also whether it's input ("source", "value") or output ("out")
["out", "out", "source", "source", "value"],
],
)

@parameterize_frame(df)
def my_func(input1: pd.Series, input2: pd.Series, input3: float) -> pd.DataFrame:
return pd.DataFrame(
[input1 * input2 * input3, input1 + input2 + input3]
)
```

Note that we have a double-index. Note that this is still in experimental.
1 change: 1 addition & 0 deletions hamilton/function_modifiers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
parameterize_sources = expanders.parameterize_sources
parameterize_values = expanders.parameterize_values
parameterize_extract_columns = expanders.parameterize_extract_columns
ParameterizedExtract = expanders.ParameterizedExtract

# The older ones that will be deprecated
parametrized = expanders.parametrized
Expand Down
18 changes: 16 additions & 2 deletions hamilton/function_modifiers/expanders.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,19 @@ def __init__(self, *extract_config: ParameterizedExtract, reassign_columns: bool
def expand_node(
self, node_: node.Node, config: Dict[str, Any], fn: Callable
) -> Collection[node.Node]:
"""Expands a node into multiple, given the extract_config passed to
parameterize_extract_columns. Goes through all parameterizations,
creates an extract_columns node for each, then delegates to that.
Note this calls out to `@parameterize` and `@extract_columns` rather
than reimplementing the logic.
:param node_: Node to expand
:param config: Config to use to expand
:param fn: Original function
:return: The nodes produced by this decorator.
"""
output_nodes = []
for parameterization in self.extract_config:
for i, parameterization in enumerate(self.extract_config):

@functools.wraps(fn)
def wrapper_fn(*args, _output_columns=parameterization.outputs, **kwargs):
Expand All @@ -550,8 +561,11 @@ def wrapper_fn(*args, _output_columns=parameterization.outputs, **kwargs):

new_node = node_.copy_with(callabl=wrapper_fn)
fn_to_call = wrapper_fn if self.reassign_columns else fn
# We have to rename the underlying function so that we do not
# get naming collisions. Using __ is cleaner than using a uuid
# as it is easier to read/manage and naturally maeks sense.
parameterization_decorator = parameterize(
**{node_.name: parameterization.input_mapping}
**{node_.name + f"__{i}": parameterization.input_mapping}
)
(parameterized_node,) = parameterization_decorator.expand_node(
new_node, config, fn_to_call
Expand Down
35 changes: 35 additions & 0 deletions tests/function_modifiers/test_expanders.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,3 +463,38 @@ def test_parametrized_full_multiple_replacements():
assert len(nodes) == 4
# test out that documentation is assigned correctly
assert [node_.documentation for node_ in nodes] == [args[node_.name][1] for node_ in nodes]


def test_parameterized_extract_columns():
annotation = function_modifiers.parameterize_extract_columns(
function_modifiers.ParameterizedExtract(
("outseries1a", "outseries2a"),
{"input1": source("inseries1a"), "input2": source("inseries1b"), "input3": value(10)},
),
function_modifiers.ParameterizedExtract(
("outseries1b", "outseries2b"),
{"input1": source("inseries2a"), "input2": source("inseries2b"), "input3": value(100)},
),
)

def fn(input1: pd.Series, input2: pd.Series, input3: float) -> pd.DataFrame:
return pd.concat([input1 * input2 * input3, input1 + input2 + input3], axis=1)

nodes = annotation.expand_node(node.Node.from_fn(fn), {}, fn)
# For each parameterized set, we have two outputs and the dataframe node
assert len(nodes) == 6
nodes_by_name = {node_.name: node_ for node_ in nodes}
# Test that it produces the expected results
pd.testing.assert_frame_equal(
nodes_by_name["fn__0"](inseries1a=pd.Series([1]), inseries1b=pd.Series([1])),
pd.DataFrame.from_dict({"outseries1a": [10], "outseries2a": [12]}),
)
pd.testing.assert_frame_equal(
nodes_by_name["fn__1"](inseries2a=pd.Series([1]), inseries2b=pd.Series([1])),
pd.DataFrame.from_dict({"outseries1b": [100], "outseries2b": [102]}),
)
# test that each of the "extractor" nodes produces exactly what we expect
assert nodes_by_name["outseries1a"](fn__0=pd.DataFrame({"outseries1a": [10]}))[0] == 10
assert nodes_by_name["outseries2a"](fn__0=pd.DataFrame({"outseries2a": [20]}))[0] == 20
assert nodes_by_name["outseries1b"](fn__1=pd.DataFrame({"outseries1b": [30]}))[0] == 30
assert nodes_by_name["outseries2b"](fn__1=pd.DataFrame({"outseries2b": [40]}))[0] == 40

0 comments on commit b3881f4

Please sign in to comment.