Skip to content

Commit

Permalink
Add polars with_columns
Browse files Browse the repository at this point in the history
Support for polars with_columns api for eager and lazy execution.
  • Loading branch information
jernejfrank committed Nov 18, 2024
1 parent e04df19 commit 801e5ce
Show file tree
Hide file tree
Showing 15 changed files with 1,925 additions and 2 deletions.
10 changes: 10 additions & 0 deletions docs/reference/decorators/with_columns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ We have a ``with_columns`` option to run operations on columns of a Pandas dataf
:special-members: __init__


Polars
--------------

We have a ``with_columns`` decorator to run operations on columns of a Polars dataframe or lazyframe and append the results as new columns.

**Reference Documentation**

.. autoclass:: hamilton.plugins.h_polars.with_columns
:special-members: __init__

PySpark
--------------

Expand Down
Binary file added examples/polars/with_columns/DAG_DataFrame.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/polars/with_columns/DAG_lazy.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions examples/polars/with_columns/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Using with_columns with Pandas

We show the ability to use the familiar `with_columns` from `polars`. Supported for both `pl.DataFrame` and `pl.LazyFrame`.

To see the example look at the notebook.

![image info](./dag.png)
51 changes: 51 additions & 0 deletions examples/polars/with_columns/my_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import polars as pl

from hamilton.function_modifiers import config

"""
Notes:
1. This file is used for all the [ray|dask|spark]/hello_world examples.
2. It therefore show cases how you can write something once and not only scale it, but port it
to different frameworks with ease!
"""


@config.when(case="millions")
def avg_3wk_spend__millions(spend: pl.Series) -> pl.Series:
"""Rolling 3 week average spend."""
return (
spend.to_frame("spend").select(pl.col("spend").rolling_mean(window_size=3) / 1e6)
).to_series(0)


@config.when(case="thousands")
def avg_3wk_spend__thousands(spend: pl.Series) -> pl.Series:
"""Rolling 3 week average spend."""
return (
spend.to_frame("spend").select(pl.col("spend").rolling_mean(window_size=3) / 1e3)
).to_series(0)


def spend_per_signup(spend: pl.Series, signups: pl.Series) -> pl.Series:
"""The cost per signup in relation to spend."""
return spend / signups


def spend_mean(spend: pl.Series) -> float:
"""Shows function creating a scalar. In this case it computes the mean of the entire column."""
return spend.mean()


def spend_zero_mean(spend: pl.Series, spend_mean: float) -> pl.Series:
"""Shows function that takes a scalar. In this case to zero mean spend."""
return spend - spend_mean


def spend_std_dev(spend: pl.Series) -> float:
"""Function that computes the standard deviation of the spend column."""
return spend.std()


def spend_zero_mean_unit_variance(spend_zero_mean: pl.Series, spend_std_dev: float) -> pl.Series:
"""Function showing one way to make spend have zero mean and unit variance."""
return spend_zero_mean / spend_std_dev
47 changes: 47 additions & 0 deletions examples/polars/with_columns/my_functions_lazy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import polars as pl

from hamilton.function_modifiers import config

"""
Notes:
1. This file is used for all the [ray|dask|spark]/hello_world examples.
2. It therefore show cases how you can write something once and not only scale it, but port it
to different frameworks with ease!
"""


@config.when(case="millions")
def avg_3wk_spend__millions(spend: pl.Expr) -> pl.Expr:
"""Rolling 3 week average spend."""
return spend.rolling_mean(window_size=3) / 1e6


@config.when(case="thousands")
def avg_3wk_spend__thousands(spend: pl.Expr) -> pl.Expr:
"""Rolling 3 week average spend."""
return spend.rolling_mean(window_size=3) / 1e3


def spend_per_signup(spend: pl.Expr, signups: pl.Expr) -> pl.Expr:
"""The cost per signup in relation to spend."""
return spend / signups


def spend_mean(spend: pl.Expr) -> float:
"""Shows function creating a scalar. In this case it computes the mean of the entire column."""
return spend.mean()


def spend_zero_mean(spend: pl.Expr, spend_mean: float) -> pl.Expr:
"""Shows function that takes a scalar. In this case to zero mean spend."""
return spend - spend_mean


def spend_std_dev(spend: pl.Expr) -> float:
"""Function that computes the standard deviation of the spend column."""
return spend.std()


def spend_zero_mean_unit_variance(spend_zero_mean: pl.Expr, spend_std_dev: float) -> pl.Expr:
"""Function showing one way to make spend have zero mean and unit variance."""
return spend_zero_mean / spend_std_dev
1,219 changes: 1,219 additions & 0 deletions examples/polars/with_columns/notebook.ipynb

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions hamilton/plugins/h_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ def __init__(
:param pass_dataframe_as: The name of the dataframe that we're modifying, as known to the subdag.
If you pass this in, you are responsible for extracting columns out. If not provided, you have
to pass columns_to_pass in, and we will extract the columns out for you.
:param select: The end nodes that represent columns to be appended to the original dataframe
via with_columns. Existing columns will be overridden.
:param namespace: The namespace of the nodes, so they don't clash with the global namespace
and so this can be reused. If its left out, there will be no namespace (in which case you'll want
to be careful about repeating it/reusing the nodes in other parts of the DAG.)
Expand All @@ -153,6 +155,7 @@ def __init__(

self.subdag_functions = subdag.collect_functions(load_from)

# TODO: select none should append all nodes like h_spark
if select is None:
raise ValueError("Please specify at least one column to append or update.")
else:
Expand Down
Loading

0 comments on commit 801e5ce

Please sign in to comment.