From 50b62a71779dcb21c7274dded514730135157f67 Mon Sep 17 00:00:00 2001 From: elijahbenizzy Date: Wed, 29 May 2024 09:26:03 -0700 Subject: [PATCH] Adds graceful fail-over This is still a bit of a WIP, but the API will stay backwards compatible so I'm OK putting it in the stdlib. This efectively cascades through null (customizable sentinel value) results, not running a node if any of its dependencies are null. --- .../lifecycle-hooks/GracefulErrorAdapter.rst | 7 ++ docs/reference/lifecycle-hooks/index.rst | 1 + hamilton/lifecycle/__init__.py | 1 + hamilton/lifecycle/default.py | 75 ++++++++++++++++++- 4 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 docs/reference/lifecycle-hooks/GracefulErrorAdapter.rst diff --git a/docs/reference/lifecycle-hooks/GracefulErrorAdapter.rst b/docs/reference/lifecycle-hooks/GracefulErrorAdapter.rst new file mode 100644 index 000000000..6b00d54bf --- /dev/null +++ b/docs/reference/lifecycle-hooks/GracefulErrorAdapter.rst @@ -0,0 +1,7 @@ +============================== +lifecycle.GracefulErrorAdapter +============================== + + +.. autoclass:: hamilton.lifecycle.default.GracefulErrorAdapter + :special-members: __init__ diff --git a/docs/reference/lifecycle-hooks/index.rst b/docs/reference/lifecycle-hooks/index.rst index 33548839f..298a1934c 100644 --- a/docs/reference/lifecycle-hooks/index.rst +++ b/docs/reference/lifecycle-hooks/index.rst @@ -54,3 +54,4 @@ Recall to add lifecycle adapters, you just need to call the ``with_adapters`` me DDOGTracer FunctionInputOutputTypeChecker SlackNotifierHook + GracefulErrorAdapter diff --git a/hamilton/lifecycle/__init__.py b/hamilton/lifecycle/__init__.py index dfec3b41d..74f152454 100644 --- a/hamilton/lifecycle/__init__.py +++ b/hamilton/lifecycle/__init__.py @@ -13,6 +13,7 @@ from .base import LifecycleAdapter # noqa: F401 from .default import ( # noqa: F401 FunctionInputOutputTypeChecker, + GracefulErrorAdapter, PDBDebugger, PrintLn, SlowDownYouMoveTooFast, diff --git a/hamilton/lifecycle/default.py b/hamilton/lifecycle/default.py index a6320d618..9984e3c68 100644 --- a/hamilton/lifecycle/default.py +++ b/hamilton/lifecycle/default.py @@ -8,7 +8,7 @@ import random import shelve import time -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Type, Union from hamilton import graph_types, htypes from hamilton.graph_types import HamiltonGraph @@ -506,3 +506,76 @@ def run_after_node_execution( raise TypeError( f"Node {node_name} returned a result of type {type(result)}, expected {node_return_type}" ) + + +SENTINEL_DEFAULT = None # sentinel value -- lazy for now + + +class GracefulErrorAdapter(NodeExecutionMethod): + """Gracefully handles errors in a graph's execution. This allows you to proceed despite failure, + dynamically pruning branches. While it still runs every node, it replaces them with no-ops if any upstream + required dependencies fail (including optional dependencies). + """ + + def __init__(self, error_to_catch: Type[Exception], sentinel_value: Any = SENTINEL_DEFAULT): + """Initializes the adapter. Allows you to customize the error to catch (which exception + your graph will throw to indicate failure), as well as the sentinel value to use in place of + a node's result if it fails (this defaults to ``None``). + + Note that this is currently only compatible with the dict-based result builder (use at your + own risk with pandas series, etc...). + + Be careful using ``None`` as the default -- feel free to replace it with a sentinel value + of your choice (this could negatively impact your graph's execution if you actually *do* intend + to use ``None`` return values). + + You can use this as follows: + + .. code-block:: python + + # my_module.py + # custom exception + class DoNotProceed(Exception): + pass + + def wont_proceed() -> int: + raise DoNotProceed() + + def will_proceed() -> int: + return 1 + + def never_reached(wont_proceed: int) -> int: + return 1 # this should not be reached + + dr = ( + driver.Builder() + .with_modules(my_module) + .with_adapters( + default.GracefulErrorAdapter( + error_to_catch=DoNotProceed, + sentinel_value=None + ) + ) + .build() + ) + dr.execute(["will_proceed", "never_reached"]) # will return {'will_proceed': 1, 'never_reached': None} + + Note you can customize the error you want it to fail on and the sentinel value to use in place of a node's result if it fails. + + :param error_to_catch: The error to catch + :param sentinel_value: The sentinel value to use in place of a node's result if it fails + """ + self.error_to_catch = error_to_catch + self.sentinel_value = sentinel_value + + def run_to_execute_node( + self, *, node_callable: Any, node_kwargs: Dict[str, Any], **future_kwargs: Any + ) -> Any: + """Executes a node. If the node fails, returns the sentinel value.""" + for key, value in node_kwargs.items(): + if value == self.sentinel_value: # == versus is + return self.sentinel_value # cascade it through + try: + return node_callable(**node_kwargs) + except self.error_to_catch: + return self.sentinel_value