diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 099d6361bd8b..a55141171d33 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -307,7 +307,6 @@ def to_summary(self) -> "DatasetStatsSummary": stages_stats.append( StageStatsSummary.from_block_metadata( metadata, - self.time_total_s, stage_name, is_substage=is_substage, ) @@ -485,7 +484,6 @@ class StageStatsSummary: def from_block_metadata( cls, block_metas: List[BlockMetadata], - time_total_s: float, stage_name: str, is_substage: bool, ) -> "StageStatsSummary": @@ -494,24 +492,32 @@ def from_block_metadata( Args: block_metas: List of `BlockMetadata` to calculate stats of - time_total_s: Total execution time of stage stage_name: Name of stage associated with `blocks` is_substage: Whether this set of blocks belongs to a substage. Returns: A `StageStatsSummary` object initialized with the calculated statistics """ exec_stats = [m.exec_stats for m in block_metas if m.exec_stats is not None] + rounded_total = 0 + time_total_s = 0 if is_substage: exec_summary_str = "{}/{} blocks executed\n".format( len(exec_stats), len(block_metas) ) else: - rounded_total = round(time_total_s, 2) - if rounded_total <= 0: - # Handle -0.0 case. - rounded_total = 0 if exec_stats: + # Calculate the total execution time of stage as + # the difference between the latest end time and + # the earliest start time of all blocks in the stage. + earliest_start_time = min(s.start_time_s for s in exec_stats) + latest_end_time = max(s.end_time_s for s in exec_stats) + time_total_s = latest_end_time - earliest_start_time + + rounded_total = round(time_total_s, 2) + if rounded_total <= 0: + # Handle -0.0 case. + rounded_total = 0 exec_summary_str = "{}/{} blocks executed in {}s".format( len(exec_stats), len(block_metas), rounded_total ) diff --git a/python/ray/data/block.py b/python/ray/data/block.py index f91a9b99a3e4..b1e28e249b9d 100644 --- a/python/ray/data/block.py +++ b/python/ray/data/block.py @@ -158,6 +158,8 @@ class BlockExecStats: """ def __init__(self): + self.start_time_s: Optional[float] = None + self.end_time_s: Optional[float] = None self.wall_time_s: Optional[float] = None self.cpu_time_s: Optional[float] = None self.node_id = ray.runtime_context.get_runtime_context().get_node_id() @@ -191,9 +193,14 @@ def __init__(self): self.start_cpu = time.process_time() def build(self) -> "BlockExecStats": + self.end_time = time.perf_counter() + self.end_cpu = time.process_time() + stats = BlockExecStats() - stats.wall_time_s = time.perf_counter() - self.start_time - stats.cpu_time_s = time.process_time() - self.start_cpu + stats.start_time_s = self.start_time + stats.end_time_s = self.end_time + stats.wall_time_s = self.end_time - self.start_time + stats.cpu_time_s = self.end_cpu - self.start_cpu if resource is None: # NOTE(swang): resource package is not supported on Windows. This # is only the memory usage at the end of the task, not the peak diff --git a/python/ray/data/tests/conftest.py b/python/ray/data/tests/conftest.py index f650fac1685c..4032cadb12d5 100644 --- a/python/ray/data/tests/conftest.py +++ b/python/ray/data/tests/conftest.py @@ -1,6 +1,7 @@ import copy import os import posixpath +import time import numpy as np import pandas as pd @@ -465,9 +466,16 @@ def stage_two_block(): "cpu_time": [1.2, 3.4], "node_id": ["a1", "b2"], } + + block_delay = 20 block_meta_list = [] for i in range(len(block_params["num_rows"])): block_exec_stats = BlockExecStats() + # The blocks are executing from [0, 5] and [20, 30]. + block_exec_stats.start_time_s = time.perf_counter() + i * block_delay + block_exec_stats.end_time_s = ( + block_exec_stats.start_time_s + block_params["wall_time"][i] + ) block_exec_stats.wall_time_s = block_params["wall_time"][i] block_exec_stats.cpu_time_s = block_params["cpu_time"][i] block_exec_stats.node_id = block_params["node_id"][i] diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 8ddeddb43120..b9addea34b14 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -1,4 +1,5 @@ import re +import time from collections import Counter from unittest.mock import patch @@ -44,6 +45,13 @@ def dummy_map_batches(x): return x +def map_batches_sleep(x, n): + """Dummy function used in calls to map_batches below, which + simply sleeps for `n` seconds before returning the input batch.""" + time.sleep(n) + return x + + def test_streaming_split_stats(ray_start_regular_shared): ds = ray.data.range(1000, parallelism=10) it = ds.map_batches(dummy_map_batches).streaming_split(1)[0] @@ -327,6 +335,50 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): ) +def test_dataset_stats_stage_execution_time(ray_start_regular_shared): + # Disable stage/operator fusion in order to test the stats + # of two different map_batches operators without fusing them together, + # so that we can observe different execution times for each. + ctx = ray.data.DataContext.get_current() + curr_optimizer_enabled = ctx.optimizer_enabled + curr_optimize_fuse_stages = ctx.optimize_fuse_stages + ctx.optimize_fuse_stages = False + ctx.optimizer_enabled = False + + sleep_1 = 1 + sleep_2 = 3 + ds = ( + ray.data.range(100, parallelism=1) + .map_batches(lambda batch: map_batches_sleep(batch, sleep_1)) + .map_batches(lambda batch: map_batches_sleep(batch, sleep_2)) + .materialize() + ) + + # Check that each map_batches operator has the corresponding execution time. + map_batches_1_stats = ds._get_stats_summary().parents[0].stages_stats[0] + map_batches_2_stats = ds._get_stats_summary().stages_stats[0] + assert sleep_1 <= map_batches_1_stats.time_total_s + assert sleep_2 <= map_batches_2_stats.time_total_s + + ctx.optimize_fuse_stages = curr_optimize_fuse_stages + ctx.optimizer_enabled = curr_optimizer_enabled + + # The following case runs 2 tasks with 1 CPU, with each task sleeping for + # `sleep_2` seconds. We expect the overall reported stage time to be + # at least `2 * sleep_2` seconds`, and less than the total elapsed time. + num_tasks = 2 + ds = ray.data.range(100, parallelism=num_tasks).map_batches( + lambda batch: map_batches_sleep(batch, sleep_2) + ) + start_time = time.time() + ds.take_all() + end_time = time.time() + + stage_stats = ds._get_stats_summary().stages_stats[0] + stage_time = stage_stats.time_total_s + assert num_tasks * sleep_2 <= stage_time <= end_time - start_time + + def test_dataset__repr__(ray_start_regular_shared): n = 100 ds = ray.data.range(n) @@ -1062,9 +1114,11 @@ def test_summarize_blocks(ray_start_regular_shared, stage_two_block): calculated_stats = stats.to_summary() summarized_lines = calculated_stats.to_string().split("\n") + latest_end_time = max(m.exec_stats.end_time_s for m in block_meta_list) + earliest_start_time = min(m.exec_stats.start_time_s for m in block_meta_list) assert ( "Stage 0 Read: 2/2 blocks executed in {}s".format( - max(round(stats.time_total_s, 2), 0) + max(round(latest_end_time - earliest_start_time, 2), 0) ) == summarized_lines[0] )