Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyspark udf support #83

Merged
merged 4 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/reference/api-reference/graph-adapters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ Pandas on Spark (Koalas)
:inherited-members:


PySpark UDFs
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: hamilton.experimental.h_spark.PySparkUDFGraphAdapter
:special-members: __init__
:members:


Async Python
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: hamilton.experimental.h_async.AsyncGraphAdapter
Expand Down
14 changes: 11 additions & 3 deletions examples/spark/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Scaling Hamilton on Spark

Currently, Hamilton scales by using Koalas on Spark.
Koalas became part of Spark officially in Spark 3.2.
The example in `hello_world` here assumes that.
## Pandas
If you're using Pandas, Hamilton scales by using Koalas on Spark.
Koalas became part of Spark officially in Spark 3.2, and was renamed Pandas on Spark.
The example in `pandas_on_spark` here assumes that.

## Pyspark UDFs
If you're not using Pandas, then you can use Hamilton to manage and organize your pyspark UDFs.
See the example in `pyspark_udfs`.
skrawcz marked this conversation as resolved.
Show resolved Hide resolved

Note: we're looking to expand coverage and support for more Spark use cases. Please come find us, or open an issue,
if you have a use case that you'd like to see supported!
101 changes: 101 additions & 0 deletions examples/spark/pyspark_udfs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Hamilton to help manage your pyspark UDFs

Here we have a version of the Hamilton hello world example showing how you can take some Hamilton functions and then
easily run them as pyspark UDFs.

## Dependencies

`pip install "sf-hamilton[pyspark, visualization]"` for the right dependencies to run this example.

## File organization:

* `pandas_udfs.py` contains UDF definitions that Hamilton will know how to run as `pandas_udfs` on pyspark.
* `vanilla_udfs.py` contains UDF definitions that Hamilton will know how to run as row based UDFs on pyspark.
* `run.py` contains spark driver and hamilton driver code to run everything.

## To run

```bash
python run.py pandas # to run using pandas_udfs
python run.py vanilla # to run using vanilla_udfs
```
You should get the following as output:

```
+-----+-------+------------------+----------------+-----------------------------+---+---+--------------+
|spend|signups| avg_3wk_spend|spend_per_signup|spend_zero_mean_unit_variance|foo|bar|augmented_mean|
+-----+-------+------------------+----------------+-----------------------------+---+---+--------------+
| 10| 1| 10.0| 10.0| -1.0644054|1.0|2.0| 3.0|
| 10| 10| 10.0| 1.0| -1.0644054|1.0|2.0| 3.0|
| 20| 50|13.333333333333334| 0.4| -0.48382062|1.0|2.0| 3.0|
| 40| 100|23.333333333333332| 0.4| 0.6773489|1.0|2.0| 3.0|
| 40| 200|33.333333333333336| 0.2| 0.6773489|1.0|2.0| 3.0|
| 50| 400|43.333333333333336| 0.125| 1.2579336|1.0|2.0| 3.0|
+-----+-------+------------------+----------------+-----------------------------+---+---+--------------+
```
with Hamilton showing you the following execution visualization:
![udf hamilton execution](my_spark_udf.dot.png)
Note: foo, bar, and augmented_mean do not depend on any inputs in the dataframe.

# What Hamilton does under the hood

You need to use the `PySparkUDFGraphAdapter` for this to work. This is because Hamilton needs to know how to
augment operations for pyspark. With this adapter, Hamilton will:

1. Takes care of the spark boilerplate for you in creating a UDF.
2. Intelligently knows whether to pull from the dataframe for input to a function, based on column names,
or pulls from the passed in dictionary of inputs for that UDF.
3. **Append** the requested outputs to the dataframe.
4. Operate on the premise that as part of the inputs field to `dr.execute(..., inputs=INPUTS)`, the `INPUTS`
dictionary contains a mapping of column name to pyspark dataframe. Hamilton will then know to use that **single**
dataframe to pull from and append columns to, to coordinate UDF execution.
5. Assume all functions are `map` based functions. If they're not, then you'll need to manually
manage that outside of Hamilton.

# Why is Hamilton useful here?

1. It makes it easy to write UDFs that can be run in a distributed setting.
2. Those UDFs can be run in other contexts that don't use spark!
3. Hamilton forces you to curate UDFs and thus they subsequently can be versioned, tested, documented, and reused
in other spark jobs easily. Think reusable feature definitions, think reusable metric definitions! Assumption: they
have to be `map` based functions.
4. Anyone who wants to use these UDFs, MUST ALIGN ON COLUMNS NAMES! This is a good thing, because it means that you can
standardize on column names and reuse UDFs across different spark jobs, which lowers your maintenance burden!
5. If you don't care about intermediate outputs, Hamilton won't add them. It'll only add the outputs you want to the
passed in dataframe.

# Limitations

1. The `PySparkUDFGraphAdapter` currently only supports `map` based functions.
2. The only `pandas_udfs` signature currently supported is for functions with pd.Series as inputs, and pd.Series as output.
3. `@check_output` annotations are not currently supported for pyspark UDFs at this time. But we're working on it - ping
us in slack (or via issues) if you need this feature!

