-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(sdk): unblock valid topology. #8416
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is getting very close. Thanks, @JOCSTAA!
# ParralelFor Check | ||
for parent in task_name_to_parent_groups[upstream_task.name]: | ||
parent = group_name_to_group.get(parent, None) | ||
if isinstance( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to keep checking deeper into the DAG. Currently, this throws an exception for valid pipelines where the downstream task is not a child of the parent ParallelFor, but instead a grandchild. This should be permitted.
For example, I believe the following is valid:
@dsl.pipeline
def my_pipeline(string: str = 'string'):
with dsl.ParallelFor([1, 2, 3]):
op1 = comp()
with dsl.Condition(string == 'string'):
grandchild = comp(inp=op1.output)
raise RuntimeError( | ||
f'Task {task.name} cannot dependent on any task inside' | ||
f' the group: {upstream_groups[0]}.') | ||
f'Task {task.name} cannot dependent on any task inside a Exithandler that is not a common ancestor of both tasks' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is true. For example, I believe the following pipeline is valid:
from kfp import compiler
from kfp import dsl
from kfp.dsl import component
@component
def identity(message: str) -> str:
return message
@component
def fail_op(message: str):
import sys
print(message)
sys.exit(1)
@dsl.pipeline
def my_pipeline(message: str = 'Hello World!'):
exit_task = identity(message='Exit handler has worked!')
with dsl.ExitHandler(exit_task):
inner_task = identity(message=message)
fail_op(message='Task failed.')
task = identity(message=inner_task.output)
task
depends on inner_task
, which would be invalid according to the rule in this error.
For the purposes of this PR, I don't think we need to give ExitHandler
s any special consideration. As far as topology is concerned, ExitHandlers
can be thought of as no control flow, but with special behavior (an exit task executed) after some subset of tasks run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies -- just remembered we discussed offline and this preserves existing behavior. Thinking more and maybe it's okay to unblock this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Connor, so would it be okay to mark this as resolved, since no action is required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Sorry for my slow response)
What if inner_task
failed? Suppose the exit task should handle it gracefully and allow continuing execution of the remaining tasks, then task
would reference a non-exists output, isn't that just like a conditional case?
We can translate the above example to Python code like:
try:
inner_task = identity(message=message)
fail_op(message='Task failed.')
except:
pass
finally:
exit_task = identity(message='Exit handler has worked!')
task = identity(message=inner_task.output)
Would this makes sense in the Python programming context? Understand Python doesn't block such usage, but it feels to me the last line of task = ...
could, and probably should be, moved into the try
block instead. WDTY?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for mapping this to normal Python -- that's very helpful.
I think the reason for not moving task
into the try
block is the same as for normal Python: it might catch exceptions unrelated to the one you're trying to capture. KFP's ExitHandler
is analogous to a "bare" except
in normal Python, so all the more reason to have the minimum required tasks in the ExitHandler
.
My personal preference is to permit the above syntax. It valid from a control flow standpoint and I don't see much ambiguity for the author.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way, I think we should leave the behavior as it currently stands for this PR and address later if we want to :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some testing, I see that there's actually more to enabling this behavior than just unblocking it.
The reference to inner task in identity(message=inner_task.output)
fails to compile, since we don't have logic to register the outputs from inner_task
to the parent exit handler sub-DAG.
All the more reason to leave as is for this PR.
/retest |
3 similar comments
/retest |
/retest |
/retest |
/retest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @JOCSTAA! The test cases are very comprehensive and the logic is good. Just a few refactoring suggestions and nitpicks.
|
||
# Condition check | ||
dependent_group = group_name_to_group.get(upstream_groups[0], None) | ||
if isinstance(dependent_group, tasks_group.Condition): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is good in current state of the codebase. Just want to note that I think it is coupled to our checks for ExitHandler
and ParallelFor
. I anticipate this check will need to be updated when we implement support for ParallelFor
fan-in (on roadmap for this year).
The reason that this is coupled to the ParallelFor
check is that the condition check only goes up one parent from the upstream task.
To demonstrate with examples...
The following pipeline causes the Condition
exception to be raised:
@dsl.pipeline
def my_pipeline(string: str = 'string'):
with dsl.Condition(string == 'x'):
op1 = producer(string=string)
op2 = consumer(string=op1.output)
because upstream_groups == ['condition-1', 'producer']
and upstream_groups[0]
causes a Condition exception is raised.
Furthermore, the following pipeline is also currently causes an exception to be raised, but it's the ParallelFor
exception:
@dsl.pipeline
def my_pipeline(string: str = 'string'):
with dsl.ParallelFor([1]):
with dsl.Condition(string == 'x'):
op1 = producer(string=string)
op2 = consumer(string=op1.output)
because upstream_groups == ['for-loop-2', 'condition-3', 'producer']
and upstream_groups[0]
causes a ParallelFor exception is raised.
Say the ParallelFor
check were removed to support fan-in. What would happen in the second example?
upstream_groups[0]
would still be a ParallelFor
group and a condition exception would not be raised, despite this being an invalid topology.
To handle this, I think we should either:
(a) Add a comment and tests to make this clear for future developers.
(b) Add a more robust Condition check that traverses upstream_groups
and performs the same check on each element.
@@ -427,14 +427,43 @@ def get_dependencies( | |||
task2=task, | |||
) | |||
|
|||
# a task cannot depend on a task created in a for loop group since individual PipelineTask variables are reassigned after each loop iteration | |||
# ParralelFor Check | |||
for parent in task_name_to_parent_groups[upstream_task.name]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this logic is mostly implemented by _get_uncommon_ancestors
already (except for the case of nested ParallelFor
). upstream_groups
contains the result -- it's all the ancestors of the upstream task that are not common to the ancestors of the downstream task. I think this is what we're looking for in order to assert that the topology is valid (the upstream cannot be nested under a sub-DAG that the downstream is not nested under, except for in the case of nested ParallelFor
).
This means that we can know if the upstream is nested in a ParallelFor
, Condition
, or ExitHandler
just by looking at the value of upstream_groups
:
import copy
uncommon_upstream_groups = copy.deepcopy(upstream_groups)
uncommon_upstream_groups.remove(upstream_task.name) # because a task's `upstream_groups` contains the task's name
if uncommon_upstream_groups:
raise ...
As far as I can tell nested pipelines are the only case that this would not cover, which would require some special handling:
@dsl.pipeline
def my_pipeline(string: str = 'string'):
with dsl.ParallelFor([1, 2]):
one = producer(string='text')
with dsl.ParallelFor([1, 2]):
two = consumer(string=one.output)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work on this, @JOCSTAA!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final nit, now that we know we're going to keep this final check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excited to merge this one! Thanks for this, @JOCSTAA. This unlocks new pipeline functionality for users.
Can you add a PR description and a release note (./sdk/RELEASE.md
), describing what sort of functionality this unlocks? Code snippets could be helpful for the description. Then, let's merge!
|
||
else: | ||
task_group_type = 'a ' + tasks_group.ParallelFor.__name__ | ||
|
||
raise RuntimeError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No action required / not your code: I want to note here that I'm not sure if this should use a RuntimeError
. This is a runtime error in the sense that it results an ambiguous runtime topology, but it's not a true "runtime" error at the time it's raised.
This file used RuntimeError
before this PR and, since this PR actually reduces the set of topologies for which this error would be raised, we don't necessarily need to reconsider this in this PR.
Furthermore, an Exception
is usually used when the error is attributed to user code, whereas an Error
is usually used when the error is attributed to something else, such as an environment. In this case, this is user code.
For this reason, I think it would make sense for this to be a custom InvalidTopologyException
or something similar.
Relatedly, some of the ValueErrors
from pipeline_task.py
now become RuntimeErrors
in this PR, so perhaps that is a reason to consider this in the short term.
/lgtm Thank you, @JOCSTAA! |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: chensun The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
* unblock valid topology * add more tests * handle git fork * sample_test_cases * main * restore to master * resolve comments on PR * resolve conflicts * resolve conflict 2 * revert conflict fix * fix changes * address comments * review * docformatter presubmit * revert docformatter * update release.md
Description of your changes:
Enables the creation for more complex topologies of task group types that were previously blocked.
Sample topology that is now valid:
Checklist: