From 7a85347c5a25141d55600f34d26d6788da9d88a8 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 28 Jun 2023 20:26:42 -0700 Subject: [PATCH] [Core] Fix proctitle for generator tasks (#36928) For static and dynamic generator tasks, the actual task execution is inside store_task_outputs when we iterator over the generator but _changeproctitle context manager doesn't cover that part and that's why the proctitle shows as IDLE. The fix is to increase the scope of _changeproctitle so that now it covers the entire task execution including deserializing args, execution, storing outputs. Signed-off-by: Jiajun Yao --- python/ray/_raylet.pyx | 115 ++++++++++++++-------------- python/ray/tests/test_advanced_3.py | 28 +++++++ 2 files changed, 85 insertions(+), 58 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 5f5ec5e328b02..38547f4e6831e 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -16,7 +16,6 @@ import io import os import pickle import random -import setproctitle import signal import sys import threading @@ -1356,7 +1355,8 @@ cdef void execute_task( return function(actor, *arguments, **kwarguments) - with core_worker.profile_event(b"task::" + name, extra_data=extra_data): + with core_worker.profile_event(b"task::" + name, extra_data=extra_data), \ + ray._private.worker._changeproctitle(title, next_title): task_exception = False try: with core_worker.profile_event(b"task:deserialize_arguments"): @@ -1410,62 +1410,61 @@ cdef void execute_task( with core_worker.profile_event(b"task:execute"): task_exception = True try: - with ray._private.worker._changeproctitle(title, next_title): - if debugger_breakpoint != b"": - ray.util.pdb.set_trace( - breakpoint_uuid=debugger_breakpoint) - outputs = function_executor(*args, **kwargs) - - if is_streaming_generator: - # Streaming generator always has a single return value - # which is the generator task return. - assert returns[0].size() == 1 - - if (not inspect.isgenerator(outputs) - and not inspect.isasyncgen(outputs)): - raise ValueError( - "Functions with " - "@ray.remote(num_returns=\"streaming\" " - "must return a generator") - - execute_streaming_generator( - outputs, - returns[0][0].first, # generator object ID. - task_type, - caller_address, - task_id, - serialized_retry_exception_allowlist, - function_name, - function_descriptor, - title, - actor, - actor_id, - name_of_concurrency_group_to_execute, - returns[0].size(), - attempt_number, - streaming_generator_returns, - is_retryable_error, - application_error) - # Streaming generator output is not used, so set it to None. - outputs = None - - next_breakpoint = ( - ray._private.worker.global_worker.debugger_breakpoint) - if next_breakpoint != b"": - # If this happens, the user typed "remote" and - # there were no more remote calls left in this - # task. In that case we just exit the debugger. - ray.experimental.internal_kv._internal_kv_put( - "RAY_PDB_{}".format(next_breakpoint), - "{\"exit_debugger\": true}", - namespace=ray_constants.KV_NAMESPACE_PDB - ) - ray.experimental.internal_kv._internal_kv_del( - "RAY_PDB_CONTINUE_{}".format(next_breakpoint), - namespace=ray_constants.KV_NAMESPACE_PDB - ) - (ray._private.worker.global_worker - .debugger_breakpoint) = b"" + if debugger_breakpoint != b"": + ray.util.pdb.set_trace( + breakpoint_uuid=debugger_breakpoint) + outputs = function_executor(*args, **kwargs) + + if is_streaming_generator: + # Streaming generator always has a single return value + # which is the generator task return. + assert returns[0].size() == 1 + + if (not inspect.isgenerator(outputs) + and not inspect.isasyncgen(outputs)): + raise ValueError( + "Functions with " + "@ray.remote(num_returns=\"streaming\" " + "must return a generator") + + execute_streaming_generator( + outputs, + returns[0][0].first, # generator object ID. + task_type, + caller_address, + task_id, + serialized_retry_exception_allowlist, + function_name, + function_descriptor, + title, + actor, + actor_id, + name_of_concurrency_group_to_execute, + returns[0].size(), + attempt_number, + streaming_generator_returns, + is_retryable_error, + application_error) + # Streaming generator output is not used, so set it to None. + outputs = None + + next_breakpoint = ( + ray._private.worker.global_worker.debugger_breakpoint) + if next_breakpoint != b"": + # If this happens, the user typed "remote" and + # there were no more remote calls left in this + # task. In that case we just exit the debugger. + ray.experimental.internal_kv._internal_kv_put( + "RAY_PDB_{}".format(next_breakpoint), + "{\"exit_debugger\": true}", + namespace=ray_constants.KV_NAMESPACE_PDB + ) + ray.experimental.internal_kv._internal_kv_del( + "RAY_PDB_CONTINUE_{}".format(next_breakpoint), + namespace=ray_constants.KV_NAMESPACE_PDB + ) + (ray._private.worker.global_worker + .debugger_breakpoint) = b"" task_exception = False except AsyncioActorExit as e: exit_current_actor_if_asyncio() diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 56bea6b9bf39c..e2fcc88468754 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -236,6 +236,34 @@ def unique_1(): ray.get(unique_1.options(name=task_name).remote()) +def test_ray_task_generator_setproctitle(ray_start_2_cpus): + @ray.remote + def generator_task(): + for i in range(4): + assert setproctitle.getproctitle() == "ray::generator_task" + yield i + + ray.get(generator_task.options(num_returns=2).remote()[0]) + ray.get(generator_task.options(num_returns="dynamic").remote()) + generator = generator_task.options(num_returns="streaming").remote() + for _ in range(4): + ray.get(next(generator)) + + @ray.remote + class UniqueName: + def f(self): + for i in range(4): + assert setproctitle.getproctitle() == "ray::UniqueName.f" + yield i + + actor = UniqueName.remote() + ray.get(actor.f.options(num_returns=2).remote()[0]) + ray.get(actor.f.options(num_returns="dynamic").remote()) + generator = actor.f.options(num_returns="streaming").remote() + for _ in range(4): + ray.get(next(generator)) + + @pytest.mark.skipif( os.getenv("TRAVIS") is None, reason="This test should only be run on Travis." )