Skip to content

Commit

Permalink
[serve] Move default runtime_env configuration into build_app (ray-…
Browse files Browse the repository at this point in the history
…project#49697)

## Why are these changes needed?

State was being leaked across calls to `serve.run` due to in-place
mutation within `get_deploy_args`.

I've moved the logic into `build_app` and added associated unit tests.
Also added an integration test matching the original bug report.

## Related issue number

Closes ray-project#49074

---------

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Signed-off-by: Roshan Kathawate <roshankathawate@gmail.com>
  • Loading branch information
edoakes authored and roshankathawate committed Jan 9, 2025
1 parent 415cabf commit b027afa
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 17 deletions.
6 changes: 5 additions & 1 deletion python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,11 @@ def build_serve_application(
)

deploy_args_list = []
built_app: BuiltApplication = build_app(app, name=name)
built_app: BuiltApplication = build_app(
app,
name=name,
default_runtime_env=ray.get_runtime_context().runtime_env,
)
for deployment in built_app.deployments:
is_ingress = deployment.name == built_app.ingress_deployment_name
deploy_args_list.append(
Expand Down
35 changes: 34 additions & 1 deletion python/ray/serve/_private/build_app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from copy import deepcopy
from dataclasses import dataclass
from typing import Callable, Dict, Generic, List, Optional, TypeVar
from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar

from ray.dag.py_obj_scanner import _PyObjScanner
from ray.serve._private.constants import SERVE_LOGGER_NAME
Expand Down Expand Up @@ -68,6 +69,7 @@ def build_app(
app: Application,
*,
name: str,
default_runtime_env: Optional[Dict[str, Any]] = None,
make_deployment_handle: Optional[
Callable[[Deployment, str], DeploymentHandle]
] = None,
Expand All @@ -92,6 +94,7 @@ def build_app(
app_name=name,
handles=handles,
deployment_names=deployment_names,
default_runtime_env=default_runtime_env,
make_deployment_handle=make_deployment_handle,
)
return BuiltApplication(
Expand All @@ -110,6 +113,7 @@ def _build_app_recursive(
app_name: str,
deployment_names: IDDict[Application, str],
handles: IDDict[Application, DeploymentHandle],
default_runtime_env: Optional[Dict[str, Any]] = None,
make_deployment_handle: Callable[[Deployment, str], DeploymentHandle],
) -> List[Deployment]:
"""Recursively traverses the graph of Application objects.
Expand Down Expand Up @@ -141,6 +145,7 @@ def _build_app_recursive(
handles=handles,
deployment_names=deployment_names,
make_deployment_handle=make_deployment_handle,
default_runtime_env=default_runtime_env,
)
)

Expand All @@ -151,6 +156,9 @@ def _build_app_recursive(
_init_args=new_init_args,
_init_kwargs=new_init_kwargs,
)
final_deployment = _set_default_runtime_env(
final_deployment, default_runtime_env
)

# Create the DeploymentHandle that will be used to replace this application
# in the arguments of its parent(s).
Expand All @@ -164,6 +172,31 @@ def _build_app_recursive(
scanner.clear()


def _set_default_runtime_env(
d: Deployment, default_runtime_env: Optional[Dict[str, Any]]
) -> Deployment:
"""Configures the deployment with the provided default runtime_env.
If the deployment does not have a runtime_env configured, the default will be set.
If it does have a runtime_env configured but that runtime_env does not have a
working_dir, only the working_dir field will be set.
Else the deployment's runtime_env will be left untouched.
"""
if not default_runtime_env:
return d

ray_actor_options = deepcopy(d.ray_actor_options or {})
default_working_dir = default_runtime_env.get("working_dir", None)
if "runtime_env" not in ray_actor_options:
ray_actor_options["runtime_env"] = default_runtime_env
elif default_working_dir is not None:
ray_actor_options["runtime_env"].setdefault("working_dir", default_working_dir)

return d.options(ray_actor_options=ray_actor_options)


def _get_unique_deployment_name_memoized(
app: Application, deployment_names: IDDict[Application, str]
) -> str:
Expand Down
11 changes: 0 additions & 11 deletions python/ray/serve/_private/deploy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,9 @@ def get_deploy_args(
Takes a deployment's configuration, and returns the arguments needed
for the controller to deploy it.
"""

if deployment_config is None:
deployment_config = {}

curr_job_env = ray.get_runtime_context().runtime_env
if "runtime_env" in replica_config.ray_actor_options:
# It is illegal to set field working_dir to None.
if curr_job_env.get("working_dir") is not None:
replica_config.ray_actor_options["runtime_env"].setdefault(
"working_dir", curr_job_env.get("working_dir")
)
else:
replica_config.ray_actor_options["runtime_env"] = curr_job_env

if isinstance(deployment_config, dict):
deployment_config = DeploymentConfig.parse_obj(deployment_config)
elif not isinstance(deployment_config, DeploymentConfig):
Expand Down
6 changes: 5 additions & 1 deletion python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,11 @@ def _run(
# Record after Ray has been started.
ServeUsageTag.API_VERSION.record("v2")
handle = client.deploy_application(
build_app(target, name=name),
build_app(
target,
name=name,
default_runtime_env=ray.get_runtime_context().runtime_env,
),
blocking=_blocking,
route_prefix=route_prefix,
logging_config=logging_config,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def __eq__(self, other):
self._replica_config.init_args == other._replica_config.init_args,
self._replica_config.init_kwargs == other._replica_config.init_kwargs,
self._replica_config.ray_actor_options
== self._replica_config.ray_actor_options,
== other._replica_config.ray_actor_options,
]
)

Expand Down
1 change: 0 additions & 1 deletion python/ray/serve/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ def autoscaling_app():
"health_check_period_s": 10.0,
"health_check_timeout_s": 30.0,
"ray_actor_options": {
"runtime_env": {},
"num_cpus": 1.0,
},
},
Expand Down
24 changes: 24 additions & 0 deletions python/ray/serve/tests/test_standalone_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,5 +567,29 @@ def __call__(self):
assert all_serve_logs.count("Deleting app 'default'") == 1


def test_job_runtime_env_not_leaked(shutdown_ray): # noqa: F811
"""https://github.com/ray-project/ray/issues/49074"""

@serve.deployment
class D:
async def __call__(self) -> str:
return os.environ["KEY"]

app = D.bind()

# Initialize Ray with a runtime_env, should get picked up by the app.
ray.init(runtime_env={"env_vars": {"KEY": "VAL1"}})
h = serve.run(app)
assert h.remote().result() == "VAL1"
serve.shutdown()
ray.shutdown()

# Re-initialize Ray with a different runtime_env, check that the updated one
# is picked up by the app.
ray.init(runtime_env={"env_vars": {"KEY": "VAL2"}})
h = serve.run(app)
assert h.remote().result() == "VAL2"


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))
161 changes: 160 additions & 1 deletion python/ray/serve/tests/unit/test_build_app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sys
from typing import Any, List
from typing import Any, Dict, List, Optional

import pytest

Expand Down Expand Up @@ -31,13 +31,15 @@ def _build_and_check(
expected_ingress_name: str,
expected_deployments: List[Deployment],
app_name: str = "default",
default_runtime_env: Optional[Dict[str, Any]] = None,
):
built_app: BuiltApplication = build_app(
app,
name=app_name,
# Each real DeploymentHandle has a unique ID (intentionally), so the below
# equality checks don't work. Use a fake implementation instead.
make_deployment_handle=FakeDeploymentHandle.from_deployment,
default_runtime_env=default_runtime_env,
)
assert built_app.name == app_name
assert built_app.ingress_deployment_name == expected_ingress_name
Expand Down Expand Up @@ -306,5 +308,162 @@ class Outer:
)


