Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async support for pipe_family #1223

Merged
merged 1 commit into from
Nov 14, 2024

Conversation

jernejfrank
Copy link
Contributor

Enables running pipe_input, pipe_output and mutate with asyncio. Addresses #1193.

Changes

  • Changes made on node.reassign_inputs.

How I tested this

import asyncio

import pandas as pd

from hamilton import async_driver
from hamilton.function_modifiers import pipe_output, pipe_input, mutate, apply_to, step, hamilton_exclude


async def data_input() -> pd.DataFrame:
    await asyncio.sleep(0.0001)
    return pd.DataFrame({
        "a": [1, 2, 3],
        "b": [4, 5, 6]
    })


async def _groupby_a(d: pd.DataFrame) -> pd.DataFrame:
    await asyncio.sleep(0.0001)
    return d.groupby("a").sum().reset_index()


async def _groupby_b(d: pd.DataFrame) -> pd.DataFrame:
    await asyncio.sleep(0.0001)
    return d.groupby("b").sum().reset_index()


@pipe_input(
    step(_groupby_a).when(groupby="a"),
    step(_groupby_b).when_not(groupby="a"),
)
def data_pipe_input(data_input: pd.DataFrame) -> pd.DataFrame:
    return data_input


@pipe_output(
    step(_groupby_a).when(groupby="a"),
    step(_groupby_b).when_not(groupby="a"),
)
def data_pipe_output(data_input: pd.DataFrame) -> pd.DataFrame:
    return data_input


def data_mutate(data_input: pd.DataFrame) -> pd.DataFrame:
    return data_input

@mutate(data_mutate)
async def _groupby_a_mutate(d: pd.DataFrame) -> pd.DataFrame:
    await asyncio.sleep(0.0001)
    return d.groupby("a").sum().reset_index()

@mutate(apply_to(data_mutate))
async def _groupby_b_mutate(d: pd.DataFrame) -> pd.DataFrame:
    await asyncio.sleep(0.0001)
    return d.groupby("b").sum().reset_index()


@hamilton_exclude
async def main():
    import __main__
    dr = (await async_driver.Builder()
          .with_modules(__main__)
          .with_config(dict(groupby="b"))
          .build())
    results = await dr.execute(["data_pipe_input", "data_pipe_output", "data_mutate"])
    print(results)


if __name__ == "__main__":
    asyncio.run(main())

Notes

Follows the pattern used in subdag by creating an async def placeholder callable.

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

Copy link
Contributor

@ellipsis-dev ellipsis-dev bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Changes requested. Reviewed everything up to f7c7e04 in 1 minute and 12 seconds

More details
  • Looked at 62 lines of code in 2 files
  • Skipped 0 files when reviewing.
  • Skipped posting 4 drafted comments based on config settings.
1. hamilton/function_modifiers/macros.py:610
  • Draft comment:
    Typo in docstring: 'parameeter' should be 'parameter'.
  • Reason this comment was not posted:
    Confidence changes required: 10%
    The comment is about a typo in the docstring, which is a minor issue but should be corrected for clarity.
2. hamilton/function_modifiers/macros.py:1310
  • Draft comment:
    Consider renaming async_function to something more descriptive like async_identity_wrapper for clarity.
  • Reason this comment was not posted:
    Confidence changes required: 20%
    The code is checking if a function is asynchronous and then creating an async wrapper if needed. This is a good approach, but the naming of the async wrapper function could be improved for clarity.
3. hamilton/node.py:370
  • Draft comment:
    Consider renaming async_function to something more descriptive like async_new_callable_wrapper for clarity.
  • Reason this comment was not posted:
    Confidence changes required: 20%
    The code is checking if a function is asynchronous and then creating an async wrapper if needed. This is a good approach, but the naming of the async wrapper function could be improved for clarity.
4. hamilton/node.py:378
  • Draft comment:
    Remove the commented-out line to keep the code clean and maintainable.
  • Reason this comment was not posted:
    Confidence changes required: 50%
    The function reassign_inputs in node.py has a commented-out line that should be removed to keep the code clean and maintainable.

Workflow ID: wflow_dRTAk95K2Or0YLaJ


Want Ellipsis to fix these issues? Tag @ellipsis-dev in a comment. You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.

@@ -1302,10 +1302,17 @@ def transform_node(
# We pick a reserved prefix that ovoids clashes with user defined functions / nodes
original_node = node_.copy_with(name=f"{node_.name}.raw")

is_async = inspect.iscoroutinefunction(fn) # determine if its async
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic to check if a function is async using inspect.iscoroutinefunction is already present in multiple places in the codebase. Consider reusing existing implementations to avoid redundancy.

  • logic to determine if a function is async (expanders.py)
  • logic to determine if a function is async (recursive.py)
  • logic to determine if a function is async (base.py)
  • logic to determine if a function is async (node.py)

Copy link
Collaborator

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the surface looks good (I don't thikn ellipsis has a particularly useful comment), but definitely will want some tests!

Enables running pipe_input, pipe_output and mutate with asyncio.
Copy link
Collaborator

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

@elijahbenizzy elijahbenizzy merged commit 8476688 into DAGWorks-Inc:main Nov 14, 2024
24 checks passed
@jernejfrank jernejfrank deleted the feat/async_pipe branch November 17, 2024 11:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants