Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

from __future__ import annotations breaks Parallelizable<>Collect #1111

Closed
jernejfrank opened this issue Aug 30, 2024 · 4 comments · Fixed by #1113
Closed

from __future__ import annotations breaks Parallelizable<>Collect #1111

jernejfrank opened this issue Aug 30, 2024 · 4 comments · Fixed by #1113
Labels
triage label for issues that need to be triaged.

Comments

@jernejfrank
Copy link
Contributor

jernejfrank commented Aug 30, 2024

Me again,

I am parallelizing some stuff and ran some issues with from future import annotations. There was already an issue about this back in 2023: #91 that got fixed, but I found that it still breaks the Parallelizable<>Collect block.

Current behavior

Parallelizable[Dict[str,Any]] outputs the whole list instead of a single entry.

Stack Traces

Using pre-1.0.0 Polars integration -- we will stop supporting this in Hamilton 2.0, so please upgrade your version of polars! Current version: 0.20.0, minimum required version: 1.0.0.

********************************************************************************
> timestamp_2 [__main__.clip_from_clips__PARALLEL()] encountered an error      <
> Node inputs:
{'clip_from_clips': "[{'clip_id': 1000, 'timestamp_1': Timestamp('2024-..."}
********************************************************************************
Traceback (most recent call last):
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/graph_functions.py", line 230, in dfs_traverse
    result = node_(**kwargs)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/node.py", line 249, in __call__
    return self.callable(*args, **kwargs)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/function_modifiers/expanders.py", line 836, in extractor_fn
    f"It only produced {list(dt.keys())}"
