diff --git a/doc/source/data/data-internals.rst b/doc/source/data/data-internals.rst index 75501a1207bf..2ad6c444b08b 100644 --- a/doc/source/data/data-internals.rst +++ b/doc/source/data/data-internals.rst @@ -254,7 +254,7 @@ You can tell if stage fusion is enabled by checking the :ref:`Dataset stats map_batches->shuffle_map: N/N blocks executed in T + Stage N read->map_batches->shuffle_map: N tasks executed, N blocks produced in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Output num rows: N min, N max, N mean, N total diff --git a/doc/source/data/inspecting-data.rst b/doc/source/data/inspecting-data.rst index 5f45dc38103d..7e6b4731bdf3 100644 --- a/doc/source/data/inspecting-data.rst +++ b/doc/source/data/inspecting-data.rst @@ -175,7 +175,7 @@ To view stats about your :class:`Datasets `, call :meth:`Datas .. testoutput:: :options: +MOCK - Stage 1 ReadCSV->Map()->Map(pause): 1/1 blocks executed in 0.23s + Operator 1 ReadCSV->SplitBlocks(4): 1 tasks executed, 4 blocks produced in 0.22s * Remote wall time: 222.1ms min, 222.1ms max, 222.1ms mean, 222.1ms total * Remote cpu time: 15.6ms min, 15.6ms max, 15.6ms mean, 15.6ms total * Peak heap memory usage (MiB): 157953.12 min, 157953.12 max, 157953 mean diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index b75ac7c2065f..32acffe2f16e 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -153,10 +153,10 @@ For example, the following code executes :func:`~ray.data.read_csv` with only on 2023-11-20 15:47:02,404 INFO split_read_output_blocks.py:101 -- Using autodetected parallelism=4 for stage ReadCSV to satisfy parallelism at least twice the available number of CPUs (2). 2023-11-20 15:47:02,405 INFO split_read_output_blocks.py:106 -- To satisfy the requested parallelism of 4, each read task output is split into 4 smaller blocks. ... - Stage 1 ReadCSV->SplitBlocks(4): 4/4 blocks executed in 0.01s + Operator 1 ReadCSV->SplitBlocks(4): 1 tasks executed, 4 blocks produced in 0.01s ... - Stage 2 Map(): 4/4 blocks executed in 0.03s + Operator 2 Map(): 4 tasks executed, 4 blocks produced in 0.3s ... To turn off this behavior and allow the read and map stages to be fused, set ``parallelism`` manually. @@ -181,7 +181,7 @@ For example, this code sets ``parallelism`` to equal the number of files: :options: +MOCK ... - Stage 1 ReadCSV->Map(): 1/1 blocks executed in 0.03s + Operator 1 ReadCSV->Map(): 1 tasks executed, 1 blocks produced in 0.01s ... @@ -270,7 +270,7 @@ Because the default ``batch_size`` for :func:`~ray.data.Dataset.map_batches` is .. testoutput:: :options: +MOCK - Stage 1 ReadRange->MapBatches(): 7/7 blocks executed in 2.99s + Operator 1 ReadRange->MapBatches(): 1 tasks executed, 7 blocks produced in 1.33s ... * Peak heap memory usage (MiB): 3302.17 min, 4233.51 max, 4100 mean * Output num rows: 125 min, 125 max, 125 mean, 1000 total @@ -298,7 +298,7 @@ Setting a lower batch size produces lower peak heap memory usage: .. testoutput:: :options: +MOCK - Stage 1 ReadRange->MapBatches(): 7/7 blocks executed in 1.08s + Operator 1 ReadRange->MapBatches(): 1 tasks executed, 7 blocks produced in 0.51s ... * Peak heap memory usage (MiB): 587.09 min, 1569.57 max, 1207 mean * Output num rows: 40 min, 160 max, 142 mean, 1000 total @@ -376,23 +376,23 @@ To illustrate these, the following code uses both strategies to coalesce the 10 :options: +MOCK # 1. ds.repartition() output. - Stage 1 ReadRange: 10/10 blocks executed in 0.45s + Operator 1 ReadRange: 10 tasks executed, 10 blocks produced in 0.33s ... * Output num rows: 1 min, 1 max, 1 mean, 10 total ... - Stage 2 Repartition: executed in 0.53s + Operator 2 Repartition: executed in 0.36s - Substage 0 RepartitionSplit: 10/10 blocks executed + Suboperator 0 RepartitionSplit: 10 tasks executed, 10 blocks produced ... - Substage 1 RepartitionReduce: 1/1 blocks executed + Suboperator 1 RepartitionReduce: 1 tasks executed, 1 blocks produced ... * Output num rows: 10 min, 10 max, 10 mean, 10 total ... # 2. ds.map_batches() output. - Stage 1 ReadRange->MapBatches(): 1/1 blocks executed in 0s + Operator 1 ReadRange->MapBatches(): 1 tasks executed, 1 blocks produced in 0s ... * Output num rows: 10 min, 10 max, 10 mean, 10 total diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 4ac30f252aad..2160e70c7f79 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4729,7 +4729,7 @@ def stats(self) -> str: .. testoutput:: :options: +MOCK - Stage 0 Read: 20/20 blocks executed in 0.3s + Operator 0 Read: 1 tasks executed, 5 blocks produced in 0s * Remote wall time: 16.29us min, 7.29ms max, 1.21ms mean, 24.17ms total * Remote cpu time: 16.0us min, 2.54ms max, 810.45us mean, 16.21ms total * Peak heap memory usage (MiB): 137968.75 min, 142734.38 max, 139846 mean diff --git a/python/ray/data/tests/test_streaming_integration.py b/python/ray/data/tests/test_streaming_integration.py index 4598944b8fd6..0aeee0992526 100644 --- a/python/ray/data/tests/test_streaming_integration.py +++ b/python/ray/data/tests/test_streaming_integration.py @@ -391,7 +391,7 @@ def func(x): assert num_finished < 20, num_finished # Check intermediate stats reporting. stats = ds.stats() - assert "100/100 blocks executed" not in stats, stats + assert "100 tasks executed" not in stats, stats # Check we can get the rest. for rest in it: @@ -399,7 +399,7 @@ def func(x): assert ray.get(counter.get.remote()) == 100 # Check final stats reporting. stats = ds.stats() - assert "100/100 blocks executed" in stats, stats + assert "100 tasks executed" in stats, stats def test_e2e_liveness_with_output_backpressure_edge_case(