Skip to content

Commit

Permalink
Mega-commit for James Arruda's fixes on #1017
Browse files Browse the repository at this point in the history
Initial issue description as a test.t

Modifying GracefulErrorAdpater for Parallelizable blocks.

Added docs to adapter. Updated for simpler passthrough.

Added sentinel injection feature and test. Updated gitignore for vscode settings folder.

Fix for tag error

Updating adapter test for proper tags

Fixing types for 3.8

Type fixing for 3.8

Fixing sentinel equality testing to avoid incomparible types.

Adding example of paralellism and GracefulErrorAdapter

Added decorator for sentinel acceptance as input. Updated tests and example. Added DAG image to example. Typo.

Simplifying node callable retrieval.

Adding docstring clarification on try_all_parallel.

Parametrizing tests. Moving test module to resources.

Added docs to decorator. Changed node tag key.
  • Loading branch information
JamesArruda authored and elijahbenizzy committed Jul 9, 2024
1 parent d9e8ef1 commit 907b690
Show file tree
Hide file tree
Showing 9 changed files with 525 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,6 @@ examples/dbt/logs/*

# Ignore hamilton-env virtual envs
examples/**/hamilton-env

# vxcode
.vscode
34 changes: 34 additions & 0 deletions examples/parallelism/graceful_running/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Parallelism with GracefulErrorAdapter Example

## Overview

This is a simple example of using the `GracefulErrorAdapter` in a parallelism example, where we might expect some component of an analysis to fail, but we'd still like to get as much data back as we can.

This example divides a large dataframe into smaller frames, and runs the same analysis on each of those frames. It then gathers the results at the end into a single frame. Any errors inside the paralellism block do not halt the total operation of the driver.

The user can define custom data splitting functions to process in the same sub-dag. In some ways, this is an example of how to do `@subdag` with `Parallelizable`.

The DAG is shown below, which is a simple set of operations on a custom set of splits of a dataframe. Failures occur on purpose in this example in the `model_fit` portion.

![image](dag.png)

## Take home

This demonstrates these capabilities:

1. Dynamically generating datasets from a larger one and analyzing them the same way - in parallel
2. Skipping over nodes when a failure occurs and returning sentinel values on failure

## Running

You can run the basic analysis in the terminal with:

```bash
python run.py
```

Change the `mode` input to demonstrate multiple methods of running in parallel.

Add the flag `--no-adapt` to see the failure that occurs when not using the adapter.

Modify the example to throw an exception in a function passed in to split the data. Change the order of the functions to see the effect on the results.
Binary file added examples/parallelism/graceful_running/dag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
81 changes: 81 additions & 0 deletions examples/parallelism/graceful_running/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from itertools import cycle
from typing import Any, Callable, Iterable, List, Tuple, Union

import numpy as np
import pandas as pd
from scipy.stats import linregress

from hamilton.function_modifiers import extract_fields
from hamilton.htypes import Collect, Parallelizable
from hamilton.lifecycle.default import accept_error_sentinels

Splitter = Callable[[pd.DataFrame], Iterable[tuple[str, pd.DataFrame]]]


def load_data() -> pd.DataFrame:
"""Make some fake data."""
n_rows = 200
views = np.linspace(0, 10_000.0, n_rows)
spend = views * np.sin(np.arange(n_rows))
_regions = cycle(["Central", "North", "South"])
regions = [next(_regions) for _ in range(n_rows)]
_methods = cycle(["Internet", "TV"])
method = [next(_methods) for _ in range(n_rows)]
df = (
pd.DataFrame()
.assign(Views=views)
.assign(Spend=spend)
.assign(Region=regions)
.assign(Method=method)
)
return df


def split_to_groups(
load_data: pd.DataFrame, funcs: List[Splitter]
) -> Parallelizable[tuple[str, pd.DataFrame]]:
"""Split data into interesting groups."""
for func in funcs:
for grp_name, grp in func(load_data):
yield (grp_name, grp)


