diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 4d1f884c7cc4d..48a2d12c9f57a 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -740,8 +740,6 @@ def __init__( submit_timeout: Optional[float] = None, buffer_size_bytes: Optional[int] = None, enable_asyncio: bool = False, - asyncio_max_queue_size: Optional[int] = None, - max_buffered_results: Optional[int] = None, max_inflight_executions: Optional[int] = None, overlap_gpu_communication: Optional[bool] = None, ): @@ -759,20 +757,6 @@ def __init__( be running in an event loop and must use `execute_async` to invoke the DAG. Otherwise, the caller should use `execute` to invoke the DAG. - asyncio_max_queue_size: Optional parameter to limit how many DAG - inputs can be queued at a time. The actual number of concurrent - DAG invocations may be higher than this, if there are already - inputs being processed by the DAG executors. If used, the - caller is responsible for preventing deadlock, i.e. if the - input queue is full, another asyncio task is reading from the - DAG output. It is only used when enable_asyncio=True. - max_buffered_results: The maximum number of execution results that - are allowed to be buffered. Setting a higher value allows more - DAGs to be executed before `ray.get()` must be called but also - increases the memory usage. Note that if the number of ongoing - executions is beyond the DAG capacity, the new execution would - be blocked in the first place; therefore, this limit is only - enforced when it is smaller than the DAG capacity. max_inflight_executions: The maximum number of in-flight executions that can be submitted via `execute` or `execute_async` before consuming the output using `ray.get()`. If the caller submits more executions, @@ -792,11 +776,6 @@ def __init__( self._enable_asyncio: bool = enable_asyncio self._fut_queue = asyncio.Queue() - self._asyncio_max_queue_size: Optional[int] = asyncio_max_queue_size - # TODO(rui): consider unify it with asyncio_max_queue_size - self._max_buffered_results: Optional[int] = max_buffered_results - if self._max_buffered_results is None: - self._max_buffered_results = ctx.max_buffered_results self._max_inflight_executions = max_inflight_executions if self._max_inflight_executions is None: self._max_inflight_executions = ctx.max_inflight_executions @@ -886,7 +865,7 @@ def __init__( self._communicator_ids: Set[str] = set() # The index of the current execution. It is incremented each time # the DAG is executed. - self._execution_index: int = 0 + self._execution_index: int = -1 # The maximum index of finished executions. # All results with higher indexes have not been generated yet. self._max_finished_execution_index: int = -1 @@ -925,12 +904,6 @@ def is_teardown(self) -> bool: def communicator_ids(self) -> Set[str]: return self._communicator_ids - def increment_max_finished_execution_index(self) -> None: - """Increment the max finished execution index. It is used to - figure out the max number of in-flight requests to the DAG - """ - self._max_finished_execution_index += 1 - def get_id(self) -> str: """ Get the unique ID of the compiled DAG. @@ -1643,7 +1616,6 @@ def _get_or_compile( self._dag_submitter = AwaitableBackgroundWriter( self.dag_input_channels, input_task.output_idxs, - self._asyncio_max_queue_size, is_input=True, ) self._dag_output_fetcher = AwaitableBackgroundReader( @@ -1938,17 +1910,17 @@ def run(self): monitor.start() return monitor - def raise_if_too_many_inflight_requests(self): - num_in_flight_requests = ( + def _raise_if_too_many_inflight_executions(self): + num_inflight_executions = ( self._execution_index - self._max_finished_execution_index - ) - if num_in_flight_requests > self._max_inflight_executions: + ) + len(self._result_buffer) + if num_inflight_executions >= self._max_inflight_executions: raise ray.exceptions.RayCgraphCapacityExceeded( - f"There are {num_in_flight_requests} in-flight requests which " - "is more than specified _max_inflight_executions of the dag: " - f"{self._max_inflight_executions}. Retrieve the output using " - "ray.get before submitting more requests or increase " - "`max_inflight_executions`. " + "The compiled graph can't have more than " + f"{self._max_inflight_executions} in-flight executions, and you " + f"currently have {num_inflight_executions} in-flight executions. " + "Retrieve an output using ray.get before submitting more requests or " + "increase `_max_inflight_executions`. " "`dag.experimental_compile(_max_inflight_executions=...)`" ) @@ -2032,7 +2004,7 @@ def release_output_channel_buffers(self, execution_index: int): timeout = ctx.get_timeout while self._max_finished_execution_index < execution_index: - self.increment_max_finished_execution_index() + self._max_finished_execution_index += 1 start_time = time.monotonic() self._dag_output_fetcher.release_channel_buffers(timeout) @@ -2076,13 +2048,6 @@ def _execute_until( timeout = ctx.get_timeout while self._max_finished_execution_index < execution_index: - if len(self._result_buffer) >= self._max_buffered_results: - raise ValueError( - "Too many buffered results: the allowed max count for " - f"buffered results is {self._max_buffered_results}; call " - "ray.get() on previous CompiledDAGRefs to free them up " - "from buffer." - ) start_time = time.monotonic() # Fetch results from each output channel up to execution_index and cache @@ -2096,7 +2061,7 @@ def _execute_until( "Otherwise, this may indicate that the execution is hanging." ) from e - self.increment_max_finished_execution_index() + self._max_finished_execution_index += 1 self._cache_execution_results( self._max_finished_execution_index, result, @@ -2144,7 +2109,7 @@ def execute( else: inp = CompiledDAGArgs(args=args, kwargs=kwargs) - self.raise_if_too_many_inflight_requests() + self._raise_if_too_many_inflight_executions() try: self._dag_submitter.write(inp, self._submit_timeout) except RayChannelTimeoutError as e: @@ -2154,6 +2119,8 @@ def execute( "seconds. Otherwise, this may indicate that execution is hanging." ) from e + self._execution_index += 1 + if self._returns_list: ref = [ CompiledDAGRef(self, self._execution_index, channel_index) @@ -2162,7 +2129,6 @@ def execute( else: ref = CompiledDAGRef(self, self._execution_index) - self._execution_index += 1 return ref def _check_inputs(self, args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> None: @@ -2217,12 +2183,14 @@ async def execute_async( else: inp = CompiledDAGArgs(args=args, kwargs=kwargs) - self.raise_if_too_many_inflight_requests() + self._raise_if_too_many_inflight_executions() await self._dag_submitter.write(inp) # Allocate a future that the caller can use to get the result. fut = asyncio.Future() await self._fut_queue.put(fut) + self._execution_index += 1 + if self._returns_list: fut = [ CompiledDAGFuture(self, self._execution_index, fut, channel_index) @@ -2231,7 +2199,6 @@ async def execute_async( else: fut = CompiledDAGFuture(self, self._execution_index, fut) - self._execution_index += 1 return fut def _visualize_ascii(self) -> str: @@ -2834,8 +2801,6 @@ def build_compiled_dag_from_ray_dag( submit_timeout: Optional[float] = None, buffer_size_bytes: Optional[int] = None, enable_asyncio: bool = False, - asyncio_max_queue_size: Optional[int] = None, - max_buffered_results: Optional[int] = None, max_inflight_executions: Optional[int] = None, overlap_gpu_communication: Optional[bool] = None, ) -> "CompiledDAG": @@ -2843,8 +2808,6 @@ def build_compiled_dag_from_ray_dag( submit_timeout, buffer_size_bytes, enable_asyncio, - asyncio_max_queue_size, - max_buffered_results, max_inflight_executions, overlap_gpu_communication, ) diff --git a/python/ray/dag/context.py b/python/ray/dag/context.py index 25fbd8bd173b0..11351cd9a45dd 100644 --- a/python/ray/dag/context.py +++ b/python/ray/dag/context.py @@ -13,15 +13,6 @@ DEFAULT_TEARDOWN_TIMEOUT_S = int(os.environ.get("RAY_CGRAPH_teardown_timeout", 30)) # Default buffer size is 1MB. DEFAULT_BUFFER_SIZE_BYTES = int(os.environ.get("RAY_CGRAPH_buffer_size_bytes", 1e6)) -# Default asyncio_max_queue_size is 0, which means no limit. -DEFAULT_ASYNCIO_MAX_QUEUE_SIZE = int( - os.environ.get("RAY_CGRAPH_asyncio_max_queue_size", 0) -) -# The default max_buffered_results is 1000, and the default buffer size is 1 MB. -# The maximum memory usage for buffered results is 1 GB. -DEFAULT_MAX_BUFFERED_RESULTS = int( - os.environ.get("RAY_CGRAPH_max_buffered_results", 1000) -) # The default number of in-flight executions that can be submitted before consuming the # output. DEFAULT_MAX_INFLIGHT_EXECUTIONS = int( @@ -61,15 +52,6 @@ class DAGContext: that can be passed between tasks in the DAG. The buffers will be automatically resized if larger messages are written to the channel. - asyncio_max_queue_size: The max queue size for the async execution. - It is only used when enable_asyncio=True. - max_buffered_results: The maximum number of execution results that - are allowed to be buffered. Setting a higher value allows more - DAGs to be executed before `ray.get()` must be called but also - increases the memory usage. Note that if the number of ongoing - executions is beyond the DAG capacity, the new execution would - be blocked in the first place; therefore, this limit is only - enforced when it is smaller than the DAG capacity. max_inflight_executions: The maximum number of in-flight executions that can be submitted via `execute` or `execute_async` before consuming the output using `ray.get()`. If the caller submits more executions, @@ -84,8 +66,6 @@ class DAGContext: get_timeout: int = DEFAULT_GET_TIMEOUT_S teardown_timeout: int = DEFAULT_TEARDOWN_TIMEOUT_S buffer_size_bytes: int = DEFAULT_BUFFER_SIZE_BYTES - asyncio_max_queue_size: int = DEFAULT_ASYNCIO_MAX_QUEUE_SIZE - max_buffered_results: int = DEFAULT_MAX_BUFFERED_RESULTS max_inflight_executions: int = DEFAULT_MAX_INFLIGHT_EXECUTIONS overlap_gpu_communication: bool = DEFAULT_OVERLAP_GPU_COMMUNICATION diff --git a/python/ray/dag/dag_node.py b/python/ray/dag/dag_node.py index 9f99e265c08e7..6211c6e85e3c5 100644 --- a/python/ray/dag/dag_node.py +++ b/python/ray/dag/dag_node.py @@ -186,8 +186,6 @@ def experimental_compile( _submit_timeout: Optional[float] = None, _buffer_size_bytes: Optional[int] = None, enable_asyncio: bool = False, - _asyncio_max_queue_size: Optional[int] = None, - _max_buffered_results: Optional[int] = None, _max_inflight_executions: Optional[int] = None, _overlap_gpu_communication: Optional[bool] = None, ) -> "ray.dag.CompiledDAG": @@ -203,15 +201,6 @@ def experimental_compile( be automatically resized if larger messages are written to the channel. enable_asyncio: Whether to enable asyncio for this DAG. - _asyncio_max_queue_size: The max queue size for the async execution. - It is only used when enable_asyncio=True. - _max_buffered_results: The maximum number of execution results that - are allowed to be buffered. Setting a higher value allows more - DAGs to be executed before `ray.get()` must be called but also - increases the memory usage. Note that if the number of ongoing - executions is beyond the DAG capacity, the new execution would - be blocked in the first place; therefore, this limit is only - enforced when it is smaller than the DAG capacity. _max_inflight_executions: The maximum number of in-flight executions that can be submitted via `execute` or `execute_async` before consuming the output using `ray.get()`. If the caller submits more executions, @@ -230,10 +219,6 @@ def experimental_compile( ctx = DAGContext.get_current() if _buffer_size_bytes is None: _buffer_size_bytes = ctx.buffer_size_bytes - if _asyncio_max_queue_size is None: - _asyncio_max_queue_size = ctx.asyncio_max_queue_size - if _max_buffered_results is None: - _max_buffered_results = ctx.max_buffered_results # Validate whether this DAG node has already been compiled. if self.is_cgraph_output_node: @@ -251,8 +236,6 @@ def experimental_compile( _submit_timeout, _buffer_size_bytes, enable_asyncio, - _asyncio_max_queue_size, - _max_buffered_results, _max_inflight_executions, _overlap_gpu_communication, ) diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index 36e9101606cac..a0283e95cbeac 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -1329,72 +1329,6 @@ def test_compile_twice_with_different_nodes(self, ray_start_regular): branch2.experimental_compile() -def test_exceed_max_buffered_results(ray_start_regular): - a = Actor.remote(0) - with InputNode() as i: - dag = a.inc.bind(i) - - compiled_dag = dag.experimental_compile(_max_buffered_results=1) - - refs = [] - for i in range(2): - ref = compiled_dag.execute(1) - # Hold the refs to avoid get() being called on the ref - # when it goes out of scope - refs.append(ref) - - # ray.get() on the 2nd ref fails because the DAG cannot buffer 2 results. - with pytest.raises( - ValueError, - match=( - "Too many buffered results: the allowed max count for buffered " - "results is 1; call ray.get\(\) on previous CompiledDAGRefs to " - "free them up from buffer" - ), - ): - ray.get(ref) - - del refs - - -@pytest.mark.parametrize("single_fetch", [True, False]) -def test_exceed_max_buffered_results_multi_output(ray_start_regular, single_fetch): - a = Actor.remote(0) - b = Actor.remote(0) - with InputNode() as inp: - dag = MultiOutputNode([a.inc.bind(inp), b.inc.bind(inp)]) - - compiled_dag = dag.experimental_compile(_max_buffered_results=1) - - refs = [] - for i in range(2): - ref = compiled_dag.execute(1) - # Hold the refs to avoid get() being called on the ref - # when it goes out of scope - refs.append(ref) - - if single_fetch: - # If there are results not fetched from an execution, that execution - # still counts towards the number of buffered results. - ray.get(refs[0][0]) - - # ray.get() on the 2nd ref fails because the DAG cannot buffer 2 results. - with pytest.raises( - ValueError, - match=( - "Too many buffered results: the allowed max count for buffered " - "results is 1; call ray.get\(\) on previous CompiledDAGRefs to " - "free them up from buffer" - ), - ): - if single_fetch: - ray.get(ref[0]) - else: - ray.get(ref) - - del refs - - def test_compiled_dag_ref_del(ray_start_regular): a = Actor.remote(0) with InputNode() as inp: @@ -1576,16 +1510,13 @@ def test_dag_teardown_while_running(ray_start_regular): assert result == 0.1 -@pytest.mark.parametrize("max_queue_size", [None, 2]) -def test_asyncio(ray_start_regular, max_queue_size): +def test_asyncio(ray_start_regular): a = Actor.remote(0) with InputNode() as i: dag = a.echo.bind(i) loop = get_or_create_event_loop() - compiled_dag = dag.experimental_compile( - enable_asyncio=True, _asyncio_max_queue_size=max_queue_size - ) + compiled_dag = dag.experimental_compile(enable_asyncio=True) async def main(i): # Use numpy so that the return value will be zero-copy deserialized. If @@ -1599,16 +1530,13 @@ async def main(i): loop.run_until_complete(asyncio.gather(*[main(i) for i in range(10)])) -@pytest.mark.parametrize("max_queue_size", [None, 2]) -def test_asyncio_out_of_order_get(ray_start_regular, max_queue_size): +def test_asyncio_out_of_order_get(ray_start_regular): c = Collector.remote() with InputNode() as i: dag = c.collect.bind(i) loop = get_or_create_event_loop() - compiled_dag = dag.experimental_compile( - enable_asyncio=True, _asyncio_max_queue_size=max_queue_size - ) + compiled_dag = dag.experimental_compile(enable_asyncio=True) async def main(): fut_a = await compiled_dag.execute_async("a") @@ -1622,18 +1550,15 @@ async def main(): loop.run_until_complete(main()) -@pytest.mark.parametrize("max_queue_size", [None, 2]) @pytest.mark.parametrize("gather_futs", [True, False]) -def test_asyncio_multi_output(ray_start_regular, max_queue_size, gather_futs): +def test_asyncio_multi_output(ray_start_regular, gather_futs): a = Actor.remote(0) b = Actor.remote(0) with InputNode() as i: dag = MultiOutputNode([a.echo.bind(i), b.echo.bind(i)]) loop = get_or_create_event_loop() - compiled_dag = dag.experimental_compile( - enable_asyncio=True, _asyncio_max_queue_size=max_queue_size - ) + compiled_dag = dag.experimental_compile(enable_asyncio=True) async def main(i): # Use numpy so that the return value will be zero-copy deserialized. If @@ -1656,16 +1581,13 @@ async def main(i): loop.run_until_complete(asyncio.gather(*[main(i) for i in range(10)])) -@pytest.mark.parametrize("max_queue_size", [None, 2]) -def test_asyncio_exceptions(ray_start_regular, max_queue_size): +def test_asyncio_exceptions(ray_start_regular): a = Actor.remote(0) with InputNode() as i: dag = a.inc.bind(i) loop = get_or_create_event_loop() - compiled_dag = dag.experimental_compile( - enable_asyncio=True, _asyncio_max_queue_size=max_queue_size - ) + compiled_dag = dag.experimental_compile(enable_asyncio=True) async def main(): fut = await compiled_dag.execute_async(1) @@ -2242,6 +2164,97 @@ async def main(): loop.run_until_complete(main()) +def test_inflight_requests_exceed_capacity(ray_start_regular): + expected_error_message = ( + "The compiled graph can't have more than 2 " + "in-flight executions, and you currently have 2 " + "in-flight executions. Retrieve an output using ray.get before " + "submitting more requests or increase `_max_inflight_executions`. " + ) + a = Actor.remote(0) + with InputNode() as inp: + dag = a.sleep.bind(inp) + compiled_dag = dag.experimental_compile(_max_inflight_executions=2) + ref1 = compiled_dag.execute(1) + ref2 = compiled_dag.execute(1) + with pytest.raises( + ray.exceptions.RayCgraphCapacityExceeded, + match=(expected_error_message), + ): + _ = compiled_dag.execute(1) + + # test same with asyncio + async def main(): + a = Actor.remote(0) + with InputNode() as inp: + dag = a.sleep.bind(inp) + async_compiled_dag = dag.experimental_compile( + enable_asyncio=True, _max_inflight_executions=2 + ) + ref1 = await async_compiled_dag.execute_async(1) + ref2 = await async_compiled_dag.execute_async(1) + print(async_compiled_dag._execution_index) + with pytest.raises( + ray.exceptions.RayCgraphCapacityExceeded, + match=(expected_error_message), + ): + _ = await async_compiled_dag.execute_async(1) + (ref1, ref2) + + loop = get_or_create_event_loop() + loop.run_until_complete(main()) + # to show variables are being used and avoid destruction since + # CompiledDagRef __del__ will release buffers and + # increment _max_finished_execution_index + (ref1, ref2) + + +def test_result_buffer_exceeds_capacity(ray_start_regular): + expected_error_message = ( + "The compiled graph can't have more than 2 " + "in-flight executions, and you currently have 2 " + "in-flight executions. Retrieve an output using ray.get before " + "submitting more requests or increase `_max_inflight_executions`. " + ) + a = Actor.remote(0) + with InputNode() as inp: + dag = a.inc.bind(inp) + compiled_dag = dag.experimental_compile(_max_inflight_executions=2) + ref1 = compiled_dag.execute(1) + ref2 = compiled_dag.execute(2) + ray.get(ref2) + ref3 = compiled_dag.execute(3) + with pytest.raises( + ray.exceptions.RayCgraphCapacityExceeded, + match=(expected_error_message), + ): + _ = compiled_dag.execute(4) + + # test same with asyncio + async def main(): + a = Actor.remote(0) + with InputNode() as inp: + dag = a.inc.bind(inp) + async_compiled_dag = dag.experimental_compile( + enable_asyncio=True, _max_inflight_executions=2 + ) + ref1 = await async_compiled_dag.execute_async(1) + ref2 = await async_compiled_dag.execute_async(2) + await ref2 + ref3 = await async_compiled_dag.execute_async(3) + with pytest.raises( + ray.exceptions.RayCgraphCapacityExceeded, + match=(expected_error_message), + ): + _ = await async_compiled_dag.execute_async(4) + (ref1, ref3) + + loop = get_or_create_event_loop() + loop.run_until_complete(main()) + # same reason as comment for test_inflight_requests_exceed_capacity + (ref1, ref3) + + def test_event_profiling(ray_start_regular, monkeypatch): monkeypatch.setattr(ray.dag.constants, "RAY_CGRAPH_ENABLE_PROFILING", True) diff --git a/python/ray/experimental/channel/common.py b/python/ray/experimental/channel/common.py index 65d4c6961cd3d..59de5cf1cf078 100644 --- a/python/ray/experimental/channel/common.py +++ b/python/ray/experimental/channel/common.py @@ -533,16 +533,10 @@ def __init__( self, output_channels: List[ChannelInterface], output_idxs: List[Optional[Union[int, str]]], - max_queue_size: Optional[int] = None, is_input=False, ): super().__init__(output_channels, output_idxs, is_input=is_input) - if max_queue_size is None: - from ray.dag import DAGContext - - ctx = DAGContext.get_current() - max_queue_size = ctx.asyncio_max_queue_size - self._queue = asyncio.Queue(max_queue_size) + self._queue = asyncio.Queue() self._background_task = None self._background_task_executor = concurrent.futures.ThreadPoolExecutor( max_workers=1, thread_name_prefix="channel.AwaitableBackgroundWriter" diff --git a/python/ray/experimental/compiled_dag_ref.py b/python/ray/experimental/compiled_dag_ref.py index 4b683428943ab..0250232a13be7 100644 --- a/python/ray/experimental/compiled_dag_ref.py +++ b/python/ray/experimental/compiled_dag_ref.py @@ -206,8 +206,8 @@ def __await__(self): if not self._dag._has_execution_results(self._execution_index): result = yield from fut.__await__() + self._dag._max_finished_execution_index += 1 self._dag._cache_execution_results(self._execution_index, result) - self._dag.increment_max_finished_execution_index() return_vals = self._dag._get_execution_results( self._execution_index, self._channel_index