Skip to content

Commit

Permalink
refactor: Merge ccar-0905
Browse files Browse the repository at this point in the history
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
  • Loading branch information
dengwxn committed Sep 25, 2024
1 parent 6ae63c8 commit 76f57c9
Show file tree
Hide file tree
Showing 42 changed files with 866 additions and 565 deletions.
17 changes: 13 additions & 4 deletions ci/ray_ci/docker_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
]
GPU_PLATFORM = "cu12.1.1-cudnn8"

PYTHON_VERSIONS_RAY = ["3.9", "3.10", "3.11"]
PYTHON_VERSIONS_RAY_ML = ["3.9", "3.10"]
PYTHON_VERSIONS_RAY = ["3.9", "3.10", "3.11", "3.12"]
PYTHON_VERSIONS_RAY_ML = ["3.9", "3.10", "3.11"]
ARCHITECTURES_RAY = ["x86_64", "aarch64"]
ARCHITECTURES_RAY_ML = ["x86_64"]

Expand All @@ -46,6 +46,15 @@ def __init__(
upload: bool = False,
) -> None:
assert "RAYCI_CHECKOUT_DIR" in os.environ, "RAYCI_CHECKOUT_DIR not set"

assert python_version in PYTHON_VERSIONS_RAY
assert platform in PLATFORMS_RAY
assert architecture in ARCHITECTURES_RAY
if image_type == RayType.RAY_ML:
assert python_version in PYTHON_VERSIONS_RAY_ML
assert platform in PLATFORMS_RAY_ML
assert architecture in ARCHITECTURES_RAY_ML

rayci_checkout_dir = os.environ["RAYCI_CHECKOUT_DIR"]
self.python_version = python_version
self.platform = platform
Expand Down Expand Up @@ -122,13 +131,13 @@ def _get_image_tags(self, external: bool = False) -> List[str]:
versions = self._get_image_version_tags(external)

platforms = [self.get_platform_tag()]
if self.platform == "cpu" and self.image_type == "ray":
if self.platform == "cpu" and self.image_type == RayType.RAY:
# no tag is alias to cpu for ray image
platforms.append("")
elif self.platform == GPU_PLATFORM:
# gpu is alias to cu118 for ray image
platforms.append("-gpu")
if self.image_type == "ray-ml":
if self.image_type == RayType.RAY_ML:
# no tag is alias to gpu for ray-ml image
platforms.append("")

Expand Down
5 changes: 3 additions & 2 deletions ci/ray_ci/test_ray_docker_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from ci.ray_ci.builder_container import DEFAULT_PYTHON_VERSION
from ci.ray_ci.container import _DOCKER_ECR_REPO
from ci.ray_ci.docker_container import GPU_PLATFORM
from ci.ray_ci.ray_docker_container import RayDockerContainer
from ci.ray_ci.test_base import RayCITestBase
from ci.ray_ci.utils import RAY_VERSION
Expand Down Expand Up @@ -203,8 +204,8 @@ def test_canonical_tag(self) -> None:
container = RayDockerContainer(v, "cpu", "ray", "aarch64")
assert container._get_canonical_tag() == f"{sha}-{pv}-cpu-aarch64"

container = RayDockerContainer(v, "cu11.8.0-cudnn8", "ray-ml")
assert container._get_canonical_tag() == f"{sha}-{pv}-cu118"
container = RayDockerContainer(v, GPU_PLATFORM, "ray-ml")
assert container._get_canonical_tag() == f"{sha}-{pv}-cu121"

with mock.patch.dict(os.environ, {"BUILDKITE_BRANCH": "releases/1.0.0"}):
container = RayDockerContainer(v, "cpu", "ray")
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
30 changes: 30 additions & 0 deletions doc/source/data/monitoring-your-workload.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,40 @@ Monitoring Your Workload

This section helps you debug and monitor the execution of your :class:`~ray.data.Dataset` by viewing the:

