Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyspark udf support #83

Merged
merged 4 commits into from
Mar 2, 2023
Merged

Pyspark udf support #83

merged 4 commits into from
Mar 2, 2023

Conversation

skrawcz
Copy link
Collaborator

@skrawcz skrawcz commented Feb 28, 2023

The introduces a new experimental graph adapter - PySparkUDFGraphAdapter.

The point of this adapter is to allow someone to build and manage a DAG of map functions that would result in columns
on a pyspark dataframe.

# specifies a way to do pandas_udfs - for vectorized computation
def spend_per_signup(spend: pd.Series, signups: pd.Series) -> column[pd.Series, float]:
    """The cost per signup in relation to spend."""
    return spend / signups

# plain spark UDF that is row based
def spend_per_signup(spend: float, signups: float) -> float:
    """The cost per signup in relation to spend."""
    return spend / signups

Limitations:

  1. Only works for "map" operations.
  2. Does not implement the full spectrum of pandas_udfs available. Not all are map based.
  3. There potentially are performance impacts to using with_column for a large DAG. We should look to use expressions, and then do one final select statement in the result_builder.

Changes

  • adds graph adatper to h_spark
  • adds example under examples/spark
  • adds reference documentation on graph adatpers
  • renames type_utils to htypes and adds a "column" annotation

How I tested this

  • created tests
  • ran the example locally

Notes

  • this is our first attempt

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

Copy link
Collaborator

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More comments/general structure thoughts but this is in draft mode so it works.

Some broader thoughts:

  1. What happens when you have missing data/missing indices? Things could get difficult really quick...
  2. How does one use this if there's a join at some point? Might be nice to have a recipe/example...
  3. Could we use subdag or something similar to represent a series of map operations? Then do a join outside of that?

examples/spark/README.md Show resolved Hide resolved
examples/spark/pyspark_udfs/pandas_udfs.py Outdated Show resolved Hide resolved
@skrawcz skrawcz force-pushed the pyspark_udf_support branch 2 times, most recently from 17ae5bd to 0d95ad1 Compare February 28, 2023 23:17
Comment on lines +414 to +433
"""
df: DataFrame = self.df_object
output_schema = self.original_schema
# what's in the dataframe:
for output_name, output_value in outputs.items():
if output_name not in output_schema:
output_schema.append(output_name)
if output_name in df.columns:
continue
else:
df = df.withColumn(output_name, lit(output_value))
# original schema + new columns should be the order.
# if someone requests a column that is in the original schema we won't duplicate it.
result = df.select(*[column(col_name) for col_name in output_schema])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit here, but you can build select expr here rather than doing df.withColumn. This will affect performance in wide dataframes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code snippet please? :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure,

def some_fun(df):
    outputs={'col1': 23.0, 'col2': 45.0, 'col3': 67.0}
    df_columns=set(df.columns)
    select_exprs=[]
    for output_name, output_value in outputs.items():
        if output_name in df_columns:
            select_exprs.append(column(output_name))
        else:
            select_exprs.append(lit(output_value).alias(output_name))
    return df.select(*select_exprs)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I remember last time you showed us this. IIRC the issue isn't the actual computation, its the fact that the plan gets big and it takes longer to optimize? https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#withColumn(colName:String,col:org.apache.spark.sql.Column):org.apache.spark.sql.DataFrame

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately python doesn't have withColumns :/ Just java/scala.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the docs...
2.0.0

Note
this method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select with the multiple columns at once.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the problem is the plan right? Not the actual computation that gets run? E.G. executing the plan gets ugly (its kind of an n^2 set of parameters...)

Copy link
Collaborator Author

@skrawcz skrawcz Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hold up. The with_column here is only called for a minority case. So that's likely never going to be a bottleneck.

What this does bring up though, is whether the code to call the UDFs (_lambda_udf) will be. Since it does with_column for each UDF... 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm not going to refactor this now -- but yeah it would be a bit of surgery to rewrite things to return udfs, and then only in build_result do the final select expression. I am currently using the dataframe object to tell me what columns should exist in it -- so would need to replace that with an object, etc...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will get slow if there's 1000 columns -- but I think its not the end of the world right now, and not worth blocking.

@skrawcz skrawcz marked this pull request as ready for review March 2, 2023 05:23
skrawcz added a commit that referenced this pull request Mar 2, 2023
This implements #83 and enables one to pass driver.Variable objects
as expected outputs to the driver.execute(). This reduces the boiler
plate for someone to massage getting variables for use with .execute().

Adds to unit test.
@skrawcz skrawcz requested review from ramannanda9 and elijahbenizzy and removed request for ramannanda9 March 2, 2023 05:36
Copy link
Collaborator

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM -- happy to talk over today but I'm pretty happy with this.

@@ -0,0 +1,52 @@
"""Pandas UDFs.

