Skip to content

Commit

Permalink
[Core] Fix proctitle for generator tasks (#36928)
Browse files Browse the repository at this point in the history
For static and dynamic generator tasks, the actual task execution is inside store_task_outputs when we iterator over the generator but _changeproctitle context manager doesn't cover that part and that's why the proctitle shows as IDLE. The fix is to increase the scope of _changeproctitle so that now it covers the entire task execution including deserializing args, execution, storing outputs.

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored Jun 29, 2023
1 parent f9912eb commit 7a85347
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 58 deletions.
115 changes: 57 additions & 58 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import io
import os
import pickle
import random
import setproctitle
import signal
import sys
import threading
Expand Down Expand Up @@ -1356,7 +1355,8 @@ cdef void execute_task(

return function(actor, *arguments, **kwarguments)

with core_worker.profile_event(b"task::" + name, extra_data=extra_data):
with core_worker.profile_event(b"task::" + name, extra_data=extra_data), \
ray._private.worker._changeproctitle(title, next_title):
task_exception = False
try:
with core_worker.profile_event(b"task:deserialize_arguments"):
Expand Down Expand Up @@ -1410,62 +1410,61 @@ cdef void execute_task(
with core_worker.profile_event(b"task:execute"):
task_exception = True
try:
with ray._private.worker._changeproctitle(title, next_title):
if debugger_breakpoint != b"":
ray.util.pdb.set_trace(
breakpoint_uuid=debugger_breakpoint)
outputs = function_executor(*args, **kwargs)

if is_streaming_generator:
# Streaming generator always has a single return value
# which is the generator task return.
assert returns[0].size() == 1

if (not inspect.isgenerator(outputs)
and not inspect.isasyncgen(outputs)):
raise ValueError(
"Functions with "
"@ray.remote(num_returns=\"streaming\" "
"must return a generator")

execute_streaming_generator(
outputs,
returns[0][0].first, # generator object ID.
task_type,
caller_address,
task_id,
serialized_retry_exception_allowlist,
function_name,
function_descriptor,
title,
actor,
actor_id,
name_of_concurrency_group_to_execute,
returns[0].size(),
attempt_number,
streaming_generator_returns,
is_retryable_error,
application_error)
# Streaming generator output is not used, so set it to None.
outputs = None

next_breakpoint = (
ray._private.worker.global_worker.debugger_breakpoint)
if next_breakpoint != b"":
# If this happens, the user typed "remote" and
# there were no more remote calls left in this
# task. In that case we just exit the debugger.
ray.experimental.internal_kv._internal_kv_put(
"RAY_PDB_{}".format(next_breakpoint),
"{\"exit_debugger\": true}",
namespace=ray_constants.KV_NAMESPACE_PDB
)
ray.experimental.internal_kv._internal_kv_del(
"RAY_PDB_CONTINUE_{}".format(next_breakpoint),
namespace=ray_constants.KV_NAMESPACE_PDB
)
(ray._private.worker.global_worker
.debugger_breakpoint) = b""
if debugger_breakpoint != b"":
ray.util.pdb.set_trace(
breakpoint_uuid=debugger_breakpoint)
outputs = function_executor(*args, **kwargs)

if is_streaming_generator:
# Streaming generator always has a single return value
# which is the generator task return.
assert returns[0].size() == 1

if (not inspect.isgenerator(outputs)
and not inspect.isasyncgen(outputs)):
raise ValueError(
"Functions with "
"@ray.remote(num_returns=\"streaming\" "
"must return a generator")

execute_streaming_generator(
outputs,
returns[0][0].first, # generator object ID.
task_type,
caller_address,
task_id,
serialized_retry_exception_allowlist,
function_name,
function_descriptor,
title,
actor,
actor_id,
name_of_concurrency_group_to_execute,
returns[0].size(),
attempt_number,
streaming_generator_returns,
is_retryable_error,
application_error)
# Streaming generator output is not used, so set it to None.
outputs = None

next_breakpoint = (
ray._private.worker.global_worker.debugger_breakpoint)
if next_breakpoint != b"":
# If this happens, the user typed "remote" and
# there were no more remote calls left in this
# task. In that case we just exit the debugger.
ray.experimental.internal_kv._internal_kv_put(
"RAY_PDB_{}".format(next_breakpoint),
"{\"exit_debugger\": true}",
namespace=ray_constants.KV_NAMESPACE_PDB
)
ray.experimental.internal_kv._internal_kv_del(
"RAY_PDB_CONTINUE_{}".format(next_breakpoint),
namespace=ray_constants.KV_NAMESPACE_PDB
)
(ray._private.worker.global_worker
.debugger_breakpoint) = b""
task_exception = False
except AsyncioActorExit as e:
exit_current_actor_if_asyncio()
Expand Down
28 changes: 28 additions & 0 deletions python/ray/tests/test_advanced_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,34 @@ def unique_1():
ray.get(unique_1.options(name=task_name).remote())


def test_ray_task_generator_setproctitle(ray_start_2_cpus):
@ray.remote
def generator_task():
for i in range(4):
assert setproctitle.getproctitle() == "ray::generator_task"
yield i

ray.get(generator_task.options(num_returns=2).remote()[0])
ray.get(generator_task.options(num_returns="dynamic").remote())
generator = generator_task.options(num_returns="streaming").remote()
for _ in range(4):
ray.get(next(generator))

@ray.remote
class UniqueName:
def f(self):
for i in range(4):
assert setproctitle.getproctitle() == "ray::UniqueName.f"
yield i

actor = UniqueName.remote()
ray.get(actor.f.options(num_returns=2).remote()[0])
ray.get(actor.f.options(num_returns="dynamic").remote())
generator = actor.f.options(num_returns="streaming").remote()
for _ in range(4):
ray.get(next(generator))


@pytest.mark.skipif(
os.getenv("TRAVIS") is None, reason="This test should only be run on Travis."
)
Expand Down

0 comments on commit 7a85347

Please sign in to comment.