* :ref:`Ray Data progress bars <ray-data-progress-bars>`
* :ref:`Ray Data dashboard <ray-data-dashboard>`
* :ref:`Ray Data logs <ray-data-logs>`
* :ref:`Ray Data stats <ray-data-stats>`

.. _ray-data-progress-bars:

Ray Data progress bars
----------------------

When you execute a :class:`~ray.data.Dataset`, Ray Data displays a set of progress bars in the console. These progress bars show various execution and progress-related metrics, including the number of rows completed/remaining, resource usage, and task/actor status. See the annotated image for a breakdown of how to interpret the progress bar outputs:

.. image:: images/dataset-progress-bar.png
:align: center


Some additional notes on progress bars:

* The progress bars are updated every second; resource usage, metrics, and task/actor status may take up to 5 seconds to update.
* When the tasks section contains the label `[backpressure]`, it indicates that the operator is *backpressured*, meaning that the operator won't submit more tasks until the downstream operator is ready to accept more data.
* The global resource usage is the sum of resources used by all operators, active and requested (includes pending scheduling and pending node assignment).

Configuring the progress bar
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Depending on your use case, you may not be interested in the full progress bar output, or wish to turn them off altogether. Ray Data provides several ways to accomplish this:

* Disabling operator-level progress bars: Set `DataContext.get_current().enable_operator_progress_bars = False`. This only shows the global progress bar, and omits operator-level progress bars.
* Disabling all progress bars: Set `DataContext.get_current().enable_progress_bars = False`. This disables all progress bars from Ray Data related to dataset execution.
* Disabling `ray_tqdm`: Set `DataContext.get_current().use_ray_tqdm = False`. This configures Ray Data to use the base `tqdm` library instead of the custom distributed `tqdm` implementation, which could be useful when debugging logging issues in a distributed setting.

For operator names longer than a threshold of 100 characters, Ray Data truncates the names by default, to prevent the case when the operator names are long and the progress bar is too wide to fit on the screen.

* To turn off this behavior and show the full operator name, set `DataContext.get_current().enable_progress_bar_name_truncation = False`.
* To change the threshold of truncating the name, update the constant `ray.data._internal.progress_bar.ProgressBar.MAX_NAME_LENGTH = 42`.

.. _ray-data-dashboard:

Expand Down
2 changes: 1 addition & 1 deletion python/ray/_private/event/export_event_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _build_export_event_file_logger(
) -> logging.Logger:
logger = logging.getLogger("_ray_export_event_logger_" + source)
logger.setLevel(logging.INFO)
dir_path = pathlib.Path(sink_dir) / "events"
dir_path = pathlib.Path(sink_dir) / "export_events"
filepath = dir_path / f"event_{source}.log"
dir_path.mkdir(exist_ok=True)
filepath.touch(exist_ok=True)
Expand Down
8 changes: 6 additions & 2 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ def env_set_by_user(key):

# The default maximum number of bytes to allocate to the object store unless
# overridden by the user.
DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES = 200 * 10**9
DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES = env_integer(
"RAY_DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES", 200 * 10**9 # 200 GB
)
# The default proportion of available memory allocated to the object store
DEFAULT_OBJECT_STORE_MEMORY_PROPORTION = 0.3
DEFAULT_OBJECT_STORE_MEMORY_PROPORTION = env_float(
"RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION", 0.3
)
# The smallest cap on the memory used by the object store that we allow.
# This must be greater than MEMORY_RESOURCE_UNIT_BYTES
OBJECT_STORE_MINIMUM_MEMORY_BYTES = 75 * 1024 * 1024
Expand Down
21 changes: 14 additions & 7 deletions python/ray/_private/resource_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,26 +244,33 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
object_store_memory, ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT
)

max_cap = ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES
object_store_memory_cap = (
ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES
)

