Skip to content

Commit

Permalink
[aDAG] Allow custom NCCL group for aDAG (#47141)
Browse files Browse the repository at this point in the history
Allow custom NCCL group for aDAG so that we can reuse what the user already created.

Marking NcclGroupInterface as DeveloperAPI for now. After validation by using it in vLLM we can change to alpha stability.

vLLM prototype: vllm-project/vllm#7568
  • Loading branch information
ruisearch42 authored Sep 6, 2024
1 parent 5c70d96 commit bbeee55
Show file tree
Hide file tree
Showing 9 changed files with 650 additions and 49 deletions.
35 changes: 34 additions & 1 deletion python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import NamedTuple

from ray.experimental.channel.cached_channel import CachedChannel
from ray.experimental.channel.gpu_communicator import GPUCommunicator
import ray
from ray.exceptions import RayTaskError, RayChannelError
from ray.util.annotations import PublicAPI
Expand Down Expand Up @@ -707,6 +708,11 @@ def __init__(
# Type hints specified by the user for DAG (intermediate) outputs.
self._type_hints = []

# This is set to true when type hint of `transport="nccl"`` is used
self._use_default_nccl_group = False
# This is set to the specified custom nccl group
# if there exists a type hint of `transport=nccl_group`
self._custom_nccl_group: Optional[GPUCommunicator] = None
# Uniquely identifies the NCCL communicator that will be used within
# this DAG, if any.
self._nccl_group_id: Optional[str] = None
Expand Down Expand Up @@ -873,6 +879,33 @@ def _preprocess(self) -> None:
if dag_node.type_hint.requires_nccl():
# Add all writers to the NCCL group.
nccl_actors.add(actor_handle)
custom_nccl_group = dag_node.type_hint.get_custom_nccl_group()
mixed_nccl_group_error_message = (
"Accelerated DAGs do not support mixed usage of "
"type hints of default NCCL group "
'(i.e., TorchTensor(transport="nccl"))'
"and custom NCCL group "
"(i.e., TorchTensor(transport=nccl_group)). "
"Please check all the TorchTensor type hints and "
"make sure only one type of NCCL transport is specified."
)
if custom_nccl_group is None:
if self._custom_nccl_group is not None:
raise ValueError(mixed_nccl_group_error_message)
self._use_default_nccl_group = True
else:
if self._use_default_nccl_group:
raise ValueError(mixed_nccl_group_error_message)
if self._custom_nccl_group is not None:
if self._custom_nccl_group != custom_nccl_group:
raise ValueError(
"Accelerated DAGs currently only support "
"a single custom NCCL group, but multiple "
"have been specified. Check all the "
"TorchTensor(transport=nccl_group) type hints "
"to make sure only one NCCL group is used."
)
self._custom_nccl_group = custom_nccl_group
elif isinstance(dag_node, InputNode):
if dag_node.type_hint.requires_nccl():
raise ValueError(
Expand Down Expand Up @@ -983,7 +1016,7 @@ def _preprocess(self) -> None:
if None in nccl_actors:
raise ValueError("Driver cannot participate in the NCCL group.")
if nccl_actors and self._nccl_group_id is None:
self._nccl_group_id = _init_nccl_group(nccl_actors)
self._nccl_group_id = _init_nccl_group(nccl_actors, self._custom_nccl_group)

if direct_input:
self._input_num_positional_args = 1
Expand Down
Loading

0 comments on commit bbeee55

Please sign in to comment.