Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[train] add placement group support #20091

Merged
merged 5 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 69 additions & 2 deletions python/ray/train/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
from ray.train.checkpoint import CheckpointManager, CheckpointStrategy, \
TuneCheckpointManager
from ray.train.constants import ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, \
TUNE_INSTALLED, ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV
TUNE_INSTALLED, ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, \
TRAIN_ENABLE_WORKER_SPREAD_ENV, \
TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV
from ray.train.session import TrainingResultType, TrainingResult
from ray.train.session import init_session, get_session, shutdown_session
from ray.train.utils import RayDataset
from ray.train.utils import check_for_failure
from ray.train.worker_group import WorkerGroup
from ray.util.placement_group import get_current_placement_group, \
remove_placement_group

if TUNE_INSTALLED:
from ray import tune
Expand Down Expand Up @@ -93,6 +97,7 @@ def __init__(
self._max_failures = float("inf")
self._num_failures = 0
self._initialization_hook = None
self._placement_group = None

if tune is not None and tune.is_session_enabled():
self.checkpoint_manager = TuneCheckpointManager()
Expand All @@ -110,6 +115,8 @@ def start(self,
train_cls_args: Optional[Tuple] = None,
train_cls_kwargs: Optional[Dict] = None):
"""Starts the worker group."""
self._create_placement_group()
placement_group = self._placement_group or "default"
self.worker_group = WorkerGroup(
num_workers=self._num_workers,
num_cpus_per_worker=self._num_cpus_per_worker,
Expand All @@ -118,7 +125,8 @@ def start(self,
_additional_resources_per_worker,
actor_cls=train_cls,
actor_cls_args=train_cls_args,
actor_cls_kwargs=train_cls_kwargs)
actor_cls_kwargs=train_cls_kwargs,
placement_group=placement_group)
try:
if initialization_hook:
self._initialization_hook = initialization_hook
Expand All @@ -137,6 +145,57 @@ def start(self,
self._increment_failures()
self._restart()

def _create_placement_group(self):
"""Creates a placement group if it does not exist.

If a placement group is already detected (Tune) this will be a no-op.

By default the placement group will be created with PACK strategy.
This is optimized for colocating GPUs on a minimal number of nodes.
This behavior can be overridden to use the SPREAD strategy by defining
``TRAIN_ENABLE_WORKER_SPREAD_ENV``

If a placement group is created it will be stored as
self._placement_group.
"""
current_placement_group = get_current_placement_group()
should_capture_child_tasks_in_placement_group = \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Man I wish there was a better way to handle this. Do we have a shared doc with Tune on asks for placement group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #20196 to track this!

ray.worker.global_worker \
.should_capture_child_tasks_in_placement_group
should_create_placement_group = \
current_placement_group is None or \
not should_capture_child_tasks_in_placement_group

if should_create_placement_group:
additional_resources_per_worker = \
self._additional_resources_per_worker or {}
bundle = {
"CPU": self._num_cpus_per_worker,
"GPU": self._num_gpus_per_worker,
**additional_resources_per_worker
}
bundles = [bundle.copy() for _ in range(self._num_workers)]

use_spread = bool(env_integer(TRAIN_ENABLE_WORKER_SPREAD_ENV, 0))
strategy = "SPREAD" if use_spread else "PACK"

placement_group = ray.util.placement_group(
bundles, strategy=strategy)
logger.debug("Waiting for placement group to start.")
timeout = env_integer(TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV, 100)
ready, _ = ray.wait([placement_group.ready()], timeout=timeout)
if ready:
logger.debug("Placement group has started.")
else:
raise TimeoutError(
"Placement group creation timed out. Make sure "
"your cluster either has enough resources or use "
"an autoscaling cluster. Current resources "
"available: {}, resources requested by the "
"placement group: {}".format(ray.available_resources(),
placement_group.bundle_specs))
self._placement_group = placement_group

def _share_cuda_visible_devices(self):
"""Sets CUDA_VISIBLE_DEVICES on all workers.

Expand Down Expand Up @@ -528,6 +587,11 @@ def shutdown(self):
"expected if one of the workers has crashed.")
self.worker_group.shutdown()
self.worker_group = InactiveWorkerGroup()

if self._placement_group:
remove_placement_group(self._placement_group)
self._placement_group = None

self.dataset_shards = None

@property
Expand Down Expand Up @@ -567,6 +631,9 @@ def _restart(self):
initialization_hook = self._initialization_hook
else:
initialization_hook = None
if self._placement_group:
remove_placement_group(self._placement_group)
self._placement_group = None
self.start(initialization_hook=initialization_hook)

def _increment_failures(self):
Expand Down
9 changes: 9 additions & 0 deletions python/ray/train/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,12 @@
# Backend.share_cuda_visible_devices. 1 for True, 0 for False.
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV =\
"TRAIN_ENABLE_SHARE_CUDA_VISIBLE_DEVICES"

# Integer value which indicates the number of seconds to wait when creating
# the worker placement group before timing out.
TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV = "TRAIN_PLACEMENT_GROUP_TIMEOUT_S"

# Integer value which if set will change the placement group strategy from
# PACK to SPREAD. 1 for True, 0 for False.
TRAIN_ENABLE_WORKER_SPREAD_ENV =\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like I have to set this on all the machines (including HEAD and non-HEAD). Why is that? Isn't the scheduling part handled by head node (and partially client side)? @matthewdeng

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @HuangLED, what version of Ray are you using? Also are you using Ray Client?

This should be addressed in master (via this commit) so that the environment variable should only need to be set on the driver, but even without this change the environment variable should only be needed on the host where the BackendExecutor is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using v1.9.0 I think.

Yes, I am using client mode. e.g. Two machines M1 and M2, while M1 being the head. Then connecting from a third machine using ray.init("ray://M1:port").

Solely setting the env on M1 does not work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you verify that the BackendExecutor process is running on M1?

"TRAIN_ENABLE_WORKER_SPREAD"
91 changes: 84 additions & 7 deletions python/ray/train/tests/test_backend.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import math
import os
import time
from unittest.mock import patch

import pytest
from unittest.mock import patch

import ray
from ray.cluster_utils import Cluster
import ray.train as train
from ray.cluster_utils import Cluster
from ray.train.backends.backend import Backend, \
InactiveWorkerGroupError, TrainBackendError, TrainingWorkerError
from ray.train.backends.backend import BackendConfig, BackendExecutor
from ray.train.backends.tensorflow import TensorflowConfig
from ray.train.constants import ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV
from ray.train.worker_group import WorkerGroup
from ray.train.backends.torch import TorchConfig

from ray.train.backends.backend import Backend, \
InactiveWorkerGroupError, TrainBackendError, TrainingWorkerError
from ray.train.constants import ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, \
TRAIN_ENABLE_WORKER_SPREAD_ENV
from ray.train.worker_group import WorkerGroup
from ray.util.placement_group import get_current_placement_group


@pytest.fixture
Expand All @@ -25,6 +27,20 @@ def ray_start_2_cpus():
ray.shutdown()


@pytest.fixture
def ray_4_node_4_cpu():
cluster = Cluster()
for _ in range(4):
cluster.add_node(num_cpus=4)

ray.init(address=cluster.address)

yield

ray.shutdown()
cluster.shutdown()


@pytest.fixture
def ray_2_node_2_gpu():
cluster = Cluster()
Expand Down Expand Up @@ -393,6 +409,67 @@ def get_resources():
assert results == expected_results


def get_node_id_set():
node_id_set = set()
for actor_info in ray.state.actors().values():
node_id = actor_info["Address"]["NodeID"]
node_id_set.add(node_id)
return node_id_set


@pytest.mark.parametrize("num_workers", [3, 4, 5])
def test_placement_group_pack(ray_4_node_4_cpu, num_workers):
"""Tests that workers are packed on nodes."""
config = TestConfig()
e = BackendExecutor(config, num_workers=num_workers)
e.start()
node_id_set = get_node_id_set()
assert len(node_id_set) == math.ceil(num_workers / 4)


@pytest.mark.parametrize("num_workers", [3, 4, 5])
def test_placement_group_spread(ray_4_node_4_cpu, num_workers):
"""Tests that workers are spread across nodes."""
os.environ[TRAIN_ENABLE_WORKER_SPREAD_ENV] = "1"
config = TestConfig()
e = BackendExecutor(config, num_workers=num_workers)
e.start()
node_id_set = get_node_id_set()
assert len(node_id_set) == min(num_workers, 4)


@pytest.mark.parametrize("placement_group_capture_child_tasks", [True, False])
def test_placement_group_parent(ray_4_node_4_cpu, tmp_path,
placement_group_capture_child_tasks):
"""Tests that parent placement group will be used."""
num_workers = 2
bundle = {"CPU": 1}
bundles = [bundle.copy() for _ in range(num_workers + 1)]
placement_group = ray.util.placement_group(bundles)

def train_func():
return get_current_placement_group().id

@ray.remote
def test():
config = TestConfig()
e = BackendExecutor(config, num_workers=2)
e.start()
e.start_training(train_func, run_dir=tmp_path)
return e.finish_training()

results_future = test.options(
placement_group=placement_group,
placement_group_capture_child_tasks=placement_group_capture_child_tasks
).remote()
results = ray.get(results_future)
for worker_result in results:
if placement_group_capture_child_tasks:
assert worker_result == placement_group.id
else:
assert worker_result != placement_group.id


if __name__ == "__main__":
import pytest
import sys
Expand Down
11 changes: 11 additions & 0 deletions python/ray/train/tests/test_worker_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ def test_bad_resources(ray_start_2_cpus):
WorkerGroup(num_gpus_per_worker=-1)


def test_placement_group(ray_start_2_cpus):
"""Tests that workers can be removed and added to a placement group."""
num_workers = 2
bundle = {"CPU": 1}
bundles = [bundle.copy() for _ in range(num_workers)]
placement_group = ray.util.placement_group(bundles)
wg = WorkerGroup(num_workers=num_workers, placement_group=placement_group)
wg.remove_workers([0])
wg.add_workers(1)


if __name__ == "__main__":
import pytest
import sys
Expand Down
4 changes: 4 additions & 0 deletions python/ray/train/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ def __init__(
):

self._backend = backend

if num_workers <= 0:
raise ValueError("`num_workers` must be a positive integer.")

self._num_workers = num_workers
self._use_gpu = use_gpu
self._resources_per_worker = resources_per_worker
Expand Down
21 changes: 16 additions & 5 deletions python/ray/train/worker_group.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
import socket
from dataclasses import dataclass
import logging
from typing import Callable, List, TypeVar, Optional, Dict, Type, Tuple
from typing import Callable, List, TypeVar, Optional, Dict, Type, Tuple, Union

import ray
from ray.actor import ActorHandle
from ray.types import ObjectRef
from ray.util.placement_group import PlacementGroup

T = TypeVar("T")

Expand Down Expand Up @@ -105,6 +106,9 @@ class WorkerGroup:
remote actors.
remote_cls_args, remote_cls_kwargs: If ``remote_cls`` is provided,
these args will be used for the worker initialization.
placement_group (PlacementGroup|str): The placement group that workers
should be created in. Defaults to "default" which will inherit the
parent placement group (if child tasks should be captured).


Example:
Expand All @@ -125,7 +129,8 @@ def __init__(
additional_resources_per_worker: Optional[Dict[str, float]] = None,
actor_cls: Type = None,
actor_cls_args: Optional[Tuple] = None,
actor_cls_kwargs: Optional[Dict] = None):
actor_cls_kwargs: Optional[Dict] = None,
placement_group: Union[PlacementGroup, str] = "default"):

if num_workers <= 0:
raise ValueError("The provided `num_workers` must be greater "
Expand All @@ -152,6 +157,8 @@ def __init__(
self._actor_cls_args = actor_cls_args or []
self._actor_cls_kwargs = actor_cls_kwargs or {}

self._placement_group = placement_group

# TODO(matt): Validate resources. Fast-fail if it is impossible to
# handle the request, rather than hang indefinitely.
self._remote_cls = ray.remote(
Expand Down Expand Up @@ -279,6 +286,9 @@ def execute_single(self, worker_index: int, func: Callable[..., T], *args,
def remove_workers(self, worker_indexes: List[int]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now with placement groups the semantics here get a little tricky since we can’t add workers without first removing them if placement groups is enabled.

Do you think we should do either of the following?

  1. Make remove_workers and add_workers private, and only expose a single replace_workers API. This will ensure that remove and add are called atomically (except for the initial creation) and your placement group will always be “full”.
  2. Keep track of the free bundles in the placement group and if add_workers is called without free resources in the placement group, then raise an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point, my initial thoughts are:

  1. This is reasonable for the current system, but I don't think it should be a requirement of the WorkerGroup/ActorGroup. With the presented API we don't really make any assumptions on the size of the placement group.
  2. This could get tricky keeping track of what a "free" bundle is (if worker:bundle is 1:1 this is easier but not sure if this makes sense). Would prefer to leave this up to placement groups to be responsible for this and raise the error when this happens ([placement groups/autoscaler] unfulfillable requests should raise an error #18018).

I do actually think #18524 is the preferred solution for Ray Train and placement groups are just a workaround to achieve this functionality, so it doesn't seem right to me to build WorkerGroup around placement group semantics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment to add_workers to warn any users about this.

  • Even prior to this change, the same undefined behavior would be present if add_workers is called within a parent placement group or if the cluster is full.
  • The only usecase in TensorflowBackend actually has some custom logic in between remove_workers and add_workers.

"""Removes the workers with the specified indexes.

The removed workers will go out of scope and their actor processes
will be terminated.

Args:
worker_indexes (List[int]): The indexes of the workers to remove.
"""
Expand All @@ -297,8 +307,9 @@ def add_workers(self, num_workers: int):
new_actors = []
new_actor_metadata = []
for _ in range(num_workers):
actor = self._remote_cls.remote(*self._actor_cls_args,
**self._actor_cls_kwargs)
actor = self._remote_cls.options(
placement_group=self._placement_group).remote(
*self._actor_cls_args, **self._actor_cls_kwargs)
new_actors.append(actor)
new_actor_metadata.append(
actor._BaseWorkerMixin__execute.remote(construct_metadata))
Expand Down