Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful errors #926

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/reference/lifecycle-hooks/GracefulErrorAdapter.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
==============================
lifecycle.GracefulErrorAdapter
==============================


.. autoclass:: hamilton.lifecycle.default.GracefulErrorAdapter
:special-members: __init__
1 change: 1 addition & 0 deletions docs/reference/lifecycle-hooks/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,4 @@ Recall to add lifecycle adapters, you just need to call the ``with_adapters`` me
DDOGTracer
FunctionInputOutputTypeChecker
SlackNotifierHook
GracefulErrorAdapter
1 change: 1 addition & 0 deletions hamilton/lifecycle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .base import LifecycleAdapter # noqa: F401
from .default import ( # noqa: F401
FunctionInputOutputTypeChecker,
GracefulErrorAdapter,
PDBDebugger,
PrintLn,
SlowDownYouMoveTooFast,
Expand Down
75 changes: 74 additions & 1 deletion hamilton/lifecycle/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import random
import shelve
import time
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Type, Union

from hamilton import graph_types, htypes
from hamilton.graph_types import HamiltonGraph
Expand Down Expand Up @@ -506,3 +506,76 @@ def run_after_node_execution(
raise TypeError(
f"Node {node_name} returned a result of type {type(result)}, expected {node_return_type}"
)


SENTINEL_DEFAULT = None # sentinel value -- lazy for now


class GracefulErrorAdapter(NodeExecutionMethod):
"""Gracefully handles errors in a graph's execution. This allows you to proceed despite failure,
dynamically pruning branches. While it still runs every node, it replaces them with no-ops if any upstream
required dependencies fail (including optional dependencies).
"""

def __init__(self, error_to_catch: Type[Exception], sentinel_value: Any = SENTINEL_DEFAULT):
"""Initializes the adapter. Allows you to customize the error to catch (which exception
your graph will throw to indicate failure), as well as the sentinel value to use in place of
a node's result if it fails (this defaults to ``None``).
Note that this is currently only compatible with the dict-based result builder (use at your
own risk with pandas series, etc...).
Be careful using ``None`` as the default -- feel free to replace it with a sentinel value
of your choice (this could negatively impact your graph's execution if you actually *do* intend
to use ``None`` return values).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you provide python example of usage, e.g. passing in the Exception type to catch, and what it would do.

GracefulErrorAdapter(ValueError) vs GracefulErrorAdapter(Exception) etc

You can use this as follows:
.. code-block:: python
# my_module.py
# custom exception
class DoNotProceed(Exception):
pass
def wont_proceed() -> int:
raise DoNotProceed()
def will_proceed() -> int:
return 1
def never_reached(wont_proceed: int) -> int:
return 1 # this should not be reached
dr = (
driver.Builder()
.with_modules(my_module)
.with_adapters(
default.GracefulErrorAdapter(
error_to_catch=DoNotProceed,
sentinel_value=None
)
)
.build()
)
dr.execute(["will_proceed", "never_reached"]) # will return {'will_proceed': 1, 'never_reached': None}
Note you can customize the error you want it to fail on and the sentinel value to use in place of a node's result if it fails.
:param error_to_catch: The error to catch
:param sentinel_value: The sentinel value to use in place of a node's result if it fails
"""
self.error_to_catch = error_to_catch
self.sentinel_value = sentinel_value

def run_to_execute_node(
self, *, node_callable: Any, node_kwargs: Dict[str, Any], **future_kwargs: Any
) -> Any:
"""Executes a node. If the node fails, returns the sentinel value."""
for key, value in node_kwargs.items():
if value == self.sentinel_value: # == versus is
return self.sentinel_value # cascade it through
try:
return node_callable(**node_kwargs)
except self.error_to_catch:
return self.sentinel_value