Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full spark integration #249

Merged
merged 11 commits into from
Aug 22, 2023
12 changes: 10 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ workflows:
python-version: '3.9'
task: ray
- test:
name: spark-py38
python-version: '3.8'
name: spark-py39
python-version: '3.9'
task: pyspark
- test:
name: spark-py310
python-version: '3.10'
task: pyspark
- test:
name: spark-py311
python-version: '3.11'
task: pyspark
- test:
name: integrations-py37
Expand Down
2 changes: 1 addition & 1 deletion docs/how-tos/scale-up.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ on larger, distributed datasets (pandas on spark, pyspark map UDFs).
1. Integrating hamilton with `pandas on spark <https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/spark/pandas_on_spark>`_.
2. Integrating hamilton with `ray <https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/ray>`_.
3. Integrating hamilton with `dask <https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/dask>`_.
4. Integrating hamilton using `pyspark map UDFs <https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/spark/pyspark_udfs>`__.
4. Integrating hamilton with `pyspark <https://github.com/DagWorks-Inc/hamilton/tree/main/examples/spark/pyspark>`_.
7 changes: 7 additions & 0 deletions examples/spark/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@

# Scaling Hamilton on Spark
## Pyspark

If you're using pyspark, Hamilton allows for natural manipulation of pyspark dataframes,
with some special constructs for managing DAGs of UDFs.

See the example in `pyspark` to learn more.

## Pandas
If you're using Pandas, Hamilton scales by using Koalas on Spark.
Expand Down
324 changes: 324 additions & 0 deletions examples/spark/pyspark/README.md

Large diffs are not rendered by default.

124 changes: 124 additions & 0 deletions examples/spark/pyspark/dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from typing import Dict

import map_transforms
import pandas as pd
import pyspark.sql as ps
from pyspark.sql.functions import col, mean, stddev

from hamilton.experimental import h_spark
from hamilton.function_modifiers import extract_fields


def spark_session() -> ps.SparkSession:
"""Pyspark session to load up when starting.
You can also pass it in if you so choose.

:return:
"""
return ps.SparkSession.builder.master("local[1]").getOrCreate()


def base_df(spark_session: ps.SparkSession) -> ps.DataFrame:
"""Dummy function showing how to wire through loading data.
Note you can use @load_from (although our spark data loaders are limited now).

:return: A dataframe with spend and signups columns.
"""
pd_df = pd.DataFrame(
{
"spend": [
10,
10,
20,
40,
40,
50,
60,
70,
90,
100,
70,
80,
90,
100,
110,
120,
130,
140,
150,
160,
],
"signups": [
1,
10,
50,
100,
200,
400,
600,
800,
1000,
1200,
1400,
1600,
1800,
2000,
2200,
2400,
2600,
2800,
3000,
3200,
],
}
)
return spark_session.createDataFrame(pd_df)


@extract_fields(
{
"spend_mean": float,
"spend_std_dev": float,
}
)
def spend_statistics(base_df: ps.DataFrame) -> Dict[str, float]:
"""Computes the mean and standard deviation of the spend column.
Note that this is a blocking (collect) operation,
but it doesn't have to be if you use an aggregation. In that case
you'd just add the column to the dataframe and refer to it downstream,
by expanding `columns_to_pass` in `with_mapped_data`.

:param base_df: Base dataframe with spend and signups columns.
:return: A dictionary with the mean and standard deviation of the spend column.
"""
df_stats = base_df.select(
mean(col("spend")).alias("mean"), stddev(col("spend")).alias("std")
).collect()

return {
"spend_mean": df_stats[0]["mean"],
"spend_std_dev": df_stats[0]["std"],
}


@h_spark.with_columns(
map_transforms,
columns_to_pass=["spend", "signups"],
)
def with_mapped_data(base_df: ps.DataFrame) -> ps.DataFrame:
"""Applies all the transforms in map_transforms

:param base_df:
:return:
"""
return base_df