# Cap by shm size by default to avoid low performance, but don't
# go lower than REQUIRE_SHM_SIZE_THRESHOLD.
if sys.platform == "linux" or sys.platform == "linux2":
# Multiple by 0.95 to give a bit of wiggle-room.
# https://github.com/ray-project/ray/pull/23034/files
shm_avail = ray._private.utils.get_shared_memory_bytes() * 0.95
max_cap = min(
max(ray_constants.REQUIRE_SHM_SIZE_THRESHOLD, shm_avail), max_cap
)
shm_cap = max(ray_constants.REQUIRE_SHM_SIZE_THRESHOLD, shm_avail)

object_store_memory_cap = min(object_store_memory_cap, shm_cap)

# Cap memory to avoid memory waste and perf issues on large nodes
if object_store_memory > max_cap:
if (
object_store_memory_cap
and object_store_memory > object_store_memory_cap
):
logger.debug(
"Warning: Capping object memory store to {}GB. ".format(
max_cap // 1e9
object_store_memory_cap // 1e9
)
+ "To increase this further, specify `object_store_memory` "
"when calling ray.init() or ray start."
)
object_store_memory = max_cap
object_store_memory = object_store_memory_cap

redis_max_memory = self.redis_max_memory
if redis_max_memory is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@
}
},
"priority": "[parameters('priority')]",
"evictionPolicy": "[parameters('evictionPolicy')]",
"evictionPolicy": "[if(equals(parameters('priority'), 'Spot'), parameters('evictionPolicy'), '')]",
"billingProfile": "[parameters('billingProfile')]"
},
"identity": {
Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/_private/command_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

import click

from ray._private.ray_constants import DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES
from ray.autoscaler._private.cli_logger import cf, cli_logger
from ray.autoscaler._private.constants import (
AUTOSCALER_NODE_SSH_INTERVAL_S,
AUTOSCALER_NODE_START_WAIT_S,
DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES,
DEFAULT_OBJECT_STORE_MEMORY_PROPORTION,
)
from ray.autoscaler._private.docker import (
Expand Down
1 change: 0 additions & 1 deletion python/ray/autoscaler/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from ray._private.ray_constants import ( # noqa F401
AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES,
DEFAULT_OBJECT_STORE_MEMORY_PROPORTION,
LABELS_ENVIRONMENT_VARIABLE,
LOGGER_FORMAT,
Expand Down
5 changes: 4 additions & 1 deletion python/ray/cluster_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ def add_node(self, wait: bool = True, **node_args):
"object_store_memory": 150 * 1024 * 1024, # 150 MiB
"min_worker_port": 0,
"max_worker_port": 0,
"dashboard_port": None,
}
ray_params = ray._private.parameter.RayParams(**node_args)
ray_params.update_if_absent(**default_kwargs)
Expand Down Expand Up @@ -257,6 +256,10 @@ def add_node(self, wait: bool = True, **node_args):
ray_params.update_if_absent(include_log_monitor=False)
# Let grpc pick a port.
ray_params.update_if_absent(node_manager_port=0)
if "dashboard_agent_listen_port" not in node_args:
# Pick a random one to not conflict
# with the head node dashboard agent
ray_params.dashboard_agent_listen_port = None

node = ray._private.node.Node(
ray_params,
Expand Down
24 changes: 15 additions & 9 deletions python/ray/dag/collective_node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from weakref import ReferenceType
from typing import Any, Dict, List, Union, Tuple, Optional, TYPE_CHECKING
from typing import Any, Dict, List, Union, Tuple, Optional
import torch

import ray
from ray.dag import (
Expand All @@ -13,22 +14,19 @@
)
from ray.dag.format_utils import get_dag_node_str
from ray.util.annotations import DeveloperAPI
from ray.util.collective.nccl_types import ReduceOp
from ray.util.collective.nccl_types import CollectiveOp, ReduceOp
from ray.experimental.channel import ChannelContext
from ray.experimental.channel.torch_tensor_nccl_channel import _init_nccl_group
from ray.experimental.channel.torch_tensor_type import GPUCommunicator, TorchTensorType

if TYPE_CHECKING:
import torch


class CollectiveGroup:
"""Represent metadata for a NCCL collective method."""

def __init__(
self,
input_nodes: List[DAGNode],
op: ReduceOp, # [TODO] General collective ops.
op: CollectiveOp,
transport: Union[str, GPUCommunicator] = TorchTensorType.NCCL,
):
self._input_nodes: List[DAGNode] = input_nodes
Expand All @@ -46,6 +44,9 @@ def __init__(
raise ValueError("Expected unique actor handles for a collective group")

self._op = op
assert isinstance(
self._op, ReduceOp
), "Other collective ops are not implemented"
self._type_hint = TorchTensorType(transport=transport, _direct_return=True)
if isinstance(transport, GPUCommunicator):
assert set(transport.get_actor_handles()) == set(
Expand Down Expand Up @@ -90,10 +91,15 @@ def get_nccl_group(self) -> GPUCommunicator:
raise ValueError("Expected a NCCL group")
return nccl_group

def method(self, tensor: "torch.Tensor"):
def method(self, tensor: torch.Tensor):
assert isinstance(tensor, torch.Tensor), "Expected a torch tensor"
nccl_group = self.get_nccl_group()
nccl_group.allreduce(tensor, self._op)
return tensor
assert isinstance(
self._op, ReduceOp
), "Other collective ops are not yet implemented"
tensor_copy = tensor.clone()
nccl_group.allreduce(tensor_copy, self._op)
return tensor_copy


@DeveloperAPI
Expand Down
25 changes: 14 additions & 11 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,31 +968,33 @@ def _preprocess(self) -> None:
# Add all readers to the NCCL group.
nccl_actors.add(downstream_actor_handle)

# [TODO] Comments.
actors_to_nccl_group_id: Dict[FrozenSet["ray.actor.ActorHandle"], str] = {}

# If there were type hints indicating transport via NCCL, initialize
# the NCCL group on the participating actors.
nccl_actors = list(nccl_actors)
if None in nccl_actors:
raise ValueError("Driver cannot participate in the NCCL group.")

# [TODO] Comments.
if nccl_actors and self._custom_nccl_group is not None:
# Initialize a NCCL group for each set of actors. A set of actors can be
# calling P2P send/recv or collective methods.
actors_to_nccl_group_id: Dict[FrozenSet["ray.actor.ActorHandle"], str] = {}

# If a custom NCCL group is specified for P2P actors, initialize and cache
# the NCCL group ID.
if nccl_actors and self._custom_nccl_group:
self._nccl_group_id = _init_nccl_group(nccl_actors, self._custom_nccl_group)
actors = frozenset(nccl_actors)
actors_to_nccl_group_id[actors] = self._nccl_group_id

# [TODO] Comments.
# If a custom NCCL group is specified for collective actors, initialize and
# cache the NCCL group ID.
for collective_group in nccl_collective_groups:
type_hint = collective_group.type_hint
if type_hint.get_custom_nccl_group() is not None:
if type_hint.get_custom_nccl_group():
nccl_group_id = collective_group.init_nccl_group()
actors = frozenset(collective_group.actor_handles)
if actors not in actors_to_nccl_group_id:
actors_to_nccl_group_id[actors] = nccl_group_id

# [TODO] Comments.
# If a NCCL group for P2P actors is not initialized, initialize and cache
# the NCCL group ID.
if nccl_actors and self._nccl_group_id is None:
actors = frozenset(nccl_actors)
if actors in actors_to_nccl_group_id:
Expand All @@ -1003,7 +1005,8 @@ def _preprocess(self) -> None:
)
actors_to_nccl_group_id[actors] = self._nccl_group_id

# [TODO] Comments.
# If a NCCL group for collective actors is not initialized, initialize and
# cache the NCCL group ID.
for collective_group in nccl_collective_groups:
type_hint = collective_group.type_hint
if type_hint.nccl_group_id is None:
Expand Down
Loading

0 comments on commit 76f57c9

Please sign in to comment.