# Future work

## Auto vectorize UDFs to be pandas_udfs
We could under the hood translate basic vanilla python UDF functions to use the pandas_udf route. This could be a
variable passed to the PySparkUDFGraphAdapter to enable it/or require some annotation on the function, or both.
Let us know if this would be useful to you!

## All the Pandas UDF signatures

(1) Let us know what you need.
(2) Implementation is a matter of (a) getting the API right, and (b) making sure it fits with the Hamilton way of thinking.

## Aggregation UDFs

We just need to determine what a good API for this would be. We're open to suggestions!

## Other dataframe operations

We could support other dataframe operations, like joins, etc. We're open to suggestions! The main issue is creating
a good API for this.

# Other questions

## Can't I just use pyspark dataframes directly with Hamilton functions?

Yes, with Hamilton you could write functions that define a named flow that operates entirely over pyspark dataframes.
However, you lose a lot of the flexibility of Hamilton doing things that way. We're open to suggestions,
or API changes to make this approach easier.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
52 changes: 52 additions & 0 deletions examples/spark/pyspark_udfs/pandas_udfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Pandas UDFs.

Has to only contain map operations! Aggregations, filters, etc. are not supported.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if its not just map operations?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pandas udfs only have certain shapes -- so it'll break in pyspark.


Notes:

1. Functions that are annotated with pd.Series for all inputs and output, will be converted to pandas UDFs.
Note - we could broaden support for more function type signatures, but this is a good start. Please make an
issue/discussion if this is something you'd like to see.
2. You need to use the `h_typing.column` annotation for the output type. This is because we need to know what
underlying primitive type the pandas series is. This is a limitation of the pandas UDFs on pyspark.
3. If a function is deemed to be a pandas_udf one, Hamilton will try to satisfy running these UDFs with columns from
the dataframe ONLY. This is different from how the vanilla UDFs behave. This is partially a limitation of pandas UDFs.
4. Pandas_udfs operate over chunks of data, and can thus operate in a vectorized manner. This is a big performance gain
over vanilla UDFs.
5. You can have non-pandas_udf functions in the same file, and will be run as row based UDFs.

"""
import pandas as pd

from hamilton.htypes import column


def spend_per_signup(spend: pd.Series, signups: pd.Series) -> column[pd.Series, float]:
"""The cost per signup in relation to spend."""
return spend / signups


def augmented_mean(foo: float, bar: float) -> float:
"""Shows you can include functions that don't depend on columns in the dataframe if you want to do
other things with Hamilton at the same time as computing. If Hamilton does not find a match in the
dataframe it'll look for a match in the inputs dictionary.
"""
return foo + bar


def spend_zero_mean(spend: pd.Series, spend_mean: pd.Series) -> column[pd.Series, float]:
"""Computes zero mean spend.
Note:
`spend_mean` here HAS TO come from the dataframe or the input dictionary.
"""
return spend - spend_mean


def spend_zero_mean_unit_variance(
spend_zero_mean: pd.Series, spend_std_dev: pd.Series
) -> column[pd.Series, float]:
"""Function showing one way to make spend have zero mean and unit variance.
Note:
`spend_std_dev` here HAS TO come from the pyspark dataframe.
"""
return spend_zero_mean / spend_std_dev
135 changes: 135 additions & 0 deletions examples/spark/pyspark_udfs/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Spark driver and Hamilton driver code."""

import pandas as pd
from pyspark.sql import DataFrame, SparkSession, Window
from pyspark.sql import functions as F

from hamilton import driver, log_setup
from hamilton.experimental import h_spark


def create_hamilton_driver(config: dict, modules: list) -> driver.Driver:
"""Helper function to create a Hamilton driver.

:param config: any configuration required to instantiate a DAG.
:param modules: the modules to crawl to get functions from to build the DAG.
:return: an instantiated Hamilton driver.
"""
adapter = h_spark.PySparkUDFGraphAdapter()
dr = driver.Driver(config, *modules, adapter=adapter) # can pass in multiple modules
return dr


def get_hamilton_modules(use_pandas_udfs: bool) -> list:
"""We have two implementations of the same UDFs - this chooses which one to use.

vanilla_udfs.py uses row based UDFs, pandas_udfs.py uses pandas UDFs which do compute
that is vectorized.

:param use_pandas_udfs: boolean, True to use pandas UDFs, False to use vanilla UDFs.
:return: list of modules to pass to the Hamilton driver.
"""
if use_pandas_udfs:
import pandas_udfs

modules = [pandas_udfs]
else:
import vanilla_udfs

modules = [vanilla_udfs]
return modules


