Skip to content

Commit

Permalink
[Logging] Switch worker_setup_hook to worker_process_setup_hook (ray-…
Browse files Browse the repository at this point in the history
…project#37247) (ray-project#37463)

Change worker_setup_hook -> worker_process_setup_hook

Signed-off-by: SangBin Cho <rkooo567@gmail.com>
  • Loading branch information
rkooo567 authored Jul 17, 2023
1 parent b6a1a6f commit 4ebe568
Showing 9 changed files with 64 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -384,10 +384,14 @@ ray.get(f.remote("A log message for a task."))

```{admonition} Caution
:class: caution
This is an experimental feature. It doesn't support [Ray Client](ray-client-ref) yet.
This is an experimental feature. The semantic of the API is subject to change.
It doesn't support [Ray Client](ray-client-ref) yet.
Currently, all the runtime environment passed to a driver (`ray.init(runtime_env={...})`) will be ignored if you specify any runtime environment via [Ray Job Submission](jobs-quickstart) API (`ray job submit --working-dir` or `ray job submit --runtime-env`).
```

Use `worker_process_setup_hook` to apply the new logging configuration to all worker processes within a job.

```python
# driver.py
def logging_setup_func():
11 changes: 8 additions & 3 deletions python/ray/_private/function_manager.py
Original file line number Diff line number Diff line change
@@ -27,7 +27,11 @@
format_error_message,
)
from ray._private.serialization import pickle_dumps
from ray._raylet import JobID, PythonFunctionDescriptor, WORKER_SETUP_HOOK_KEY_NAME_GCS
from ray._raylet import (
JobID,
PythonFunctionDescriptor,
WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS,
)

FunctionExecutionInfo = namedtuple(
"FunctionExecutionInfo", ["function", "function_name", "max_calls"]
@@ -178,7 +182,8 @@ def export_setup_func(
) -> bytes:
"""Export the setup hook function and return the key."""
pickled_function = pickle_dumps(
setup_func, f"Cannot serialize the worker_setup_hook {setup_func.__name__}"
setup_func,
"Cannot serialize the worker_process_setup_hook " f"{setup_func.__name__}",
)

function_to_run_id = hashlib.shake_128(pickled_function).digest(
@@ -187,7 +192,7 @@ def export_setup_func(
key = make_function_table_key(
# This value should match with gcs_function_manager.h.
# Otherwise, it won't be GC'ed.
WORKER_SETUP_HOOK_KEY_NAME_GCS.encode(),
WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS.encode(),
# b"FunctionsToRun",
self._worker.current_job_id.binary(),
function_to_run_id,
8 changes: 6 additions & 2 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
@@ -436,5 +436,9 @@ def gcs_actor_scheduling_enabled():
"RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING", False
)

WORKER_SETUP_HOOK_ENV_VAR = "__RAY_WORKER_SETUP_HOOK_ENV_VAR"
RAY_WORKER_SETUP_HOOK_LOAD_TIMEOUT_ENV_VAR = "RAY_WORKER_SETUP_HOOK_LOAD_TIMEOUT"
WORKER_PROCESS_SETUP_HOOK_ENV_VAR = "__RAY_WORKER_PROCESS_SETUP_HOOK_ENV_VAR"
RAY_WORKER_PROCESS_SETUP_HOOK_LOAD_TIMEOUT_ENV_VAR = (
"RAY_WORKER_PROCESS_SETUP_HOOK_LOAD_TIMEOUT" # noqa
)

RAY_DEFAULT_LABEL_KEYS_PREFIX = "ray.io/"
34 changes: 19 additions & 15 deletions python/ray/_private/runtime_env/setup_hook.py
Original file line number Diff line number Diff line change
@@ -15,7 +15,9 @@

def get_import_export_timeout():
return int(
os.environ.get(ray_constants.RAY_WORKER_SETUP_HOOK_LOAD_TIMEOUT_ENV_VAR, "60")
os.environ.get(
ray_constants.RAY_WORKER_PROCESS_SETUP_HOOK_LOAD_TIMEOUT_ENV_VAR, "60"
)
)


@@ -27,15 +29,15 @@ def _encode_function_key(key: str) -> bytes:
return base64.b64decode(key)


def upload_worker_setup_hook_if_needed(
def upload_worker_process_setup_hook_if_needed(
runtime_env: Union[Dict[str, Any], RuntimeEnv],
worker: "ray.Worker",
) -> Union[Dict[str, Any], RuntimeEnv]:
"""Uploads the worker_setup_hook to GCS with a key.
"""Uploads the worker_process_setup_hook to GCS with a key.
runtime_env["worker_setup_hook"] is converted to a decoded key
runtime_env["worker_process_setup_hook"] is converted to a decoded key
that can load the worker setup hook function from GCS.
I.e., you can use internalKV.Get(runtime_env["worker_setup_hook])
i.e., you can use internalKV.Get(runtime_env["worker_process_setup_hook])
to access the worker setup hook from GCS.
Args:
@@ -48,13 +50,13 @@ def upload_worker_setup_hook_if_needed(
a string. The given decoder is used to decode the function
key.
"""
setup_func = runtime_env.get("worker_setup_hook")
setup_func = runtime_env.get("worker_process_setup_hook")
if setup_func is None:
return runtime_env

if not isinstance(setup_func, Callable):
raise TypeError(
"worker_setup_hook must be a function, " f"got {type(setup_func)}."
"worker_process_setup_hook must be a function, " f"got {type(setup_func)}."
)
# TODO(sang): Support modules.

@@ -67,39 +69,41 @@ def upload_worker_setup_hook_if_needed(
"Failed to export the setup function."
) from e
env_vars = runtime_env.get("env_vars", {})
assert ray_constants.WORKER_SETUP_HOOK_ENV_VAR not in env_vars, (
f"The env var, {ray_constants.WORKER_SETUP_HOOK_ENV_VAR}, "
assert ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR not in env_vars, (
f"The env var, {ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR}, "
"is not permitted because it is reserved for the internal use."
)
env_vars[ray_constants.WORKER_SETUP_HOOK_ENV_VAR] = _decode_function_key(key)
env_vars[ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR] = _decode_function_key(
key
)
runtime_env["env_vars"] = env_vars
# Note: This field is no-op. We don't have a plugin for the setup hook
# because we can implement it simply using an env var.
# This field is just for the observability purpose, so we store
# the name of the method.
runtime_env["worker_setup_hook"] = setup_func.__name__
runtime_env["worker_process_setup_hook"] = setup_func.__name__
return runtime_env


def load_and_execute_setup_hook(
worker_setup_hook_key: str,
worker_process_setup_hook_key: str,
) -> Optional[str]:
"""Load the setup hook from a given key and execute.
Args:
worker_setup_hook_key: The key to import the setup hook
worker_process_setup_hook_key: The key to import the setup hook
from GCS.
Returns:
An error message if it fails. None if it succeeds.
"""
assert worker_setup_hook_key is not None
assert worker_process_setup_hook_key is not None
worker = ray._private.worker.global_worker
assert worker.connected

func_manager = worker.function_actor_manager
try:
worker_setup_func_info = func_manager.fetch_registered_method(
_encode_function_key(worker_setup_hook_key),
_encode_function_key(worker_process_setup_hook_key),
timeout=get_import_export_timeout(),
)
except Exception:
6 changes: 4 additions & 2 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
@@ -73,7 +73,9 @@
from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR
from ray._private.runtime_env.py_modules import upload_py_modules_if_needed
from ray._private.runtime_env.working_dir import upload_working_dir_if_needed
from ray._private.runtime_env.setup_hook import upload_worker_setup_hook_if_needed
from ray._private.runtime_env.setup_hook import (
upload_worker_process_setup_hook_if_needed,
)
from ray._private.storage import _load_class
from ray._private.utils import get_ray_doc_version
from ray.exceptions import ObjectStoreFullError, RayError, RaySystemError, RayTaskError
@@ -2170,7 +2172,7 @@ def connect(
runtime_env = upload_working_dir_if_needed(
runtime_env, scratch_dir, logger=logger
)
runtime_env = upload_worker_setup_hook_if_needed(
runtime_env = upload_worker_process_setup_hook_if_needed(
runtime_env,
worker,
)
8 changes: 5 additions & 3 deletions python/ray/_private/workers/default_worker.py
Original file line number Diff line number Diff line change
@@ -252,9 +252,11 @@
ray._private.utils.try_import_each_module(module_names_to_import)

# If the worker setup function is configured, run it.
worker_setup_hook_key = os.getenv(ray_constants.WORKER_SETUP_HOOK_ENV_VAR)
if worker_setup_hook_key:
error = load_and_execute_setup_hook(worker_setup_hook_key)
worker_process_setup_hook_key = os.getenv(
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR
)
if worker_process_setup_hook_key:
error = load_and_execute_setup_hook(worker_process_setup_hook_key)
if error is not None:
worker.core_worker.exit_worker("system", error)

2 changes: 1 addition & 1 deletion python/ray/includes/common.pxi
Original file line number Diff line number Diff line change
@@ -30,6 +30,6 @@ cdef class GcsClientOptions:
return <CGcsClientOptions*>(self.inner.get())


WORKER_SETUP_HOOK_KEY_NAME_GCS = str(kWorkerSetupHookKeyName)
WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS = str(kWorkerSetupHookKeyName)
RESOURCE_UNIT_SCALING = kResourceUnitScaling
STREAMING_GENERATOR_RETURN = kStreamingGeneratorReturn
16 changes: 8 additions & 8 deletions python/ray/runtime_env/runtime_env.py
Original file line number Diff line number Diff line change
@@ -235,11 +235,11 @@ class MyClass:
The `run_options` list spec is here:
https://docs.docker.com/engine/reference/run/
env_vars: Environment variables to set.
worker_setup_hook: The setup hook that's called after workers
start and before tasks and actors are scheduled.
The value has to be a callable when passed to the job/task/actor.
worker_process_setup_hook: (Experimental) The setup hook that's
called after workers start and before Tasks and Actors are scheduled.
The value has to be a callable when passed to the Job, Task, or Actor.
The callable is then exported and this value is converted to
the setup hook's function name for the observability purpose.
the setup hook's function name for observability.
config: config for runtime environment. Either
a dict or a RuntimeEnvConfig. Field: (1) setup_timeout_seconds, the
timeout of runtime environment creation, timeout is in seconds.
@@ -263,7 +263,7 @@ class MyClass:
# field which is not supported. We should remove it
# with the test.
"docker",
"worker_setup_hook",
"worker_process_setup_hook",
}

extensions_fields: Set[str] = {
@@ -281,7 +281,7 @@ def __init__(
conda: Optional[Union[Dict[str, str], str]] = None,
container: Optional[Dict[str, str]] = None,
env_vars: Optional[Dict[str, str]] = None,
worker_setup_hook: Optional[Union[Callable, str]] = None,
worker_process_setup_hook: Optional[Union[Callable, str]] = None,
config: Optional[Union[Dict, RuntimeEnvConfig]] = None,
_validate: bool = True,
**kwargs,
@@ -303,8 +303,8 @@ def __init__(
runtime_env["env_vars"] = env_vars
if config is not None:
runtime_env["config"] = config
if worker_setup_hook is not None:
runtime_env["worker_setup_hook"] = worker_setup_hook
if worker_process_setup_hook is not None:
runtime_env["worker_process_setup_hook"] = worker_process_setup_hook

if runtime_env.get("java_jars"):
runtime_env["java_jars"] = runtime_env.get("java_jars")
14 changes: 8 additions & 6 deletions python/ray/tests/test_runtime_env_setup_func.py
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ def configure_logging(level: int):
ray.init(
num_cpus=1,
runtime_env={
"worker_setup_hook": lambda: configure_logging(logging.DEBUG),
"worker_process_setup_hook": lambda: configure_logging(logging.DEBUG),
"env_vars": {"ABC": "123"},
},
)
@@ -53,10 +53,12 @@ def get_env_var(self, key):
# ray.get(
# f.options(
# runtime_env={
# "worker_setup_hook": lambda: configure_logging(logging.INFO)}
# "worker_process_setup_hook": lambda: configure_logging(logging.INFO)}
# ).remote("INFO"))
# a = Actor.optinos(
# runtime_env={"worker_setup_hook": lambda: configure_logging(logging.INFO)}
# runtime_env={
# "worker_process_setup_hook": lambda: configure_logging(logging.INFO)
# }
# ).remote("INFO")
# assert ray.get(a.__ray_ready__.remote())

@@ -88,7 +90,7 @@ def setup():
ray.init(
num_cpus=1,
runtime_env={
"worker_setup_hook": setup,
"worker_process_setup_hook": setup,
},
)

@@ -114,7 +116,7 @@ class A:
ray.init(
num_cpus=0,
runtime_env={
"worker_setup_hook": lambda: print(lock),
"worker_process_setup_hook": lambda: print(lock),
},
)
assert "Failed to export the setup function." in str(e.value)
@@ -130,7 +132,7 @@ def setup_func():
ray.init(
num_cpus=1,
runtime_env={
"worker_setup_hook": setup_func,
"worker_process_setup_hook": setup_func,
},
)

0 comments on commit 4ebe568

Please sign in to comment.