Skip to content

Commit

Permalink
Added docs to decorator. Changed node tag key.
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesArruda authored and elijahbenizzy committed Jul 9, 2024
1 parent dfdb11f commit 5437b04
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 5 deletions.
Binary file modified examples/parallelism/graceful_running/dag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 0 additions & 1 deletion hamilton/function_modifiers/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def my_function(...) -> ...:
"ccpa",
"dag",
"module",
"ERROR_SENTINEL",
RAY_REMOTE_TAG_NAMESPACE,
] # Anything that starts with any of these is banned, the framework reserves the right to manage it

Expand Down
1 change: 1 addition & 0 deletions hamilton/lifecycle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
PDBDebugger,
PrintLn,
SlowDownYouMoveTooFast,
accept_error_sentinels,
)

PrintLnHook = PrintLn # for backwards compatibility -- this will be removed in 2.0
Expand Down
42 changes: 38 additions & 4 deletions hamilton/lifecycle/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,14 +511,48 @@ def run_after_node_execution(


SENTINEL_DEFAULT = None # sentinel value -- lazy for now
INJECTION_ALLOWED = "injection is requested"


def accept_error_sentinels(func: Callable):
"""Tag a function to allow passing in error sentinels.
For use with ``GracefulErrorAdapter``.
For use with ``GracefulErrorAdapter``. The standard adapter behavior is to skip a node
when an error sentinel is one of its inputs. This decorator will cause the node to
run, and place the error sentinel into the appropriate input.
Take care to ensure your sentinels are easily distinguishable if you do this - see the
note in the GracefulErrorAdapater docstring.
A use case is any data or computation aggregation step that still wants partial results,
or considers a failure interesting enough to log or notify.
.. code-block:: python
SENTINEL = object()
...
@accept_error_sentinels
def results_gathering(result_1: float, result_2: float) -> dict[str, Any]:
answer = {}
for name, res in zip(["result 1", "result 2"], [result_1, result_2])
answer[name] = res
if res is SENTINEL:
answer[name] = "Node failure: no result"
# You may want side-effects for a failure.
_send_text_that_your_runs_errored()
return answer
...
adapter = GracefulErrorAdapter(sentinel_value=SENTINEL)
...
"""
_the_tag = tag(ERROR_SENTINEL="True", bypass_reserved_namespaces_=True)
_the_tag = tag(
**{"hamilton.error_sentinel": INJECTION_ALLOWED}, bypass_reserved_namespaces_=True
)
return _the_tag(func)


Expand Down Expand Up @@ -586,7 +620,7 @@ def never_reached(wont_proceed: int) -> int:
Here's an example for parallelizable to demonstrate try_all_parallel:
.. code-block::python
.. code-block:: python
# parallel_module.py
# custom exception
Expand Down Expand Up @@ -661,7 +695,7 @@ def run_to_execute_node(
"""Executes a node. If the node fails, returns the sentinel value."""
default_return = [self.sentinel_value] if is_expand else self.sentinel_value
_node_tags = future_kwargs["node_tags"]
can_inject = _node_tags.get("ERROR_SENTINEL", "false") == "True"
can_inject = _node_tags.get("hamilton.error_sentinel", "") == INJECTION_ALLOWED
can_inject = can_inject and self.allow_injection

if not can_inject:
Expand Down

0 comments on commit 5437b04

Please sign in to comment.