def my_spark_job(spark: SparkSession, use_pandas_udfs: bool = False):
"""Template for a Spark job that uses Hamilton for their featuring engineering, i.e. any map, operations.

:param spark: the SparkSession
:param use_pandas_udfs: whether to use pandas UDFs or vanilla UDFs -- see code for details.
"""
# replace this with SQL or however you'd get the data you need in.
pandas_df = pd.DataFrame(
{"spend": [10, 10, 20, 40, 40, 50], "signups": [1, 10, 50, 100, 200, 400]}
)
df = spark.createDataFrame(pandas_df)
# add some extra values to the DF, e.g. aggregates, etc.
df = add_values_to_dataframe(df)
# get the modules that contain the UDFs
modules = get_hamilton_modules(use_pandas_udfs)
# create the Hamilton driver
dr = create_hamilton_driver({}, modules)
# create inputs to the UDFs - this needs to be column_name -> spark dataframe.
execute_inputs = {col: df for col in df.columns}
# add in any other scalar inputs/values/objects needed by the UDFs
execute_inputs.update(
{
# "spend_mean": agg_values.first()["spend_mean"],
# "spend_std_dev": agg_values.first()["spend_std_dev"],
"foo": 1.0,
"bar": 2.0,
}
)
# tell Hamilton what columns need to be appended to the dataframe.
cols_to_append = [
"spend_per_signup",
"spend_zero_mean_unit_variance",
"foo",
"bar",
"augmented_mean", # these can be function references too
]
# visualize execution of what is going to be appended
dr.visualize_execution(
cols_to_append, "./my_spark_udf.dot", {"format": "png"}, inputs=execute_inputs
)
# tell Hamilton to tell Spark what to do
df = dr.execute(cols_to_append, inputs=execute_inputs)
# do other stuff -- you could filter, groupby, etc.
w = Window.rowsBetween(-2, 0)
df = df.withColumn("avg_3wk_spend", F.mean("spend").over(w))
df = df.select(["spend", "signups", "avg_3wk_spend"] + cols_to_append)
df.explain()
df.show()
# and you can reuse the same driver to execute UDFs on new dataframes:
# df2 = spark.createDataFrame(pandas_df)
# add some extra values to the DF, e.g. aggregates, etc.
# df2 = add_values_to_dataframe(df2)
# execute_inputs = {col: df2 for col in df2.columns}
# df2 = dr.execute([
# "spend_per_signup",
# "spend_zero_mean_unit_variance",
# ], inputs=execute_inputs)
# df2.show()


def add_values_to_dataframe(df: DataFrame) -> DataFrame:
"""Helper function to add some extra columns to the dataframe.

These are required to compute some of the UDFs. See comments for details.
"""
# compute these values for some of the UDFs -- skip if you don't need them.
agg_values = df.agg(F.mean("spend"), F.stddev("spend"))
agg_values = agg_values.withColumnRenamed("avg(spend)", "spend_mean")
agg_values = agg_values.withColumnRenamed("stddev_samp(spend)", "spend_std_dev")
# we need a way to pass in the mean and std_dev to the UDFs - so add as columns.
df = df.withColumn("spend_mean", F.lit(agg_values.first()["spend_mean"]))
# df = df.withColumn("spend_mean", F.lit(10.0)) # or hard code the value
df = df.withColumn("spend_std_dev", F.lit(agg_values.first()["spend_std_dev"]))
# df = df.withColumn("spend_std_dev", F.lit(5.0)) # or hard code the value
return df


if __name__ == "__main__":
import sys

if len(sys.argv) > 1:
use_pandas_udfs = sys.argv[1].lower() == "pandas"
else:
use_pandas_udfs = False
log_setup.setup_logging(log_level=log_setup.LOG_LEVELS["INFO"])
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("info")

my_spark_job(spark, use_pandas_udfs=use_pandas_udfs)
# my_spark_job(spark, use_pandas_udfs=True) # use pandas UDF functions
# my_spark_job(spark, use_pandas_udfs=False) # use vanilla UDF functions

spark.stop()
39 changes: 39 additions & 0 deletions examples/spark/pyspark_udfs/vanilla_udfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Pyspark UDFs

Has to only contain map operations!

Notes:

1. Hamilton will first try to satisfy running these UDFs with columns from the dataframe, else it will take from the
input dictionary that are not part of the pyspark dataframe.
2. UDFs defined this way operate in a row-by-row fashion, so they are not vectorized.

"""


def spend_per_signup(spend: float, signups: float) -> float:
"""The cost per signup in relation to spend."""
return spend / signups


def augmented_mean(foo: float, bar: float) -> float:
"""Shows you can include functions that don't depend on columns in the dataframe if you want to do
other things with Hamilton at the same time as computing. If Hamilton does not find a match in the
dataframe it'll look for a match in the inputs dictionary."""
return foo + bar


def spend_zero_mean(spend: float, spend_mean: float) -> float:
"""Computes zero mean spend.
Note:
`spend_mean` here COULD come from the dataframe OR the input dictionary.
"""
return spend - spend_mean


def spend_zero_mean_unit_variance(spend_zero_mean: float, spend_std_dev: float) -> float:
"""Function showing one way to make spend have zero mean and unit variance.
Note:
`spend_std_dev` here COULD come from the pyspark dataframe OR the input dictionary.
"""
return spend_zero_mean / spend_std_dev
2 changes: 1 addition & 1 deletion examples/spark/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sf-hamilton[pyspark]
sf-hamilton[pyspark, visualization]
Loading