Skip to content

Commit

Permalink
Adds graceful fail-over
Browse files Browse the repository at this point in the history
This is still a bit of a WIP, but the API will stay backwards compatible
so I'm OK putting it in the stdlib.

This efectively cascades through null (customizable sentinel value)
results, not running a node if any of its dependencies are null.
  • Loading branch information
elijahbenizzy committed Jun 5, 2024
1 parent 47e7fa3 commit 1b26a1c
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 1 deletion.
7 changes: 7 additions & 0 deletions docs/reference/lifecycle-hooks/GracefulErrorAdapter.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
==============================
lifecycle.GracefulErrorAdapter
==============================


.. autoclass:: hamilton.lifecycle.default.GracefulErrorAdapter
:special-members: __init__
1 change: 1 addition & 0 deletions docs/reference/lifecycle-hooks/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,4 @@ Recall to add lifecycle adapters, you just need to call the ``with_adapters`` me
DDOGTracer
FunctionInputOutputTypeChecker
SlackNotifierHook
GracefulErrorAdapter
1 change: 1 addition & 0 deletions hamilton/lifecycle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .base import LifecycleAdapter # noqa: F401
from .default import ( # noqa: F401
FunctionInputOutputTypeChecker,
GracefulErrorAdapter,
PDBDebugger,
PrintLn,
SlowDownYouMoveTooFast,
Expand Down
75 changes: 74 additions & 1 deletion hamilton/lifecycle/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import random
import shelve
import time
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Type, Union

from hamilton import graph_types, htypes
from hamilton.graph_types import HamiltonGraph
Expand Down Expand Up @@ -506,3 +506,76 @@ def run_after_node_execution(
raise TypeError(
f"Node {node_name} returned a result of type {type(result)}, expected {node_return_type}"
)


SENTINEL_DEFAULT = None # sentinel value -- lazy for now


class GracefulErrorAdapter(NodeExecutionMethod):
"""Gracefully handles errors in a graph's execution. This allows you to proceed despite failure,
dynamically pruning branches. While it still runs every node, it replaces them with no-ops if any upstream
required dependencies fail (including optional dependencies).
"""

def __init__(self, error_to_catch: Type[Exception], sentinel_value: Any = SENTINEL_DEFAULT):
"""Initializes the adapter. Allows you to customize the error to catch (which exception
your graph will throw to indicate failure), as well as the sentinel value to use in place of
a node's result if it fails (this defaults to ``None``).
Note that this is currently only compatible with the dict-based result builder (use at your
own risk with pandas series, etc...).
Be careful using ``None`` as the default -- feel free to replace it with a sentinel value
of your choice (this could negatively impact your graph's execution if you actually *do* intend
to use ``None`` return values).
You can use this as follows:
.. code-block:: python
# my_module.py
# custom exception
class DoNotProceed(Exception):
pass
def wont_proceed() -> int:
raise DoNotProceed()
def will_proceed() -> int:
return 1
def never_reached(wont_proceed: int) -> int:
return 1 # this should not be reached
dr = (
driver.Builder()
.with_modules(my_module)
.with_adapters(
default.GracefulErrorAdapter(
error_to_catch=DoNotProceed,
sentinel_value=None
)
)
.build()
)
dr.execute(["will_proceed", "never_reached"]) # will return {'will_proceed': 1, 'never_reached': None}
Note you can customize the error you want it to fail on and the sentinel value to use in place of a node's result if it fails.
:param error_to_catch: The error to catch
:param sentinel_value: The sentinel value to use in place of a node's result if it fails
"""
self.error_to_catch = error_to_catch
self.sentinel_value = sentinel_value

def run_to_execute_node(
self, *, node_callable: Any, node_kwargs: Dict[str, Any], **future_kwargs: Any
) -> Any:
"""Executes a node. If the node fails, returns the sentinel value."""
for key, value in node_kwargs.items():
if value == self.sentinel_value: # == versus is
return self.sentinel_value # cascade it through
try:
return node_callable(**node_kwargs)
except self.error_to_catch:
return self.sentinel_value

0 comments on commit 1b26a1c

Please sign in to comment.