Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add e2e tests for async pipe_family #1231

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion tests/function_modifiers/test_macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest

import hamilton.function_modifiers
from hamilton import base, driver, function_modifiers, models, node
from hamilton import async_driver, base, driver, function_modifiers, models, node
from hamilton.function_modifiers import does
from hamilton.function_modifiers.dependencies import source, value
from hamilton.function_modifiers.macros import (
Expand All @@ -20,6 +20,8 @@
from hamilton.node import DependencyType

import tests.resources.mutate
import tests.resources.mutate_async
import tests.resources.pipe_async
import tests.resources.pipe_input
import tests.resources.pipe_output

Expand Down Expand Up @@ -1150,3 +1152,61 @@ def test_mutate_end_to_end_1(import_mutate_module):
)
assert result["chain_1_using_mutate"] == result["chain_1_not_using_mutate"]
assert result["chain_2_using_mutate"] == result["chain_2_not_using_mutate"]


@pytest.mark.asyncio
async def test_async_pipe_input_and_output_end_to_end():
elijahbenizzy marked this conversation as resolved.
Show resolved Hide resolved
inputs = {"data_input": pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})}

group_by_a = inputs["data_input"].groupby("a").sum().reset_index()
group_by_b = inputs["data_input"].groupby("b").sum().reset_index()

dr = (
await async_driver.Builder()
.with_modules(tests.resources.pipe_async)
.with_config(dict(groupby="a"))
.build()
)
results = await dr.execute(final_vars=["data_pipe_input", "data_pipe_output"], inputs=inputs)

pd.testing.assert_frame_equal(group_by_a, results["data_pipe_output"])
pd.testing.assert_frame_equal(group_by_a, results["data_pipe_input"])

dr = (
await async_driver.Builder()
.with_modules(tests.resources.pipe_async)
.with_config(dict(groupby="b"))
.build()
)
results = await dr.execute(final_vars=["data_pipe_input", "data_pipe_output"], inputs=inputs)

pd.testing.assert_frame_equal(group_by_b, results["data_pipe_output"])
pd.testing.assert_frame_equal(group_by_b, results["data_pipe_input"])


@pytest.mark.asyncio
async def test_async_mutate_end_to_end():
inputs = {"data_input": pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})}

group_by_a = inputs["data_input"].groupby("a").sum().reset_index()
group_by_b = inputs["data_input"].groupby("b").sum().reset_index()

dr = (
await async_driver.Builder()
.with_modules(tests.resources.mutate_async)
.with_config(dict(groupby="a"))
.build()
)
results = await dr.execute(final_vars=["data_mutate"], inputs=inputs)

pd.testing.assert_frame_equal(group_by_a, results["data_mutate"])

dr = (
await async_driver.Builder()
.with_modules(tests.resources.mutate_async)
.with_config(dict(groupby="b"))
.build()
)
results = await dr.execute(final_vars=["data_mutate"], inputs=inputs)

pd.testing.assert_frame_equal(group_by_b, results["data_mutate"])
21 changes: 21 additions & 0 deletions tests/resources/mutate_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import asyncio

import pandas as pd

from hamilton.function_modifiers import apply_to, mutate


def data_mutate(data_input: pd.DataFrame) -> pd.DataFrame:
elijahbenizzy marked this conversation as resolved.
Show resolved Hide resolved
return data_input


@mutate(apply_to(data_mutate).when(groupby="a"))
async def _groupby_a_mutate(d: pd.DataFrame) -> pd.DataFrame:
await asyncio.sleep(0.0001)
return d.groupby("a").sum().reset_index()


@mutate(apply_to(data_mutate).when_not(groupby="a"))
async def _groupby_b_mutate(d: pd.DataFrame) -> pd.DataFrame:
await asyncio.sleep(0.0001)
return d.groupby("b").sum().reset_index()
35 changes: 35 additions & 0 deletions tests/resources/pipe_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio

import pandas as pd

from hamilton.function_modifiers import pipe_input, pipe_output, step

# async def data_input() -> pd.DataFrame:
# await asyncio.sleep(0.0001)
# return


async def _groupby_a(d: pd.DataFrame) -> pd.DataFrame:
await asyncio.sleep(0.0001)
return d.groupby("a").sum().reset_index()


async def _groupby_b(d: pd.DataFrame) -> pd.DataFrame:
await asyncio.sleep(0.0001)
return d.groupby("b").sum().reset_index()


@pipe_input(
step(_groupby_a).when(groupby="a"),
step(_groupby_b).when_not(groupby="a"),
)
def data_pipe_input(data_input: pd.DataFrame) -> pd.DataFrame:
return data_input


@pipe_output(
step(_groupby_a).when(groupby="a"),
step(_groupby_b).when_not(groupby="a"),
)
def data_pipe_output(data_input: pd.DataFrame) -> pd.DataFrame:
return data_input