diff --git a/python/ray/_private/runtime_env/BUILD b/python/ray/_private/runtime_env/BUILD index 4ba4f80d5b9e..34edfab1f290 100644 --- a/python/ray/_private/runtime_env/BUILD +++ b/python/ray/_private/runtime_env/BUILD @@ -1,3 +1,6 @@ +# TODO(hjiang): All existing pythons are not using bazel as build system, which leads to missing BUILD file and targets. +# Revisit if we decide to support bazel build in the future. + load("@rules_python//python:defs.bzl", "py_library", "py_test") package(default_visibility = ["//visibility:public"]) diff --git a/python/ray/_private/runtime_env/agent/runtime_env_agent.py b/python/ray/_private/runtime_env/agent/runtime_env_agent.py index 58c332fb3077..9118aa33d266 100644 --- a/python/ray/_private/runtime_env/agent/runtime_env_agent.py +++ b/python/ray/_private/runtime_env/agent/runtime_env_agent.py @@ -18,6 +18,7 @@ from ray._private.runtime_env.java_jars import JavaJarsPlugin from ray._private.runtime_env.image_uri import ContainerPlugin from ray._private.runtime_env.pip import PipPlugin +from ray._private.runtime_env.uv import UvPlugin from ray._private.gcs_utils import GcsAioClient from ray._private.runtime_env.plugin import ( RuntimeEnvPlugin, @@ -206,6 +207,7 @@ def __init__( ) self._pip_plugin = PipPlugin(self._runtime_env_dir) + self._uv_plugin = UvPlugin(self._runtime_env_dir) self._conda_plugin = CondaPlugin(self._runtime_env_dir) self._py_modules_plugin = PyModulesPlugin( self._runtime_env_dir, self._gcs_aio_client @@ -228,6 +230,7 @@ def __init__( # self._xxx_plugin, we should just iterate through self._plugins. self._base_plugins: List[RuntimeEnvPlugin] = [ self._working_dir_plugin, + self._uv_plugin, self._pip_plugin, self._conda_plugin, self._py_modules_plugin, diff --git a/python/ray/_private/runtime_env/packaging.py b/python/ray/_private/runtime_env/packaging.py index dd9212556c47..279c40139484 100644 --- a/python/ray/_private/runtime_env/packaging.py +++ b/python/ray/_private/runtime_env/packaging.py @@ -87,6 +87,7 @@ def __new__(cls, value, doc=None): GCS = "gcs", "For packages dynamically uploaded and managed by the GCS." CONDA = "conda", "For conda environments installed locally on each node." PIP = "pip", "For pip environments installed locally on each node." + UV = "uv", "For uv environments install locally on each node." HTTPS = "https", "Remote https path, assumes everything packed in one zip file." S3 = "s3", "Remote s3 path, assumes everything packed in one zip file." GS = "gs", "Remote google storage path, assumes everything packed in one zip file." diff --git a/python/ray/_private/runtime_env/uv.py b/python/ray/_private/runtime_env/uv.py index 5d0c5db980c9..3a934e3d69ad 100644 --- a/python/ray/_private/runtime_env/uv.py +++ b/python/ray/_private/runtime_env/uv.py @@ -1,21 +1,50 @@ """Util class to install packages via uv. """ -# TODO(hjiang): Implement `UvPlugin`, which is the counterpart for `PipPlugin`. - from typing import Dict, List, Optional -from asyncio import get_running_loop import os +import hashlib from ray._private.runtime_env import virtualenv_utils from ray._private.runtime_env import dependency_utils from ray._private.runtime_env.utils import check_output_cmd +from ray._private.runtime_env.plugin import RuntimeEnvPlugin +from ray._private.runtime_env.packaging import Protocol, parse_uri +from asyncio import create_task, get_running_loop import shutil import logging +import json +import asyncio import sys +from ray._private.utils import try_to_create_directory, get_directory_size_bytes default_logger = logging.getLogger(__name__) +def _get_uv_hash(uv_dict: Dict) -> str: + """Get a deterministic hash value for `uv` related runtime envs.""" + serialized_uv_spec = json.dumps(uv_dict, sort_keys=True) + hash_val = hashlib.sha1(serialized_uv_spec.encode("utf-8")).hexdigest() + return hash_val + + +def get_uri(runtime_env: Dict) -> Optional[str]: + """Return `"uv://"`, or None if no GC required.""" + uv = runtime_env.get("uv") + if uv is not None: + if isinstance(uv, dict): + uri = "uv://" + _get_uv_hash(uv_dict=uv) + elif isinstance(uv, list): + uri = "uv://" + _get_uv_hash(uv_dict=dict(packages=uv)) + else: + raise TypeError( + "uv field received by RuntimeEnvAgent must be " + f"list or dict, not {type(uv).__name__}." + ) + else: + uri = None + return uri + + class UvProcessor: def __init__( self, @@ -131,3 +160,128 @@ async def _run(self): def __await__(self): return self._run().__await__() + + +class UvPlugin(RuntimeEnvPlugin): + name = "uv" + + def __init__(self, resources_dir: str): + self._uv_resource_dir = os.path.join(resources_dir, "uv") + self._creating_task = {} + # Maps a URI to a lock that is used to prevent multiple concurrent + # installs of the same virtualenv, see #24513 + self._create_locks: Dict[str, asyncio.Lock] = {} + # Key: created hashes. Value: size of the uv dir. + self._created_hash_bytes: Dict[str, int] = {} + try_to_create_directory(self._uv_resource_dir) + + def _get_path_from_hash(self, hash_val: str) -> str: + """Generate a path from the hash of a uv spec. + + Example output: + /tmp/ray/session_2021-11-03_16-33-59_356303_41018/runtime_resources + /uv/ray-9a7972c3a75f55e976e620484f58410c920db091 + """ + return os.path.join(self._uv_resource_dir, hash_val) + + def get_uris(self, runtime_env: "RuntimeEnv") -> List[str]: # noqa: F821 + """Return the uv URI from the RuntimeEnv if it exists, else return [].""" + uv_uri = runtime_env.uv_uri() + if uv_uri: + return [uv_uri] + return [] + + def delete_uri( + self, uri: str, logger: Optional[logging.Logger] = default_logger + ) -> int: + """Delete URI and return the number of bytes deleted.""" + logger.info("Got request to delete uv URI %s", uri) + protocol, hash_val = parse_uri(uri) + if protocol != Protocol.UV: + raise ValueError( + "UvPlugin can only delete URIs with protocol " + f"uv. Received protocol {protocol}, URI {uri}" + ) + + # Cancel running create task. + task = self._creating_task.pop(hash_val, None) + if task is not None: + task.cancel() + + del self._created_hash_bytes[hash_val] + + uv_env_path = self._get_path_from_hash(hash_val) + local_dir_size = get_directory_size_bytes(uv_env_path) + del self._create_locks[uri] + try: + shutil.rmtree(uv_env_path) + except OSError as e: + logger.warning(f"Error when deleting uv env {uv_env_path}: {str(e)}") + return 0 + + return local_dir_size + + async def create( + self, + uri: str, + runtime_env: "RuntimeEnv", # noqa: F821 + context: "RuntimeEnvContext", # noqa: F821 + logger: Optional[logging.Logger] = default_logger, + ) -> int: + if not runtime_env.has_uv(): + return 0 + + protocol, hash_val = parse_uri(uri) + target_dir = self._get_path_from_hash(hash_val) + + async def _create_for_hash(): + await UvProcessor( + target_dir, + runtime_env, + logger, + ) + + loop = get_running_loop() + return await loop.run_in_executor( + None, get_directory_size_bytes, target_dir + ) + + if uri not in self._create_locks: + # async lock to prevent the same virtualenv being concurrently installed + self._create_locks[uri] = asyncio.Lock() + + async with self._create_locks[uri]: + if hash_val in self._created_hash_bytes: + return self._created_hash_bytes[hash_val] + self._creating_task[hash_val] = task = create_task(_create_for_hash()) + task.add_done_callback(lambda _: self._creating_task.pop(hash_val, None)) + uv_dir_bytes = await task + self._created_hash_bytes[hash_val] = uv_dir_bytes + return uv_dir_bytes + + def modify_context( + self, + uris: List[str], + runtime_env: "RuntimeEnv", # noqa: F821 + context: "RuntimeEnvContext", # noqa: F821 + logger: logging.Logger = default_logger, + ): + if not runtime_env.has_uv(): + return + # UvPlugin only uses a single URI. + uri = uris[0] + # Update py_executable. + protocol, hash_val = parse_uri(uri) + target_dir = self._get_path_from_hash(hash_val) + virtualenv_python = virtualenv_utils.get_virtualenv_python(target_dir) + + if not os.path.exists(virtualenv_python): + raise ValueError( + f"Local directory {target_dir} for URI {uri} does " + "not exist on the cluster. Something may have gone wrong while " + "installing the runtime_env `uv` packages." + ) + context.py_executable = virtualenv_python + context.command_prefix += virtualenv_utils.get_virtualenv_activate_command( + target_dir + ) diff --git a/python/ray/_private/runtime_env/validation.py b/python/ray/_private/runtime_env/validation.py index ac478df59203..86e4eee3d2e1 100644 --- a/python/ray/_private/runtime_env/validation.py +++ b/python/ray/_private/runtime_env/validation.py @@ -337,14 +337,13 @@ def parse_and_validate_env_vars(env_vars: Dict[str, str]) -> Optional[Dict[str, # Dictionary mapping runtime_env options with the function to parse and # validate them. -# -# TODO(hjiang): Expose `uv` related validation after implementation finished. OPTION_TO_VALIDATION_FN = { "py_modules": parse_and_validate_py_modules, "working_dir": parse_and_validate_working_dir, "excludes": parse_and_validate_excludes, "conda": parse_and_validate_conda, "pip": parse_and_validate_pip, + "uv": parse_and_validate_uv, "env_vars": parse_and_validate_env_vars, "container": parse_and_validate_container, } diff --git a/python/ray/runtime_env/runtime_env.py b/python/ray/runtime_env/runtime_env.py index ab91e04ef8cd..20ebc1793c4e 100644 --- a/python/ray/runtime_env/runtime_env.py +++ b/python/ray/runtime_env/runtime_env.py @@ -10,6 +10,7 @@ from ray._private.runtime_env.conda import get_uri as get_conda_uri from ray._private.runtime_env.pip import get_uri as get_pip_uri from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager +from ray._private.runtime_env.uv import get_uri as get_uv_uri from ray._private.runtime_env.validation import OPTION_TO_VALIDATION_FN from ray._private.thirdparty.dacite import from_dict from ray.core.generated.runtime_env_common_pb2 import ( @@ -148,7 +149,6 @@ def to_dict(self) -> Dict: ] = RuntimeEnvConfig.parse_and_validate_runtime_env_config -# TODO(hjiang): Expose `uv` related fields after implementation finished. @PublicAPI class RuntimeEnv(dict): """This class is used to define a runtime environment for a job, task, @@ -237,6 +237,8 @@ class MyClass: the package name "pip" in front of the ``pip_version`` to form the final requirement string, the syntax of a requirement specifier is defined in full in PEP 508. + uv: Either a list of pip packages, or a python dictionary that has one field: + 1) ``packages`` (required, List[str]). conda: Either the conda YAML config, the name of a local conda env (e.g., "pytorch_p36"), or the path to a conda environment.yaml file. @@ -272,6 +274,7 @@ class MyClass: "working_dir", "conda", "pip", + "uv", "container", "excludes", "env_vars", @@ -311,6 +314,7 @@ def __init__( _validate: bool = True, mpi: Optional[Dict] = None, image_uri: Optional[str] = None, + uv: Optional[List[str]] = None, **kwargs, ): super().__init__() @@ -322,6 +326,8 @@ def __init__( runtime_env["working_dir"] = working_dir if pip is not None: runtime_env["pip"] = pip + if uv is not None: + runtime_env["uv"] = uv if conda is not None: runtime_env["conda"] = conda if nsight is not None: @@ -349,16 +355,19 @@ def __init__( if not _validate: return - if self.get("conda") and self.get("pip"): + use_conda_install = int(self.get("conda") is not None) + use_pip_install = int(self.get("pip") is not None) + use_uv_install = int(self.get("uv") is not None) + if use_conda_install + use_pip_install + use_uv_install > 1: raise ValueError( - "The 'pip' field and 'conda' field of " - "runtime_env cannot both be specified.\n" - f"specified pip field: {self['pip']}\n" - f"specified conda field: {self['conda']}\n" - "To use pip with conda, please only set the 'conda' " - "field, and specify your pip dependencies " - "within the conda YAML config dict: see " - "https://conda.io/projects/conda/en/latest/" + "The 'pip' field, 'uv' field, and 'conda' field of " + "runtime_env cannot be specified at the same time.\n" + f"specified pip field: {self.get('pip')}\n" + f"specified conda field: {self.get('conda')}\n" + f"specified conda field: {self.get('uv')}\n" + "To use pip with conda or uv, please only set the 'conda' or 'uv'" + "field, and specify your pip dependencies within the conda YAML " + "config dict: see https://conda.io/projects/conda/en/latest/" "user-guide/tasks/manage-environments.html" "#create-env-file-manually" ) @@ -472,6 +481,11 @@ def pip_uri(self) -> Optional[str]: return get_pip_uri(self) return None + def uv_uri(self) -> Optional[str]: + if "uv" in self: + return get_uv_uri(self) + return None + def plugin_uris(self) -> List[str]: """Not implemented yet, always return a empty list""" return [] @@ -518,6 +532,11 @@ def has_pip(self) -> bool: return True return False + def has_uv(self) -> bool: + if self.get("uv"): + return True + return False + def virtualenv_name(self) -> Optional[str]: if not self.has_pip() or not isinstance(self["pip"], str): return None @@ -530,6 +549,13 @@ def pip_config(self) -> Dict: self["pip"] = self["pip"] return self["pip"] + def uv_config(self) -> Dict: + if not self.has_uv() or isinstance(self["uv"], str): + return {} + # Parse and validate field pip on method `__setitem__` + self["uv"] = self["uv"] + return self["uv"] + def get_extension(self, key) -> Optional[str]: if key not in RuntimeEnv.extensions_fields: raise ValueError( diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 300279a883b8..016827389604 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -572,6 +572,7 @@ py_test_module_list( "test_runtime_env_conda_and_pip_3.py", "test_runtime_env_conda_and_pip_4.py", "test_runtime_env_conda_and_pip_5.py", + "test_runtime_uv.py", ], size = "large", tags = ["exclusive", "post_wheel_build", "team:core"], diff --git a/python/ray/tests/test_runtime_uv.py b/python/ray/tests/test_runtime_uv.py new file mode 100644 index 000000000000..780d952f0caf --- /dev/null +++ b/python/ray/tests/test_runtime_uv.py @@ -0,0 +1,64 @@ +# TODO(hjiang): A few unit tests to add after full functionality implemented. +# 1. Install specialized version of `uv`. +# 2. Options for `uv install`. +# 3. Use requirement files for packages. + +import os +import pytest +import sys + +from ray._private.runtime_env import virtualenv_utils +import ray + + +def test_uv_install_in_virtualenv(start_cluster): + assert ( + virtualenv_utils.is_in_virtualenv() is False + and "IN_VIRTUALENV" not in os.environ + ) or (virtualenv_utils.is_in_virtualenv() is True and "IN_VIRTUALENV" in os.environ) + cluster, address = start_cluster + runtime_env = {"pip": ["pip-install-test==0.5"]} + + ray.init(address, runtime_env=runtime_env) + + @ray.remote + def f(): + import pip_install_test # noqa: F401 + + return virtualenv_utils.is_in_virtualenv() + + # Ensure that the runtime env has been installed and virtualenv is activated. + assert ray.get(f.remote()) + + +# Package installation succeeds. +def test_package_install_with_uv(): + @ray.remote(runtime_env={"uv": {"packages": ["pip-install-test==0.5"]}}) + def f(): + import pip + + return pip.__version__ + + assert ray.get(f.remote()) == "24.1.2" + + +# Package installation fails due to conflict versions. +def test_package_install_has_conflict_with_uv(): + # moto require requests>=2.5 + conflict_packages = ["moto==3.0.5", "requests==2.4.0"] + + @ray.remote(runtime_env={"uv": {"packages": conflict_packages}}) + def f(): + import pip + + return pip.__version__ + + with pytest.raises(ray.exceptions.RuntimeEnvSetupError) as _: + ray.get(f.remote()) + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__]))