Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow stuck with parallelizable node #948

Closed
legout opened this issue Jun 12, 2024 · 2 comments
Closed

Workflow stuck with parallelizable node #948

legout opened this issue Jun 12, 2024 · 2 comments
Assignees
Labels
bug Something isn't working parallelism

Comments

@legout
Copy link

legout commented Jun 12, 2024

Short description explaining the high-level reason for the new issue.

Current behavior

It seems, that hamilton workflows stuck in the following scenario:

There is a not parallelized node A and a parallelized node B and two downstream nodes C and D. In case A is an input argument for C and D, the hamilton workflow won´t start / stuck without any error

Screenshots

test

Steps to replicate behavior

Here is a simple example

from hamilton.htypes import Parallelizable, Collect
from random import randint


def random_int() -> int:

    random_int = randint(0, 100)
    print("randint:", random_int)
    return random_int


def numbers(random_int: int) -> Parallelizable[int]:
    for i in [1, 2, 3]:
        yield i + random_int


def add1(numbers: int, random_int: int) -> int:
    print(numbers + random_int)
    return numbers + random_int


def add2(add1: int, random_int: int) -> int:
    print(add1 + random_int)
    return add1 + random_int


def collect_numbers(add2: Collect[int]) -> list[int]:
    return add2


if __name__ == "__main__":
    import __main__ as test
    from hamilton import driver

    dr = (
        driver.Builder()
        .with_modules(test)
        .enable_dynamic_execution(allow_experimental_mode=True)
        .build()
    )

    final_vars = ["collect_numbers"]
    r = dr.execute(final_vars=final_vars)

Library & System Information

python: 3.12.2
hamilton: 1.65.0

@legout legout added the triage label for issues that need to be triaged. label Jun 12, 2024
@elijahbenizzy
Copy link
Collaborator

elijahbenizzy commented Jun 12, 2024

Thanks @legout! You've identified what appears to be a bug. I'm digging in -- first clue (+ context) is the following:

  1. The executor first breaks it up into "tasks"
  2. It then executes tasks, walking through the graph
  3. Usually this is: (1) node tasks for each of the non-parallel block tasks, then multi-node tasks for everything inside a "parallelizable block"
  4. In this case, it's not creating a task for the first one (so it never gets started). This is getting added to the middle task. It thus never reaches the end, so it just spins.

Looking into it more, hoping I can find a quick solution

Debugging information:

Tasks:

[TaskSpec(base_id='expand-numbers',
          spawning_task_base_id=None,
          nodes=[<numbers {'module': 'temporary_module_b26db111_76fe_4d83_8d44_4fc58b9542f6'}>],
          purpose=<NodeGroupPurpose.EXPAND_UNORDERED: 'expand_unordered'>,
          available_nodes={'numbers'},
          outputs_to_compute=['numbers'],
          overrides={},
          adapter=<hamilton.lifecycle.base.LifecycleAdapterSet object at 0x2a27fb810>,
          base_dependencies=['block-numbers']),
 TaskSpec(base_id='block-numbers',
          spawning_task_base_id='expand-numbers',
          nodes=[<add1 {'module': 'temporary_module_b26db111_76fe_4d83_8d44_4fc58b9542f6'}>,
                 <random_int {'module': 'temporary_module_b26db111_76fe_4d83_8d44_4fc58b9542f6'}>,
                 <add2 {'module': 'temporary_module_b26db111_76fe_4d83_8d44_4fc58b9542f6'}>],
          purpose=<NodeGroupPurpose.EXECUTE_BLOCK: 'execute_block'>,
          available_nodes={'add1', 'random_int', 'add2'},
          outputs_to_compute=['random_int', 'add2'],
          overrides={},
          adapter=<hamilton.lifecycle.base.LifecycleAdapterSet object at 0x2a27fb810>,
          base_dependencies=['expand-numbers']),
 TaskSpec(base_id='collect-numbers',
          spawning_task_base_id='expand-numbers',
          nodes=[<collect_numbers {'module': 'temporary_module_b26db111_76fe_4d83_8d44_4fc58b9542f6'}>],
          purpose=<NodeGroupPurpose.GATHER: 'gather'>,
          available_nodes={'collect_numbers'},
          outputs_to_compute=['collect_numbers'],
          overrides={},
          adapter=<hamilton.lifecycle.base.LifecycleAdapterSet object at 0x2a27fb810>,
          base_dependencies=['block-numbers'])]

@elijahbenizzy elijahbenizzy added bug Something isn't working parallelism and removed triage label for issues that need to be triaged. labels Jun 12, 2024
@elijahbenizzy elijahbenizzy self-assigned this Jun 12, 2024
elijahbenizzy added a commit that referenced this issue Jun 12, 2024
elijahbenizzy added a commit that referenced this issue Jun 12, 2024
Our algorithm was bunk. This fixes it.
@elijahbenizzy
Copy link
Collaborator

Great, got a fix! WIll release soon. See #949.

elijahbenizzy added a commit that referenced this issue Jun 12, 2024
Our algorithm was bunk. This fixes it.
elijahbenizzy added a commit that referenced this issue Jun 12, 2024
Our algorithm was bunk. This fixes it.
elijahbenizzy added a commit that referenced this issue Jun 12, 2024
Our algorithm was bunk. This fixes it.
elijahbenizzy added a commit that referenced this issue Jun 13, 2024
Before we had difficulty with the traversal when determining a parallel
block. The algorithm was broken and would often traverse to the end,
including source nodes. This just rewrites it -- I didn't bother
debugigng because the approach was not great from the start. This does a
simple DFS + uses a nonlocal variable to track the origin (and update
it), so we can return it.

See #948 for context.
skrawcz pushed a commit that referenced this issue Jun 13, 2024
Before we had difficulty with the traversal when determining a parallel
block. The algorithm was broken and would often traverse to the end,
including source nodes. This just rewrites it -- I didn't bother
debugigng because the approach was not great from the start. This does a
simple DFS + uses a nonlocal variable to track the origin (and update
it), so we can return it.

See #948 for context.
@skrawcz skrawcz closed this as completed Jul 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working parallelism
Projects
None yet
Development

No branches or pull requests

3 participants