Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Remove deprecated Resources class #32490

Merged
merged 7 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union, Dict, Any, Optional
from typing import Dict, Any, Optional
import sklearn.datasets
import sklearn.metrics
import os
Expand All @@ -11,7 +11,6 @@
from ray import air, tune
from ray.tune.schedulers import ResourceChangingScheduler, ASHAScheduler
from ray.tune import Trainable
from ray.tune.resources import Resources
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.tune.experiment import Trial
from ray.tune.execution import trial_runner
Expand Down Expand Up @@ -167,7 +166,7 @@ def example_resources_allocation_function(
trial: Trial,
result: Dict[str, Any],
scheduler: "ResourceChangingScheduler",
) -> Optional[Union[PlacementGroupFactory, Resources]]:
) -> Optional[PlacementGroupFactory]:
"""This is a basic example of a resource allocating function.

The function naively balances available CPUs over live trials.
Expand Down
11 changes: 6 additions & 5 deletions python/ray/tune/execution/placement_groups.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import warnings
from typing import Dict, Optional
from ray.air.execution.resources.request import ResourceRequest
from ray.tune.resources import Resources
from ray.util.annotations import DeveloperAPI, PublicAPI
from ray.util.placement_group import placement_group

Expand Down Expand Up @@ -103,21 +102,23 @@ def __call__(self, *args, **kwargs):


@DeveloperAPI
def resource_dict_to_pg_factory(spec: Optional[Dict[str, float]]):
def resource_dict_to_pg_factory(spec: Optional[Dict[str, float]] = None):
"""Translates resource dict into PlacementGroupFactory."""
spec = spec or {"cpu": 1}

if isinstance(spec, Resources):
spec = spec._asdict()

spec = spec.copy()

cpus = spec.pop("cpu", spec.pop("CPU", 0.0))
gpus = spec.pop("gpu", spec.pop("GPU", 0.0))
memory = spec.pop("memory", 0.0)

# If there is a custom_resources key, use as base for bundle
bundle = {k: v for k, v in spec.pop("custom_resources", {}).items()}

# Otherwise, consider all other keys as custom resources
if not bundle:
bundle = spec

bundle.update(
{
"CPU": cpus,
Expand Down
11 changes: 1 addition & 10 deletions python/ray/tune/experiment/config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from ray.tune.experiment import Trial
from ray.tune.resources import json_to_resources
from ray.tune.syncer import SyncConfig, Syncer
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.tune.utils.util import SafeFallbackEncoder


Expand Down Expand Up @@ -198,15 +197,7 @@ def _create_trial_from_spec(
raise TuneError("Error parsing args, see above message", spec)

if resources:
if isinstance(resources, PlacementGroupFactory):
trial_kwargs["placement_group_factory"] = resources
else:
# This will be converted to a placement group factory in the
# Trial object constructor
try:
trial_kwargs["resources"] = json_to_resources(resources)
except (TuneError, ValueError) as exc:
raise TuneError("Error parsing resources_per_trial", resources) from exc
trial_kwargs["placement_group_factory"] = resources

experiment_dir_name = spec.get("experiment_dir_name")

Expand Down
92 changes: 36 additions & 56 deletions python/ray/tune/experiment/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
TRIAL_ID,
DEBUG_METRICS,
)
from ray.tune.resources import Resources
from ray.tune.syncer import SyncConfig, Syncer
from ray.tune.execution.placement_groups import (
PlacementGroupFactory,
Expand Down Expand Up @@ -166,11 +165,11 @@ def trial_id(self):
return self._trial_id

@property
def trial_resources(self) -> Union[Resources, PlacementGroupFactory]:
def trial_resources(self) -> PlacementGroupFactory:
return self._trial_resources

@trial_resources.setter
def trial_resources(self, new_resources: Union[Resources, PlacementGroupFactory]):
def trial_resources(self, new_resources: PlacementGroupFactory):
self._trial_resources = new_resources


Expand All @@ -186,23 +185,6 @@ def _create_unique_logdir_name(root: str, relative_logdir: str) -> str:
return relative_logdir


def _to_pg_factory(
resources: Optional[Resources],
placement_group_factory: Optional[PlacementGroupFactory],
) -> PlacementGroupFactory:
"""Outputs resources requirement in the form of PGF.

In case that `placement_group_factory` is None, `resources` will be
converted to PGF. If this is unsuccessful, an error will be raised.

