From 97443106180e53dc63905c47e238d5c10309cf9e Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Thu, 16 Feb 2023 09:27:25 -0800 Subject: [PATCH] [tune] Remove deprecated Resources class (#32490) The `tune.resources.Resources` class has been deprecated in favor of PlacementGroupFactory for a long time. This PR removes the old Resources class (raising an error when trying to use it) in our internal code. We keep a private copy for the resources updater for now (to use the add/subtract semantics). Signed-off-by: Kai Fricke --- .../xgboost_dynamic_resources_example.py | 5 +- python/ray/tune/execution/placement_groups.py | 11 +- python/ray/tune/experiment/config_parser.py | 11 +- python/ray/tune/experiment/trial.py | 92 +++---- python/ray/tune/resources.py | 226 ++---------------- .../schedulers/resource_changing_scheduler.py | 11 +- python/ray/tune/tests/test_api.py | 5 +- .../ray/tune/tests/test_ray_trial_executor.py | 3 +- .../ray/tune/tests/test_resource_updater.py | 51 +++- python/ray/tune/tests/test_trial_runner.py | 18 +- python/ray/tune/tests/test_trial_runner_2.py | 25 +- python/ray/tune/tests/test_trial_runner_3.py | 56 +---- python/ray/tune/tests/test_trial_scheduler.py | 5 +- python/ray/tune/tests/test_tune_server.py | 5 +- .../ray/tune/trainable/function_trainable.py | 7 +- python/ray/tune/trainable/session.py | 3 +- python/ray/tune/trainable/trainable.py | 11 +- python/ray/tune/trainable/util.py | 3 +- python/ray/tune/utils/resource_updater.py | 215 ++++++++++++++++- 19 files changed, 373 insertions(+), 390 deletions(-) diff --git a/python/ray/tune/examples/xgboost_dynamic_resources_example.py b/python/ray/tune/examples/xgboost_dynamic_resources_example.py index 8e8cda72f93f..e08be5cf400d 100644 --- a/python/ray/tune/examples/xgboost_dynamic_resources_example.py +++ b/python/ray/tune/examples/xgboost_dynamic_resources_example.py @@ -1,4 +1,4 @@ -from typing import Union, Dict, Any, Optional +from typing import Dict, Any, Optional import sklearn.datasets import sklearn.metrics import os @@ -11,7 +11,6 @@ from ray import air, tune from ray.tune.schedulers import ResourceChangingScheduler, ASHAScheduler from ray.tune import Trainable -from ray.tune.resources import Resources from ray.tune.execution.placement_groups import PlacementGroupFactory from ray.tune.experiment import Trial from ray.tune.execution import trial_runner @@ -167,7 +166,7 @@ def example_resources_allocation_function( trial: Trial, result: Dict[str, Any], scheduler: "ResourceChangingScheduler", - ) -> Optional[Union[PlacementGroupFactory, Resources]]: + ) -> Optional[PlacementGroupFactory]: """This is a basic example of a resource allocating function. The function naively balances available CPUs over live trials. diff --git a/python/ray/tune/execution/placement_groups.py b/python/ray/tune/execution/placement_groups.py index 12f820e5df6b..1b9b1f93503b 100644 --- a/python/ray/tune/execution/placement_groups.py +++ b/python/ray/tune/execution/placement_groups.py @@ -1,7 +1,6 @@ import warnings from typing import Dict, Optional from ray.air.execution.resources.request import ResourceRequest -from ray.tune.resources import Resources from ray.util.annotations import DeveloperAPI, PublicAPI from ray.util.placement_group import placement_group @@ -103,21 +102,23 @@ def __call__(self, *args, **kwargs): @DeveloperAPI -def resource_dict_to_pg_factory(spec: Optional[Dict[str, float]]): +def resource_dict_to_pg_factory(spec: Optional[Dict[str, float]] = None): """Translates resource dict into PlacementGroupFactory.""" spec = spec or {"cpu": 1} - if isinstance(spec, Resources): - spec = spec._asdict() - spec = spec.copy() cpus = spec.pop("cpu", spec.pop("CPU", 0.0)) gpus = spec.pop("gpu", spec.pop("GPU", 0.0)) memory = spec.pop("memory", 0.0) + # If there is a custom_resources key, use as base for bundle bundle = {k: v for k, v in spec.pop("custom_resources", {}).items()} + # Otherwise, consider all other keys as custom resources + if not bundle: + bundle = spec + bundle.update( { "CPU": cpus, diff --git a/python/ray/tune/experiment/config_parser.py b/python/ray/tune/experiment/config_parser.py index 8dfd8e770555..c163a125131c 100644 --- a/python/ray/tune/experiment/config_parser.py +++ b/python/ray/tune/experiment/config_parser.py @@ -10,7 +10,6 @@ from ray.tune.experiment import Trial from ray.tune.resources import json_to_resources from ray.tune.syncer import SyncConfig, Syncer -from ray.tune.execution.placement_groups import PlacementGroupFactory from ray.tune.utils.util import SafeFallbackEncoder @@ -198,15 +197,7 @@ def _create_trial_from_spec( raise TuneError("Error parsing args, see above message", spec) if resources: - if isinstance(resources, PlacementGroupFactory): - trial_kwargs["placement_group_factory"] = resources - else: - # This will be converted to a placement group factory in the - # Trial object constructor - try: - trial_kwargs["resources"] = json_to_resources(resources) - except (TuneError, ValueError) as exc: - raise TuneError("Error parsing resources_per_trial", resources) from exc + trial_kwargs["placement_group_factory"] = resources experiment_dir_name = spec.get("experiment_dir_name") diff --git a/python/ray/tune/experiment/trial.py b/python/ray/tune/experiment/trial.py index 2b4edcf8c4c3..1f5b74af2e49 100644 --- a/python/ray/tune/experiment/trial.py +++ b/python/ray/tune/experiment/trial.py @@ -35,7 +35,6 @@ TRIAL_ID, DEBUG_METRICS, ) -from ray.tune.resources import Resources from ray.tune.syncer import SyncConfig, Syncer from ray.tune.execution.placement_groups import ( PlacementGroupFactory, @@ -167,11 +166,11 @@ def trial_id(self): return self._trial_id @property - def trial_resources(self) -> Union[Resources, PlacementGroupFactory]: + def trial_resources(self) -> PlacementGroupFactory: return self._trial_resources @trial_resources.setter - def trial_resources(self, new_resources: Union[Resources, PlacementGroupFactory]): + def trial_resources(self, new_resources: PlacementGroupFactory): self._trial_resources = new_resources @@ -187,23 +186,6 @@ def _create_unique_logdir_name(root: str, relative_logdir: str) -> str: return relative_logdir -def _to_pg_factory( - resources: Optional[Resources], - placement_group_factory: Optional[PlacementGroupFactory], -) -> PlacementGroupFactory: - """Outputs resources requirement in the form of PGF. - - In case that `placement_group_factory` is None, `resources` will be - converted to PGF. If this is unsuccessful, an error will be raised. - - """ - if not placement_group_factory: - if not resources: - resources = Resources(cpu=1, gpu=0) - placement_group_factory = resource_dict_to_pg_factory(resources) - return placement_group_factory - - @DeveloperAPI class Trial: """A trial object holds the state for one model training run. @@ -259,7 +241,6 @@ def __init__( local_dir: Optional[str] = DEFAULT_RESULTS_DIR, evaluated_params: Optional[Dict] = None, experiment_tag: str = "", - resources: Optional[Resources] = None, placement_group_factory: Optional[PlacementGroupFactory] = None, stopping_criterion: Optional[Dict[str, float]] = None, experiment_dir_name: Optional[str] = None, @@ -309,7 +290,14 @@ def __init__( self.stopping_criterion = stopping_criterion or {} self._setup_default_resource = _setup_default_resource - self._resources = resources + + if placement_group_factory and not isinstance( + placement_group_factory, PlacementGroupFactory + ): + placement_group_factory = resource_dict_to_pg_factory( + placement_group_factory + ) + self._default_placement_group_factory = placement_group_factory # Will be created in create_placement_group_factory(). self.placement_group_factory = None @@ -412,8 +400,8 @@ def create_placement_group_factory(self): trainable_cls = self.get_trainable_cls() if not trainable_cls or not self._setup_default_resource: # Create placement group factory using default resources. - self.placement_group_factory = _to_pg_factory( - self._resources, self._default_placement_group_factory + self.placement_group_factory = ( + self._default_placement_group_factory or resource_dict_to_pg_factory() ) return @@ -421,28 +409,27 @@ def create_placement_group_factory(self): # If Trainable returns resources, do not allow manual override via # `resources_per_trial` by the user. - if default_resources: - if self._resources or self._default_placement_group_factory: - raise ValueError( - "Resources for {} have been automatically set to {} " - "by its `default_resource_request()` method. Please " - "clear the `resources_per_trial` option.".format( - trainable_cls, default_resources - ) + if default_resources and self._default_placement_group_factory: + raise TuneError( + "Resources for {} have been automatically set to {} " + "by its `default_resource_request()` method. Please " + "clear the `resources_per_trial` option.".format( + trainable_cls, default_resources ) + ) - if isinstance(default_resources, PlacementGroupFactory): - default_placement_group_factory = default_resources - resources = None - else: - default_placement_group_factory = None - resources = default_resources - else: - default_placement_group_factory = self._default_placement_group_factory - resources = self._resources - - self.placement_group_factory = _to_pg_factory( - resources, default_placement_group_factory + if default_resources and not isinstance( + default_resources, PlacementGroupFactory + ): + default_resources = resource_dict_to_pg_factory(default_resources) + + self.placement_group_factory = ( + # default_resource_request + default_resources + # resources_per_trial + or self._default_placement_group_factory + # cpu=1 + or resource_dict_to_pg_factory() ) def _get_default_result_or_future(self) -> Optional[dict]: @@ -638,7 +625,6 @@ def reset(self): local_dir=self.local_dir, evaluated_params=self.evaluated_params, experiment_tag=self.experiment_tag, - resources=None, placement_group_factory=placement_group_factory, stopping_criterion=self.stopping_criterion, sync_config=self.sync_config, @@ -663,9 +649,7 @@ def init_logdir(self): self.invalidate_json_state() - def update_resources( - self, resources: Union[Dict, Resources, PlacementGroupFactory] - ): + def update_resources(self, resources: Union[dict, PlacementGroupFactory]): """EXPERIMENTAL: Updates the resource requirements. Should only be called when the trial is not running. @@ -676,15 +660,11 @@ def update_resources( if self.status is Trial.RUNNING: raise ValueError("Cannot update resources while Trial is running.") - placement_group_factory = None - if isinstance(resources, PlacementGroupFactory): - placement_group_factory = resources - elif isinstance(resources, dict): - resources = Resources(**resources) + placement_group_factory = resources + if isinstance(resources, dict): + placement_group_factory = resource_dict_to_pg_factory(resources) - self.placement_group_factory = _to_pg_factory( - resources, placement_group_factory - ) + self.placement_group_factory = placement_group_factory self.invalidate_json_state() diff --git a/python/ray/tune/resources.py b/python/ray/tune/resources.py index 0ad2bf1d3b1f..c45a5ce3d50c 100644 --- a/python/ray/tune/resources.py +++ b/python/ray/tune/resources.py @@ -1,14 +1,17 @@ from collections import namedtuple import logging import json -from numbers import Number # For compatibility under py2 to consider unicode as str from typing import Optional +from ray.tune.execution.placement_groups import ( + resource_dict_to_pg_factory, + PlacementGroupFactory, +) +from ray.tune.utils.resource_updater import _Resources from ray.util.annotations import Deprecated, DeveloperAPI -from ray._private.resource_spec import NODE_ID_PREFIX from ray.tune import TuneError logger = logging.getLogger(__name__) @@ -33,32 +36,6 @@ class Resources( ], ) ): - """Ray resources required to schedule a trial. - - Parameters: - cpu: Number of CPUs to allocate to the trial. - gpu: Number of GPUs to allocate to the trial. - memory: Memory to reserve for the trial. - object_store_memory: Object store memory to reserve. - extra_cpu: Extra CPUs to reserve in case the trial needs to - launch additional Ray actors that use CPUs. - extra_gpu: Extra GPUs to reserve in case the trial needs to - launch additional Ray actors that use GPUs. - extra_memory: Memory to reserve for the trial launching - additional Ray actors that use memory. - extra_object_store_memory: Object store memory to reserve for - the trial launching additional Ray actors that use object store - memory. - custom_resources: Mapping of resource to quantity to allocate - to the trial. - extra_custom_resources: Extra custom resources to reserve in - case the trial needs to launch additional Ray actors that use - any of these custom resources. - has_placement_group: Bool indicating if the trial also - has an associated placement group. - - """ - __slots__ = () def __new__( @@ -75,159 +52,13 @@ def __new__( extra_custom_resources: Optional[dict] = None, has_placement_group: bool = False, ): - custom_resources = custom_resources or {} - extra_custom_resources = extra_custom_resources or {} - leftovers = set(custom_resources) ^ set(extra_custom_resources) - - for value in leftovers: - custom_resources.setdefault(value, 0) - extra_custom_resources.setdefault(value, 0) - - cpu = round(cpu, 2) - gpu = round(gpu, 2) - memory = round(memory, 2) - object_store_memory = round(object_store_memory, 2) - extra_cpu = round(extra_cpu, 2) - extra_gpu = round(extra_gpu, 2) - extra_memory = round(extra_memory, 2) - extra_object_store_memory = round(extra_object_store_memory, 2) - custom_resources = { - resource: round(value, 2) for resource, value in custom_resources.items() - } - extra_custom_resources = { - resource: round(value, 2) - for resource, value in extra_custom_resources.items() - } - - all_values = [ - cpu, - gpu, - memory, - object_store_memory, - extra_cpu, - extra_gpu, - extra_memory, - extra_object_store_memory, - ] - all_values += list(custom_resources.values()) - all_values += list(extra_custom_resources.values()) - assert len(custom_resources) == len(extra_custom_resources) - for entry in all_values: - assert isinstance(entry, Number), ("Improper resource value.", entry) - return super(Resources, cls).__new__( - cls, - cpu, - gpu, - memory, - object_store_memory, - extra_cpu, - extra_gpu, - extra_memory, - extra_object_store_memory, - custom_resources, - extra_custom_resources, - has_placement_group, - ) - - def summary_string(self): - summary = "{} CPUs, {} GPUs".format( - self.cpu + self.extra_cpu, self.gpu + self.extra_gpu + raise DeprecationWarning( + "tune.Resources is depracted. Use tune.PlacementGroupFactory instead." ) - if self.memory or self.extra_memory: - summary += ", {} GiB heap".format( - round((self.memory + self.extra_memory) / (1024**3), 2) - ) - if self.object_store_memory or self.extra_object_store_memory: - summary += ", {} GiB objects".format( - round( - (self.object_store_memory + self.extra_object_store_memory) - / (1024**3), - 2, - ) - ) - custom_summary = ", ".join( - [ - "{} {}".format(self.get_res_total(res), res) - for res in self.custom_resources - if not res.startswith(NODE_ID_PREFIX) - ] - ) - if custom_summary: - summary += " ({})".format(custom_summary) - return summary - - def cpu_total(self): - return self.cpu + self.extra_cpu - - def gpu_total(self): - return self.gpu + self.extra_gpu - - def memory_total(self): - return self.memory + self.extra_memory - - def object_store_memory_total(self): - return self.object_store_memory + self.extra_object_store_memory - - def get_res_total(self, key): - return self.custom_resources.get(key, 0) + self.extra_custom_resources.get( - key, 0 - ) - - def get(self, key): - return self.custom_resources.get(key, 0) - - def is_nonnegative(self): - all_values = [self.cpu, self.gpu, self.extra_cpu, self.extra_gpu] - all_values += list(self.custom_resources.values()) - all_values += list(self.extra_custom_resources.values()) - return all(v >= 0 for v in all_values) - - @classmethod - def subtract(cls, original, to_remove): - cpu = original.cpu - to_remove.cpu - gpu = original.gpu - to_remove.gpu - memory = original.memory - to_remove.memory - object_store_memory = ( - original.object_store_memory - to_remove.object_store_memory - ) - extra_cpu = original.extra_cpu - to_remove.extra_cpu - extra_gpu = original.extra_gpu - to_remove.extra_gpu - extra_memory = original.extra_memory - to_remove.extra_memory - extra_object_store_memory = ( - original.extra_object_store_memory - to_remove.extra_object_store_memory - ) - all_resources = set(original.custom_resources).union( - set(to_remove.custom_resources) - ) - new_custom_res = { - k: original.custom_resources.get(k, 0) - - to_remove.custom_resources.get(k, 0) - for k in all_resources - } - extra_custom_res = { - k: original.extra_custom_resources.get(k, 0) - - to_remove.extra_custom_resources.get(k, 0) - for k in all_resources - } - return Resources( - cpu, - gpu, - memory, - object_store_memory, - extra_cpu, - extra_gpu, - extra_memory, - extra_object_store_memory, - new_custom_res, - extra_custom_res, - ) - - def to_json(self): - return resources_to_json(self) @DeveloperAPI -def json_to_resources(data: Optional[str]): +def json_to_resources(data: Optional[str]) -> Optional[PlacementGroupFactory]: if data is None or data == "null": return None if isinstance(data, str): @@ -239,39 +70,24 @@ def json_to_resources(data: Optional[str]): "The field `{}` is no longer supported. Use `extra_cpu` " "or `extra_gpu` instead.".format(k) ) - if k not in Resources._fields: + if k not in _Resources._fields: raise ValueError( "Unknown resource field {}, must be one of {}".format( k, Resources._fields ) ) - return Resources( - data.get("cpu", 1), - data.get("gpu", 0), - data.get("memory", 0), - data.get("object_store_memory", 0), - data.get("extra_cpu", 0), - data.get("extra_gpu", 0), - data.get("extra_memory", 0), - data.get("extra_object_store_memory", 0), - data.get("custom_resources"), - data.get("extra_custom_resources"), + resource_dict_to_pg_factory( + dict( + cpu=data.get("cpu", 1), + gpu=data.get("gpu", 0), + memory=data.get("memory", 0), + custom_resources=data.get("custom_resources"), + ) ) -@DeveloperAPI -def resources_to_json(resources: Optional[Resources]): - if resources is None: - return None - return { - "cpu": resources.cpu, - "gpu": resources.gpu, - "memory": resources.memory, - "object_store_memory": resources.object_store_memory, - "extra_cpu": resources.extra_cpu, - "extra_gpu": resources.extra_gpu, - "extra_memory": resources.extra_memory, - "extra_object_store_memory": resources.extra_object_store_memory, - "custom_resources": resources.custom_resources.copy(), - "extra_custom_resources": resources.extra_custom_resources.copy(), - } +@Deprecated +def resources_to_json(*args, **kwargs): + raise DeprecationWarning( + "tune.Resources is depracted. Use tune.PlacementGroupFactory instead." + ) diff --git a/python/ray/tune/schedulers/resource_changing_scheduler.py b/python/ray/tune/schedulers/resource_changing_scheduler.py index ad54af1e1afa..2004de5ffbd4 100644 --- a/python/ray/tune/schedulers/resource_changing_scheduler.py +++ b/python/ray/tune/schedulers/resource_changing_scheduler.py @@ -9,7 +9,6 @@ from ray.air.execution.resources.request import _sum_bundles from ray.util.annotations import PublicAPI from ray.tune.execution import trial_runner -from ray.tune.resources import Resources from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler from ray.tune.experiment import Trial from ray.tune.execution.placement_groups import PlacementGroupFactory @@ -625,7 +624,7 @@ class ResourceChangingScheduler(TrialScheduler): The callable must take four arguments: ``TrialRunner``, current ``Trial``, current result :class:`dict` and the ``ResourceChangingScheduler`` calling it. The callable must - return a ``PlacementGroupFactory``, ``Resources``, :class:`dict` + return a ``PlacementGroupFactory`` or None (signifying no need for an update). If ``resources_allocation_function`` is None, no resource requirements will be changed at any time. @@ -673,7 +672,7 @@ def __init__( Dict[str, Any], "ResourceChangingScheduler", ], - Optional[Union[PlacementGroupFactory, Resources]], + Optional[PlacementGroupFactory], ] ] = _DistributeResourcesDefault, ) -> None: @@ -686,9 +685,7 @@ def __init__( ) self._resources_allocation_function = resources_allocation_function self._base_scheduler = base_scheduler or FIFOScheduler() - self._base_trial_resources: Optional[ - Union[Resources, PlacementGroupFactory] - ] = None + self._base_trial_resources: Optional[PlacementGroupFactory] = None self._trials_to_reallocate: Dict[ Trial, Optional[Union[dict, PlacementGroupFactory]] ] = {} @@ -701,7 +698,7 @@ def metric(self): return self._base_scheduler._metric @property - def base_trial_resources(self) -> Optional[Union[Resources, PlacementGroupFactory]]: + def base_trial_resources(self) -> Optional[PlacementGroupFactory]: return self._base_trial_resources def set_search_properties( diff --git a/python/ray/tune/tests/test_api.py b/python/ray/tune/tests/test_api.py index 3e75ebfad7da..2b2c3cd263b3 100644 --- a/python/ray/tune/tests/test_api.py +++ b/python/ray/tune/tests/test_api.py @@ -32,7 +32,6 @@ from ray.tune.trainable import wrap_function from ray.tune.logger import Logger, LegacyLoggerCallback from ray.tune.execution.ray_trial_executor import _noop_logger_creator -from ray.tune.resources import Resources from ray.tune.result import ( TIMESTEPS_TOTAL, DONE, @@ -260,7 +259,9 @@ def testBuiltInTrainableResources(self): class B(Trainable): @classmethod def default_resource_request(cls, config): - return Resources(cpu=config["cpu"], gpu=config["gpu"]) + return PlacementGroupFactory( + [{"CPU": config["cpu"], "GPU": config["gpu"]}] + ) def step(self): return {"timesteps_this_iter": 1, "done": True} diff --git a/python/ray/tune/tests/test_ray_trial_executor.py b/python/ray/tune/tests/test_ray_trial_executor.py index 457b23df8133..66e9152f8406 100644 --- a/python/ray/tune/tests/test_ray_trial_executor.py +++ b/python/ray/tune/tests/test_ray_trial_executor.py @@ -21,7 +21,6 @@ from ray.tune.result import PID, TRAINING_ITERATION, TRIAL_ID from ray.tune.search import BasicVariantGenerator from ray.tune.experiment import Trial -from ray.tune.resources import Resources from ray.cluster_utils import Cluster from ray.tune.execution.placement_groups import PlacementGroupFactory @@ -233,7 +232,7 @@ def testSavePauseResumeErrorRestore(self): def testStartFailure(self): _global_registry.register(TRAINABLE_CLASS, "asdf", None) - trial = _make_trial("asdf", resources=Resources(1, 0)) + trial = _make_trial("asdf") self.trial_executor.start_trial(trial) self.assertEqual(Trial.ERROR, trial.status) diff --git a/python/ray/tune/tests/test_resource_updater.py b/python/ray/tune/tests/test_resource_updater.py index 3396ddfa8775..7497ae072e4f 100644 --- a/python/ray/tune/tests/test_resource_updater.py +++ b/python/ray/tune/tests/test_resource_updater.py @@ -1,6 +1,55 @@ import ray from ray.tests.conftest import * # noqa -from ray.tune.utils.resource_updater import _ResourceUpdater +from ray.tune.utils.resource_updater import _ResourceUpdater, _Resources + + +def test_resources_numerical_error(): + resource = _Resources(cpu=0.99, gpu=0.99, custom_resources={"a": 0.99}) + small_resource = _Resources(cpu=0.33, gpu=0.33, custom_resources={"a": 0.33}) + for i in range(3): + resource = _Resources.subtract(resource, small_resource) + assert resource.is_nonnegative() + + +def test_resources_subtraction(): + resource_1 = _Resources( + 1, + 0, + 0, + 1, + custom_resources={"a": 1, "b": 2}, + extra_custom_resources={"a": 1, "b": 1}, + ) + resource_2 = _Resources( + 1, + 0, + 0, + 1, + custom_resources={"a": 1, "b": 2}, + extra_custom_resources={"a": 1, "b": 1}, + ) + new_res = _Resources.subtract(resource_1, resource_2) + assert new_res.cpu == 0 + assert new_res.gpu == 0 + assert new_res.extra_cpu == 0 + assert new_res.extra_gpu == 0 + + assert all(k == 0 for k in new_res.custom_resources.values()) + assert all(k == 0 for k in new_res.extra_custom_resources.values()) + + +def test_resources_different(): + resource_1 = _Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2}) + resource_2 = _Resources(1, 0, 0, 1, custom_resources={"a": 1, "c": 2}) + new_res = _Resources.subtract(resource_1, resource_2) + assert "c" in new_res.custom_resources + assert "b" in new_res.custom_resources + + assert new_res.cpu == 0 + assert new_res.gpu == 0 + assert new_res.extra_cpu == 0 + assert new_res.extra_gpu == 0 + assert new_res.get("a") == 0 def test_resource_updater(ray_start_cluster): diff --git a/python/ray/tune/tests/test_trial_runner.py b/python/ray/tune/tests/test_trial_runner.py index 55417c4013e8..4d9b41103b3a 100644 --- a/python/ray/tune/tests/test_trial_runner.py +++ b/python/ray/tune/tests/test_trial_runner.py @@ -9,7 +9,6 @@ from ray import tune from ray.tune import TuneError, register_trainable from ray.tune.execution.ray_trial_executor import RayTrialExecutor -from ray.tune.resources import Resources from ray.tune.schedulers import TrialScheduler, FIFOScheduler from ray.tune.search import BasicVariantGenerator from ray.tune.experiment import Trial @@ -133,7 +132,7 @@ def testFractionalGpus(self): trial_executor=RayTrialExecutor(resource_manager=self._resourceManager()) ) kwargs = { - "resources": Resources(cpu=1, gpu=0.5), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 0.5}]), } trials = [ Trial("__fake", **kwargs), @@ -152,18 +151,11 @@ def testFractionalGpus(self): self.assertEqual(trials[2].status, Trial.PENDING) self.assertEqual(trials[3].status, Trial.PENDING) - def testResourceNumericalError(self): - resource = Resources(cpu=0.99, gpu=0.99, custom_resources={"a": 0.99}) - small_resource = Resources(cpu=0.33, gpu=0.33, custom_resources={"a": 0.33}) - for i in range(3): - resource = Resources.subtract(resource, small_resource) - self.assertTrue(resource.is_nonnegative()) - def testResourceScheduler(self): ray.init(num_cpus=4, num_gpus=1) kwargs = { "stopping_criterion": {"training_iteration": 1}, - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), } trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] @@ -185,7 +177,7 @@ def testMultiStepRun(self): ray.init(num_cpus=4, num_gpus=2) kwargs = { "stopping_criterion": {"training_iteration": 5}, - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), } trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] snapshot = TrialStatusSnapshot() @@ -209,7 +201,7 @@ def testMultiStepRun2(self): ) kwargs = { "stopping_criterion": {"training_iteration": 2}, - "resources": Resources(cpu=1, gpu=0), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 0}]), } trials = [Trial("__fake", **kwargs)] for t in trials: @@ -247,7 +239,7 @@ def on_trial_result(self, trial_runner, trial, result): ) kwargs = { "stopping_criterion": {"training_iteration": 2}, - "resources": Resources(cpu=1, gpu=0), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 0}]), } trials = [Trial("__fake", **kwargs)] for t in trials: diff --git a/python/ray/tune/tests/test_trial_runner_2.py b/python/ray/tune/tests/test_trial_runner_2.py index 8363c0a813d3..484d27085d36 100644 --- a/python/ray/tune/tests/test_trial_runner_2.py +++ b/python/ray/tune/tests/test_trial_runner_2.py @@ -11,14 +11,13 @@ from ray.air.execution import PlacementGroupResourceManager, FixedResourceManager from ray.rllib import _register_all -from ray.tune import TuneError +from ray.tune import TuneError, PlacementGroupFactory from ray.tune.execution.ray_trial_executor import RayTrialExecutor from ray.tune.schedulers import FIFOScheduler from ray.tune.result import DONE from ray.tune.registry import _global_registry, TRAINABLE_CLASS from ray.tune.experiment import Trial from ray.tune.execution.trial_runner import TrialRunner -from ray.tune.resources import Resources from ray.tune.search import BasicVariantGenerator from ray.tune.tests.tune_test_util import TrialResultObserver from ray.tune.trainable.util import TrainableUtil @@ -61,7 +60,7 @@ def testErrorHandling(self): ) kwargs = { "stopping_criterion": {"training_iteration": 1}, - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), } _global_registry.register(TRAINABLE_CLASS, "asdf", None) trials = [Trial("asdf", **kwargs), Trial("__fake", **kwargs)] @@ -94,7 +93,7 @@ def testFailureRecoveryDisabled(self): trial_executor=RayTrialExecutor(resource_manager=self._resourceManager()), ) kwargs = { - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), "checkpoint_config": CheckpointConfig(checkpoint_frequency=1), "max_failures": 0, "config": { @@ -124,7 +123,7 @@ def testFailureRecoveryEnabled(self): kwargs = { "stopping_criterion": {"training_iteration": 2}, - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), "checkpoint_config": CheckpointConfig(checkpoint_frequency=1), "max_failures": 1, "config": { @@ -150,7 +149,7 @@ def testFailureRecoveryMaxFailures(self): trial_executor=RayTrialExecutor(resource_manager=self._resourceManager()) ) kwargs = { - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), "checkpoint_config": CheckpointConfig(checkpoint_frequency=1), "max_failures": 2, "config": { @@ -173,7 +172,7 @@ def testFailFast(self): trial_executor=RayTrialExecutor(resource_manager=self._resourceManager()), ) kwargs = { - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), "checkpoint_config": CheckpointConfig(checkpoint_frequency=1), "max_failures": 0, "config": { @@ -200,7 +199,7 @@ def testFailFastRaise(self): trial_executor=RayTrialExecutor(resource_manager=self._resourceManager()), ) kwargs = { - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), "checkpoint_config": CheckpointConfig(checkpoint_frequency=1), "max_failures": 0, "config": { @@ -228,7 +227,7 @@ def testCheckpointing(self): ) kwargs = { "stopping_criterion": {"training_iteration": 1}, - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), "checkpoint_config": CheckpointConfig(checkpoint_frequency=1), } runner.add_trial(Trial("__fake", **kwargs)) @@ -264,7 +263,7 @@ def testRestoreMetricsAfterCheckpointing(self): ) kwargs = { "stopping_criterion": {"training_iteration": 2}, - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), "checkpoint_config": CheckpointConfig(checkpoint_frequency=1), } runner.add_trial(Trial("__fake", **kwargs)) @@ -304,7 +303,7 @@ def testCheckpointingAtEnd(self): kwargs = { "stopping_criterion": {"training_iteration": 2}, "checkpoint_config": CheckpointConfig(checkpoint_at_end=True), - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), } runner.add_trial(Trial("__fake", **kwargs)) trials = runner.get_trials() @@ -322,7 +321,7 @@ def testResultDone(self): ) kwargs = { "stopping_criterion": {"training_iteration": 2}, - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), } runner.add_trial(Trial("__fake", **kwargs)) trials = runner.get_trials() @@ -338,7 +337,7 @@ def testPauseThenResume(self): ) kwargs = { "stopping_criterion": {"training_iteration": 2}, - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), } runner.add_trial(Trial("__fake", **kwargs)) trials = runner.get_trials() diff --git a/python/ray/tune/tests/test_trial_runner_3.py b/python/ray/tune/tests/test_trial_runner_3.py index 954f18a429d9..02f95591a45f 100644 --- a/python/ray/tune/tests/test_trial_runner_3.py +++ b/python/ray/tune/tests/test_trial_runner_3.py @@ -17,7 +17,7 @@ from ray.rllib import _register_all from ray.rllib.algorithms.callbacks import DefaultCallbacks -from ray.tune import TuneError +from ray.tune import TuneError, PlacementGroupFactory from ray.tune.execution.ray_trial_executor import RayTrialExecutor from ray.tune.impl.placeholder import create_resolvers_map, inject_placeholders from ray.tune.result import TRAINING_ITERATION @@ -28,7 +28,6 @@ from ray.tune.search.variant_generator import grid_search from ray.tune.experiment import Trial from ray.tune.execution.trial_runner import TrialRunner -from ray.tune.resources import Resources, json_to_resources, resources_to_json from ray.tune.search.repeater import Repeater from ray.tune.search._mock import _MockSuggestionAlgorithm from ray.tune.search import Searcher, ConcurrencyLimiter @@ -94,7 +93,7 @@ def on_step_end(self, search_ended: bool = False): kwargs = { "stopping_criterion": {"training_iteration": 5}, - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), } runner.add_trial(Trial("__fake", **kwargs)) runner.step() @@ -108,7 +107,7 @@ def testStopTrial(self): ) kwargs = { "stopping_criterion": {"training_iteration": 5}, - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), } trials = [ Trial("__fake", **kwargs), @@ -500,7 +499,7 @@ def testTrialErrorResumeFalse(self): ) kwargs = { "stopping_criterion": {"training_iteration": 4}, - "resources": Resources(cpu=1, gpu=0), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 0}]), } trials = [ Trial("__fake", config={"mock_error": True}, **kwargs), @@ -534,7 +533,7 @@ def testTrialErrorResumeTrue(self): ) kwargs = { "stopping_criterion": {"training_iteration": 4}, - "resources": Resources(cpu=1, gpu=0), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 0}]), } trials = [ Trial("__fake", config={"mock_error": True}, **kwargs), @@ -1455,51 +1454,6 @@ def set_max_concurrency(self, max_concurrent: int) -> bool: assert limiter.suggest("test_3")["score"] == 3 -class ResourcesTest(unittest.TestCase): - def testSubtraction(self): - resource_1 = Resources( - 1, - 0, - 0, - 1, - custom_resources={"a": 1, "b": 2}, - extra_custom_resources={"a": 1, "b": 1}, - ) - resource_2 = Resources( - 1, - 0, - 0, - 1, - custom_resources={"a": 1, "b": 2}, - extra_custom_resources={"a": 1, "b": 1}, - ) - new_res = Resources.subtract(resource_1, resource_2) - self.assertTrue(new_res.cpu == 0) - self.assertTrue(new_res.gpu == 0) - self.assertTrue(new_res.extra_cpu == 0) - self.assertTrue(new_res.extra_gpu == 0) - self.assertTrue(all(k == 0 for k in new_res.custom_resources.values())) - self.assertTrue(all(k == 0 for k in new_res.extra_custom_resources.values())) - - def testDifferentResources(self): - resource_1 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2}) - resource_2 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "c": 2}) - new_res = Resources.subtract(resource_1, resource_2) - assert "c" in new_res.custom_resources - assert "b" in new_res.custom_resources - self.assertTrue(new_res.cpu == 0) - self.assertTrue(new_res.gpu == 0) - self.assertTrue(new_res.extra_cpu == 0) - self.assertTrue(new_res.extra_gpu == 0) - self.assertTrue(new_res.get("a") == 0) - - def testSerialization(self): - original = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2}) - jsoned = resources_to_json(original) - new_resource = json_to_resources(jsoned) - self.assertEqual(original, new_resource) - - if __name__ == "__main__": import pytest diff --git a/python/ray/tune/tests/test_trial_scheduler.py b/python/ray/tune/tests/test_trial_scheduler.py index e77585c78313..2be12ecbbdfc 100644 --- a/python/ray/tune/tests/test_trial_scheduler.py +++ b/python/ray/tune/tests/test_trial_scheduler.py @@ -15,7 +15,7 @@ from ray import tune from ray.air import CheckpointConfig from ray.air._internal.checkpoint_manager import _TrackedCheckpoint, CheckpointStorage -from ray.tune import Trainable +from ray.tune import Trainable, PlacementGroupFactory from ray.tune.execution.checkpoint_manager import _CheckpointManager from ray.tune.execution.ray_trial_executor import RayTrialExecutor from ray.tune.result import TRAINING_ITERATION @@ -33,7 +33,6 @@ from ray.tune.search._mock import _MockSearcher from ray.tune.search import ConcurrencyLimiter from ray.tune.experiment import Trial -from ray.tune.resources import Resources from ray.rllib import _register_all @@ -852,7 +851,7 @@ def __init__(self, i, config): self.trial_name_creator = None self.logger_running = False self.restored_checkpoint = None - self.resources = Resources(1, 0) + self.placement_group_factory = PlacementGroupFactory([{"CPU": 1}]) self.custom_trial_name = None self.custom_dirname = None self._local_dir = None diff --git a/python/ray/tune/tests/test_tune_server.py b/python/ray/tune/tests/test_tune_server.py index 87615924e8bc..67a938a40b54 100644 --- a/python/ray/tune/tests/test_tune_server.py +++ b/python/ray/tune/tests/test_tune_server.py @@ -6,7 +6,8 @@ import ray from ray.rllib import _register_all -from ray.tune.experiment.trial import Trial, Resources +from ray.tune import PlacementGroupFactory +from ray.tune.experiment.trial import Trial from ray.tune.web_server import TuneClient from ray.tune.execution.trial_runner import TrialRunner @@ -34,7 +35,7 @@ def basicSetup(self): runner = self.runner kwargs = { "stopping_criterion": {"training_iteration": 3}, - "resources": Resources(cpu=1, gpu=1), + "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), } trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] for t in trials: diff --git a/python/ray/tune/trainable/function_trainable.py b/python/ray/tune/trainable/function_trainable.py index 068589d396a3..bfea1d9b508c 100644 --- a/python/ray/tune/trainable/function_trainable.py +++ b/python/ray/tune/trainable/function_trainable.py @@ -9,10 +9,9 @@ import warnings from functools import partial from numbers import Number -from typing import Any, Callable, Dict, Optional, Type, Union +from typing import Any, Callable, Dict, Optional, Type from ray.air._internal.util import StartTraceback, RunnerThread -from ray.tune.resources import Resources import queue from ray.air.checkpoint import Checkpoint @@ -135,7 +134,7 @@ def __init__( trial_name: Optional[str] = None, trial_id: Optional[str] = None, logdir: Optional[str] = None, - trial_resources: Optional[Union[Resources, PlacementGroupFactory]] = None, + trial_resources: Optional[PlacementGroupFactory] = None, ): self._queue = result_queue self._last_report_time = None @@ -663,7 +662,7 @@ def handle_output(output): @classmethod def default_resource_request( cls, config: Dict[str, Any] - ) -> Optional[Union[Resources, PlacementGroupFactory]]: + ) -> Optional[PlacementGroupFactory]: if not isinstance(resources, PlacementGroupFactory) and callable(resources): return resources(config) return resources diff --git a/python/ray/tune/trainable/session.py b/python/ray/tune/trainable/session.py index 130259dc3db2..dba247601710 100644 --- a/python/ray/tune/trainable/session.py +++ b/python/ray/tune/trainable/session.py @@ -379,8 +379,7 @@ def get_trial_id(): def get_trial_resources(): """Trial resources for the corresponding trial. - Will be a PlacementGroupFactory if trial uses those, - otherwise a Resources instance. + Will be a PlacementGroupFactory. For function API use only. """ diff --git a/python/ray/tune/trainable/trainable.py b/python/ray/tune/trainable/trainable.py index f364a0ccb812..14668c98f6c6 100644 --- a/python/ray/tune/trainable/trainable.py +++ b/python/ray/tune/trainable/trainable.py @@ -19,7 +19,6 @@ Checkpoint, _DICT_CHECKPOINT_ADDITIONAL_FILE_KEY, ) -from ray.tune.resources import Resources from ray.tune.result import ( DEBUG_METRICS, DEFAULT_RESULTS_DIR, @@ -200,7 +199,7 @@ def _storage_path(self, local_path): @classmethod def default_resource_request( cls, config: Dict[str, Any] - ) -> Optional[Union[Resources, PlacementGroupFactory]]: + ) -> Optional[PlacementGroupFactory]: """Provides a static resource requirement for the given configuration. This can be overridden by sub-classes to set the correct trial resource @@ -217,8 +216,8 @@ def default_resource_request(cls, config): config[Dict[str, Any]]: The Trainable's config dict. Returns: - Union[Resources, PlacementGroupFactory]: A Resources object or - PlacementGroupFactory consumed by Tune for queueing. + PlacementGroupFactory: A PlacementGroupFactory consumed by Tune + for queueing. """ return None @@ -1060,7 +1059,7 @@ def trial_id(self): return "default" @property - def trial_resources(self) -> Union[Resources, PlacementGroupFactory]: + def trial_resources(self) -> Optional[PlacementGroupFactory]: """Resources currently assigned to the trial of this Trainable. This is not set if not using Tune. @@ -1072,7 +1071,7 @@ def trial_resources(self) -> Union[Resources, PlacementGroupFactory]: if self._trial_info: return self._trial_info.trial_resources else: - return "default" + return None @property def iteration(self): diff --git a/python/ray/tune/trainable/util.py b/python/ray/tune/trainable/util.py index 94d6e0261c1d..2c62278571ad 100644 --- a/python/ray/tune/trainable/util.py +++ b/python/ray/tune/trainable/util.py @@ -17,7 +17,6 @@ from ray.air._internal.uri_utils import URI from ray.air.config import ScalingConfig from ray.tune.registry import _ParameterRegistry -from ray.tune.resources import Resources from ray.tune.utils import _detect_checkpoint_function from ray.util import placement_group from ray.util.annotations import DeveloperAPI, PublicAPI @@ -525,7 +524,7 @@ class ResourceTrainable(trainable): @classmethod def default_resource_request( cls, config: Dict[str, Any] - ) -> Optional[Union[Resources, PlacementGroupFactory]]: + ) -> Optional[PlacementGroupFactory]: if not isinstance(pgf, PlacementGroupFactory) and callable(pgf): return pgf(config) return pgf diff --git a/python/ray/tune/utils/resource_updater.py b/python/ray/tune/utils/resource_updater.py index d2c3b0f9b38f..38db9471d4b5 100644 --- a/python/ray/tune/utils/resource_updater.py +++ b/python/ray/tune/utils/resource_updater.py @@ -1,11 +1,12 @@ import logging import os import time +from collections import namedtuple +from numbers import Number from typing import Any, Dict, Optional import ray from ray._private.resource_spec import NODE_ID_PREFIX -from ray.tune.resources import Resources logger = logging.getLogger(__name__) @@ -16,6 +17,214 @@ def _to_gb(n_bytes): return round(n_bytes / (1024**3), 2) +class _Resources( + namedtuple( + "_Resources", + [ + "cpu", + "gpu", + "memory", + "object_store_memory", + "extra_cpu", + "extra_gpu", + "extra_memory", + "extra_object_store_memory", + "custom_resources", + "extra_custom_resources", + "has_placement_group", + ], + ) +): + """Ray resources required to schedule a trial. + + Parameters: + cpu: Number of CPUs to allocate to the trial. + gpu: Number of GPUs to allocate to the trial. + memory: Memory to reserve for the trial. + object_store_memory: Object store memory to reserve. + extra_cpu: Extra CPUs to reserve in case the trial needs to + launch additional Ray actors that use CPUs. + extra_gpu: Extra GPUs to reserve in case the trial needs to + launch additional Ray actors that use GPUs. + extra_memory: Memory to reserve for the trial launching + additional Ray actors that use memory. + extra_object_store_memory: Object store memory to reserve for + the trial launching additional Ray actors that use object store + memory. + custom_resources: Mapping of resource to quantity to allocate + to the trial. + extra_custom_resources: Extra custom resources to reserve in + case the trial needs to launch additional Ray actors that use + any of these custom resources. + has_placement_group: Bool indicating if the trial also + has an associated placement group. + + """ + + __slots__ = () + + def __new__( + cls, + cpu: float, + gpu: float, + memory: float = 0, + object_store_memory: float = 0.0, + extra_cpu: float = 0.0, + extra_gpu: float = 0.0, + extra_memory: float = 0.0, + extra_object_store_memory: float = 0.0, + custom_resources: Optional[dict] = None, + extra_custom_resources: Optional[dict] = None, + has_placement_group: bool = False, + ): + custom_resources = custom_resources or {} + extra_custom_resources = extra_custom_resources or {} + leftovers = set(custom_resources) ^ set(extra_custom_resources) + + for value in leftovers: + custom_resources.setdefault(value, 0) + extra_custom_resources.setdefault(value, 0) + + cpu = round(cpu, 2) + gpu = round(gpu, 2) + memory = round(memory, 2) + object_store_memory = round(object_store_memory, 2) + extra_cpu = round(extra_cpu, 2) + extra_gpu = round(extra_gpu, 2) + extra_memory = round(extra_memory, 2) + extra_object_store_memory = round(extra_object_store_memory, 2) + custom_resources = { + resource: round(value, 2) for resource, value in custom_resources.items() + } + extra_custom_resources = { + resource: round(value, 2) + for resource, value in extra_custom_resources.items() + } + + all_values = [ + cpu, + gpu, + memory, + object_store_memory, + extra_cpu, + extra_gpu, + extra_memory, + extra_object_store_memory, + ] + all_values += list(custom_resources.values()) + all_values += list(extra_custom_resources.values()) + assert len(custom_resources) == len(extra_custom_resources) + for entry in all_values: + assert isinstance(entry, Number), ("Improper resource value.", entry) + return super(_Resources, cls).__new__( + cls, + cpu, + gpu, + memory, + object_store_memory, + extra_cpu, + extra_gpu, + extra_memory, + extra_object_store_memory, + custom_resources, + extra_custom_resources, + has_placement_group, + ) + + def summary_string(self): + summary = "{} CPUs, {} GPUs".format( + self.cpu + self.extra_cpu, self.gpu + self.extra_gpu + ) + if self.memory or self.extra_memory: + summary += ", {} GiB heap".format( + round((self.memory + self.extra_memory) / (1024**3), 2) + ) + if self.object_store_memory or self.extra_object_store_memory: + summary += ", {} GiB objects".format( + round( + (self.object_store_memory + self.extra_object_store_memory) + / (1024**3), + 2, + ) + ) + custom_summary = ", ".join( + [ + "{} {}".format(self.get_res_total(res), res) + for res in self.custom_resources + if not res.startswith(NODE_ID_PREFIX) + ] + ) + if custom_summary: + summary += " ({})".format(custom_summary) + return summary + + def cpu_total(self): + return self.cpu + self.extra_cpu + + def gpu_total(self): + return self.gpu + self.extra_gpu + + def memory_total(self): + return self.memory + self.extra_memory + + def object_store_memory_total(self): + return self.object_store_memory + self.extra_object_store_memory + + def get_res_total(self, key): + return self.custom_resources.get(key, 0) + self.extra_custom_resources.get( + key, 0 + ) + + def get(self, key): + return self.custom_resources.get(key, 0) + + def is_nonnegative(self): + all_values = [self.cpu, self.gpu, self.extra_cpu, self.extra_gpu] + all_values += list(self.custom_resources.values()) + all_values += list(self.extra_custom_resources.values()) + return all(v >= 0 for v in all_values) + + @classmethod + def subtract(cls, original, to_remove): + cpu = original.cpu - to_remove.cpu + gpu = original.gpu - to_remove.gpu + memory = original.memory - to_remove.memory + object_store_memory = ( + original.object_store_memory - to_remove.object_store_memory + ) + extra_cpu = original.extra_cpu - to_remove.extra_cpu + extra_gpu = original.extra_gpu - to_remove.extra_gpu + extra_memory = original.extra_memory - to_remove.extra_memory + extra_object_store_memory = ( + original.extra_object_store_memory - to_remove.extra_object_store_memory + ) + all_resources = set(original.custom_resources).union( + set(to_remove.custom_resources) + ) + new_custom_res = { + k: original.custom_resources.get(k, 0) + - to_remove.custom_resources.get(k, 0) + for k in all_resources + } + extra_custom_res = { + k: original.extra_custom_resources.get(k, 0) + - to_remove.extra_custom_resources.get(k, 0) + for k in all_resources + } + return _Resources( + cpu, + gpu, + memory, + object_store_memory, + extra_cpu, + extra_gpu, + extra_memory, + extra_object_store_memory, + new_custom_res, + extra_custom_res, + ) + + class _ResourceUpdater: """Periodic Resource updater for Tune. @@ -28,7 +237,7 @@ class _ResourceUpdater: """ def __init__(self, refresh_period: Optional[float] = None): - self._avail_resources = Resources(cpu=0, gpu=0) + self._avail_resources = _Resources(cpu=0, gpu=0) if refresh_period is None: refresh_period = float( @@ -72,7 +281,7 @@ def update_avail_resources(self, num_retries=5): object_store_memory = resources.pop("object_store_memory", 0) custom_resources = resources - self._avail_resources = Resources( + self._avail_resources = _Resources( int(num_cpus), int(num_gpus), memory=int(memory),