From fcf911f696df362a4c53f8e40945bb14d53017fc Mon Sep 17 00:00:00 2001 From: jernejfrank Date: Sat, 2 Nov 2024 23:53:29 +0800 Subject: [PATCH] Refactor with_columns into pandas plugin --- docs/reference/decorators/with_columns.rst | 2 +- examples/pandas/with_columns/notebook.ipynb | 616 +++++++++--------- hamilton/function_modifiers/__init__.py | 1 - hamilton/function_modifiers/recursive.py | 278 +------- hamilton/plugins/h_pandas.py | 291 +++++++++ plugin_tests/h_pandas/__init__.py | 0 plugin_tests/h_pandas/conftest.py | 4 + plugin_tests/h_pandas/resources/__init__.py | 0 .../resources/with_columns_end_to_end.py | 66 ++ plugin_tests/h_pandas/test_with_columns.py | 310 +++++++++ tests/function_modifiers/test_recursive.py | 266 +------- tests/resources/with_columns.py | 39 -- 12 files changed, 982 insertions(+), 891 deletions(-) create mode 100644 hamilton/plugins/h_pandas.py create mode 100644 plugin_tests/h_pandas/__init__.py create mode 100644 plugin_tests/h_pandas/conftest.py create mode 100644 plugin_tests/h_pandas/resources/__init__.py create mode 100644 plugin_tests/h_pandas/resources/with_columns_end_to_end.py create mode 100644 plugin_tests/h_pandas/test_with_columns.py delete mode 100644 tests/resources/with_columns.py diff --git a/docs/reference/decorators/with_columns.rst b/docs/reference/decorators/with_columns.rst index 1d0e0ea62..9dbbb7b1f 100644 --- a/docs/reference/decorators/with_columns.rst +++ b/docs/reference/decorators/with_columns.rst @@ -9,7 +9,7 @@ We have a ``with_columns`` option to run operations on columns of a Pandas dataf **Reference Documentation** -.. autoclass:: hamilton.function_modifiers.with_columns +.. autoclass:: hamilton.plugins.h_pandas.with_columns :special-members: __init__ diff --git a/examples/pandas/with_columns/notebook.ipynb b/examples/pandas/with_columns/notebook.ipynb index f2fbcc8f3..13def7566 100644 --- a/examples/pandas/with_columns/notebook.ipynb +++ b/examples/pandas/with_columns/notebook.ipynb @@ -49,234 +49,232 @@ "\n", "\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "\n", "\n", "cluster__legend\n", - "\n", - "Legend\n", + "\n", + "Legend\n", "\n", "\n", "\n", "case\n", - "\n", - "\n", - "\n", - "case\n", - "thousands\n", - "\n", - "\n", - "\n", - "final_df\n", - "\n", - "final_df\n", - "DataFrame\n", + "\n", + "\n", + "\n", + "case\n", + "thousands\n", "\n", "\n", - "\n", + "\n", "initial_df\n", - "\n", - "initial_df\n", - "DataFrame\n", - "\n", - "\n", - "\n", - "final_df.__append\n", - "\n", - "final_df.__append\n", - "DataFrame\n", - "\n", - "\n", - "\n", - "initial_df->final_df.__append\n", - "\n", + "\n", + "initial_df\n", + "DataFrame\n", "\n", "\n", - "\n", + "\n", "final_df.signups\n", - "\n", - "final_df.signups\n", - "Series\n", + "\n", + "final_df.signups\n", + "Series\n", "\n", "\n", - "\n", + "\n", "initial_df->final_df.signups\n", - "\n", - "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.__append\n", + "\n", + "final_df.__append\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "initial_df->final_df.__append\n", + "\n", + "\n", "\n", "\n", "\n", "final_df.spend\n", - "\n", - "final_df.spend\n", - "Series\n", + "\n", + "final_df.spend\n", + "Series\n", "\n", "\n", "\n", "initial_df->final_df.spend\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_std_dev\n", - "\n", - "final_df.spend_std_dev\n", - "float\n", + "\n", + "\n", + "final_df.avg_3wk_spend\n", + "\n", + "final_df.avg_3wk_spend: case\n", + "Series\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "Series\n", + "\n", + "\n", + "final_df.avg_3wk_spend->final_df.__append\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", + "final_df.signups->final_df.__append\n", + "\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_per_signup\n", - "\n", - "final_df.spend_per_signup\n", - "Series\n", + "\n", + "final_df.spend_per_signup\n", + "Series\n", "\n", - "\n", - "\n", - "final_df.spend_per_signup->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "final_df.signups->final_df.spend_per_signup\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.__append->final_df\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean\n", + "\n", + "final_df.spend_zero_mean\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_zero_mean_unit_variance->final_df.__append\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "final_df.spend_mean\n", - "\n", - "final_df.spend_mean\n", - "float\n", - "\n", - "\n", - "\n", - "final_df.spend_zero_mean\n", - "\n", - "final_df.spend_zero_mean\n", - "Series\n", + "final_df\n", + "\n", + "final_df\n", + "DataFrame\n", "\n", - "\n", - "\n", - "final_df.spend_mean->final_df.spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", + "final_df.__append->final_df\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "final_df.avg_3wk_spend\n", - "\n", - "final_df.avg_3wk_spend: case\n", - "Series\n", - "\n", - "\n", - "\n", - "final_df.avg_3wk_spend->final_df.__append\n", - "\n", + "final_df.spend_mean\n", + "\n", + "final_df.spend_mean\n", + "float\n", "\n", - "\n", + "\n", "\n", - "final_df.signups->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend_mean->final_df.spend_zero_mean\n", + "\n", + "\n", "\n", - "\n", + "\n", + "\n", + "final_df.spend_std_dev\n", + "\n", + "final_df.spend_std_dev\n", + "float\n", + "\n", + "\n", "\n", - "final_df.signups->final_df.__append\n", - "\n", - "\n", + "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.spend_std_dev\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend->final_df.avg_3wk_spend\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "final_df.spend->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend->final_df.spend_zero_mean\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.spend_mean\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.avg_3wk_spend\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_std_dev\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "final_df.spend->final_df.spend_zero_mean\n", - "\n", - "\n", + "final_df.spend->final_df.spend_per_signup\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_per_signup->final_df.__append\n", + "\n", "\n", "\n", "\n", "config\n", - "\n", - "\n", - "\n", - "config\n", + "\n", + "\n", + "\n", + "config\n", "\n", "\n", "\n", "function\n", - "\n", - "function\n", + "\n", + "function\n", "\n", "\n", "\n", "output\n", - "\n", - "output\n", + "\n", + "output\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, "metadata": {}, @@ -453,234 +451,232 @@ "\n", "\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "\n", "\n", "cluster__legend\n", - "\n", - "Legend\n", + "\n", + "Legend\n", "\n", "\n", "\n", "case\n", - "\n", - "\n", - "\n", - "case\n", - "millions\n", - "\n", - "\n", - "\n", - "final_df\n", - "\n", - "final_df\n", - "DataFrame\n", + "\n", + "\n", + "\n", + "case\n", + "millions\n", "\n", "\n", - "\n", + "\n", "initial_df\n", - "\n", - "initial_df\n", - "DataFrame\n", - "\n", - "\n", - "\n", - "final_df.__append\n", - "\n", - "final_df.__append\n", - "DataFrame\n", - "\n", - "\n", - "\n", - "initial_df->final_df.__append\n", - "\n", + "\n", + "initial_df\n", + "DataFrame\n", "\n", "\n", - "\n", + "\n", "final_df.signups\n", - "\n", - "final_df.signups\n", - "Series\n", + "\n", + "final_df.signups\n", + "Series\n", "\n", "\n", - "\n", + "\n", "initial_df->final_df.signups\n", - "\n", - "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.__append\n", + "\n", + "final_df.__append\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "initial_df->final_df.__append\n", + "\n", + "\n", "\n", "\n", "\n", "final_df.spend\n", - "\n", - "final_df.spend\n", - "Series\n", + "\n", + "final_df.spend\n", + "Series\n", "\n", "\n", "\n", "initial_df->final_df.spend\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_std_dev\n", - "\n", - "final_df.spend_std_dev\n", - "float\n", + "\n", + "\n", + "final_df.avg_3wk_spend\n", + "\n", + "final_df.avg_3wk_spend: case\n", + "Series\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "Series\n", + "\n", + "\n", + "final_df.avg_3wk_spend->final_df.__append\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", + "final_df.signups->final_df.__append\n", + "\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_per_signup\n", - "\n", - "final_df.spend_per_signup\n", - "Series\n", + "\n", + "final_df.spend_per_signup\n", + "Series\n", "\n", - "\n", - "\n", - "final_df.spend_per_signup->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "final_df.signups->final_df.spend_per_signup\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.__append->final_df\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean\n", + "\n", + "final_df.spend_zero_mean\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_zero_mean_unit_variance->final_df.__append\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "final_df.spend_mean\n", - "\n", - "final_df.spend_mean\n", - "float\n", - "\n", - "\n", - "\n", - "final_df.spend_zero_mean\n", - "\n", - "final_df.spend_zero_mean\n", - "Series\n", + "final_df\n", + "\n", + "final_df\n", + "DataFrame\n", "\n", - "\n", - "\n", - "final_df.spend_mean->final_df.spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", + "final_df.__append->final_df\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "final_df.avg_3wk_spend\n", - "\n", - "final_df.avg_3wk_spend: case\n", - "Series\n", - "\n", - "\n", - "\n", - "final_df.avg_3wk_spend->final_df.__append\n", - "\n", + "final_df.spend_mean\n", + "\n", + "final_df.spend_mean\n", + "float\n", "\n", - "\n", + "\n", "\n", - "final_df.signups->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend_mean->final_df.spend_zero_mean\n", + "\n", + "\n", "\n", - "\n", + "\n", + "\n", + "final_df.spend_std_dev\n", + "\n", + "final_df.spend_std_dev\n", + "float\n", + "\n", + "\n", "\n", - "final_df.signups->final_df.__append\n", - "\n", - "\n", + "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.spend_std_dev\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend->final_df.avg_3wk_spend\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "final_df.spend->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend->final_df.spend_zero_mean\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.spend_mean\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.avg_3wk_spend\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_std_dev\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "final_df.spend->final_df.spend_zero_mean\n", - "\n", - "\n", + "final_df.spend->final_df.spend_per_signup\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_per_signup->final_df.__append\n", + "\n", "\n", "\n", "\n", "config\n", - "\n", - "\n", - "\n", - "config\n", + "\n", + "\n", + "\n", + "config\n", "\n", "\n", "\n", "function\n", - "\n", - "function\n", + "\n", + "function\n", "\n", "\n", "\n", "output\n", - "\n", - "output\n", + "\n", + "output\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, "execution_count": 3, diff --git a/hamilton/function_modifiers/__init__.py b/hamilton/function_modifiers/__init__.py index cd0161991..958d07540 100644 --- a/hamilton/function_modifiers/__init__.py +++ b/hamilton/function_modifiers/__init__.py @@ -88,7 +88,6 @@ subdag = recursive.subdag parameterized_subdag = recursive.parameterized_subdag -with_columns = recursive.with_columns # resolve/meta stuff -- power user features diff --git a/hamilton/function_modifiers/recursive.py b/hamilton/function_modifiers/recursive.py index be4bd6a70..744d222e2 100644 --- a/hamilton/function_modifiers/recursive.py +++ b/hamilton/function_modifiers/recursive.py @@ -1,8 +1,7 @@ import inspect import sys -from collections import defaultdict from types import ModuleType -from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, Union +from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, TypedDict, Union _sys_version_info = sys.version_info _version_tuple = (_sys_version_info.major, _sys_version_info.minor, _sys_version_info.micro) @@ -13,13 +12,9 @@ from typing import NotRequired -from typing import TypedDict - -import pandas as pd - # Copied this over from function_graph # TODO -- determine the best place to put this code -from hamilton import graph_utils, node, registry +from hamilton import graph_utils, node from hamilton.function_modifiers import base, dependencies from hamilton.function_modifiers.base import InvalidDecoratorException, NodeTransformer from hamilton.function_modifiers.dependencies import ( @@ -631,272 +626,3 @@ def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> L stack.append(dep_node) seen_nodes.add(dep) return output - - -class with_columns(base.NodeInjector): - """Initializes a with_columns decorator for pandas. This allows you to efficiently run groups of map operations on a dataframe. - - Here's an example of calling it -- if you've seen ``@subdag``, you should be familiar with - the concepts: - - .. code-block:: python - - # my_module.py - def a(a_from_df: pd.Series) -> pd.Series: - return _process(a) - - def b(b_from_df: pd.Series) -> pd.Series: - return _process(b) - - def a_b_average(a_from_df: pd.Series, b_from_df: pd.Series) -> pd.Series: - return (a_from_df + b_from_df) / 2 - - - .. code-block:: python - - # with_columns_module.py - def a_plus_b(a: pd.Series, b: pd.Series) -> pd.Series: - return a + b - - - # the with_columns call - @with_columns( - *[my_module], # Load from any module - *[a_plus_b], # or list operations directly - columns_to_pass=["a_from_df", "b_from_df"], # The columns to pass from the dataframe to - # the subdag - select=["a", "b", "a_plus_b", "a_b_average"], # The columns to select from the dataframe - ) - def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: - # process, or just return unprocessed - ... - - In this instance the ``initial_df`` would get two columns added: ``a_plus_b`` and ``a_b_average``. - - The operations are applied in topological order. This allows you to - express the operations individually, making it easy to unit-test and reuse. - - Note that the operation is "append", meaning that the columns that are selected are appended - onto the dataframe. - - If the function takes multiple dataframes, the dataframe input to process will always be - the first argument. This will be passed to the subdag, transformed, and passed back to the function. - This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code - above, the dataframe that is passed to the subdag is `initial_df`. That is transformed - by the subdag, and then returned as the final dataframe. - - You can read it as: - - "final_df is a function that transforms the upstream dataframe initial_df, running the transformations - from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns - a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it." - - In case you need more flexibility you can alternatively use ``pass_dataframe_as``, for example, - - .. code-block:: python - - # with_columns_module.py - def a_from_df(initial_df: pd.Series) -> pd.Series: - return initial_df["a_from_df"] / 100 - - def b_from_df(initial_df: pd.Series) -> pd.Series: - return initial_df["b_from_df"] / 100 - - - # the with_columns call - @with_columns( - *[my_module], - *[a_from_df], - columns_to_pass=["a_from_df", "b_from_df"], - select=["a_from_df", "b_from_df", "a", "b", "a_plus_b", "a_b_average"], - ) - def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: - # process, or just return unprocessed - ... - - the above would output a dataframe where the two columns ``a_from_df`` and ``b_from_df`` get - overwritten. - """ - - @staticmethod - def _check_for_duplicates(nodes_: List[node.Node]) -> bool: - "Ensures that we don't run into name clashing of columns and group operations." - node_counter = defaultdict(int) - for node_ in nodes_: - node_counter[node_.name] += 1 - if node_counter[node_.name] > 1: - return True - return False - - def __init__( - self, - *load_from: Union[Callable, ModuleType], - columns_to_pass: Union[str, List[str]] = None, - pass_dataframe_as: str = None, - select: Union[str, List[str]] = None, - namespace: str = None, - ): - """Instantiates a ``@with_column`` decorator. - - :param load_from: The functions or modules that will be used to generate the group of map operations. - :param columns_to_pass: The initial schema of the dataframe. This is used to determine which - upstream inputs should be taken from the dataframe, and which shouldn't. Note that, if this is - left empty (and external_inputs is as well), we will assume that all dependencies come - from the dataframe. This cannot be used in conjunction with pass_dataframe_as. - :param pass_dataframe_as: The name of the dataframe that we're modifying, as known to the subdag. - If you pass this in, you are responsible for extracting columns out. If not provided, you have - to pass columns_to_pass in, and we will extract the columns out for you. - :param namespace: The namespace of the nodes, so they don't clash with the global namespace - and so this can be reused. If its left out, there will be no namespace (in which case you'll want - to be careful about repeating it/reusing the nodes in other parts of the DAG.) - :param config_required: the list of config keys that are required to resolve any functions. Pass in None\ - if you want the functions/modules to have access to all possible config. - """ - - self.subdag_functions = subdag.collect_functions(load_from) - - if select is None: - raise ValueError("Please specify at least one column to append or update.") - elif isinstance(select, str): - self.select = [select] - else: - self.select = select - - if (pass_dataframe_as is not None and columns_to_pass is not None) or ( - pass_dataframe_as is None and columns_to_pass is None - ): - raise ValueError( - "You must specify only one of columns_to_pass and " - "pass_dataframe_as. " - "This is because specifying pass_dataframe_as injects into " - "the set of columns, allowing you to perform your own extraction" - "from the dataframe. We then execute all columns in the sbudag" - "in order, passing in that initial dataframe. If you want" - "to reference columns in your code, you'll have to specify " - "the set of initial columns, and allow the subdag decorator " - "to inject the dataframe through. The initial columns tell " - "us which parameters to take from that dataframe, so we can" - "feed the right data into the right columns." - ) - - # TODO: Setting this up to pass in "*" to select all (same as in Polars) - if isinstance(columns_to_pass, str): - self.initial_schema = [columns_to_pass] - else: - self.initial_schema = columns_to_pass - - self.dataframe_subdag_param = pass_dataframe_as - self.namespace = namespace - # This never gets used within the class, but pyspark had it so keeping it here in case we - # need to access it from somewhere outside - self.upstream_dependency = pd.DataFrame - - # TODO: This is duplicate from extract columns, probably can be combined - # TODO: Add async functionality - def _create_column_nodes( - self, inject_parameter: str, params: Dict[str, Type[Type]] - ) -> List[node.Node]: - """Helper function to extract columns for the user.""" - output_type = params[inject_parameter] - series_type = registry.get_column_type_from_df_type(output_type) - - out = [] - for column in self.initial_schema: - - def extractor_fn( - column_to_extract: str = column, **kwargs - ) -> Any: # avoiding problems with closures - df = kwargs[inject_parameter] - if column_to_extract not in df: - raise base.InvalidDecoratorException( - f"No such column: {column_to_extract} produced by {inject_parameter}. " - f"It only produced {str(df.columns)}" - ) - return registry.get_column(df, column_to_extract) - - out.append( - node.Node( - name=column, - typ=series_type, - callabl=extractor_fn, - input_types={inject_parameter: output_type}, - ) - ) - - return out - - def _get_inital_nodes( - self, fn: Callable, params: Dict[str, Type[Type]] - ) -> Tuple[str, Collection[node.Node]]: - """Selects the correct dataframe and optionally extracts out columns.""" - initial_nodes = [] - if self.dataframe_subdag_param is not None: - inject_parameter = self.dataframe_subdag_param - else: - # If we don't have a specified dataframe we assume it's the first argument - sig = inspect.signature(fn) - inject_parameter = list(sig.parameters.values())[0].name - initial_nodes.extend( - self._create_column_nodes(inject_parameter=inject_parameter, params=params) - ) - - if inject_parameter not in params: - raise base.InvalidDecoratorException( - f"Function: {fn.__name__} has a first parameter that is not a dependency. " - f"@with_columns requires the parameter names to match the function parameters. " - f"Thus it might not be compatible with some other decorators" - ) - - return inject_parameter, initial_nodes - - def _create_merge_node(self, upstream_node: str, node_name: str) -> node.Node: - "Node that adds to / overrides columns for the original dataframe based on selected output." - - def new_callable(**kwargs) -> Any: - df = kwargs[upstream_node] - for column in self.select: - df[column] = kwargs[column] - return df - - input_map = {column: pd.Series for column in self.select} - input_map[upstream_node] = pd.DataFrame - - return node.Node( - name=node_name, - typ=pd.DataFrame, - callabl=new_callable, - input_types=input_map, - ) - - def inject_nodes( - self, params: Dict[str, Type[Type]], config: Dict[str, Any], fn: Callable - ) -> Tuple[List[node.Node], Dict[str, str]]: - namespace = fn.__name__ if self.namespace is None else self.namespace - - inject_parameter, initial_nodes = self._get_inital_nodes(fn=fn, params=params) - - subdag_nodes = subdag.collect_nodes(config, self.subdag_functions) - - if with_columns._check_for_duplicates(initial_nodes + subdag_nodes): - raise ValueError( - "You can only specify columns once. You used `columns_to_pass` and we " - "extract the columns for you. In this case they cannot be overwritten -- only new columns get " - "appended. If you want to modify in-place columns pass in a dataframe and " - "extract + modify the columns and afterwards select them." - ) - - pruned_nodes = prune_nodes(subdag_nodes, self.select) - if len(pruned_nodes) == 0: - raise ValueError( - f"No nodes found upstream from select columns: {self.select} for function: " - f"{fn.__qualname__}" - ) - - merge_node = self._create_merge_node(inject_parameter, node_name="__append") - - output_nodes = initial_nodes + pruned_nodes + [merge_node] - output_nodes = subdag.add_namespace(output_nodes, namespace) - return output_nodes, {inject_parameter: assign_namespace(merge_node.name, namespace)} - - def validate(self, fn: Callable): - pass diff --git a/hamilton/plugins/h_pandas.py b/hamilton/plugins/h_pandas.py new file mode 100644 index 000000000..722896dcf --- /dev/null +++ b/hamilton/plugins/h_pandas.py @@ -0,0 +1,291 @@ +import inspect +import sys +import typing +from collections import defaultdict +from types import ModuleType +from typing import Any, Callable, Collection, Dict, List, Tuple, Type, Union + +_sys_version_info = sys.version_info +_version_tuple = (_sys_version_info.major, _sys_version_info.minor, _sys_version_info.micro) + +if _version_tuple < (3, 11, 0): + pass +else: + pass + +import pandas as pd + +# Copied this over from function_graph +# TODO -- determine the best place to put this code +from hamilton import node +from hamilton.function_modifiers import base +from hamilton.function_modifiers.expanders import extract_columns +from hamilton.function_modifiers.recursive import assign_namespace, prune_nodes, subdag + + +class with_columns(base.NodeInjector): + """Initializes a with_columns decorator for pandas. This allows you to efficiently run groups of map operations on a dataframe. + + Here's an example of calling it -- if you've seen ``@subdag``, you should be familiar with + the concepts: + + .. code-block:: python + + # my_module.py + def a(a_from_df: pd.Series) -> pd.Series: + return _process(a) + + def b(b_from_df: pd.Series) -> pd.Series: + return _process(b) + + def a_b_average(a_from_df: pd.Series, b_from_df: pd.Series) -> pd.Series: + return (a_from_df + b_from_df) / 2 + + + .. code-block:: python + + # with_columns_module.py + def a_plus_b(a: pd.Series, b: pd.Series) -> pd.Series: + return a + b + + + # the with_columns call + @with_columns( + *[my_module], # Load from any module + *[a_plus_b], # or list operations directly + columns_to_pass=["a_from_df", "b_from_df"], # The columns to pass from the dataframe to + # the subdag + select=["a", "b", "a_plus_b", "a_b_average"], # The columns to select from the dataframe + ) + def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: + # process, or just return unprocessed + ... + + In this instance the ``initial_df`` would get two columns added: ``a_plus_b`` and ``a_b_average``. + + The operations are applied in topological order. This allows you to + express the operations individually, making it easy to unit-test and reuse. + + Note that the operation is "append", meaning that the columns that are selected are appended + onto the dataframe. + + If the function takes multiple dataframes, the dataframe input to process will always be + the first argument. This will be passed to the subdag, transformed, and passed back to the function. + This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code + above, the dataframe that is passed to the subdag is `initial_df`. That is transformed + by the subdag, and then returned as the final dataframe. + + You can read it as: + + "final_df is a function that transforms the upstream dataframe initial_df, running the transformations + from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns + a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it." + + In case you need more flexibility you can alternatively use ``pass_dataframe_as``, for example, + + .. code-block:: python + + # with_columns_module.py + def a_from_df(initial_df: pd.Series) -> pd.Series: + return initial_df["a_from_df"] / 100 + + def b_from_df(initial_df: pd.Series) -> pd.Series: + return initial_df["b_from_df"] / 100 + + + # the with_columns call + @with_columns( + *[my_module], + *[a_from_df], + columns_to_pass=["a_from_df", "b_from_df"], + select=["a_from_df", "b_from_df", "a", "b", "a_plus_b", "a_b_average"], + ) + def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: + # process, or just return unprocessed + ... + + the above would output a dataframe where the two columns ``a_from_df`` and ``b_from_df`` get + overwritten. + """ + + @staticmethod + def _check_for_duplicates(nodes_: List[node.Node]) -> bool: + """Ensures that we don't run into name clashing of columns and group operations. + + In the case when we extract columns for the user, because ``columns_to_pass`` was used, we want + to safeguard against nameclashing with functions that are passed into ``with_columns`` - i.e. + there are no functions that have the same name as the columns. This effectively means that + using ``columns_to_pass`` will only append new columns to the dataframe and for changing + existing columns ``pass_dataframe_as`` needs to be used. + """ + node_counter = defaultdict(int) + for node_ in nodes_: + node_counter[node_.name] += 1 + if node_counter[node_.name] > 1: + return True + return False + + def __init__( + self, + *load_from: Union[Callable, ModuleType], + columns_to_pass: List[str] = None, + pass_dataframe_as: str = None, + select: List[str] = None, + namespace: str = None, + config_required: List[str] = None, + ): + """Instantiates a ``@with_column`` decorator. + + :param load_from: The functions or modules that will be used to generate the group of map operations. + :param columns_to_pass: The initial schema of the dataframe. This is used to determine which + upstream inputs should be taken from the dataframe, and which shouldn't. Note that, if this is + left empty (and external_inputs is as well), we will assume that all dependencies come + from the dataframe. This cannot be used in conjunction with pass_dataframe_as. + :param pass_dataframe_as: The name of the dataframe that we're modifying, as known to the subdag. + If you pass this in, you are responsible for extracting columns out. If not provided, you have + to pass columns_to_pass in, and we will extract the columns out for you. + :param namespace: The namespace of the nodes, so they don't clash with the global namespace + and so this can be reused. If its left out, there will be no namespace (in which case you'll want + to be careful about repeating it/reusing the nodes in other parts of the DAG.) + :param config_required: the list of config keys that are required to resolve any functions. Pass in None\ + if you want the functions/modules to have access to all possible config. + """ + + self.subdag_functions = subdag.collect_functions(load_from) + + if select is None: + raise ValueError("Please specify at least one column to append or update.") + else: + self.select = select + + if (pass_dataframe_as is not None and columns_to_pass is not None) or ( + pass_dataframe_as is None and columns_to_pass is None + ): + raise ValueError( + "You must specify only one of columns_to_pass and " + "pass_dataframe_as. " + "This is because specifying pass_dataframe_as injects into " + "the set of columns, allowing you to perform your own extraction" + "from the dataframe. We then execute all columns in the sbudag" + "in order, passing in that initial dataframe. If you want" + "to reference columns in your code, you'll have to specify " + "the set of initial columns, and allow the subdag decorator " + "to inject the dataframe through. The initial columns tell " + "us which parameters to take from that dataframe, so we can" + "feed the right data into the right columns." + ) + + self.initial_schema = columns_to_pass + self.dataframe_subdag_param = pass_dataframe_as + self.namespace = namespace + self.config_required = config_required + + def required_config(self) -> List[str]: + return self.config_required + + def _create_column_nodes( + self, inject_parameter: str, params: Dict[str, Type[Type]] + ) -> List[node.Node]: + output_type = params[inject_parameter] + + def temp_fn(**kwargs) -> pd.DataFrame: + return kwargs[inject_parameter] + + # We recreate the df node to use extract columns + temp_node = node.Node( + name=inject_parameter, + typ=output_type, + callabl=temp_fn, + input_types={inject_parameter: output_type}, + ) + + extract_columns_decorator = extract_columns(*self.initial_schema) + + out_nodes = extract_columns_decorator.transform_node(temp_node, config={}, fn=temp_fn) + return out_nodes[1:] + + def _get_inital_nodes( + self, fn: Callable, params: Dict[str, Type[Type]] + ) -> Tuple[str, Collection[node.Node]]: + """Selects the correct dataframe and optionally extracts out columns.""" + initial_nodes = [] + if self.dataframe_subdag_param is not None: + inject_parameter = self.dataframe_subdag_param + else: + # If we don't have a specified dataframe we assume it's the first argument + sig = inspect.signature(fn) + inject_parameter = list(sig.parameters.values())[0].name + input_types = typing.get_type_hints(fn) + + if not input_types[inject_parameter] == pd.DataFrame: + raise ValueError( + "First argument has to be a pandas DataFrame. If you wish to use a " + "different argument, please use `pass_dataframe_as` option." + ) + + initial_nodes.extend( + self._create_column_nodes(inject_parameter=inject_parameter, params=params) + ) + + if inject_parameter not in params: + raise base.InvalidDecoratorException( + f"Function: {fn.__name__} has a first parameter that is not a dependency. " + f"@with_columns requires the parameter names to match the function parameters. " + f"Thus it might not be compatible with some other decorators" + ) + + return inject_parameter, initial_nodes + + def _create_merge_node(self, upstream_node: str, node_name: str) -> node.Node: + "Node that adds to / overrides columns for the original dataframe based on selected output." + + def new_callable(**kwargs) -> Any: + df = kwargs[upstream_node] + columns_to_append = {} + for column in self.select: + columns_to_append[column] = kwargs[column] + + return df.assign(**columns_to_append) + + input_map = {column: pd.Series for column in self.select} + input_map[upstream_node] = pd.DataFrame + + return node.Node( + name=node_name, + typ=pd.DataFrame, + callabl=new_callable, + input_types=input_map, + ) + + def inject_nodes( + self, params: Dict[str, Type[Type]], config: Dict[str, Any], fn: Callable + ) -> Tuple[List[node.Node], Dict[str, str]]: + namespace = fn.__name__ if self.namespace is None else self.namespace + + inject_parameter, initial_nodes = self._get_inital_nodes(fn=fn, params=params) + + subdag_nodes = subdag.collect_nodes(config, self.subdag_functions) + + if with_columns._check_for_duplicates(initial_nodes + subdag_nodes): + raise ValueError( + "You can only specify columns once. You used `columns_to_pass` and we " + "extract the columns for you. In this case they cannot be overwritten -- only new columns get " + "appended. If you want to modify in-place columns pass in a dataframe and " + "extract + modify the columns and afterwards select them." + ) + + pruned_nodes = prune_nodes(subdag_nodes, self.select) + if len(pruned_nodes) == 0: + raise ValueError( + f"No nodes found upstream from select columns: {self.select} for function: " + f"{fn.__qualname__}" + ) + + merge_node = self._create_merge_node(inject_parameter, node_name="__append") + + output_nodes = initial_nodes + pruned_nodes + [merge_node] + output_nodes = subdag.add_namespace(output_nodes, namespace) + return output_nodes, {inject_parameter: assign_namespace(merge_node.name, namespace)} + + def validate(self, fn: Callable): + pass diff --git a/plugin_tests/h_pandas/__init__.py b/plugin_tests/h_pandas/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/plugin_tests/h_pandas/conftest.py b/plugin_tests/h_pandas/conftest.py new file mode 100644 index 000000000..bc5ef5b5a --- /dev/null +++ b/plugin_tests/h_pandas/conftest.py @@ -0,0 +1,4 @@ +from hamilton import telemetry + +# disable telemetry for all tests! +telemetry.disable_telemetry() diff --git a/plugin_tests/h_pandas/resources/__init__.py b/plugin_tests/h_pandas/resources/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/plugin_tests/h_pandas/resources/with_columns_end_to_end.py b/plugin_tests/h_pandas/resources/with_columns_end_to_end.py new file mode 100644 index 000000000..16d493e46 --- /dev/null +++ b/plugin_tests/h_pandas/resources/with_columns_end_to_end.py @@ -0,0 +1,66 @@ +import pandas as pd + +from hamilton.function_modifiers import config +from hamilton.plugins.h_pandas import with_columns + + +def upstream_factor() -> int: + return 3 + + +def initial_df() -> pd.DataFrame: + return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14], "col_3": [1, 1, 1, 1]}) + + +def subtract_1_from_2(col_1: pd.Series, col_2: pd.Series) -> pd.Series: + return col_2 - col_1 + + +@config.when(factor=5) +def multiply_3__by_5(col_3: pd.Series) -> pd.Series: + return col_3 * 5 + + +@config.when(factor=7) +def multiply_3__by_7(col_3: pd.Series) -> pd.Series: + return col_3 * 7 + + +def add_1_by_user_adjustment_factor(col_1: pd.Series, user_factor: int) -> pd.Series: + return col_1 + user_factor + + +def multiply_2_by_upstream_3(col_2: pd.Series, upstream_factor: int) -> pd.Series: + return col_2 * upstream_factor + + +@with_columns( + subtract_1_from_2, + multiply_3__by_5, + multiply_3__by_7, + add_1_by_user_adjustment_factor, + multiply_2_by_upstream_3, + columns_to_pass=["col_1", "col_2", "col_3"], + select=[ + "subtract_1_from_2", + "multiply_3", + "add_1_by_user_adjustment_factor", + "multiply_2_by_upstream_3", + ], + namespace="some_subdag", +) +def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: + return initial_df + + +def col_3(initial_df: pd.DataFrame) -> pd.Series: + return pd.Series([0, 2, 4, 6]) + + +@with_columns( + col_3, + pass_dataframe_as="initial_df", + select=["col_3", "multiply_3"], +) +def final_df_2(initial_df: pd.DataFrame) -> pd.DataFrame: + return initial_df diff --git a/plugin_tests/h_pandas/test_with_columns.py b/plugin_tests/h_pandas/test_with_columns.py new file mode 100644 index 000000000..b3a84d666 --- /dev/null +++ b/plugin_tests/h_pandas/test_with_columns.py @@ -0,0 +1,310 @@ +import pandas as pd +import pytest + +from hamilton import driver, node +from hamilton.function_modifiers.base import NodeInjector +from hamilton.plugins.h_pandas import with_columns + +from .resources import with_columns_end_to_end + + +def dummy_fn_with_columns(col_1: pd.Series) -> pd.Series: + return col_1 + 100 + + +def test_detect_duplicate_nodes(): + node_a = node.Node.from_fn(dummy_fn_with_columns, name="a") + node_b = node.Node.from_fn(dummy_fn_with_columns, name="a") + node_c = node.Node.from_fn(dummy_fn_with_columns, name="c") + + if not with_columns._check_for_duplicates([node_a, node_b, node_c]): + raise (AssertionError) + + if with_columns._check_for_duplicates([node_a, node_c]): + raise (AssertionError) + + +def test_select_not_empty(): + error_message = "Please specify at least one column to append or update." + + with pytest.raises(ValueError) as e: + with_columns(dummy_fn_with_columns) + assert str(e.value) == error_message + + +def test_columns_to_pass_and_pass_dataframe_as_raises_error(): + error_message = ( + "You must specify only one of columns_to_pass and " + "pass_dataframe_as. " + "This is because specifying pass_dataframe_as injects into " + "the set of columns, allowing you to perform your own extraction" + "from the dataframe. We then execute all columns in the sbudag" + "in order, passing in that initial dataframe. If you want" + "to reference columns in your code, you'll have to specify " + "the set of initial columns, and allow the subdag decorator " + "to inject the dataframe through. The initial columns tell " + "us which parameters to take from that dataframe, so we can" + "feed the right data into the right columns." + ) + + with pytest.raises(ValueError) as e: + with_columns( + dummy_fn_with_columns, columns_to_pass=["a"], pass_dataframe_as="a", select=["a"] + ) + assert str(e.value) == error_message + + +def test_first_parameter_is_dataframe(): + error_message = ( + "First argument has to be a pandas DataFrame. If you wish to use a " + "different argument, please use `pass_dataframe_as` option." + ) + + def target_fn(upstream_df: int) -> pd.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1"], select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + + with pytest.raises(ValueError) as e: + decorator._get_inital_nodes(fn=target_fn, params=injectable_params) + + assert str(e.value) == error_message + + +def test_create_column_nodes_pass_dataframe(): + def target_fn(some_var: int, upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, pass_dataframe_as="upstream_df", select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + inject_parameter, initial_nodes = decorator._get_inital_nodes( + fn=target_fn, params=injectable_params + ) + + assert inject_parameter == "upstream_df" + assert len(initial_nodes) == 0 + + +def test_create_column_nodes_extract_single_columns(): + def dummy_df() -> pd.DataFrame: + return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1"], select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + + inject_parameter, initial_nodes = decorator._get_inital_nodes( + fn=target_fn, params=injectable_params + ) + + assert inject_parameter == "upstream_df" + assert len(initial_nodes) == 1 + assert initial_nodes[0].name == "col_1" + assert initial_nodes[0].type == pd.Series + pd.testing.assert_series_equal( + initial_nodes[0].callable(upstream_df=dummy_df()), + pd.Series([1, 2, 3, 4]), + check_names=False, + ) + + +def test_create_column_nodes_extract_multiple_columns(): + def dummy_df() -> pd.DataFrame: + return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + + inject_parameter, initial_nodes = decorator._get_inital_nodes( + fn=target_fn, params=injectable_params + ) + + assert inject_parameter == "upstream_df" + assert len(initial_nodes) == 2 + assert initial_nodes[0].name == "col_1" + assert initial_nodes[1].name == "col_2" + assert initial_nodes[0].type == pd.Series + assert initial_nodes[1].type == pd.Series + pd.testing.assert_series_equal( + initial_nodes[0].callable(upstream_df=dummy_df()), + pd.Series([1, 2, 3, 4]), + check_names=False, + ) + pd.testing.assert_series_equal( + initial_nodes[1].callable(upstream_df=dummy_df()), + pd.Series([11, 12, 13, 14]), + check_names=False, + ) + + +def test_no_matching_select_column_error(): + def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + select = "wrong_column" + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=select + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + + error_message = ( + f"No nodes found upstream from select columns: {select} for function: " + f"{target_fn.__qualname__}" + ) + with pytest.raises(ValueError) as e: + decorator.inject_nodes(injectable_params, {}, fn=target_fn) + + assert str(e.value) == error_message + + +def test_append_into_original_df(): + def dummy_df() -> pd.DataFrame: + return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=["dummy_fn_with_columns"] + ) + merge_node = decorator._create_merge_node(upstream_node="upstream_df", node_name="merge_node") + + output_df = merge_node.callable( + upstream_df=dummy_df(), + dummy_fn_with_columns=dummy_fn_with_columns(col_1=pd.Series([1, 2, 3, 4])), + ) + assert merge_node.name == "merge_node" + assert merge_node.type == pd.DataFrame + + pd.testing.assert_series_equal(output_df["col_1"], pd.Series([1, 2, 3, 4]), check_names=False) + pd.testing.assert_series_equal( + output_df["col_2"], pd.Series([11, 12, 13, 14]), check_names=False + ) + pd.testing.assert_series_equal( + output_df["dummy_fn_with_columns"], pd.Series([101, 102, 103, 104]), check_names=False + ) + + +def test_override_original_column_in_df(): + def dummy_df() -> pd.DataFrame: + return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + def col_1() -> pd.Series: + return pd.Series([0, 3, 5, 7]) + + decorator = with_columns(col_1, pass_dataframe_as=["upstream_df"], select=["col_1"]) + merge_node = decorator._create_merge_node(upstream_node="upstream_df", node_name="merge_node") + + output_df = merge_node.callable(upstream_df=dummy_df(), col_1=col_1()) + assert merge_node.name == "merge_node" + assert merge_node.type == pd.DataFrame + + pd.testing.assert_series_equal(output_df["col_1"], pd.Series([0, 3, 5, 7]), check_names=False) + pd.testing.assert_series_equal( + output_df["col_2"], pd.Series([11, 12, 13, 14]), check_names=False + ) + + +def test_assign_custom_namespace_with_columns(): + def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + decorator = with_columns( + dummy_fn_with_columns, + columns_to_pass=["col_1", "col_2"], + select=["dummy_fn_with_columns"], + namespace="dummy_namespace", + ) + nodes_ = decorator.transform_dag([dummy_node], {}, target_fn) + + assert nodes_[0].name == "target_fn" + assert nodes_[1].name == "dummy_namespace.col_1" + assert nodes_[2].name == "dummy_namespace.col_2" + assert nodes_[3].name == "dummy_namespace.dummy_fn_with_columns" + assert nodes_[4].name == "dummy_namespace.__append" + + +def test_end_to_end_with_columns_automatic_extract(): + config_5 = { + "factor": 5, + } + dr = driver.Builder().with_modules(with_columns_end_to_end).with_config(config_5).build() + result = dr.execute(final_vars=["final_df"], inputs={"user_factor": 1000})["final_df"] + + expected_df = pd.DataFrame( + { + "col_1": [1, 2, 3, 4], + "col_2": [11, 12, 13, 14], + "col_3": [1, 1, 1, 1], + "subtract_1_from_2": [10, 10, 10, 10], + "multiply_3": [5, 5, 5, 5], + "add_1_by_user_adjustment_factor": [1001, 1002, 1003, 1004], + "multiply_2_by_upstream_3": [33, 36, 39, 42], + } + ) + pd.testing.assert_frame_equal(result, expected_df) + + config_7 = { + "factor": 7, + } + dr = driver.Builder().with_modules(with_columns_end_to_end).with_config(config_7).build() + result = dr.execute(final_vars=["final_df"], inputs={"user_factor": 1000})["final_df"] + + expected_df = pd.DataFrame( + { + "col_1": [1, 2, 3, 4], + "col_2": [11, 12, 13, 14], + "col_3": [1, 1, 1, 1], + "subtract_1_from_2": [10, 10, 10, 10], + "multiply_3": [7, 7, 7, 7], + "add_1_by_user_adjustment_factor": [1001, 1002, 1003, 1004], + "multiply_2_by_upstream_3": [33, 36, 39, 42], + } + ) + pd.testing.assert_frame_equal(result, expected_df) + + +def test_end_to_end_with_columns_pass_dataframe(): + config_5 = { + "factor": 5, + } + dr = driver.Builder().with_modules(with_columns_end_to_end).with_config(config_5).build() + + result = dr.execute(final_vars=["final_df_2"])["final_df_2"] + expected_df = pd.DataFrame( + { + "col_1": [1, 2, 3, 4], + "col_2": [11, 12, 13, 14], + "col_3": [0, 2, 4, 6], + "multiply_3": [0, 10, 20, 30], + } + ) + pd.testing.assert_frame_equal(result, expected_df) diff --git a/tests/function_modifiers/test_recursive.py b/tests/function_modifiers/test_recursive.py index d56d921dd..7b3ae7a91 100644 --- a/tests/function_modifiers/test_recursive.py +++ b/tests/function_modifiers/test_recursive.py @@ -3,11 +3,9 @@ import random from typing import Tuple -import pandas as pd import pytest -from hamilton import ad_hoc_utils, driver, graph -from hamilton import node as hamilton_node +from hamilton import ad_hoc_utils, graph from hamilton.function_modifiers import ( InvalidDecoratorException, config, @@ -15,14 +13,12 @@ recursive, subdag, value, - with_columns, ) -from hamilton.function_modifiers.base import NodeInjector, NodeTransformer +from hamilton.function_modifiers.base import NodeTransformer from hamilton.function_modifiers.dependencies import source from hamilton.function_modifiers.recursive import _validate_config_inputs import tests.resources.reuse_subdag -import tests.resources.with_columns def test_collect_function_fns(): @@ -543,261 +539,3 @@ def test_recursive_validate_config_inputs_happy(config, inputs): def test_recursive_validate_config_inputs_sad(config, inputs): with pytest.raises(InvalidDecoratorException): _validate_config_inputs(config, inputs) - - -def dummy_fn_with_columns(col_1: pd.Series) -> pd.Series: - return col_1 + 100 - - -def test_detect_duplicate_nodes(): - node_a = hamilton_node.Node.from_fn(dummy_fn_with_columns, name="a") - node_b = hamilton_node.Node.from_fn(dummy_fn_with_columns, name="a") - node_c = hamilton_node.Node.from_fn(dummy_fn_with_columns, name="c") - - if not with_columns._check_for_duplicates([node_a, node_b, node_c]): - raise (AssertionError) - - if with_columns._check_for_duplicates([node_a, node_c]): - raise (AssertionError) - - -def test_select_not_empty(): - error_message = "Please specify at least one column to append or update." - - with pytest.raises(ValueError) as e: - with_columns(dummy_fn_with_columns) - assert str(e.value) == error_message - - -def test_columns_to_pass_and_pass_dataframe_as_raises_error(): - error_message = ( - "You must specify only one of columns_to_pass and " - "pass_dataframe_as. " - "This is because specifying pass_dataframe_as injects into " - "the set of columns, allowing you to perform your own extraction" - "from the dataframe. We then execute all columns in the sbudag" - "in order, passing in that initial dataframe. If you want" - "to reference columns in your code, you'll have to specify " - "the set of initial columns, and allow the subdag decorator " - "to inject the dataframe through. The initial columns tell " - "us which parameters to take from that dataframe, so we can" - "feed the right data into the right columns." - ) - - with pytest.raises(ValueError) as e: - with_columns( - dummy_fn_with_columns, columns_to_pass=["a"], pass_dataframe_as="a", select="a" - ) - assert str(e.value) == error_message - - -def test_create_column_nodes_pass_dataframe(): - def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: - return upstream_df - - dummy_node = hamilton_node.Node.from_fn(target_fn) - - decorator = with_columns( - dummy_fn_with_columns, pass_dataframe_as="upstream_df", select="dummy_fn_with_columns" - ) - injectable_params = NodeInjector.find_injectable_params([dummy_node]) - inject_parameter, initial_nodes = decorator._get_inital_nodes( - fn=target_fn, params=injectable_params - ) - - assert inject_parameter == "upstream_df" - assert len(initial_nodes) == 0 - - -def test_create_column_nodes_extract_single_columns(): - def dummy_df() -> pd.DataFrame: - return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) - - def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: - return upstream_df - - dummy_node = hamilton_node.Node.from_fn(target_fn) - - decorator = with_columns( - dummy_fn_with_columns, columns_to_pass="col_1", select="dummy_fn_with_columns" - ) - injectable_params = NodeInjector.find_injectable_params([dummy_node]) - - inject_parameter, initial_nodes = decorator._get_inital_nodes( - fn=target_fn, params=injectable_params - ) - - assert inject_parameter == "upstream_df" - assert len(initial_nodes) == 1 - assert initial_nodes[0].name == "col_1" - assert initial_nodes[0].type == pd.Series - pd.testing.assert_series_equal( - initial_nodes[0].callable(upstream_df=dummy_df()), - pd.Series([1, 2, 3, 4]), - check_names=False, - ) - - -def test_create_column_nodes_extract_multiple_columns(): - def dummy_df() -> pd.DataFrame: - return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) - - def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: - return upstream_df - - dummy_node = hamilton_node.Node.from_fn(target_fn) - - decorator = with_columns( - dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select="dummy_fn_with_columns" - ) - injectable_params = NodeInjector.find_injectable_params([dummy_node]) - - inject_parameter, initial_nodes = decorator._get_inital_nodes( - fn=target_fn, params=injectable_params - ) - - assert inject_parameter == "upstream_df" - assert len(initial_nodes) == 2 - assert initial_nodes[0].name == "col_1" - assert initial_nodes[1].name == "col_2" - assert initial_nodes[0].type == pd.Series - assert initial_nodes[1].type == pd.Series - pd.testing.assert_series_equal( - initial_nodes[0].callable(upstream_df=dummy_df()), - pd.Series([1, 2, 3, 4]), - check_names=False, - ) - pd.testing.assert_series_equal( - initial_nodes[1].callable(upstream_df=dummy_df()), - pd.Series([11, 12, 13, 14]), - check_names=False, - ) - - -def test_no_matching_select_column_error(): - def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: - return upstream_df - - dummy_node = hamilton_node.Node.from_fn(target_fn) - select = "wrong_column" - - decorator = with_columns( - dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=select - ) - injectable_params = NodeInjector.find_injectable_params([dummy_node]) - - error_message = ( - f"No nodes found upstream from select columns: {[select]} for function: " - f"{target_fn.__qualname__}" - ) - with pytest.raises(ValueError) as e: - decorator.inject_nodes(injectable_params, {}, fn=target_fn) - - assert str(e.value) == error_message - - -def test_append_into_original_df(): - def dummy_df() -> pd.DataFrame: - return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) - - def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: - return upstream_df - - decorator = with_columns( - dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select="dummy_fn_with_columns" - ) - merge_node = decorator._create_merge_node(upstream_node="upstream_df", node_name="merge_node") - - output_df = merge_node.callable( - upstream_df=dummy_df(), - dummy_fn_with_columns=dummy_fn_with_columns(col_1=pd.Series([1, 2, 3, 4])), - ) - assert merge_node.name == "merge_node" - assert merge_node.type == pd.DataFrame - - pd.testing.assert_series_equal(output_df["col_1"], pd.Series([1, 2, 3, 4]), check_names=False) - pd.testing.assert_series_equal( - output_df["col_2"], pd.Series([11, 12, 13, 14]), check_names=False - ) - pd.testing.assert_series_equal( - output_df["dummy_fn_with_columns"], pd.Series([101, 102, 103, 104]), check_names=False - ) - - -def test_override_original_column_in_df(): - def dummy_df() -> pd.DataFrame: - return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) - - def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: - return upstream_df - - def col_1() -> pd.Series: - return pd.Series([0, 3, 5, 7]) - - decorator = with_columns(col_1, pass_dataframe_as="upstream_df", select="col_1") - merge_node = decorator._create_merge_node(upstream_node="upstream_df", node_name="merge_node") - - output_df = merge_node.callable(upstream_df=dummy_df(), col_1=col_1()) - assert merge_node.name == "merge_node" - assert merge_node.type == pd.DataFrame - - pd.testing.assert_series_equal(output_df["col_1"], pd.Series([0, 3, 5, 7]), check_names=False) - pd.testing.assert_series_equal( - output_df["col_2"], pd.Series([11, 12, 13, 14]), check_names=False - ) - - -def test_assign_custom_namespace_with_columns(): - def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: - return upstream_df - - dummy_node = hamilton_node.Node.from_fn(target_fn) - decorator = with_columns( - dummy_fn_with_columns, - columns_to_pass=["col_1", "col_2"], - select="dummy_fn_with_columns", - namespace="dummy_namespace", - ) - nodes_ = decorator.transform_dag([dummy_node], {}, target_fn) - - assert nodes_[0].name == "target_fn" - assert nodes_[1].name == "dummy_namespace.col_1" - assert nodes_[2].name == "dummy_namespace.col_2" - assert nodes_[3].name == "dummy_namespace.dummy_fn_with_columns" - assert nodes_[4].name == "dummy_namespace.__append" - - -def test_end_to_end_with_columns_automatic_extract(): - dr = driver.Builder().with_modules(tests.resources.with_columns).build() - result = dr.execute(final_vars=["final_df"])["final_df"] - - expected_df = pd.DataFrame( - { - "col_1": [1, 2, 3, 4], - "col_2": [11, 12, 13, 14], - "col_3": [1, 1, 1, 1], - "subtract_1_from_2": [10, 10, 10, 10], - "multiply_3_by_5": [5, 5, 5, 5], - } - ) - pd.testing.assert_frame_equal(result, expected_df) - - -def test_end_to_end_with_columns_pass_dataframe(): - dr = ( - driver.Builder() - .with_modules(tests.resources.with_columns) - .with_config({"case": "override_columns"}) - .build() - ) - - result = dr.execute(final_vars=["final_df_2"])["final_df_2"] - expected_df = pd.DataFrame( - { - "col_1": [1, 2, 3, 4], - "col_2": [11, 12, 13, 14], - "col_3": [0, 2, 4, 6], - "multiply_3_by_5": [0, 10, 20, 30], - } - ) - pd.testing.assert_frame_equal(result, expected_df) diff --git a/tests/resources/with_columns.py b/tests/resources/with_columns.py deleted file mode 100644 index 081843009..000000000 --- a/tests/resources/with_columns.py +++ /dev/null @@ -1,39 +0,0 @@ -import pandas as pd - -from hamilton.function_modifiers import with_columns - - -def initial_df() -> pd.DataFrame: - return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14], "col_3": [1, 1, 1, 1]}) - - -def subtract_1_from_2(col_1: pd.Series, col_2: pd.Series) -> pd.Series: - return col_2 - col_1 - - -def multiply_3_by_5(col_3: pd.Series) -> pd.Series: - return col_3 * 5 - - -@with_columns( - subtract_1_from_2, - multiply_3_by_5, - columns_to_pass=["col_1", "col_2", "col_3"], - select=["subtract_1_from_2", "multiply_3_by_5"], - namespace="some_subdag", -) -def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: - return initial_df - - -def col_3(initial_df: pd.DataFrame) -> pd.Series: - return pd.Series([0, 2, 4, 6]) - - -@with_columns( - col_3, - pass_dataframe_as="initial_df", - select=["col_3", "multiply_3_by_5"], -) -def final_df_2(initial_df: pd.DataFrame) -> pd.DataFrame: - return initial_df