"""
if not placement_group_factory:
if not resources:
resources = Resources(cpu=1, gpu=0)
placement_group_factory = resource_dict_to_pg_factory(resources)
return placement_group_factory


@DeveloperAPI
class Trial:
"""A trial object holds the state for one model training run.
Expand Down Expand Up @@ -258,7 +240,6 @@ def __init__(
local_dir: Optional[str] = DEFAULT_RESULTS_DIR,
evaluated_params: Optional[Dict] = None,
experiment_tag: str = "",
resources: Optional[Resources] = None,
placement_group_factory: Optional[PlacementGroupFactory] = None,
stopping_criterion: Optional[Dict[str, float]] = None,
experiment_dir_name: Optional[str] = None,
Expand Down Expand Up @@ -308,7 +289,14 @@ def __init__(
self.stopping_criterion = stopping_criterion or {}

self._setup_default_resource = _setup_default_resource
self._resources = resources

if placement_group_factory and not isinstance(
placement_group_factory, PlacementGroupFactory
):
placement_group_factory = resource_dict_to_pg_factory(
placement_group_factory
)

self._default_placement_group_factory = placement_group_factory
# Will be created in create_placement_group_factory().
self.placement_group_factory = None
Expand Down Expand Up @@ -411,37 +399,36 @@ def create_placement_group_factory(self):
trainable_cls = self.get_trainable_cls()
if not trainable_cls or not self._setup_default_resource:
# Create placement group factory using default resources.
self.placement_group_factory = _to_pg_factory(
self._resources, self._default_placement_group_factory
self.placement_group_factory = (
self._default_placement_group_factory or resource_dict_to_pg_factory()
)
return

default_resources = trainable_cls.default_resource_request(self.config)

# If Trainable returns resources, do not allow manual override via
# `resources_per_trial` by the user.
if default_resources:
if self._resources or self._default_placement_group_factory:
raise ValueError(
"Resources for {} have been automatically set to {} "
"by its `default_resource_request()` method. Please "
"clear the `resources_per_trial` option.".format(
trainable_cls, default_resources
)
if default_resources and self._default_placement_group_factory:
raise TuneError(
"Resources for {} have been automatically set to {} "
"by its `default_resource_request()` method. Please "
"clear the `resources_per_trial` option.".format(
trainable_cls, default_resources
)
)

if isinstance(default_resources, PlacementGroupFactory):
default_placement_group_factory = default_resources
resources = None
else:
default_placement_group_factory = None
resources = default_resources
else:
default_placement_group_factory = self._default_placement_group_factory
resources = self._resources

self.placement_group_factory = _to_pg_factory(
resources, default_placement_group_factory
if default_resources and not isinstance(
default_resources, PlacementGroupFactory
):
default_resources = resource_dict_to_pg_factory(default_resources)

self.placement_group_factory = (
# default_resource_request
default_resources
# resources_per_trial
or self._default_placement_group_factory
# cpu=1
or resource_dict_to_pg_factory()
)

def _get_default_result_or_future(self) -> Optional[dict]:
Expand Down Expand Up @@ -638,7 +625,6 @@ def reset(self):
local_dir=self.local_dir,
evaluated_params=self.evaluated_params,
experiment_tag=self.experiment_tag,
resources=None,
placement_group_factory=placement_group_factory,
stopping_criterion=self.stopping_criterion,
sync_config=self.sync_config,
Expand All @@ -663,9 +649,7 @@ def init_logdir(self):

self.invalidate_json_state()

def update_resources(
self, resources: Union[Dict, Resources, PlacementGroupFactory]
):
def update_resources(self, resources: Union[dict, PlacementGroupFactory]):
"""EXPERIMENTAL: Updates the resource requirements.

Should only be called when the trial is not running.
Expand All @@ -676,15 +660,11 @@ def update_resources(
if self.status is Trial.RUNNING:
raise ValueError("Cannot update resources while Trial is running.")

placement_group_factory = None
if isinstance(resources, PlacementGroupFactory):
placement_group_factory = resources
elif isinstance(resources, dict):
resources = Resources(**resources)
placement_group_factory = resources
if isinstance(resources, dict):
placement_group_factory = resource_dict_to_pg_factory(resources)

self.placement_group_factory = _to_pg_factory(
resources, placement_group_factory
)
self.placement_group_factory = placement_group_factory

self.invalidate_json_state()

Expand Down
Loading