Skip to content

Commit

Permalink
[core] [3/N] Use uv as package manager (ray-project#48611)
Browse files Browse the repository at this point in the history
This PR integrates `uv` into existing runtime env system, which uses
`uv` to setup environment for better performance.

TODO list:
- [ ] Add `uv` to public documentation

---------

Signed-off-by: dentiny <dentinyhao@gmail.com>
Co-authored-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
2 people authored and JP-sDEV committed Nov 14, 2024
1 parent 0ede323 commit 1713260
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 15 deletions.
3 changes: 3 additions & 0 deletions python/ray/_private/runtime_env/BUILD
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# TODO(hjiang): All existing pythons are not using bazel as build system, which leads to missing BUILD file and targets.
# Revisit if we decide to support bazel build in the future.

load("@rules_python//python:defs.bzl", "py_library", "py_test")

package(default_visibility = ["//visibility:public"])
Expand Down
3 changes: 3 additions & 0 deletions python/ray/_private/runtime_env/agent/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ray._private.runtime_env.java_jars import JavaJarsPlugin
from ray._private.runtime_env.image_uri import ContainerPlugin
from ray._private.runtime_env.pip import PipPlugin
from ray._private.runtime_env.uv import UvPlugin
from ray._private.gcs_utils import GcsAioClient
from ray._private.runtime_env.plugin import (
RuntimeEnvPlugin,
Expand Down Expand Up @@ -206,6 +207,7 @@ def __init__(
)

self._pip_plugin = PipPlugin(self._runtime_env_dir)
self._uv_plugin = UvPlugin(self._runtime_env_dir)
self._conda_plugin = CondaPlugin(self._runtime_env_dir)
self._py_modules_plugin = PyModulesPlugin(
self._runtime_env_dir, self._gcs_aio_client
Expand All @@ -228,6 +230,7 @@ def __init__(
# self._xxx_plugin, we should just iterate through self._plugins.
self._base_plugins: List[RuntimeEnvPlugin] = [
self._working_dir_plugin,
self._uv_plugin,
self._pip_plugin,
self._conda_plugin,
self._py_modules_plugin,
Expand Down
1 change: 1 addition & 0 deletions python/ray/_private/runtime_env/packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __new__(cls, value, doc=None):
GCS = "gcs", "For packages dynamically uploaded and managed by the GCS."
CONDA = "conda", "For conda environments installed locally on each node."
PIP = "pip", "For pip environments installed locally on each node."
UV = "uv", "For uv environments install locally on each node."
HTTPS = "https", "Remote https path, assumes everything packed in one zip file."
S3 = "s3", "Remote s3 path, assumes everything packed in one zip file."
GS = "gs", "Remote google storage path, assumes everything packed in one zip file."
Expand Down
160 changes: 157 additions & 3 deletions python/ray/_private/runtime_env/uv.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,50 @@
"""Util class to install packages via uv.
"""

# TODO(hjiang): Implement `UvPlugin`, which is the counterpart for `PipPlugin`.

from typing import Dict, List, Optional
from asyncio import get_running_loop
import os
import hashlib
from ray._private.runtime_env import virtualenv_utils
from ray._private.runtime_env import dependency_utils
from ray._private.runtime_env.utils import check_output_cmd
from ray._private.runtime_env.plugin import RuntimeEnvPlugin
from ray._private.runtime_env.packaging import Protocol, parse_uri
from asyncio import create_task, get_running_loop
import shutil
import logging
import json
import asyncio
import sys
from ray._private.utils import try_to_create_directory, get_directory_size_bytes

default_logger = logging.getLogger(__name__)


def _get_uv_hash(uv_dict: Dict) -> str:
"""Get a deterministic hash value for `uv` related runtime envs."""
serialized_uv_spec = json.dumps(uv_dict, sort_keys=True)
hash_val = hashlib.sha1(serialized_uv_spec.encode("utf-8")).hexdigest()
return hash_val


def get_uri(runtime_env: Dict) -> Optional[str]:
"""Return `"uv://<hashed_dependencies>"`, or None if no GC required."""
uv = runtime_env.get("uv")
if uv is not None:
if isinstance(uv, dict):
uri = "uv://" + _get_uv_hash(uv_dict=uv)
elif isinstance(uv, list):
uri = "uv://" + _get_uv_hash(uv_dict=dict(packages=uv))
else:
raise TypeError(
"uv field received by RuntimeEnvAgent must be "
f"list or dict, not {type(uv).__name__}."
)
else:
uri = None
return uri


class UvProcessor:
def __init__(
self,
Expand Down Expand Up @@ -131,3 +160,128 @@ async def _run(self):

def __await__(self):
return self._run().__await__()


class UvPlugin(RuntimeEnvPlugin):
name = "uv"

def __init__(self, resources_dir: str):
self._uv_resource_dir = os.path.join(resources_dir, "uv")
self._creating_task = {}
# Maps a URI to a lock that is used to prevent multiple concurrent
# installs of the same virtualenv, see #24513
self._create_locks: Dict[str, asyncio.Lock] = {}
# Key: created hashes. Value: size of the uv dir.
self._created_hash_bytes: Dict[str, int] = {}
try_to_create_directory(self._uv_resource_dir)

def _get_path_from_hash(self, hash_val: str) -> str:
"""Generate a path from the hash of a uv spec.
Example output:
/tmp/ray/session_2021-11-03_16-33-59_356303_41018/runtime_resources
/uv/ray-9a7972c3a75f55e976e620484f58410c920db091
"""
return os.path.join(self._uv_resource_dir, hash_val)

def get_uris(self, runtime_env: "RuntimeEnv") -> List[str]: # noqa: F821
"""Return the uv URI from the RuntimeEnv if it exists, else return []."""
uv_uri = runtime_env.uv_uri()
if uv_uri:
return [uv_uri]
return []

def delete_uri(
self, uri: str, logger: Optional[logging.Logger] = default_logger
) -> int:
"""Delete URI and return the number of bytes deleted."""
logger.info("Got request to delete uv URI %s", uri)
protocol, hash_val = parse_uri(uri)
if protocol != Protocol.UV:
raise ValueError(
"UvPlugin can only delete URIs with protocol "
f"uv. Received protocol {protocol}, URI {uri}"
)

# Cancel running create task.
task = self._creating_task.pop(hash_val, None)
if task is not None:
task.cancel()

del self._created_hash_bytes[hash_val]

uv_env_path = self._get_path_from_hash(hash_val)
local_dir_size = get_directory_size_bytes(uv_env_path)
del self._create_locks[uri]
try:
shutil.rmtree(uv_env_path)
except OSError as e:
logger.warning(f"Error when deleting uv env {uv_env_path}: {str(e)}")
return 0

return local_dir_size

async def create(
self,
uri: str,
runtime_env: "RuntimeEnv", # noqa: F821
context: "RuntimeEnvContext", # noqa: F821
logger: Optional[logging.Logger] = default_logger,
) -> int:
if not runtime_env.has_uv():
return 0

protocol, hash_val = parse_uri(uri)
target_dir = self._get_path_from_hash(hash_val)

async def _create_for_hash():
await UvProcessor(
target_dir,
runtime_env,
logger,
)

loop = get_running_loop()
return await loop.run_in_executor(
None, get_directory_size_bytes, target_dir
)

if uri not in self._create_locks:
# async lock to prevent the same virtualenv being concurrently installed
self._create_locks[uri] = asyncio.Lock()

async with self._create_locks[uri]:
if hash_val in self._created_hash_bytes:
return self._created_hash_bytes[hash_val]
self._creating_task[hash_val] = task = create_task(_create_for_hash())
task.add_done_callback(lambda _: self._creating_task.pop(hash_val, None))
uv_dir_bytes = await task
self._created_hash_bytes[hash_val] = uv_dir_bytes
return uv_dir_bytes

def modify_context(
self,
uris: List[str],
runtime_env: "RuntimeEnv", # noqa: F821
context: "RuntimeEnvContext", # noqa: F821
logger: logging.Logger = default_logger,
):
if not runtime_env.has_uv():
return
# UvPlugin only uses a single URI.
uri = uris[0]
# Update py_executable.
protocol, hash_val = parse_uri(uri)
target_dir = self._get_path_from_hash(hash_val)
virtualenv_python = virtualenv_utils.get_virtualenv_python(target_dir)

if not os.path.exists(virtualenv_python):
raise ValueError(
f"Local directory {target_dir} for URI {uri} does "
"not exist on the cluster. Something may have gone wrong while "
"installing the runtime_env `uv` packages."
)
context.py_executable = virtualenv_python
context.command_prefix += virtualenv_utils.get_virtualenv_activate_command(
target_dir
)
3 changes: 1 addition & 2 deletions python/ray/_private/runtime_env/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,14 +337,13 @@ def parse_and_validate_env_vars(env_vars: Dict[str, str]) -> Optional[Dict[str,

# Dictionary mapping runtime_env options with the function to parse and
# validate them.
#
# TODO(hjiang): Expose `uv` related validation after implementation finished.
OPTION_TO_VALIDATION_FN = {
"py_modules": parse_and_validate_py_modules,
"working_dir": parse_and_validate_working_dir,
"excludes": parse_and_validate_excludes,
"conda": parse_and_validate_conda,
"pip": parse_and_validate_pip,
"uv": parse_and_validate_uv,
"env_vars": parse_and_validate_env_vars,
"container": parse_and_validate_container,
}
45 changes: 35 additions & 10 deletions python/ray/runtime_env/runtime_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ray._private.runtime_env.conda import get_uri as get_conda_uri
from ray._private.runtime_env.pip import get_uri as get_pip_uri
from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager
from ray._private.runtime_env.uv import get_uri as get_uv_uri
from ray._private.runtime_env.validation import OPTION_TO_VALIDATION_FN
from ray._private.thirdparty.dacite import from_dict
from ray.core.generated.runtime_env_common_pb2 import (
Expand Down Expand Up @@ -148,7 +149,6 @@ def to_dict(self) -> Dict:
] = RuntimeEnvConfig.parse_and_validate_runtime_env_config


# TODO(hjiang): Expose `uv` related fields after implementation finished.
@PublicAPI
class RuntimeEnv(dict):
"""This class is used to define a runtime environment for a job, task,
Expand Down Expand Up @@ -237,6 +237,8 @@ class MyClass:
the package name "pip" in front of the ``pip_version`` to form the final
requirement string, the syntax of a requirement specifier is defined in
full in PEP 508.
uv: Either a list of pip packages, or a python dictionary that has one field:
1) ``packages`` (required, List[str]).
conda: Either the conda YAML config, the name of a
local conda env (e.g., "pytorch_p36"), or the path to a conda
environment.yaml file.
Expand Down Expand Up @@ -272,6 +274,7 @@ class MyClass:
"working_dir",
"conda",
"pip",
"uv",
"container",
"excludes",
"env_vars",
Expand Down Expand Up @@ -311,6 +314,7 @@ def __init__(
_validate: bool = True,
mpi: Optional[Dict] = None,
image_uri: Optional[str] = None,
uv: Optional[List[str]] = None,
**kwargs,
):
super().__init__()
Expand All @@ -322,6 +326,8 @@ def __init__(
runtime_env["working_dir"] = working_dir
if pip is not None:
runtime_env["pip"] = pip
if uv is not None:
runtime_env["uv"] = uv
if conda is not None:
runtime_env["conda"] = conda
if nsight is not None:
Expand Down Expand Up @@ -349,16 +355,18 @@ def __init__(
if not _validate:
return

if self.get("conda") and self.get("pip"):
if (self.get("conda") is not None) + (self.get("pip") is not None) + (
self.get("uv") is not None
) > 1:
raise ValueError(
"The 'pip' field and 'conda' field of "
"runtime_env cannot both be specified.\n"
f"specified pip field: {self['pip']}\n"
f"specified conda field: {self['conda']}\n"
"To use pip with conda, please only set the 'conda' "
"field, and specify your pip dependencies "
"within the conda YAML config dict: see "
"https://conda.io/projects/conda/en/latest/"
"The 'pip' field, 'uv' field, and 'conda' field of "
"runtime_env cannot be specified at the same time.\n"
f"specified pip field: {self.get('pip')}\n"
f"specified conda field: {self.get('conda')}\n"
f"specified uv field: {self.get('uv')}\n"
"To use pip with conda, please only set the 'conda'"
"field, and specify your pip dependencies within the conda YAML "
"config dict: see https://conda.io/projects/conda/en/latest/"
"user-guide/tasks/manage-environments.html"
"#create-env-file-manually"
)
Expand Down Expand Up @@ -472,6 +480,11 @@ def pip_uri(self) -> Optional[str]:
return get_pip_uri(self)
return None

def uv_uri(self) -> Optional[str]:
if "uv" in self:
return get_uv_uri(self)
return None

def plugin_uris(self) -> List[str]:
"""Not implemented yet, always return a empty list"""
return []
Expand Down Expand Up @@ -518,6 +531,11 @@ def has_pip(self) -> bool:
return True
return False

def has_uv(self) -> bool:
if self.get("uv"):
return True
return False

def virtualenv_name(self) -> Optional[str]:
if not self.has_pip() or not isinstance(self["pip"], str):
return None
Expand All @@ -530,6 +548,13 @@ def pip_config(self) -> Dict:
self["pip"] = self["pip"]
return self["pip"]

def uv_config(self) -> Dict:
if not self.has_uv() or isinstance(self["uv"], str):
return {}
# Parse and validate field pip on method `__setitem__`
self["uv"] = self["uv"]
return self["uv"]

def get_extension(self, key) -> Optional[str]:
if key not in RuntimeEnv.extensions_fields:
raise ValueError(
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ py_test_module_list(
"test_runtime_env_conda_and_pip_3.py",
"test_runtime_env_conda_and_pip_4.py",
"test_runtime_env_conda_and_pip_5.py",
"test_runtime_env_uv.py",
],
size = "large",
tags = ["exclusive", "post_wheel_build", "team:core"],
Expand Down
Loading

0 comments on commit 1713260

Please sign in to comment.