Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful failure for parallelism #1017

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,6 @@ examples/dbt/logs/*

# Ignore hamilton-env virtual envs
examples/**/hamilton-env

# vxcode
.vscode
31 changes: 31 additions & 0 deletions examples/parallelism/graceful_running/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Parallelism with GracefulErrorAdapter Example
elijahbenizzy marked this conversation as resolved.
Show resolved Hide resolved

## Overview

This is a simple example of using the `GracefulErrorAdapter` in a parallelism example, where we might expect some component of an analysis to fail, but we'd still like to get as much data back as we can.

This example divides a large dataframe into smaller frames, and runs the same analysis on each of those frames. It then gathers the results at the end into a single frame. Any errors inside the paralellism block do not halt the total operation of the driver.

The user can define custom data splitting functions to process in the same sub-dag. In some ways, this is an example of how to do `@subdag` with `Parallelizable`.

## Take home

This demonstrates these capabilities:

1. Dynamically generating datasets from a larger one and analyzing them the same way - in parallel
2. Skipping over nodes when a failure occurs and returning sentinel values on failure


## Running

You can run the basic analysis in the terminal with:

```bash
python run.py
```

Change the `mode` input to demonstrate multiple methods of running in parallel.

Add the flag `--no-adapt` to see the failure that occurs when not using the adapter.

Modify the example to throw an exception in a function passed in to split the data. Change the order of the functions to see the effect on the results.
80 changes: 80 additions & 0 deletions examples/parallelism/graceful_running/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from itertools import cycle
from typing import Any, Callable, Iterable, List, Tuple, Union

import numpy as np
import pandas as pd
from scipy.stats import linregress

from hamilton.function_modifiers import extract_fields, tag
from hamilton.htypes import Collect, Parallelizable

Splitter = Callable[[pd.DataFrame], Iterable[tuple[str, pd.DataFrame]]]


def load_data() -> pd.DataFrame:
"""Make some fake data."""
n_rows = 200
views = np.linspace(0, 10_000.0, n_rows)
spend = views * np.sin(np.arange(n_rows))
_regions = cycle(["Central", "North", "South"])
regions = [next(_regions) for _ in range(n_rows)]
_methods = cycle(["Internet", "TV"])
method = [next(_methods) for _ in range(n_rows)]
df = (
pd.DataFrame()
.assign(Views=views)
.assign(Spend=spend)
.assign(Region=regions)
.assign(Method=method)
)
return df


def split_to_groups(
load_data: pd.DataFrame, funcs: List[Splitter]
) -> Parallelizable[tuple[str, pd.DataFrame]]:
"""Split data into interesting groups."""
for func in funcs:
for grp_name, grp in func(load_data):
yield (grp_name, grp)


@extract_fields(dict(data=pd.DataFrame, group_name=str))
def expander(split_to_groups: tuple[str, pd.DataFrame]) -> dict[str, Any]:
return {"data": split_to_groups[1], "group_name": split_to_groups[0]}


def average(data: pd.DataFrame) -> float:
"""Average the views."""
return data.Views.mean()


def model_fit(data: pd.DataFrame, group_name: str) -> Tuple[float, float, float]:
"""Imagine a model fit that doesn't always work."""
if "Method:TV" in group_name:
raise Exception("Fake floating point error, e.g.")
xs = data.Spend.values
ys = data.Views.values
res = linregress(xs, ys)
return res.intercept, res.slope, res.rvalue


@tag(keep_sentinels="true")
def gather_metrics(
group_name: Union[str, None],
average: Union[float, None],
model_fit: Union[Tuple[float, float, float], None],
) -> dict[str, Any]:
answer = {
"Name": group_name,
"Average": average,
"Intercept": model_fit[0] if model_fit else None,
"Spend_Coef": model_fit[1] if model_fit else None,
"Model_Fit": model_fit[2] if model_fit else None,
}
return answer


def final_collect(gather_metrics: Collect[dict[str, Any]]) -> pd.DataFrame:
df = pd.DataFrame.from_records(gather_metrics)
return df
80 changes: 80 additions & 0 deletions examples/parallelism/graceful_running/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import Iterable, Tuple

import click
import functions
import pandas as pd
from dask import distributed

from hamilton import driver
from hamilton.execution import executors
from hamilton.lifecycle import GracefulErrorAdapter
from hamilton.plugins import h_dask

# Assume we define some custom methods for splittings


def split_on_region(data: pd.DataFrame) -> Iterable[Tuple[str, pd.DataFrame]]:
for idx, grp in data.groupby("Region"):
yield f"Region:{idx}", grp


def split_on_attrs(data: pd.DataFrame) -> Iterable[Tuple[str, pd.DataFrame]]:
for (region, method), grp in data.groupby(["Region", "Method"]):
yield f"Region:{region} - Method:{method}", grp


def split_on_views(data: pd.DataFrame) -> Iterable[Tuple[str, pd.DataFrame]]:
yield "Low Views", data[data.Views <= 4000.0]
yield "High Views", data[data.Views > 4000.0]


@click.command()
@click.option(
"--mode",
type=click.Choice(["local", "multithreading", "dask"]),
help="Where to run remote tasks.",
default="local",
)
@click.option("--no-adapt", is_flag=True, default=False, help="Disable the graceful adapter.")
def main(mode: str, no_adapt: bool):
adapter = GracefulErrorAdapter(
error_to_catch=Exception,
sentinel_value=None,
fail_all_parallel=False,
sentinel_injection_tags={
"keep_sentinels": "true",
},
)

