-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Graceful failure for parallelism #1017
Graceful failure for parallelism #1017
Conversation
…de settings folder.
This is looking great, I'll be able to look in detail tomorrow. If you feel like adding an example (in the examples/ directory), that would be much appreciated! Otherwise no worries, I'm happy to add one later. I also think this would make a nice blog post (we are doing guest authors), so let me know if you're interested!). Going to download and play around a bit to get a feel for it. |
OK, played around, this is really cool. Here's my testing script btw -- you'll have to replace the project/name with your own and run from hamilton.htypes import Parallelizable, Collect
from hamilton import driver
from hamilton.lifecycle import GracefulErrorAdapter
from hamilton_sdk import adapters
import logging
logger = logging.getLogger(__name__)
class SpecificException(Exception):
pass
def first_node(fail_first_node: bool = False) -> int:
# First node that can fail
if fail_first_node:
raise SpecificException()
return 1
def parallelizable_block(
first_node: int,
total_iterations: int,
fail_iterations: list[int] = None) -> Parallelizable[int]:
if fail_iterations is None:
fail_iterations = []
# Parallelizable block that can fail
for i in range(total_iterations):
if i in fail_iterations:
raise SpecificException
yield i
def computed_1(parallelizable_block: int, fail_computed_1: bool = False) -> int:
# Computed node that can fail
if fail_computed_1:
raise SpecificException()
import time
time.sleep(1)
return parallelizable_block + 1
def computed_2(computed_1: int, fail_computed_2: bool = False) -> int:
# Computed node that can fail
if fail_computed_2:
raise SpecificException()
import time
time.sleep(1)
return computed_1 + 2
def computed_3(computed_2: int, fail_computed_3: bool = False) -> int:
# Computed node that can fail
if fail_computed_3:
raise SpecificException()
import time
time.sleep(1)
return computed_2 + 3
def collected(computed_3: Collect[int], fail_collected: bool = False) -> int:
filtered=[item for item in computed_3 if item is not None]
if fail_collected:
raise SpecificException()
# fail in advance
return sum(filtered)
def final_node(collected: int, fail_final: bool = False) -> int:
if fail_final:
raise SpecificException()
return collected
if __name__ == "__main__":
import __main__
TOTAL_ITERATIONS = 10
for params in [
{"fail_first_node": True},
{"fail_iterations": [3, 5]},
{"fail_computed_1": True},
{"fail_computed_2": True},
{"fail_computed_3": True},
{"fail_collected": True},
{"fail_final": True},
]:
try:
fail_all_parallel = True
dag_name = str(params.items()).replace("[", "").replace("]", "").replace(", ", "_").replace("'", "")
tags = {
**{key: str(value) for key, value in params.items()},
**{"test_run": "2", "with_adapter": "true", "fail_all_parallel": str(fail_all_parallel).lower()}
}
print(f"Running with params: {params}")
params["total_iterations"] = TOTAL_ITERATIONS
tracker = adapters.HamiltonTracker(
project_id=10, # modify this as needed
username="elijah@dagworks.io",
dag_name="initial_test",
tags=tags
)
dr = (
driver
.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_adapters(
tracker,
GracefulErrorAdapter(
error_to_catch=SpecificException
)
)
.with_modules(__main__)
.build()
)
dr.execute(["final_node"], inputs=params)
except Exception as e:
logger.exception(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really great, thank you. Made quite a few comments on ideas/clarifications. You've already done a ton, so let me know if there's anything you want to hand off!
Otherwise I'd love to co-author a blog post on this, this is very cool and a really fun technical story.
hamilton/lifecycle/default.py
Outdated
@@ -563,11 +569,19 @@ def never_reached(wont_proceed: int) -> int: | |||
|
|||
Note you can customize the error you want it to fail on and the sentinel value to use in place of a node's result if it fails. | |||
|
|||
For Parallelizable nodes, this adapter will attempt to iterate over the node outputs. If an error occurs, the sentinel value is | |||
returned and no more iterations over the node will occur. If you set ``fail_all_parallel`` to be True, it only sends on sentinel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on -> one
returned and no more iterations over the node will occur. If you set ``fail_all_parallel`` to be True, it only sends on sentinel | |
returned and no more iterations over the node will occur. If you set ``fail_all_parallel`` to be True, it only sends one sentinel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hamilton/lifecycle/default.py
Outdated
:param error_to_catch: The error to catch | ||
:param sentinel_value: The sentinel value to use in place of a node's result if it fails | ||
:param fail_all_parallel: Treat a Parallelizable as 1 failure (True) or allow the successful ones to go through (False). | ||
:param sentinel_injection_tags: Node tag key:value pairs that allow sentinel injection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little unclear what these tags do to me, mind expanding a bit on why they're necessary/how to use them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, is there any reason you would want to set specific ones? Why not just have a fixed tag? Then you could create a simple decorator that calls out to it:
@drop_errors(...)
(still not 100% sure how to use these so this might be wrong...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Attempted to make a decorator that creates a specific tag to allow sentinel injection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, thanks! Would you mind adding a little documentation on that decorator with an example? I think its a little confusing what it does.
hamilton/lifecycle/default.py
Outdated
|
||
# Grab the partial-ized function that is a parallelizable. | ||
# Be very specific... | ||
if len(node_callable.keywords) == 1 and "_callable" in node_callable.keywords: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is specifically coupling to this, right?
hamilton/hamilton/execution/executors.py
Line 63 in 87c5de0
def new_callable(*args, _callable=None, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JamesArruda nice work, almost there! Added some comments. Do let me know if you want me to take over getting it the last little bit (don't want to bog you down with nitpicks...). |
hamilton/lifecycle/default.py
Outdated
def accept_error_sentinels(func: Callable): | ||
"""Tag a function to allow passing in error sentinels. | ||
|
||
For use with ``GracefulErrorAdapter``. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit -- it's nice to add this to the docs -- here (
). I'm happy to add it in afterwards cause it takes a bit of dev work to test out. Plus side is we can also cross-link the reference.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind adding that, I would appreciate the help. You'll get it done much faster, I think!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You got it!
@@ -57,6 +57,7 @@ def my_function(...) -> ...: | |||
"ccpa", | |||
"dag", | |||
"module", | |||
"ERROR_SENTINEL", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a namespace? I'd set it as hamilton.error_sentinel=True)
then keep the ypassing of reserved namespaces. That said, we can always change it later as its an internal contract...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hamilton/lifecycle/default.py
Outdated
:param error_to_catch: The error to catch | ||
:param sentinel_value: The sentinel value to use in place of a node's result if it fails | ||
:param fail_all_parallel: Treat a Parallelizable as 1 failure (True) or allow the successful ones to go through (False). | ||
:param sentinel_injection_tags: Node tag key:value pairs that allow sentinel injection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, thanks! Would you mind adding a little documentation on that decorator with an example? I think its a little confusing what it does.
@JamesArruda -- turns out the test failure is coming from the inclusion of I think this is due to a circular dependency, but I haven't been able to dig in. So, the cleanest way to do this is to move the decorator to somewhere in the function_modifiers package, and it'll be loosely coupled. Could also do a dynamic (inline) import. I think that might be cleaner with a TODO + reference to this conversation. That said, I think this is looking pretty much ready, so I'm happy to merge this and then I'll take this the last mile. Sounds good? |
@elijahbenizzy Sounds great, thanks! |
There's an issue that we have not solved yet -- we'll need to audit circular references. For now an inline import is OK, but see PR + comment here for more details #1017. We'll need to fix this later.
683bc3a
into
DAGWorks-Inc:graceful-failure-parallelism
There's an issue that we have not solved yet -- we'll need to audit circular references. For now an inline import is OK, but see PR + comment here for more details #1017. We'll need to fix this later.
There's an issue that we have not solved yet -- we'll need to audit circular references. For now an inline import is OK, but see PR + comment here for more details #1017. We'll need to fix this later.
There's an issue that we have not solved yet -- we'll need to audit circular references. For now an inline import is OK, but see PR + comment here for more details #1017. We'll need to fix this later.
There's an issue that we have not solved yet -- we'll need to audit circular references. For now an inline import is OK, but see PR + comment here for more details #1017. We'll need to fix this later.
Initial issue description as a test.t Modifying GracefulErrorAdpater for Parallelizable blocks. Added docs to adapter. Updated for simpler passthrough. Added sentinel injection feature and test. Updated gitignore for vscode settings folder. Fix for tag error Updating adapter test for proper tags Fixing types for 3.8 Type fixing for 3.8 Fixing sentinel equality testing to avoid incomparible types. Adding example of paralellism and GracefulErrorAdapter Added decorator for sentinel acceptance as input. Updated tests and example. Added DAG image to example. Typo. Simplifying node callable retrieval. Adding docstring clarification on try_all_parallel. Parametrizing tests. Moving test module to resources. Added docs to decorator. Changed node tag key.
There's an issue that we have not solved yet -- we'll need to audit circular references. For now an inline import is OK, but see PR + comment here for more details #1017. We'll need to fix this later.
Initial issue description as a test.t Modifying GracefulErrorAdpater for Parallelizable blocks. Added docs to adapter. Updated for simpler passthrough. Added sentinel injection feature and test. Updated gitignore for vscode settings folder. Fix for tag error Updating adapter test for proper tags Fixing types for 3.8 Type fixing for 3.8 Fixing sentinel equality testing to avoid incomparible types. Adding example of paralellism and GracefulErrorAdapter Added decorator for sentinel acceptance as input. Updated tests and example. Added DAG image to example. Typo. Simplifying node callable retrieval. Adding docstring clarification on try_all_parallel. Parametrizing tests. Moving test module to resources. Added docs to decorator. Changed node tag key.
There's an issue that we have not solved yet -- we'll need to audit circular references. For now an inline import is OK, but see PR + comment here for more details #1017. We'll need to fix this later.
This PR updates the
GracefulErrorAdapter
to handle several situations forParallelizable
nodes, as described in #1009.It also includes a wish-list item (sentinel input) for a broader use case, which is motivated here: #742
This PR allows for sentinel injection into tagged functions, which will support pre-
Collect
blocks that usehamilton
'sresolve
to allow runtime-definable DAGs that still aggregate as expected.Changes
Using the newly available
is_expand
input, modify the adapter's run logic to:How I tested this
The
test_parallel_graceful.py
test file checks for:Parallelizable
node (early failure and failure with successful yields)fail_all_parallel
even with successful yieldsParallelizable
failure, but checks that failures within the sub-dag are collected.Notes
The
Parallelizable
node gets a pre-treatment from_modify_callable
that handles the list creation from the generator. This makes it hard to specifically handle per-iteration success/fail in the adapter. I opted to pull the original node callable from the partial object out to avoid touching the base implementation. This seems reasonable since it's an optional adapter, but if_modify_callable
gets more advanced for anEXPAND
node, this may break the expectations of the adapter.It also only pulls out the callable if the user wants to get each successful output, otherwise it goes back to the original behavior.
If "inject" is a loaded term (such as in the
resolve
feature), please suggest a better one!My intent is that, once this is accepted, to make a more advanced version that has a custom class for the sentinel that stores error traceback information and is easily type-checked for when the sentinel is injected.
Checklist