Skip to content

Commit

Permalink
[Data] Update OperatorStatsSummary related test and docstrings ray-pr…
Browse files Browse the repository at this point in the history
…oject#42147

Followup to ray-project#41873, update some remaining docstrings and failing tests which check the raw string output of OperatorStatsSummary.

Signed-off-by: Scott Lee <sjl@anyscale.com>
  • Loading branch information
scottjlee authored and vickytsang committed Jan 12, 2024
1 parent 05d384b commit 866d8f4
Showing 5 changed files with 15 additions and 15 deletions.
2 changes: 1 addition & 1 deletion doc/source/data/data-internals.rst
Original file line number Diff line number Diff line change
@@ -254,7 +254,7 @@ You can tell if stage fusion is enabled by checking the :ref:`Dataset stats <dat

.. code-block::
Stage N read->map_batches->shuffle_map: N/N blocks executed in T
Stage N read->map_batches->shuffle_map: N tasks executed, N blocks produced in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Output num rows: N min, N max, N mean, N total
2 changes: 1 addition & 1 deletion doc/source/data/inspecting-data.rst
Original file line number Diff line number Diff line change
@@ -175,7 +175,7 @@ To view stats about your :class:`Datasets <ray.data.Dataset>`, call :meth:`Datas
.. testoutput::
:options: +MOCK

Stage 1 ReadCSV->Map(<lambda>)->Map(pause): 1/1 blocks executed in 0.23s
Operator 1 ReadCSV->SplitBlocks(4): 1 tasks executed, 4 blocks produced in 0.22s
* Remote wall time: 222.1ms min, 222.1ms max, 222.1ms mean, 222.1ms total
* Remote cpu time: 15.6ms min, 15.6ms max, 15.6ms mean, 15.6ms total
* Peak heap memory usage (MiB): 157953.12 min, 157953.12 max, 157953 mean
20 changes: 10 additions & 10 deletions doc/source/data/performance-tips.rst
Original file line number Diff line number Diff line change
@@ -153,10 +153,10 @@ For example, the following code executes :func:`~ray.data.read_csv` with only on
2023-11-20 15:47:02,404 INFO split_read_output_blocks.py:101 -- Using autodetected parallelism=4 for stage ReadCSV to satisfy parallelism at least twice the available number of CPUs (2).
2023-11-20 15:47:02,405 INFO split_read_output_blocks.py:106 -- To satisfy the requested parallelism of 4, each read task output is split into 4 smaller blocks.
...
Stage 1 ReadCSV->SplitBlocks(4): 4/4 blocks executed in 0.01s
Operator 1 ReadCSV->SplitBlocks(4): 1 tasks executed, 4 blocks produced in 0.01s
...

Stage 2 Map(<lambda>): 4/4 blocks executed in 0.03s
Operator 2 Map(<lambda>): 4 tasks executed, 4 blocks produced in 0.3s
...

To turn off this behavior and allow the read and map stages to be fused, set ``parallelism`` manually.
@@ -181,7 +181,7 @@ For example, this code sets ``parallelism`` to equal the number of files:
:options: +MOCK

...
Stage 1 ReadCSV->Map(<lambda>): 1/1 blocks executed in 0.03s
Operator 1 ReadCSV->Map(<lambda>): 1 tasks executed, 1 blocks produced in 0.01s
...


@@ -270,7 +270,7 @@ Because the default ``batch_size`` for :func:`~ray.data.Dataset.map_batches` is
.. testoutput::
:options: +MOCK

Stage 1 ReadRange->MapBatches(<lambda>): 7/7 blocks executed in 2.99s
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 7 blocks produced in 1.33s
...
* Peak heap memory usage (MiB): 3302.17 min, 4233.51 max, 4100 mean
* Output num rows: 125 min, 125 max, 125 mean, 1000 total
@@ -298,7 +298,7 @@ Setting a lower batch size produces lower peak heap memory usage:
.. testoutput::
:options: +MOCK

Stage 1 ReadRange->MapBatches(<lambda>): 7/7 blocks executed in 1.08s
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 7 blocks produced in 0.51s
...
* Peak heap memory usage (MiB): 587.09 min, 1569.57 max, 1207 mean
* Output num rows: 40 min, 160 max, 142 mean, 1000 total
@@ -376,23 +376,23 @@ To illustrate these, the following code uses both strategies to coalesce the 10
:options: +MOCK

# 1. ds.repartition() output.
Stage 1 ReadRange: 10/10 blocks executed in 0.45s
Operator 1 ReadRange: 10 tasks executed, 10 blocks produced in 0.33s
...
* Output num rows: 1 min, 1 max, 1 mean, 10 total
...
Stage 2 Repartition: executed in 0.53s
Operator 2 Repartition: executed in 0.36s

Substage 0 RepartitionSplit: 10/10 blocks executed
Suboperator 0 RepartitionSplit: 10 tasks executed, 10 blocks produced
...

Substage 1 RepartitionReduce: 1/1 blocks executed
Suboperator 1 RepartitionReduce: 1 tasks executed, 1 blocks produced
...
* Output num rows: 10 min, 10 max, 10 mean, 10 total
...


# 2. ds.map_batches() output.
Stage 1 ReadRange->MapBatches(<lambda>): 1/1 blocks executed in 0s
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 1 blocks produced in 0s
...
* Output num rows: 10 min, 10 max, 10 mean, 10 total

2 changes: 1 addition & 1 deletion python/ray/data/dataset.py
Original file line number Diff line number Diff line change
@@ -4729,7 +4729,7 @@ def stats(self) -> str:
.. testoutput::
:options: +MOCK
Stage 0 Read: 20/20 blocks executed in 0.3s
Operator 0 Read: 1 tasks executed, 5 blocks produced in 0s
* Remote wall time: 16.29us min, 7.29ms max, 1.21ms mean, 24.17ms total
* Remote cpu time: 16.0us min, 2.54ms max, 810.45us mean, 16.21ms total
* Peak heap memory usage (MiB): 137968.75 min, 142734.38 max, 139846 mean
4 changes: 2 additions & 2 deletions python/ray/data/tests/test_streaming_integration.py
Original file line number Diff line number Diff line change
@@ -391,15 +391,15 @@ def func(x):
assert num_finished < 20, num_finished
# Check intermediate stats reporting.
stats = ds.stats()
assert "100/100 blocks executed" not in stats, stats
assert "100 tasks executed" not in stats, stats

# Check we can get the rest.
for rest in it:
pass
assert ray.get(counter.get.remote()) == 100
# Check final stats reporting.
stats = ds.stats()
assert "100/100 blocks executed" in stats, stats
assert "100 tasks executed" in stats, stats


def test_e2e_liveness_with_output_backpressure_edge_case(

0 comments on commit 866d8f4

Please sign in to comment.