shutdown = None
if mode == "local":
remote_executor = executors.SynchronousLocalTaskExecutor()
elif mode == "multithreading":
remote_executor = executors.MultiThreadingExecutor(max_tasks=100)
elif mode == "dask":
cluster = distributed.LocalCluster()
client = distributed.Client(cluster)
remote_executor = h_dask.DaskExecutor(client=client)
shutdown = cluster.close

dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_remote_executor(remote_executor)
.with_modules(functions)
)
if not no_adapt:
dr = dr.with_adapters(adapter)
dr = dr.build()
print(
dr.execute(
final_vars=["final_collect"],
inputs={"funcs": [split_on_region, split_on_attrs, split_on_views]},
)["final_collect"]
)
if shutdown:
shutdown()


if __name__ == "__main__":
main()
59 changes: 53 additions & 6 deletions hamilton/lifecycle/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,13 @@ class GracefulErrorAdapter(NodeExecutionMethod):
required dependencies fail (including optional dependencies).
"""

def __init__(self, error_to_catch: Type[Exception], sentinel_value: Any = SENTINEL_DEFAULT):
def __init__(
self,
error_to_catch: Type[Exception],
sentinel_value: Any = SENTINEL_DEFAULT,
fail_all_parallel: bool = False,
sentinel_injection_tags: Optional[Dict[str, str]] = None,
):
"""Initializes the adapter. Allows you to customize the error to catch (which exception
your graph will throw to indicate failure), as well as the sentinel value to use in place of
a node's result if it fails (this defaults to ``None``).
Expand Down Expand Up @@ -563,11 +569,19 @@ def never_reached(wont_proceed: int) -> int:

Note you can customize the error you want it to fail on and the sentinel value to use in place of a node's result if it fails.

For Parallelizable nodes, this adapter will attempt to iterate over the node outputs. If an error occurs, the sentinel value is
returned and no more iterations over the node will occur. If you set ``fail_all_parallel`` to be True, it only sends on sentinel
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on -> one

Suggested change
returned and no more iterations over the node will occur. If you set ``fail_all_parallel`` to be True, it only sends on sentinel
returned and no more iterations over the node will occur. If you set ``fail_all_parallel`` to be True, it only sends one sentinel

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value into the parallelize sub-dag.

:param error_to_catch: The error to catch
:param sentinel_value: The sentinel value to use in place of a node's result if it fails
:param fail_all_parallel: Treat a Parallelizable as 1 failure (True) or allow the successful ones to go through (False).
:param sentinel_injection_tags: Node tag key:value pairs that allow sentinel injection
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little unclear what these tags do to me, mind expanding a bit on why they're necessary/how to use them?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is there any reason you would want to set specific ones? Why not just have a fixed tag? Then you could create a simple decorator that calls out to it:

@drop_errors(...)

(still not 100% sure how to use these so this might be wrong...)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attempted to make a decorator that creates a specific tag to allow sentinel injection.

4308d0a

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks! Would you mind adding a little documentation on that decorator with an example? I think its a little confusing what it does.

"""
self.error_to_catch = error_to_catch
self.sentinel_value = sentinel_value
self.fail_all_parallel = fail_all_parallel
self.sentinel_injection_tags = sentinel_injection_tags or {}

def run_to_execute_node(
self,
Expand All @@ -584,10 +598,43 @@ def run_to_execute_node(
# and truncate it/provide sentinels for every failure)
# TODO -- decide what to do with collect
"""Executes a node. If the node fails, returns the sentinel value."""
for key, value in node_kwargs.items():
if value == self.sentinel_value: # == versus is
return self.sentinel_value # cascade it through
default_return = [self.sentinel_value] if is_expand else self.sentinel_value
elijahbenizzy marked this conversation as resolved.
Show resolved Hide resolved
_node_tags = future_kwargs["node_tags"]
can_inject = any(
_node_tags.get(k, "") == v for k, v in self.sentinel_injection_tags.items()
)
if not can_inject:
for key, value in node_kwargs.items():
if type(self.sentinel_value) is type(value):
if self.sentinel_value == value: # == versus is
return default_return
if not is_expand:
elijahbenizzy marked this conversation as resolved.
Show resolved Hide resolved
try:
return node_callable(**node_kwargs)
except self.error_to_catch:
return self.sentinel_value

# Grab the partial-ized function that is a parallelizable.
# Be very specific...
if len(node_callable.keywords) == 1 and "_callable" in node_callable.keywords:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is specifically coupling to this, right?

def new_callable(*args, _callable=None, **kwargs):
? Would it be better to just check if its an expand and make the assumption? Not perfect, but I think it cleans a bit of the coupling up/makes it clearer. Still coupled, but slightly cleaner conditions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gen_func = node_callable.keywords["_callable"]
elif self.fail_all_parallel:
gen_func = node_callable
else:
raise ValueError(
"Unexpected configuration for expandable node and GracefulErrorAdapter."
)
try:
return node_callable(**node_kwargs)
gen = gen_func(**node_kwargs)
except self.error_to_catch:
return self.sentinel_value
return [self.sentinel_value]
results: list[Any] = []
try:
for _res in gen:
results.append(_res)
except self.error_to_catch:
if self.fail_all_parallel:
results = [self.sentinel_value]
else:
results.append(self.sentinel_value)
elijahbenizzy marked this conversation as resolved.
Show resolved Hide resolved
return results
Loading