Skip to content

Commit

Permalink
Fixes bugs in pyspark @with_columns
Browse files Browse the repository at this point in the history
1. Configuration linkage was not exposed. So adding a parameter
for people to specify what configuration is required.
2. There was a bug in the UDF logic, because `all(empty)` evaluates
to True. So fixed that.
3. The `select` in append mode appended all possible columns. It
didn't only append what was in the `select`. This changes that behavior
by dropping "outputs" from the subdag that aren't in the select. We
do it this way because we don't know what's in the original dataframe.
4. Adds test to check for the new logic
  • Loading branch information
skrawcz committed Mar 2, 2024
1 parent 80ad335 commit ddd38df
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 14 deletions.
35 changes: 29 additions & 6 deletions examples/LLM_Workflows/pdf_summarizer/run_on_spark/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
# PDF Summarizer on Spark

Here we show how you can run the same Hamilton dataflow, that we defined in the backend
folder, on Spark. This is useful if you want to run the same dataflow on a larger dataset,
or have to run it on a cluster. Importantly this means you don't have to rewrite your
code, or have to change where/how you develop!
folder, on Spark (using two approaches). This is useful if you want to run the same dataflow
on a larger dataset, or have to run it on a cluster. Importantly this means you don't have
to rewrite your code, or have to change where/how you develop!

![Summarization dataflow](spark_summarization.png)

# File organization
- `summarization.py` this should be a carbon copy of the one in the backend folder.
- `run.py` this contains the code to create a spark job and run the summarization code.
- `run_with_columns.py` this contains the code to create a spark job and run the summarization code, but
using the `@with_columns` syntax. This is a more ergonomic way to write a full spark job with Hamilton.
- `run.py` this contains the code to create a spark job and run the summarization code as UDFs.

# How this works
# How `run_with_columns.py` works
We take the dataflow defined by `summarization.py` and execute it as a bunch
of row based UDFs on spark. The magic to do this happens in the `@with_columns` decorator.

This approach allows you to put your entire pyspark workflow into Hamilton in an ergonomic way.
You can request any intermediate outputs of summarization.py as columns in the dataframe.
This is great for debugging and understanding your dataflow.

![with_columns](spark_with_columns_summarization.png)

# How `run.py` works
We take the dataflow defined by `summarization.py` and execute it as a bunch
of row based UDFs on spark. The magic to do this happens in the Hamilton PySparkUDFGraphAdapter.

Expand All @@ -20,7 +32,18 @@ that contains a column that maps to the required input. Which in this example
is `pdf_source`. You can request whatever intermediate outputs as columns, which
in this example we do with `["raw_text", "chunked_text", "summarized_text"]`.

## Running the code
![udfs](spark_with_columns_summarization.png)

## Running `run_with_columns.py`

1. Make sure you have the right dependencies installed. You can do this by running
`pip install -r requirements.txt`.
2. You can then run the code with the single PDF that is provided, else add more paths to
PDF files in `spark_pdf_pipeline.py`.
3. Then you can run the code with `python run_with_columns.py`. Be sure to have your OPENAI_API_KEY in the
environment.

## Running `run.py`