@extract_fields(dict(data=pd.DataFrame, group_name=str))
def expander(split_to_groups: tuple[str, pd.DataFrame]) -> dict[str, Any]:
return {"data": split_to_groups[1], "group_name": split_to_groups[0]}


def average(data: pd.DataFrame) -> float:
"""Average the views."""
return data.Views.mean()


def model_fit(data: pd.DataFrame, group_name: str) -> Tuple[float, float, float]:
"""Imagine a model fit that doesn't always work."""
if "Method:TV" in group_name:
raise Exception("Fake floating point error, e.g.")
xs = data.Spend.values
ys = data.Views.values
res = linregress(xs, ys)
return res.intercept, res.slope, res.rvalue


@accept_error_sentinels
def gather_metrics(
group_name: Union[str, None],
average: Union[float, None],
model_fit: Union[Tuple[float, float, float], None],
) -> dict[str, Any]:
answer = {
"Name": group_name,
"Average": average,
"Intercept": model_fit[0] if model_fit else None,
"Spend_Coef": model_fit[1] if model_fit else None,
"Model_Fit": model_fit[2] if model_fit else None,
}
return answer


def final_collect(gather_metrics: Collect[dict[str, Any]]) -> pd.DataFrame:
df = pd.DataFrame.from_records(gather_metrics)
return df
83 changes: 83 additions & 0 deletions examples/parallelism/graceful_running/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from typing import Iterable, Tuple

import click
import functions
import pandas as pd
from dask import distributed

from hamilton import driver
from hamilton.execution import executors
from hamilton.lifecycle import GracefulErrorAdapter
from hamilton.plugins import h_dask

# Assume we define some custom methods for splittings


def split_on_region(data: pd.DataFrame) -> Iterable[Tuple[str, pd.DataFrame]]:
for idx, grp in data.groupby("Region"):
yield f"Region:{idx}", grp


def split_on_attrs(data: pd.DataFrame) -> Iterable[Tuple[str, pd.DataFrame]]:
for (region, method), grp in data.groupby(["Region", "Method"]):
yield f"Region:{region} - Method:{method}", grp


def split_on_views(data: pd.DataFrame) -> Iterable[Tuple[str, pd.DataFrame]]:
yield "Low Views", data[data.Views <= 4000.0]
yield "High Views", data[data.Views > 4000.0]


@click.command()
@click.option(
"--mode",
type=click.Choice(["local", "multithreading", "dask"]),
help="Where to run remote tasks.",
default="local",
)
@click.option("--no-adapt", is_flag=True, default=False, help="Disable the graceful adapter.")
def main(mode: str, no_adapt: bool):
adapter = GracefulErrorAdapter(
error_to_catch=Exception,
sentinel_value=None,
try_all_parallel=True,
)

shutdown = None
if mode == "local":
remote_executor = executors.SynchronousLocalTaskExecutor()
elif mode == "multithreading":
remote_executor = executors.MultiThreadingExecutor(max_tasks=100)
elif mode == "dask":
cluster = distributed.LocalCluster()
client = distributed.Client(cluster)
remote_executor = h_dask.DaskExecutor(client=client)
shutdown = cluster.close

dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_remote_executor(remote_executor)
.with_modules(functions)
)
if not no_adapt:
dr = dr.with_adapters(adapter)
dr = dr.build()

the_funcs = [split_on_region, split_on_attrs, split_on_views]
dr.visualize_execution(
["final_collect"], "./dag", {}, inputs={"funcs": the_funcs}, show_legend=False
)

print(
dr.execute(
final_vars=["final_collect"],
inputs={"funcs": the_funcs},
)["final_collect"]
)
if shutdown:
shutdown()


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions hamilton/lifecycle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
PDBDebugger,
PrintLn,
SlowDownYouMoveTooFast,
accept_error_sentinels,
)

PrintLnHook = PrintLn # for backwards compatibility -- this will be removed in 2.0
Expand Down
Loading

0 comments on commit 907b690

Please sign in to comment.