Skip to content

Commit

Permalink
update flow task
Browse files Browse the repository at this point in the history
  • Loading branch information
goFrendiAsgard committed Jan 6, 2024
1 parent 004dafc commit e0cc675
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docs/concepts/task/flow-task.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ __Returns:__
No documentation available.


### `FlowTask._get_root_upstreams`

No documentation available.


### `FlowTask._get_task_pid`

No documentation available.
Expand Down
22 changes: 21 additions & 1 deletion src/zrb/task/flow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,29 @@ def _get_embeded_tasks(
embeded_tasks: List[AnyTask] = []
for task in tasks:
embeded_task = task.copy()
embeded_task.add_upstream(*upstreams)
embeded_task_root_upstreams = self._get_root_upstreams(
tasks=[embeded_task]
)
for embeded_task_root_upstream in embeded_task_root_upstreams:
embeded_task_root_upstream.add_upstream(*upstreams)
# embeded_task.add_upstream(*upstreams)
embeded_task.add_env(*envs)
embeded_task.add_env_file(*env_files)
embeded_task.add_input(*inputs)
embeded_tasks.append(embeded_task)
return embeded_tasks

def _get_root_upstreams(self, tasks: List[AnyTask]):
root_upstreams = []
for task in tasks:
upstreams = task._get_upstreams()
if len(upstreams) == 0:
root_upstreams.append(task)
continue
for upstream in upstreams:
if len(upstream._get_upstreams()) == 0:
root_upstreams.append(upstream)
continue
root_upstreams += self._get_root_upstreams([upstream])
return root_upstreams

0 comments on commit e0cc675

Please sign in to comment.