Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guard rails around collect/parallelize #301

Open
elijahbenizzy opened this issue Aug 25, 2023 · 5 comments
Open

Guard rails around collect/parallelize #301

elijahbenizzy opened this issue Aug 25, 2023 · 5 comments
Labels
documentation Improvements or additions to documentation Dynamic DAGs parallelism

Comments

@elijahbenizzy
Copy link
Collaborator

elijahbenizzy commented Aug 25, 2023

Is your feature request related to a problem? Please describe.
There are a few edge cases that aren't handled in the code:

  1. One tries to query for a node within a block between Parallelizable[]/Collect Parallel Execution: KeyError: 'key df not found in cache' #1029
  2. One passes or overrides nodes for ^^^
  3. A Parallelizable directly preceeding a Collect
  4. A Parallelizable with no Collect coming afterwards
  5. A Parallelizable with multiple Collects -- this should be allow eventually but its not feasible now. See Parallelizable cannot aggregate or return multiple Collects #742.
  6. Multiple Collect in a node
  7. No nodes in between Parallelizable and Collect - Parallel Execution: Collect node returns list[list[pd.DataFrame]] instead of list[pd.DataFrame] #1030

And probably a few more.

Describe the solution you'd like
Clean errors as early as possible. Currently this does nothing.
Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context

Follow up from this: #299.

@elijahbenizzy
Copy link
Collaborator Author

Another one -- if you call to output a node within a Parallelizable statement, it will treat it as a list, rather than individuals. See this as repro:

import hamilton.ad_hoc_utils
from hamilton.htypes import Parallelizable, Collect


def url() -> Parallelizable[str]:
    for url_ in ['url_a', 'url_b']:
        print(url)
        yield url_


def url_loaded(url: str) -> str:
    print(url)
    return url


def counts(url_loaded: str) -> int:
    print(url_loaded)
    print(type(url_loaded)) # url_loaded seems to be a list not a string???
    return len(url_loaded.split("_"))


def total_words(counts: Collect[int]) -> int:
    return sum(counts)

my_hamilton_nodes = hamilton.ad_hoc_utils.create_temporary_module(
    url,url_loaded, counts, total_words
)
if __name__ == '__main__':
    from hamilton import driver, base, telemetry
    from hamilton.execution import executors

    telemetry.disable_telemetry()

    config = {}

    dr = (
        driver.Builder()
        .with_modules(my_hamilton_nodes)
        .enable_dynamic_execution(allow_experimental_mode=True)
        .with_local_executor(executors.SynchronousLocalTaskExecutor())
        .with_config(config)
        .build()
    )

    output_columns = [
        'counts'
    ]
    out = dr.execute(output_columns)
    print(out)

@skrawcz
Copy link
Collaborator

skrawcz commented Nov 20, 2023

One other thing to get a better error for - regular driver used to try to execute a graph with parallelize.

@elijahbenizzy
Copy link
Collaborator Author

Documented here: #745

@skrawcz
Copy link
Collaborator

skrawcz commented Jul 18, 2024

Users filing issues for this - #1030
&
#1029

@Dev-iL
Copy link
Contributor

Dev-iL commented Aug 21, 2024

@elijahbenizzy I ran into #301 (comment) too, in the context of a dataloader under parallelizable. This is quite a blocker for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation Dynamic DAGs parallelism
Projects
None yet
Development

No branches or pull requests

3 participants