From b9fe263ebd25bc10e74909d277c37ad9f373acf0 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 28 Jun 2023 10:40:15 -0700 Subject: [PATCH 1/2] fix iter stats --- .../_internal/iterator/stream_split_iterator.py | 15 +++++++++------ python/ray/data/tests/test_stats.py | 13 +++++++++++++ 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/python/ray/data/_internal/iterator/stream_split_iterator.py b/python/ray/data/_internal/iterator/stream_split_iterator.py index c6c27a7a4694..9aedb25ebe76 100644 --- a/python/ray/data/_internal/iterator/stream_split_iterator.py +++ b/python/ray/data/_internal/iterator/stream_split_iterator.py @@ -10,7 +10,7 @@ from ray.data._internal.execution.legacy_compat import execute_to_legacy_bundle_iterator from ray.data._internal.execution.operators.output_splitter import OutputSplitter from ray.data._internal.execution.streaming_executor import StreamingExecutor -from ray.data._internal.stats import DatasetStats +from ray.data._internal.stats import DatasetStats, DatasetStatsSummary from ray.data.block import Block, BlockMetadata from ray.data.iterator import DataIterator from ray.types import ObjectRef @@ -65,6 +65,7 @@ def __init__( self._coord_actor = coord_actor self._output_split_idx = output_split_idx self._world_size = world_size + self._iter_stats = DatasetStats(stages={}, parent=None) def _to_block_iterator( self, @@ -92,11 +93,13 @@ def gen_blocks() -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]: ) yield block_ref - return gen_blocks(), None, False + return gen_blocks(), self._iter_stats, False def stats(self) -> str: """Implements DataIterator.""" - return ray.get(self._coord_actor.stats.remote()) + summary = ray.get(self._coord_actor.stats.remote()) + summary.iter_stats = self._iter_stats.to_summary().iter_stats + return summary.to_string() def schema(self) -> Union[type, "pyarrow.lib.Schema"]: """Implements DataIterator.""" @@ -161,11 +164,11 @@ def add_split_op(dag): self._next_epoch = gen_epochs() self._output_iterator = None - def stats(self) -> str: + def stats(self) -> DatasetStatsSummary: """Returns stats from the base dataset.""" if self._executor: - return self._executor.get_stats().to_summary().to_string() - return self._base_dataset.stats() + return self._executor.get_stats().to_summary() + return self._base_dataset._get_stats_summary() def start_epoch(self, split_idx: int) -> str: """Called to start an epoch. diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 01c41a24128d..8ddeddb43120 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -64,6 +64,19 @@ def test_streaming_split_stats(ray_start_regular_shared): # Workaround to preserve trailing whitespace in the above line without # causing linter failures. "* Extra metrics: {'num_output_N': N}\n" + """ +Dataset iterator time breakdown: +* Total time user code is blocked: T +* Total time in user code: T +* Total time overall: T +* Num blocks local: Z +* Num blocks remote: Z +* Num blocks unknown location: N +* Batch iteration time breakdown (summed across prefetch threads): + * In ray.get(): T min, T max, T avg, T total + * In batch creation: T min, T max, T avg, T total + * In batch formatting: T min, T max, T avg, T total +""" ) From fa4bde37585dcbc6035cd44f951c439ec7a2e3ad Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 28 Jun 2023 10:43:25 -0700 Subject: [PATCH 2/2] comment --- python/ray/data/_internal/iterator/stream_split_iterator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/data/_internal/iterator/stream_split_iterator.py b/python/ray/data/_internal/iterator/stream_split_iterator.py index 9aedb25ebe76..d5ab948c06e2 100644 --- a/python/ray/data/_internal/iterator/stream_split_iterator.py +++ b/python/ray/data/_internal/iterator/stream_split_iterator.py @@ -97,6 +97,8 @@ def gen_blocks() -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]: def stats(self) -> str: """Implements DataIterator.""" + # Merge the locally recorded iter stats and the remotely recorded + # stream execution stats. summary = ray.get(self._coord_actor.stats.remote()) summary.iter_stats = self._iter_stats.to_summary().iter_stats return summary.to_string()