def test_default_runtime_env():
@serve.deployment
class NoRayActorOptions:
pass

@serve.deployment(ray_actor_options={"num_cpus": 0, "num_gpus": 1})
class NoRuntimeEnv:
pass

@serve.deployment(ray_actor_options={"runtime_env": {"env_vars": {"ENV": "VAR"}}})
class RuntimeEnvNoWorkingDir:
pass

@serve.deployment(
ray_actor_options={"runtime_env": {"working_dir": "s3://test.zip"}}
)
class RuntimeEnvWithWorkingDir:
pass

@serve.deployment
class Outer:
pass

app = Outer.bind(
NoRayActorOptions.bind(),
NoRuntimeEnv.bind(),
RuntimeEnvNoWorkingDir.bind(),
RuntimeEnvWithWorkingDir.bind(),
)

handles = tuple(
FakeDeploymentHandle(name, app_name="default")
for name in [
"NoRayActorOptions",
"NoRuntimeEnv",
"RuntimeEnvNoWorkingDir",
"RuntimeEnvWithWorkingDir",
]
)

# 1) Test behavior when no default_runtime_env is passed.
_build_and_check(
app,
expected_ingress_name="Outer",
expected_deployments=[
Outer.options(name="Outer", _init_args=handles, _init_kwargs={}),
NoRayActorOptions.options(
name="NoRayActorOptions", _init_args=tuple(), _init_kwargs={}
),
NoRuntimeEnv.options(
name="NoRuntimeEnv", _init_args=tuple(), _init_kwargs={}
),
RuntimeEnvNoWorkingDir.options(
name="RuntimeEnvNoWorkingDir", _init_args=tuple(), _init_kwargs={}
),
RuntimeEnvWithWorkingDir.options(
name="RuntimeEnvWithWorkingDir", _init_args=tuple(), _init_kwargs={}
),
],
)