1. Make sure you have the right dependencies installed. You can do this by running
`pip install -r requirements.txt`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Spark driver and Hamilton driver code."""

import spark_pdf_pipeline
from pyspark.sql import SparkSession

from hamilton import base, driver, log_setup


def my_spark_job(spark: SparkSession, openai_gpt_model: str, content_type: str, user_query: str):
"""Template for a Spark job that uses Hamilton for their featuring engineering, i.e. any map, operations.
:param spark: the SparkSession
:param openai_gpt_model: the model to use for summarization
:param content_type: the content type of the document to summarize
:param user_query: the user query to use for summarization
"""
dr = (
driver.Builder()
.with_config({"file_type": "pdf"})
.with_modules(spark_pdf_pipeline)
.with_adapter(base.DefaultAdapter())
.build()
)
# create inputs to the UDFs - this needs to be column_name -> spark dataframe.
execute_inputs = {
"spark_session": spark,
"save_path": "summarized_pdf_df.parquet",
"openai_gpt_model": openai_gpt_model,
"content_type": content_type,
"user_query": user_query,
}
output = ["saved_summarized_pdf_df"]
# visualize execution of what is going to be appended
dr.visualize_execution(
output,
"./spark_with_columns_summarization.png",
inputs=execute_inputs,
deduplicate_inputs=True,
)
# tell Hamilton to tell Spark what to do
dict_result = dr.execute(output, inputs=execute_inputs)
return dict_result["saved_summarized_pdf_df"]


if __name__ == "__main__":
import os

openai_api_key = os.environ.get("OPENAI_API_KEY")
log_setup.setup_logging(log_level=log_setup.LOG_LEVELS["INFO"])
# create the SparkSession -- note in real life, you'd adjust the number of executors to control parallelism.
spark = SparkSession.builder.config(
"spark.executorEnv.OPENAI_API_KEY", openai_api_key
).getOrCreate()
spark.sparkContext.setLogLevel("info")
# run the job
_df = my_spark_job(spark, "gpt-3.5-turbo-0613", "Scientific article", "Can you ELI5 the paper?")
# show the dataframe & thus make spark compute something
_df.show()
spark.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pandas as pd
import pyspark.sql as ps
import summarization

from hamilton.plugins.h_spark import with_columns


def pdf_df(spark_session: ps.SparkSession) -> ps.DataFrame:
pandas_df = pd.DataFrame(
# TODO: update this to point to a PDF or two.
{"pdf_source": ["CDMS_HAMILTON_PAPER.pdf"]}
)
df = spark_session.createDataFrame(pandas_df)
return df


@with_columns(
summarization,
select=["summarized_chunks", "summarized_text"],
columns_to_pass=["pdf_source"],
config_required=["file_type"],
)
def summarized_pdf_df(pdf_df: ps.DataFrame) -> ps.DataFrame:
return pdf_df


def saved_summarized_pdf_df(
summarized_pdf_df: ps.DataFrame, save_path: str, persist_before_save: bool = True
) -> ps.DataFrame:
"""Save the summarized PDF dataframe to a parquet file."""
if persist_before_save:
summarized_pdf_df.persist()
summarized_pdf_df.write.parquet(save_path, mode="overwrite")
return summarized_pdf_df
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
61 changes: 53 additions & 8 deletions hamilton/plugins/h_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def _lambda_udf(df: DataFrame, node_: node.Node, actual_kwargs: Dict[str, Any])
raise ValueError(
f"Currently unsupported function for {node_.name} with function signature:\n{node_.input_types}."
)
elif all(pandas_annotation.values()):
elif all(pandas_annotation.values()) and len(pandas_annotation.values()) > 0:
hamilton_udf = _fabricate_spark_function(node_, params_to_bind, params_from_df, True)
# pull from annotation here instead of tag.
base_type, type_args = htypes.get_type_information(node_.type)
Expand Down Expand Up @@ -944,6 +944,7 @@ def __init__(
select: List[str] = None,
namespace: str = None,
mode: str = "append",
config_required: List[str] = None,
):
"""Initializes a with_columns decorator for spark. This allows you to efficiently run
groups of map operations on a dataframe, represented as pandas/primitives UDFs. This
Expand Down Expand Up @@ -1002,7 +1003,7 @@ def final_df(initial_df: ps.DataFrame) -> ps.DataFrame:
:param load_from: The functions that will be used to generate the group of map operations.
:param select: Columns to select from the transformation. If this is left blank it will
keep all columns in the subdag.
add all possible columns from the subdag to the dataframe.
:param columns_to_pass: The initial schema of the dataframe. This is used to determine which
upstream inputs should be taken from the dataframe, and which shouldn't. Note that, if this is
left empty (and external_inputs is as well), we will assume that all dependencies come
Expand All @@ -1014,11 +1015,12 @@ def final_df(initial_df: ps.DataFrame) -> ps.DataFrame:
and so this can be reused. If its left out, there will be no namespace (in which case you'll want
to be careful about repeating it/reusing the nodes in other parts of the DAG.)
:param mode: The mode of the operation. This can be either "append" or "select".
If it is "append", it will keep all columns in the dataframe. If it is "select",
If it is "append", it will keep all original columns in the dataframe. If it is "select",
it will only keep the columns in the dataframe from the `select` parameter. Note that,
if the `select` parameter is left blank, it will keep all columns in the dataframe
that are in the subdag (as that is the behavior of the `select` parameter. This
defaults to `append`
if the `select` parameter is left blank, it will add all columns in the dataframe
that are in the subdag. This defaults to `append`.
:param config_required: the list of config keys that are required to resolve any functions. Pass in None\
if you want the functions/modules to have access to all possible config.
"""
self.subdag_functions = subdag.collect_functions(load_from)
self.select = select
Expand All @@ -1043,6 +1045,7 @@ def final_df(initial_df: ps.DataFrame) -> ps.DataFrame:
self.namespace = namespace
self.upstream_dependency = dataframe
self.mode = mode
self.config_required = config_required

@staticmethod
def _prep_nodes(initial_nodes: List[node.Node]) -> List[node.Node]:
Expand All @@ -1068,11 +1071,10 @@ def create_selector_node(
upstream_name: str, columns: List[str], node_name: str = "select"
) -> node.Node:
"""Creates a selector node. The sole job of this is to select just the specified columns.
Note this is a utility function that's only called
Note this is a utility function that's only called here.
:param upstream_name: Name of the upstream dataframe node
:param columns: Columns to select
:param node_namespace: Namespace of the node
:param node_name: Name of the node to create
:return:
"""
Expand All @@ -1087,6 +1089,29 @@ def new_callable(**kwargs) -> DataFrame:
input_types={upstream_name: DataFrame},
)

@staticmethod
def create_drop_node(
upstream_name: str, columns: List[str], node_name: str = "select"
) -> node.Node:
"""Creates a drop node. The sole job of this is to drop just the specified columns.
Note this is a utility function that's only called here.
:param upstream_name: Name of the upstream dataframe node
:param columns: Columns to drop
:param node_name: Name of the node to create
:return:
"""

def new_callable(**kwargs) -> DataFrame:
return kwargs[upstream_name].drop(*columns)

return node.Node(
name=node_name,
typ=DataFrame,
callabl=new_callable,
input_types={upstream_name: DataFrame},
)

def _validate_dataframe_subdag_parameter(self, nodes: List[node.Node], fn_name: str):
all_upstream_dataframe_nodes = _identify_upstream_dataframe_nodes(nodes)
initial_schema = set(self.initial_schema) if self.initial_schema is not None else set()
Expand Down Expand Up @@ -1121,6 +1146,9 @@ def _validate_dataframe_subdag_parameter(self, nodes: List[node.Node], fn_name:
f"Instead, we found: {upstream_dependency}."
)

def required_config(self) -> List[str]:
return self.config_required

def generate_nodes(self, fn: Callable, config: Dict[str, Any]) -> List[node.Node]:
"""Generates nodes in the with_columns groups. This does the following:
Expand Down Expand Up @@ -1154,6 +1182,7 @@ def generate_nodes(self, fn: Callable, config: Dict[str, Any]) -> List[node.Node
columns_passed_in_from_dataframe = (
set(self.initial_schema) if self.initial_schema is not None else []
)
drop_list = []
# Or from the dataframe passed in...
for node_ in sorted_initial_nodes:
# dependent columns are broken into two sets:
Expand Down Expand Up @@ -1185,11 +1214,16 @@ def generate_nodes(self, fn: Callable, config: Dict[str, Any]) -> List[node.Node
dependent_columns_in_mapgroup,
dependent_columns_in_dataframe,
)
if self.select is not None and sparkified.name not in self.select:
# we need to create a drop list because we don't want to drop
# original columns from the DF by accident.
drop_list.append(sparkified.name)
output_nodes.append(sparkified)
current_dataframe_node = sparkified.name
# We get the final node, which is the function we're using
# and reassign inputs to be the dataframe
if self.mode == "select":
# this selects over the original DF and the additions
select_columns = (
self.select if self.select is not None else [item.name for item in output_nodes]
)
Expand All @@ -1200,6 +1234,17 @@ def generate_nodes(self, fn: Callable, config: Dict[str, Any]) -> List[node.Node
)
output_nodes.append(select_node)
current_dataframe_node = select_node.name
elif self.select is not None and len(drop_list) > 0:
# since it's in append mode, we only want to append what's in the select
# but we don't know what the original schema is, so we instead drop
# things from the DF to achieve the same result
select_node = with_columns.create_drop_node(
upstream_name=current_dataframe_node,
columns=drop_list,
node_name="_select",
)
output_nodes.append(select_node)
current_dataframe_node = select_node.name
output_nodes = subdag.add_namespace(output_nodes, namespace)
final_node = node.Node.from_fn(fn).reassign_inputs(
{inject_parameter: assign_namespace(current_dataframe_node, namespace)}
Expand Down
22 changes: 22 additions & 0 deletions plugin_tests/h_spark/test_h_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
spark_dag_multiple_with_columns,
spark_dag_pyspark_udfs,
)
from tests.resources.spark import a_dag


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -607,6 +608,27 @@ def df_as_pandas(df: DataFrame) -> pd.DataFrame:
assert set(nodes_by_names.keys()) == {"df_as_pandas.c", "df_as_pandas"}


def test_with_columns_generate_nodes_select_append_mode():
dec = h_spark.with_columns(
a_dag,
columns_to_pass=["input"],
select=["c"],
)

def df_as_pandas(df: DataFrame) -> pd.DataFrame:
return df.toPandas()

nodes = dec.generate_nodes(df_as_pandas, {})
nodes_by_names = {n.name: n for n in nodes}
assert set(nodes_by_names.keys()) == {
"df_as_pandas",
"df_as_pandas._select",
"df_as_pandas.a",
"df_as_pandas.b",
"df_as_pandas.c",
}


def test_with_columns_generate_nodes_select_mode_select():
dec = h_spark.with_columns(
basic_spark_dag.a,
Expand Down

0 comments on commit ddd38df

Please sign in to comment.