Has to only contain map operations! Aggregations, filters, etc. are not supported.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if its not just map operations?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pandas udfs only have certain shapes -- so it'll break in pyspark.

@@ -149,3 +149,289 @@ def build_result(
return df.to_pandas()
else:
return df


def numpy_to_spark_type(numpy_type: Type) -> types.DataType:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No utility functinos in spark to do this right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked -- didn't find anything obvious.

elijahbenizzy pushed a commit that referenced this pull request Mar 2, 2023
This implements #83 and enables one to pass driver.Variable objects
as expected outputs to the driver.execute(). This reduces the boiler
plate for someone to massage getting variables for use with .execute().

Adds to unit test.
To make it clear what the example is.
@skrawcz skrawcz force-pushed the pyspark_udf_support branch from 58f341c to ccb5395 Compare March 2, 2023 22:48
elijahbenizzy and others added 3 commits March 2, 2023 15:33
Currently we're using an alias "htype" to shorten the syntax. We
also are hardcoded to pandas series, but we might want to extend
this/put it in extensions.

Next step -- we need this to work for custom_subclass_check.
This enables a cleaner import. Furthermore, it opens up the ability to
add more annotations later on.
This is a large commit from a couple of squashed ones, original commits below.

The introduces a new experimental graph adapter - PySparkUDFGraphAdapter.

The point of this adapter is to allow someone to build and manage a DAG of map functions that would result in columns
on a pyspark dataframe.

def spend_per_signup(spend: pd.Series, signups: pd.Series) -> column[pd.Series, float]:
    """The cost per signup in relation to spend."""
    return spend / signups

def spend_per_signup(spend: float, signups: float) -> float:
    """The cost per signup in relation to spend."""
    return spend / signups

Limitations:
 - Only works for "map" operations.
 - Does not implement the full spectrum of pandas_udfs available. Not all are map based.
 - There potentially are performance impacts to using with_column for a large DAG. We should look to use expressions, and then do one final select statement in the result_builder.

——
Pulls out static functions from PySparkUDFGraphAdapter

They don't need to be in the graph adapter itself. Pulling them
out makes them easier to unit test, as well as simplifies the
class itself.

Adds unit tests for the basic functionality of each static
function, as well as an integration test for the overall
adapter. (+5 squashed commits)
Squashed commits:
[857b91c] Adds to PysparkUDFGraphAdapter docs and functions

1. h_spark, so that they're more understandable and chunkable.
This should make it simpler to unit test them.
2. sphinx docs, so that this class shows up there for reference.
[ff60fb1] Adds future work section to pyspark udf readme

So that it's clear we have more work to do here.
[7558cfa] Fixes UDF driver reuse bug

We need to depend on type information from the node, not the actual callable itself, since we
will mutate the annotations on it for running spark. This just impacts behavior of rerunning things.
[a08c392] Adds PySparkUDFGraphAdapter

TODOs are to fill in docs and add tests.

Otherwise this is functionally ready to go!

Regular UDFs are pretty straightforward.
Pandas UDFs involve a bunch more code, and we only support one type of them.
So the bulk of the complexity is in supporting them and massaging
the function because pyspark expects no extra annotations etc.
[b76bbd5] Adds WIP code to run pyspark UDFs through Hamilton

The current behavior assumes:

1. a single DF being operated on.
2. the execute function says what should additionally be added to the current dataframe.
3. you need to do joins, filters, aggregations, outside of UDFs.
4. the pandas_udf stuff works, but assumes no ability to pass in scalar values to the function.
So you have to instead have them be in the columns.

TODOs:
 - move files
 - more testing
 - unit testing
 - adding usage documentation and limitations.
@skrawcz skrawcz force-pushed the pyspark_udf_support branch from ccb5395 to 85737c0 Compare March 2, 2023 23:33
@skrawcz skrawcz merged commit 44a36ee into main Mar 2, 2023
@skrawcz skrawcz deleted the pyspark_udf_support branch March 2, 2023 23:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants