Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][cgraph] Collapse other params into max_inflight_executions and adjust execution_index counting #49565

Merged
merged 16 commits into from
Jan 13, 2025
Merged
73 changes: 18 additions & 55 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,6 @@ def __init__(
submit_timeout: Optional[float] = None,
buffer_size_bytes: Optional[int] = None,
enable_asyncio: bool = False,
asyncio_max_queue_size: Optional[int] = None,
max_buffered_results: Optional[int] = None,
max_inflight_executions: Optional[int] = None,
overlap_gpu_communication: Optional[bool] = None,
):
Expand All @@ -759,20 +757,6 @@ def __init__(
be running in an event loop and must use `execute_async` to
invoke the DAG. Otherwise, the caller should use `execute` to
invoke the DAG.
asyncio_max_queue_size: Optional parameter to limit how many DAG
inputs can be queued at a time. The actual number of concurrent
DAG invocations may be higher than this, if there are already
inputs being processed by the DAG executors. If used, the
caller is responsible for preventing deadlock, i.e. if the
input queue is full, another asyncio task is reading from the
DAG output. It is only used when enable_asyncio=True.
max_buffered_results: The maximum number of execution results that
are allowed to be buffered. Setting a higher value allows more
DAGs to be executed before `ray.get()` must be called but also
increases the memory usage. Note that if the number of ongoing
executions is beyond the DAG capacity, the new execution would
be blocked in the first place; therefore, this limit is only
enforced when it is smaller than the DAG capacity.
max_inflight_executions: The maximum number of in-flight executions that
can be submitted via `execute` or `execute_async` before consuming
the output using `ray.get()`. If the caller submits more executions,
Expand All @@ -792,11 +776,6 @@ def __init__(

self._enable_asyncio: bool = enable_asyncio
self._fut_queue = asyncio.Queue()
self._asyncio_max_queue_size: Optional[int] = asyncio_max_queue_size
# TODO(rui): consider unify it with asyncio_max_queue_size
self._max_buffered_results: Optional[int] = max_buffered_results
if self._max_buffered_results is None:
self._max_buffered_results = ctx.max_buffered_results
self._max_inflight_executions = max_inflight_executions
if self._max_inflight_executions is None:
self._max_inflight_executions = ctx.max_inflight_executions
Expand Down Expand Up @@ -886,7 +865,7 @@ def __init__(
self._communicator_ids: Set[str] = set()
# The index of the current execution. It is incremented each time
# the DAG is executed.
self._execution_index: int = 0
self._execution_index: int = -1
# The maximum index of finished executions.
# All results with higher indexes have not been generated yet.
self._max_finished_execution_index: int = -1
Expand Down Expand Up @@ -925,12 +904,6 @@ def is_teardown(self) -> bool:
def communicator_ids(self) -> Set[str]:
return self._communicator_ids

def increment_max_finished_execution_index(self) -> None:
"""Increment the max finished execution index. It is used to
figure out the max number of in-flight requests to the DAG
"""
self._max_finished_execution_index += 1

def get_id(self) -> str:
"""
Get the unique ID of the compiled DAG.
Expand Down Expand Up @@ -1643,7 +1616,6 @@ def _get_or_compile(
self._dag_submitter = AwaitableBackgroundWriter(
self.dag_input_channels,
input_task.output_idxs,
self._asyncio_max_queue_size,
is_input=True,
)
self._dag_output_fetcher = AwaitableBackgroundReader(
Expand Down Expand Up @@ -1938,17 +1910,17 @@ def run(self):
monitor.start()
return monitor

def raise_if_too_many_inflight_requests(self):
num_in_flight_requests = (
def _raise_if_too_many_inflight_executions(self):
num_inflight_executions = (
self._execution_index - self._max_finished_execution_index
)
if num_in_flight_requests > self._max_inflight_executions:
) + len(self._result_buffer)
if num_inflight_executions >= self._max_inflight_executions:
raise ray.exceptions.RayCgraphCapacityExceeded(
f"There are {num_in_flight_requests} in-flight requests which "
"is more than specified _max_inflight_executions of the dag: "
f"{self._max_inflight_executions}. Retrieve the output using "
"ray.get before submitting more requests or increase "
"`max_inflight_executions`. "
"The compiled graph can't have more than "
f"{self._max_inflight_executions} in-flight executions, and you "
f"currently have {num_inflight_executions} in-flight executions. "
"Retrieve an output using ray.get before submitting more requests or "
"increase `_max_inflight_executions`. "
"`dag.experimental_compile(_max_inflight_executions=...)`"
)

Expand Down Expand Up @@ -2032,7 +2004,7 @@ def release_output_channel_buffers(self, execution_index: int):
timeout = ctx.get_timeout

while self._max_finished_execution_index < execution_index:
self.increment_max_finished_execution_index()
self._max_finished_execution_index += 1
start_time = time.monotonic()
self._dag_output_fetcher.release_channel_buffers(timeout)

Expand Down Expand Up @@ -2076,13 +2048,6 @@ def _execute_until(
timeout = ctx.get_timeout

while self._max_finished_execution_index < execution_index:
if len(self._result_buffer) >= self._max_buffered_results:
raise ValueError(
"Too many buffered results: the allowed max count for "
f"buffered results is {self._max_buffered_results}; call "
"ray.get() on previous CompiledDAGRefs to free them up "
"from buffer."
)
dayshah marked this conversation as resolved.
Show resolved Hide resolved
start_time = time.monotonic()

# Fetch results from each output channel up to execution_index and cache
Expand All @@ -2096,7 +2061,7 @@ def _execute_until(
"Otherwise, this may indicate that the execution is hanging."
) from e

self.increment_max_finished_execution_index()
self._max_finished_execution_index += 1
self._cache_execution_results(
self._max_finished_execution_index,
result,
Expand Down Expand Up @@ -2144,7 +2109,7 @@ def execute(
else:
inp = CompiledDAGArgs(args=args, kwargs=kwargs)

self.raise_if_too_many_inflight_requests()
self._raise_if_too_many_inflight_executions()
try:
self._dag_submitter.write(inp, self._submit_timeout)
except RayChannelTimeoutError as e:
Expand All @@ -2154,6 +2119,8 @@ def execute(
"seconds. Otherwise, this may indicate that execution is hanging."
) from e

self._execution_index += 1

if self._returns_list:
ref = [
CompiledDAGRef(self, self._execution_index, channel_index)
Expand All @@ -2162,7 +2129,6 @@ def execute(
else:
ref = CompiledDAGRef(self, self._execution_index)

self._execution_index += 1
return ref

def _check_inputs(self, args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -2217,12 +2183,14 @@ async def execute_async(
else:
inp = CompiledDAGArgs(args=args, kwargs=kwargs)

self.raise_if_too_many_inflight_requests()
self._raise_if_too_many_inflight_executions()
await self._dag_submitter.write(inp)
# Allocate a future that the caller can use to get the result.
fut = asyncio.Future()
await self._fut_queue.put(fut)

self._execution_index += 1

if self._returns_list:
fut = [
CompiledDAGFuture(self, self._execution_index, fut, channel_index)
Expand All @@ -2231,7 +2199,6 @@ async def execute_async(
else:
fut = CompiledDAGFuture(self, self._execution_index, fut)

self._execution_index += 1
return fut

def _visualize_ascii(self) -> str:
Expand Down Expand Up @@ -2834,17 +2801,13 @@ def build_compiled_dag_from_ray_dag(
submit_timeout: Optional[float] = None,
buffer_size_bytes: Optional[int] = None,
enable_asyncio: bool = False,
asyncio_max_queue_size: Optional[int] = None,
max_buffered_results: Optional[int] = None,
max_inflight_executions: Optional[int] = None,
overlap_gpu_communication: Optional[bool] = None,
) -> "CompiledDAG":
compiled_dag = CompiledDAG(
submit_timeout,
buffer_size_bytes,
enable_asyncio,
asyncio_max_queue_size,
max_buffered_results,
max_inflight_executions,
overlap_gpu_communication,
)
Expand Down
20 changes: 0 additions & 20 deletions python/ray/dag/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,6 @@
DEFAULT_TEARDOWN_TIMEOUT_S = int(os.environ.get("RAY_CGRAPH_teardown_timeout", 30))
# Default buffer size is 1MB.
DEFAULT_BUFFER_SIZE_BYTES = int(os.environ.get("RAY_CGRAPH_buffer_size_bytes", 1e6))
# Default asyncio_max_queue_size is 0, which means no limit.
DEFAULT_ASYNCIO_MAX_QUEUE_SIZE = int(
os.environ.get("RAY_CGRAPH_asyncio_max_queue_size", 0)
)
# The default max_buffered_results is 1000, and the default buffer size is 1 MB.
# The maximum memory usage for buffered results is 1 GB.
DEFAULT_MAX_BUFFERED_RESULTS = int(
os.environ.get("RAY_CGRAPH_max_buffered_results", 1000)
)
# The default number of in-flight executions that can be submitted before consuming the
# output.
DEFAULT_MAX_INFLIGHT_EXECUTIONS = int(
Expand Down Expand Up @@ -61,15 +52,6 @@ class DAGContext:
that can be passed between tasks in the DAG. The buffers will
be automatically resized if larger messages are written to the
channel.
asyncio_max_queue_size: The max queue size for the async execution.
It is only used when enable_asyncio=True.
max_buffered_results: The maximum number of execution results that
are allowed to be buffered. Setting a higher value allows more
DAGs to be executed before `ray.get()` must be called but also
increases the memory usage. Note that if the number of ongoing
executions is beyond the DAG capacity, the new execution would
be blocked in the first place; therefore, this limit is only
enforced when it is smaller than the DAG capacity.
max_inflight_executions: The maximum number of in-flight executions that
can be submitted via `execute` or `execute_async` before consuming
the output using `ray.get()`. If the caller submits more executions,
Expand All @@ -84,8 +66,6 @@ class DAGContext:
get_timeout: int = DEFAULT_GET_TIMEOUT_S
teardown_timeout: int = DEFAULT_TEARDOWN_TIMEOUT_S
buffer_size_bytes: int = DEFAULT_BUFFER_SIZE_BYTES
asyncio_max_queue_size: int = DEFAULT_ASYNCIO_MAX_QUEUE_SIZE
max_buffered_results: int = DEFAULT_MAX_BUFFERED_RESULTS
max_inflight_executions: int = DEFAULT_MAX_INFLIGHT_EXECUTIONS
overlap_gpu_communication: bool = DEFAULT_OVERLAP_GPU_COMMUNICATION

Expand Down
17 changes: 0 additions & 17 deletions python/ray/dag/dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ def experimental_compile(
_submit_timeout: Optional[float] = None,
_buffer_size_bytes: Optional[int] = None,
enable_asyncio: bool = False,
_asyncio_max_queue_size: Optional[int] = None,
_max_buffered_results: Optional[int] = None,
_max_inflight_executions: Optional[int] = None,
_overlap_gpu_communication: Optional[bool] = None,
) -> "ray.dag.CompiledDAG":
Expand All @@ -203,15 +201,6 @@ def experimental_compile(
be automatically resized if larger messages are written to the
channel.
enable_asyncio: Whether to enable asyncio for this DAG.
_asyncio_max_queue_size: The max queue size for the async execution.
It is only used when enable_asyncio=True.
_max_buffered_results: The maximum number of execution results that
are allowed to be buffered. Setting a higher value allows more
DAGs to be executed before `ray.get()` must be called but also
increases the memory usage. Note that if the number of ongoing
executions is beyond the DAG capacity, the new execution would
be blocked in the first place; therefore, this limit is only
enforced when it is smaller than the DAG capacity.
_max_inflight_executions: The maximum number of in-flight executions that
can be submitted via `execute` or `execute_async` before consuming
the output using `ray.get()`. If the caller submits more executions,
Expand All @@ -230,10 +219,6 @@ def experimental_compile(
ctx = DAGContext.get_current()
if _buffer_size_bytes is None:
_buffer_size_bytes = ctx.buffer_size_bytes
if _asyncio_max_queue_size is None:
_asyncio_max_queue_size = ctx.asyncio_max_queue_size
if _max_buffered_results is None:
_max_buffered_results = ctx.max_buffered_results

# Validate whether this DAG node has already been compiled.
if self.is_cgraph_output_node:
Expand All @@ -251,8 +236,6 @@ def experimental_compile(
_submit_timeout,
_buffer_size_bytes,
enable_asyncio,
_asyncio_max_queue_size,
_max_buffered_results,
_max_inflight_executions,
_overlap_gpu_communication,
)
Expand Down
Loading
Loading