Skip to content

Commit

Permalink
Merge branch 'master' into add-default-gradient-logging
Browse files Browse the repository at this point in the history
  • Loading branch information
simonsays1980 committed Sep 9, 2024
2 parents db926b7 + 1dd8d60 commit e18204e
Show file tree
Hide file tree
Showing 36 changed files with 1,479 additions and 1,104 deletions.
2 changes: 1 addition & 1 deletion doc/source/_templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ <h3>Scale with Ray</h3>
''') }}
<div class="tab-pane-links">
<a href="{{ pathto('serve/index') }}" target="_blank">Learn more about Ray Serve</a>
<a href="{{ pathto('serve/examples') }}" target="_blank">Examples</a>
<a href="https://console.anyscale.com/register/ha?utm_source=ray_docs&utm_medium=docs&utm_campaign=scale_with_ray&redirectTo=/v2/template-preview/serve-stable-diffusion-v2" target="_blank">Quickstart</a>
</div>
</div>
<!-- prettier-ignore -->
Expand Down
2 changes: 1 addition & 1 deletion doc/source/ray-overview/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Installing Ray
.. raw:: html

<a id="try-anyscale-quickstart-install-ray" target="_blank" href="https://console.anyscale.com/register/ha?utm_source=ray_docs&utm_medium=docs&utm_campaign=installing_ray&redirectTo=/v2/template-preview/workspace-intro">
<img src="../../_static/img/quickstart-with-ray.svg" alt="Run Quickstart on Anyscale" />
<img src="../_static/img/quickstart-with-ray.svg" alt="Run Quickstart on Anyscale" />
<br/><br/>
</a>

Expand Down
3 changes: 3 additions & 0 deletions doc/source/serve/tutorials/stable-diffusion.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ orphan: true
(serve-stable-diffusion-tutorial)=

# Serve a Stable Diffusion Model

[![try-anyscale-quickstart-ray-serve-stable-diffusion-quickstart](../../_static/img/run-quickstart-anyscale.svg)](https://console.anyscale.com/register/ha?utm_source=ray_docs&utm_medium=docs&utm_campaign=ray-serve-stable-diffusion-quickstart&redirectTo=/v2/template-preview/serve-stable-diffusion-v2)

This example runs a Stable Diffusion application with Ray Serve.

To run this example, install the following:
Expand Down
2 changes: 0 additions & 2 deletions python/ray/_private/internal_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import ray._private.services as services
import ray._private.utils as utils
import ray._private.worker
from ray._private import ray_constants
from ray._private.state import GlobalState
from ray._raylet import GcsClientOptions
from ray.core.generated import common_pb2
Expand Down Expand Up @@ -34,7 +33,6 @@ def get_state_from_address(address=None):

def memory_summary(
address=None,
redis_password=ray_constants.REDIS_DEFAULT_PASSWORD,
group_by="NODE_ADDRESS",
sort_by="OBJECT_SIZE",
units="B",
Expand Down
42 changes: 0 additions & 42 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ from ray.includes.common cimport (
kResourceUnitScaling,
kImplicitResourcePrefix,
kWorkerSetupHookKeyName,
PythonCheckGcsHealth,
PythonGetNodeLabels,
PythonGetResourcesTotal,
)
Expand Down Expand Up @@ -3263,47 +3262,6 @@ cdef class _TestOnly_GcsActorSubscriber(_GcsSubscriber):
return [(key_id, info)]


def check_health(address: str, timeout=2, skip_version_check=False):
"""Checks Ray cluster health, before / without actually connecting to the
cluster via ray.init().
Args:
address: Ray cluster / GCS address string, e.g. ip:port.
timeout: request timeout.
skip_version_check: If True, will skip comparision of GCS Ray version with local
Ray version. If False (default), will raise exception on mismatch.
Returns:
Returns True if the cluster is running and has matching Ray version.
Returns False if no service is running.
Raises an exception otherwise.
"""

tokens = address.rsplit(":", 1)
if len(tokens) != 2:
raise ValueError("Invalid address: {}. Expect 'ip:port'".format(address))
gcs_address, gcs_port = tokens

cdef:
c_string c_gcs_address = gcs_address
int c_gcs_port = int(gcs_port)
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
c_string c_ray_version = ray.__version__
c_bool c_skip_version_check = skip_version_check
c_bool c_is_healthy = True

try:
with nogil:
check_status(PythonCheckGcsHealth(
c_gcs_address, c_gcs_port, timeout_ms, c_ray_version,
c_skip_version_check, c_is_healthy))
except RpcError:
traceback.print_exc()
except RaySystemError as e:
raise RuntimeError(str(e))

return c_is_healthy


cdef class CoreWorker:

def __cinit__(self, worker_type, store_socket, raylet_socket,
Expand Down
35 changes: 34 additions & 1 deletion python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import NamedTuple

from ray.experimental.channel.cached_channel import CachedChannel
from ray.experimental.channel.gpu_communicator import GPUCommunicator
import ray
from ray.exceptions import RayTaskError, RayChannelError
from ray.util.annotations import PublicAPI
Expand Down Expand Up @@ -707,6 +708,11 @@ def __init__(
# Type hints specified by the user for DAG (intermediate) outputs.
self._type_hints = []

# This is set to true when type hint of `transport="nccl"`` is used
self._use_default_nccl_group = False
# This is set to the specified custom nccl group
# if there exists a type hint of `transport=nccl_group`
self._custom_nccl_group: Optional[GPUCommunicator] = None
# Uniquely identifies the NCCL communicator that will be used within
# this DAG, if any.
self._nccl_group_id: Optional[str] = None
Expand Down Expand Up @@ -873,6 +879,33 @@ def _preprocess(self) -> None:
if dag_node.type_hint.requires_nccl():
# Add all writers to the NCCL group.
nccl_actors.add(actor_handle)
custom_nccl_group = dag_node.type_hint.get_custom_nccl_group()
mixed_nccl_group_error_message = (
"Accelerated DAGs do not support mixed usage of "
"type hints of default NCCL group "
'(i.e., TorchTensor(transport="nccl"))'
"and custom NCCL group "
"(i.e., TorchTensor(transport=nccl_group)). "
"Please check all the TorchTensor type hints and "
"make sure only one type of NCCL transport is specified."
)
if custom_nccl_group is None:
if self._custom_nccl_group is not None:
raise ValueError(mixed_nccl_group_error_message)
self._use_default_nccl_group = True
else:
if self._use_default_nccl_group:
raise ValueError(mixed_nccl_group_error_message)
if self._custom_nccl_group is not None:
if self._custom_nccl_group != custom_nccl_group:
raise ValueError(
"Accelerated DAGs currently only support "
"a single custom NCCL group, but multiple "
"have been specified. Check all the "
"TorchTensor(transport=nccl_group) type hints "
"to make sure only one NCCL group is used."
)
self._custom_nccl_group = custom_nccl_group
elif isinstance(dag_node, InputNode):
if dag_node.type_hint.requires_nccl():
raise ValueError(
Expand Down Expand Up @@ -983,7 +1016,7 @@ def _preprocess(self) -> None:
if None in nccl_actors:
raise ValueError("Driver cannot participate in the NCCL group.")
if nccl_actors and self._nccl_group_id is None:
self._nccl_group_id = _init_nccl_group(nccl_actors)
self._nccl_group_id = _init_nccl_group(nccl_actors, self._custom_nccl_group)

if direct_input:
self._input_num_positional_args = 1
Expand Down
2 changes: 2 additions & 0 deletions python/ray/dag/tests/experimental/test_accelerated_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1849,6 +1849,8 @@ def test_event_profiling(ray_start_regular, monkeypatch):
assert event.method_name == "inc"
assert event.operation in ["READ", "COMPUTE", "WRITE"]

adag.teardown()


@ray.remote
class TestWorker:
Expand Down
Loading

0 comments on commit e18204e

Please sign in to comment.