def final_result(with_mapped_data: ps.DataFrame) -> pd.DataFrame:
"""Computes the final result. You could always just output the pyspark
dataframe, but we'll collect it and make it a pandas dataframe.

:param base_df: Base dataframe with spend and signups columns.
:return: A dataframe with the final result.
"""
return with_mapped_data.toPandas()
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/spark/pyspark/illustration.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
20 changes: 20 additions & 0 deletions examples/spark/pyspark/map_transforms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pandas as pd

from hamilton.htypes import column


def spend_per_signup(spend: pd.Series, signups: pd.Series) -> column[pd.Series, float]:
"""The cost per signup in relation to spend."""
return spend / signups


def spend_zero_mean(spend: pd.Series, spend_mean: float) -> column[pd.Series, float]:
"""Shows function that takes a scalar. In this case to zero mean spend."""
return spend - spend_mean


def spend_zero_mean_unit_variance(
spend_zero_mean: pd.Series, spend_std_dev: float
) -> column[pd.Series, float]:
"""Function showing one way to make spend have zero mean and unit variance."""
return spend_zero_mean / spend_std_dev
155 changes: 155 additions & 0 deletions examples/spark/pyspark/notebook.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 13,
"id": "4c8c7cb7",
"metadata": {},
"outputs": [],
"source": [
"import pyspark.sql as ps\n",
"import pandas as pd\n",
"from pyspark.sql.functions import col, mean, stddev"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "a85bb3cf",
"metadata": {},
"outputs": [],
"source": [
"spark_session = ps.SparkSession.builder.master(\"local[1]\").getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "ac02f09c",
"metadata": {},
"outputs": [],
"source": [
"pd_df = pd.DataFrame(\n",
" {\n",
" \"spend\": [\n",
" 10,\n",
" 10,\n",
" 20,\n",
" 40,\n",
" 40,\n",
" 50,\n",
" 60,\n",
" 70,\n",
" 90,\n",
" 100,\n",
" 70,\n",
" 80,\n",
" 90,\n",
" 100,\n",
" 110,\n",
" 120,\n",
" 130,\n",
" 140,\n",
" 150,\n",
" 160,\n",
" ],\n",
" \"signups\": [\n",
" 1,\n",
" 10,\n",
" 50,\n",
" 100,\n",
" 200,\n",
" 400,\n",
" 600,\n",
" 800,\n",
" 1000,\n",
" 1200,\n",
" 1400,\n",
" 1600,\n",
" 1800,\n",
" 2000,\n",
" 2200,\n",
" 2400,\n",
" 2600,\n",
" 2800,\n",
" 3000,\n",
" 3200,\n",
" ],\n",
" }\n",
" )\n",
"ps_df = spark_session.createDataFrame(pd_df)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "71fd52ed",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[foo: double]"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ps_df.select(mean(col(\"spend\")).alias(\"foo\"))"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "8986d1ca",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[spend: bigint, signups: bigint, foo: bigint]"
]
},
"execution_count": 25,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ps_df.withColumn(\"foo\", ps_df['signups']*ps_df['spend'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7489e4dd",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.10"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
25 changes: 25 additions & 0 deletions examples/spark/pyspark/out.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 15 additions & 0 deletions examples/spark/pyspark/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import dataflow
import map_transforms

from hamilton import driver


def main():
dr = driver.Builder().with_modules(dataflow, map_transforms).build()
dr.visualize_execution(["final_result"], "./out.png", {"format": "png"})
final_result = dr.execute(["final_result"])
print(final_result)


if __name__ == "__main__":
main()
Binary file modified examples/spark/pyspark_udfs/my_spark_udf.dot.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 12 additions & 0 deletions examples/spark/tpc-h/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# TPC-H

We've represented a few TPC-h queries using pyspark + hamilton.

While we have not optimized these for benchmarking, they provide a good set of examples for how to express pyspark logic/break
it into hamilton functions.

## Running

To run, you have `run.py` -- this enables you to run a few of the queries. That said, you'll have to generate the data on your own, which is a bit tricky.

Download dbgen here, and follow the instructions: https://www.tpc.org/tpch/. You can also reach out to us and we'll help you get set up.
Loading