-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] fix a bug that may cause async map tasks to hang #48861
Conversation
Signed-off-by: Hao Chen <chenh1024@gmail.com>
dab4793
to
5af4576
Compare
loop = ray.data._map_actor_context.udf_map_asyncio_loop | ||
tasks = [loop.create_task(process_batch(x)) for x in input_iterable] | ||
try: | ||
loop = ray.data._map_actor_context.udf_map_asyncio_loop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of scope for this PR, but do you know why we need to use a global variable for the loop? Seems like the actor context is specific to a UDF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's because we already have a global var _MapActorContext
to cache other stuff.
technically the loop can also be bound to the actor object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@raulchen please hold on merging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -389,6 +394,12 @@ async def process_all_batches(): | |||
# from the async generator, corresponding to a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's simplify the conditional as well to just
while True:
# Blocking
batch = q.get()
if sentinel:
break
@@ -352,6 +352,8 @@ def transform_fn( | |||
# generators, and in the main event loop, yield them from | |||
# the queue as they become available. | |||
output_batch_queue = queue.Queue() | |||
# Use a special object to signal the end of the queue. | |||
end_of_queue = object() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Sentinel is a pretty common term for it
…48861) ## Why are these changes needed? Fix a bug that may cause async map tasks to hang. See code comments for details. This issue can be reproduced with an existing test `test_map_batches_async_generator` on slow machines. --------- Signed-off-by: Hao Chen <chenh1024@gmail.com> Signed-off-by: Connor Sanders <connor@elastiflow.com>
…48861) ## Why are these changes needed? Fix a bug that may cause async map tasks to hang. See code comments for details. This issue can be reproduced with an existing test `test_map_batches_async_generator` on slow machines. --------- Signed-off-by: Hao Chen <chenh1024@gmail.com> Signed-off-by: hjiang <dentinyhao@gmail.com>
Why are these changes needed?
Fix a bug that may cause async map tasks to hang. See code comments for details.
This issue can be reproduced with an existing test
test_map_batches_async_generator
on slow machines.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.