# 2) Test behavior when a default_runtime_env is passed without a working_dir.
default_runtime_env = {"env_vars": {"DEFAULT": "ENV"}}
_build_and_check(
app,
expected_ingress_name="Outer",
expected_deployments=[
Outer.options(
name="Outer",
_init_args=handles,
_init_kwargs={},
ray_actor_options={"num_cpus": 1, "runtime_env": default_runtime_env},
),
NoRayActorOptions.options(
name="NoRayActorOptions",
_init_args=tuple(),
_init_kwargs={},
ray_actor_options={"num_cpus": 1, "runtime_env": default_runtime_env},
),
NoRuntimeEnv.options(
name="NoRuntimeEnv",
_init_args=tuple(),
_init_kwargs={},
ray_actor_options={
"num_cpus": 0,
"num_gpus": 1,
"runtime_env": default_runtime_env,
},
),
# ray_actor_options shouldn't be affected.
RuntimeEnvNoWorkingDir.options(
name="RuntimeEnvNoWorkingDir", _init_args=tuple(), _init_kwargs={}
),
# ray_actor_options shouldn't be affected.
RuntimeEnvWithWorkingDir.options(
name="RuntimeEnvWithWorkingDir", _init_args=tuple(), _init_kwargs={}
),
],
default_runtime_env=default_runtime_env,
)

# 3) Test behavior when a default_runtime_env is passed with a working_dir.
default_runtime_env = {
"working_dir": "s3://default.zip",
"env_vars": {"DEFAULT": "ENV"},
}
_build_and_check(
app,
expected_ingress_name="Outer",
expected_deployments=[
Outer.options(
name="Outer",
_init_args=handles,
_init_kwargs={},
ray_actor_options={"num_cpus": 1, "runtime_env": default_runtime_env},
),
NoRayActorOptions.options(
name="NoRayActorOptions",
_init_args=tuple(),
_init_kwargs={},
ray_actor_options={
"num_cpus": 1,
"runtime_env": default_runtime_env,
},
),
NoRuntimeEnv.options(
name="NoRuntimeEnv",
_init_args=tuple(),
_init_kwargs={},
ray_actor_options={
"num_cpus": 0,
"num_gpus": 1,
"runtime_env": default_runtime_env,
},
),
# Only the working_dir field should be overridden.
RuntimeEnvNoWorkingDir.options(
name="RuntimeEnvNoWorkingDir",
_init_args=tuple(),
_init_kwargs={},
ray_actor_options={
"num_cpus": 1,
"runtime_env": {
"working_dir": "s3://default.zip",
**RuntimeEnvNoWorkingDir.ray_actor_options["runtime_env"],
},
},
),
# ray_actor_options shouldn't be affected.
RuntimeEnvWithWorkingDir.options(
name="RuntimeEnvWithWorkingDir", _init_args=tuple(), _init_kwargs={}
),
],
default_runtime_env=default_runtime_env,
)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))

0 comments on commit b027afa

Please sign in to comment.