-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decorator like @subdag
that works with async functions
#903
Comments
@elijahbenizzy yep sorry didn't add context, chatted offline with @kpounder. But yes subdag assumes everything is sync -- haven't scoped what's required. If you have an actual DAG the error is somewhere in subdag because it gets a co-routine result, not what it was actually expecting. |
Using the async fast api example: # async_subdag.py
import async_module
import fastapi
from hamilton.experimental import h_async
from hamilton import base
from hamilton.function_modifiers import subdag, source
@subdag(
async_module,
inputs={"request": source("request")},
)
async def my_subdag_with_decorator(pipeline: dict) -> dict:
return pipeline import async_subdag
dr = h_async.AsyncDriver({}, async_subdag, result_builder=base.DictResult())
@app.post("/execute")
async def call(request: fastapi.Request) -> dict:
"""Handler for pipeline call"""
input_data = {"request": request}
result = await dr.execute(["my_subdag_with_decorator"], inputs=input_data)
return result Stacktrace: ERROR: Exception in ASGI application
Traceback (most recent call last):
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/uvicorn/protocols/http/h11_impl.py", line 404, in run_asgi
result = await app( # type: ignore[func-returns-value]
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
return await self.app(scope, receive, send)
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/fastapi/applications.py", line 1054, in __call__
await super().__call__(scope, receive, send)
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/applications.py", line 123, in __call__
await self.middleware_stack(scope, receive, send)
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line 186, in __call__
raise exc
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line 164, in __call__
await self.app(scope, receive, _send)
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
raise exc
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
await app(scope, receive, sender)
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 762, in __call__
await self.middleware_stack(scope, receive, send)
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 782, in app
await route.handle(scope, receive, send)
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 297, in handle
await self.app(scope, receive, send)
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 77, in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
raise exc
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
await app(scope, receive, sender)
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 72, in app
response = await func(request)
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/fastapi/routing.py", line 299, in app
raise e
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/fastapi/routing.py", line 294, in app
raw_response = await run_endpoint_function(
File ".pyenv/versions/my-venv/lib/python3.9/site-packages/fastapi/routing.py", line 191, in run_endpoint_function
return await dependant.call(**values)
File "hamilton/examples/async/fastapi_example.py", line 21, in call
result = await dr.execute(["my_subdag_with_decorator"], inputs=input_data)
File "hamilton/hamilton/experimental/h_async.py", line 173, in execute
raise e
File "hamilton/hamilton/experimental/h_async.py", line 164, in execute
outputs = await self.raw_execute(final_vars, overrides, display_graph, inputs=inputs)
File "hamilton/hamilton/experimental/h_async.py", line 136, in raw_execute
return await await_dict_of_tasks(task_dict)
File "hamilton/hamilton/experimental/h_async.py", line 20, in await_dict_of_tasks
coroutines_gathered = await asyncio.gather(*coroutines)
File "hamilton/hamilton/experimental/h_async.py", line 33, in process_value
return await val
File "hamilton/hamilton/experimental/h_async.py", line 68, in new_fn
fn_kwargs = await await_dict_of_tasks(task_dict)
File "hamilton/hamilton/experimental/h_async.py", line 20, in await_dict_of_tasks
coroutines_gathered = await asyncio.gather(*coroutines)
File "hamilton/hamilton/experimental/h_async.py", line 33, in process_value
return await val
File "hamilton/hamilton/experimental/h_async.py", line 68, in new_fn
fn_kwargs = await await_dict_of_tasks(task_dict)
File "hamilton/hamilton/experimental/h_async.py", line 20, in await_dict_of_tasks
coroutines_gathered = await asyncio.gather(*coroutines)
File "hamilton/hamilton/experimental/h_async.py", line 33, in process_value
return await val
File "hamilton/hamilton/experimental/h_async.py", line 68, in new_fn
fn_kwargs = await await_dict_of_tasks(task_dict)
File "hamilton/hamilton/experimental/h_async.py", line 20, in await_dict_of_tasks
coroutines_gathered = await asyncio.gather(*coroutines)
File "hamilton/hamilton/experimental/h_async.py", line 33, in process_value
return await val
File "hamilton/hamilton/experimental/h_async.py", line 71, in new_fn
return fn(**fn_kwargs)
File "hamilton/hamilton/function_modifiers/recursive.py", line 348, in fn
return _callabl(**new_kwargs)
File "hamilton/examples/async/async_module.py", line 20, in foo
return request_raw.get("foo", "far")
AttributeError: 'coroutine' object has no attribute 'get' |
I'm hoping it should be as simple as adding alternate implementations to:
E.G. switching these to I also want to add a little more tooling to the node calass (E.G. it should know if its async), so we can do this more ergonomically. |
This just adds another path for the two functions we redefine. We need to add tests but this works on a manual test for now. See #903
@kpounder -- this was easy (if I got it right) -- see #905. I've published an RC version, mind giving your code a spin under Note there are a few decorators that might not work with async, let us know if you find more and we'll fix them too! |
This just adds another path for the two functions we redefine. We need to add tests but this works on a manual test for now. See #903
@elijahbenizzy it seems to work! 🎉 will do a bit more playing around with it, but thank you very much |
This just adds another path for the two functions we redefine. We need to add tests but this works on a manual test for now. See #903
This just adds another path for the two functions we redefine. We need to add tests but this works on a manual test for now. See #903
This just adds another path for the two functions we redefine. We need to add tests but this works on a manual test for now. See #903
Closing -- releasing this soon. |
Is your feature request related to a problem? Please describe.
The workflow I'm currently considering migrating to Hamilton relies heavily on async functions and is a good use case for using something like
@subdag
to reuse certain code at different parts in the overall DAG. The problem is that@subdag
does not work with sub-DAGs that contain async functions.Describe the solution you'd like
Maybe a
@subdag_async
that works like@subdag
Describe alternatives you've considered
Trying now to create an async driver within the main driver to execute the sub-DAG async -- but would still prefer
@subdag_async
for developer UX, better overall DAG visualization, etc.Additional context
N/A
The text was updated successfully, but these errors were encountered: