Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Propagate iter stats for streaming_split #36908

Merged
merged 2 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions python/ray/data/_internal/iterator/stream_split_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ray.data._internal.execution.legacy_compat import execute_to_legacy_bundle_iterator
from ray.data._internal.execution.operators.output_splitter import OutputSplitter
from ray.data._internal.execution.streaming_executor import StreamingExecutor
from ray.data._internal.stats import DatasetStats
from ray.data._internal.stats import DatasetStats, DatasetStatsSummary
from ray.data.block import Block, BlockMetadata
from ray.data.iterator import DataIterator
from ray.types import ObjectRef
Expand Down Expand Up @@ -65,6 +65,7 @@ def __init__(
self._coord_actor = coord_actor
self._output_split_idx = output_split_idx
self._world_size = world_size
self._iter_stats = DatasetStats(stages={}, parent=None)

def _to_block_iterator(
self,
Expand Down Expand Up @@ -92,11 +93,15 @@ def gen_blocks() -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]:
)
yield block_ref

return gen_blocks(), None, False
return gen_blocks(), self._iter_stats, False

def stats(self) -> str:
"""Implements DataIterator."""
return ray.get(self._coord_actor.stats.remote())
# Merge the locally recorded iter stats and the remotely recorded
# stream execution stats.
summary = ray.get(self._coord_actor.stats.remote())
summary.iter_stats = self._iter_stats.to_summary().iter_stats
return summary.to_string()

def schema(self) -> Union[type, "pyarrow.lib.Schema"]:
"""Implements DataIterator."""
Expand Down Expand Up @@ -161,11 +166,11 @@ def add_split_op(dag):
self._next_epoch = gen_epochs()
self._output_iterator = None

def stats(self) -> str:
def stats(self) -> DatasetStatsSummary:
"""Returns stats from the base dataset."""
if self._executor:
return self._executor.get_stats().to_summary().to_string()
return self._base_dataset.stats()
return self._executor.get_stats().to_summary()
return self._base_dataset._get_stats_summary()

def start_epoch(self, split_idx: int) -> str:
"""Called to start an epoch.
Expand Down
13 changes: 13 additions & 0 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ def test_streaming_split_stats(ray_start_regular_shared):
# Workaround to preserve trailing whitespace in the above line without
# causing linter failures.
"* Extra metrics: {'num_output_N': N}\n"
"""
Dataset iterator time breakdown:
* Total time user code is blocked: T
* Total time in user code: T
* Total time overall: T
* Num blocks local: Z
* Num blocks remote: Z
* Num blocks unknown location: N
* Batch iteration time breakdown (summed across prefetch threads):
* In ray.get(): T min, T max, T avg, T total
* In batch creation: T min, T max, T avg, T total
* In batch formatting: T min, T max, T avg, T total
"""
)


Expand Down