AttributeError: 'list' object has no attribute 'keys'
timestamp_2
Traceback (most recent call last):
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/executors.py", line 116, in base_execute_task
    results = execute_subdag(
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/graph_functions.py", line 284, in execute_subdag
    dfs_traverse(final_var_node, dep_type)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/graph_functions.py", line 230, in dfs_traverse
    result = node_(**kwargs)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/node.py", line 249, in __call__
    return self.callable(*args, **kwargs)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/function_modifiers/expanders.py", line 836, in extractor_fn
    f"It only produced {list(dt.keys())}"
AttributeError: 'list' object has no attribute 'keys'
Exception executing task timestamp_2, with nodes: ['timestamp_2']
Traceback (most recent call last):
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/executors.py", line 116, in base_execute_task
    results = execute_subdag(
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/graph_functions.py", line 284, in execute_subdag
    dfs_traverse(final_var_node, dep_type)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/graph_functions.py", line 230, in dfs_traverse
    result = node_(**kwargs)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/node.py", line 249, in __call__
    return self.callable(*args, **kwargs)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/function_modifiers/expanders.py", line 836, in extractor_fn
    f"It only produced {list(dt.keys())}"
AttributeError: 'list' object has no attribute 'keys'
Exception submitting task timestamp_2, with nodes: ['timestamp_2']
Traceback (most recent call last):
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/executors.py", line 381, in run_graph_to_completion
    submitted = task_executor.submit_task(next_task)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/executors.py", line 165, in submit_task
    result = base_execute_task(task)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/executors.py", line 131, in base_execute_task
    raise e
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/executors.py", line 116, in base_execute_task
    results = execute_subdag(
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/graph_functions.py", line 284, in execute_subdag
    dfs_traverse(final_var_node, dep_type)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/graph_functions.py", line 230, in dfs_traverse
    result = node_(**kwargs)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/node.py", line 249, in __call__
    return self.callable(*args, **kwargs)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/function_modifiers/expanders.py", line 836, in extractor_fn
    f"It only produced {list(dt.keys())}"
AttributeError: 'list' object has no attribute 'keys'
-------------------------------------------------------------------
Oh no an error! Need help with Hamilton?
Join our slack and ask for help! https://join.slack.com/t/hamilton-opensource/shared_invite/zt-2niepkra8-DGKGf_tTYhXuJWBTXtIs4g
-------------------------------------------------------------------

Traceback (most recent call last):
  File "/home/jfrank/code/oxedl_data_ingester/TODO/bug_future_anntoations.py", line 88, in <module>
    result = dr_serial.execute(inputs=inputs, final_vars=outputs)["collect_parallel"]
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/driver.py", line 596, in execute
    raise e
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/driver.py", line 586, in execute
    outputs = self.raw_execute(_final_vars, overrides, display_graph, inputs=inputs)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/driver.py", line 715, in raw_execute
    raise e
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/driver.py", line 704, in raw_execute
    results = self.graph_executor.execute(
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/driver.py", line 234, in execute
    executors.run_graph_to_completion(execution_state, self.execution_manager)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/executors.py", line 387, in run_graph_to_completion
    raise e
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/executors.py", line 381, in run_graph_to_completion
    submitted = task_executor.submit_task(next_task)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/executors.py", line 165, in submit_task
    result = base_execute_task(task)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/executors.py", line 131, in base_execute_task
    raise e
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/executors.py", line 116, in base_execute_task
    results = execute_subdag(
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/graph_functions.py", line 284, in execute_subdag
    dfs_traverse(final_var_node, dep_type)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/execution/graph_functions.py", line 230, in dfs_traverse
    result = node_(**kwargs)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/node.py", line 249, in __call__
    return self.callable(*args, **kwargs)
  File "/home/jfrank/miniconda3/envs/oxedl_data_ingester/lib/python3.10/site-packages/hamilton/function_modifiers/expanders.py", line 836, in extractor_fn
    f"It only produced {list(dt.keys())}"
AttributeError: 'list' object has no attribute 'keys'

Steps to replicate behavior

from __future__ import annotations

from beartype.typing import Any, Dict, List
from hamilton.htypes import Parallelizable, Collect
from hamilton.function_modifiers import config, extract_fields
import pandas as pd


@config.when(get_annotations="parallel")
def parallelize_clips__PARALLEL(clips: pd.DataFrame) -> Parallelizable[Dict[str,Any]]:
    _clips_stack = []
    for _,row in clips.iterrows():
        _clips_stack.append(row.to_dict())

    for _clip in _clips_stack:
        yield _clip


@extract_fields(
    {
        "timestamp_1": pd.Timestamp,
        "timestamp_2": pd.Timestamp,
    }
)
@config.when(get_annotations="parallel")
def clip_from_clips__PARALLEL(
     parallelize_clips: Dict[str,Any]
) -> Dict[str, Any]:
    return parallelize_clips

def something(timestamp_1:pd.Timestamp, timestamp_2:pd.Timestamp)->pd.Timestamp:
    return timestamp_2 - timestamp_1


@config.when(get_annotations="parallel")
def combine_parallel__PARALLEL(clip_from_clips:Dict[str,Any], something:pd.Timestamp)->Dict[str,Any]:
    return {
        "clip":clip_from_clips,
        "time_delta" : something
        }

@config.when(get_annotations="parallel")
def collect_parallel__PARALLEL(combine_parallel:Collect[Dict[str,Any]])->List[Dict[str,Any]]:
    return combine_parallel


if __name__ == "__main__":
    import pandas as pd
    from hamilton import driver
    from hamilton.execution import executors
    import __main__

    df = pd.DataFrame([{'clip_id': 1000,
    'timestamp_1': pd.Timestamp('2024-08-30 07:49:14.962772+0000', tz='UTC'),
    'timestamp_2': pd.Timestamp('2024-08-30 07:49:14.962859+0000', tz='UTC'),
    'path': 'some_path_1',
    'other_data': 'other_data_1'},
    {'clip_id': 2000,
    'timestamp_1': pd.Timestamp('2024-08-30 07:49:14.962864+0000', tz='UTC'),
    'timestamp_2': pd.Timestamp('2024-08-30 07:49:14.962866+0000', tz='UTC'),
    'path': 'some_path_2',
    'other_data': 'other_data_2'},
    {'clip_id': 3000,
    'timestamp_1': pd.Timestamp('2024-08-30 07:49:14.962868+0000', tz='UTC'),
    'timestamp_2': pd.Timestamp('2024-08-30 07:49:14.962869+0000', tz='UTC'),
    'path': 'some_path_3',
    'other_data': 'other_data_3'},])

    inputs = {
        "clips": df,
    }

    dr_serial = (
        driver.Builder()
        .with_modules(__main__)
        .with_config({"get_annotations":"parallel"})
        .enable_dynamic_execution(allow_experimental_mode=True)
        .with_local_executor(executors.SynchronousLocalTaskExecutor())
        # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))
        .build()
    )


    outputs = [
    "collect_parallel"
    ]

    result = dr_serial.execute(inputs=inputs, final_vars=outputs)["collect_parallel"]
    print(len(result))
    print(result[0])

Library & System Information

  • Linux Ubuntu 20.04
  • Python 3.10.14
  • sf-hamilton 1.74.0
  • sf-hamilton-sdk 0.6.0

Expected behavior

Can used from __future__ import annotations and Parallelizable<>Collect in same module.

Additional context

Also a nice to have, but really low priority thing would be to have something like

extract_fields({"key1": type 1, "key1": type 2,})
def f() -> Parallelizable[Dict[str,Any]]:
    yield  for _list_of_parallel_things_dictionary
@jernejfrank jernejfrank added the triage label for issues that need to be triaged. label Aug 30, 2024
@jernejfrank jernejfrank changed the title Bug Report from __future__ import annotations breaks Parallelizable<>Collect Aug 30, 2024
@Dev-iL
Copy link
Contributor

Dev-iL commented Aug 30, 2024

Might be worth filling out the "Library & System Information" section, in particular - your python version.

The important part, to me, is ensuring there's a unit test that covers this behavior.

@jernejfrank
Copy link
Contributor Author

Sorry, added that in there.

@elijahbenizzy
Copy link
Collaborator

Simpler repro, also python3.10:

from __future__ import annotations

from hamilton.htypes import Parallelizable, Collect


def foo() -> Parallelizable[int]:
    yield 1
    yield 2
    yield 3


def bar(foo: int) -> int:
    return foo + 1


def baz(bar: Collect[int]) -> int:
    return sum(bar)


if __name__ == "__main__":
    from hamilton import driver
    from hamilton.execution import executors
    import __main__

    dr_serial = (
        driver.Builder()
        .with_modules(__main__)
        .enable_dynamic_execution(allow_experimental_mode=True)
        .with_local_executor(executors.SynchronousLocalTaskExecutor())
        .build()
    )

    outputs = [
        "baz"
    ]

    dr_serial.display_all_functions("./future_annotations.png")
    result = dr_serial.execute(final_vars=outputs)[outputs[0]]
    print(result)

will dig in soon

@elijahbenizzy
Copy link
Collaborator

Solved this in #1113!

@skrawcz skrawcz linked a pull request Aug 30, 2024 that will close this issue
7 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
triage label for issues that need to be triaged.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants