Skip to content

Commit

Permalink
[Data] Calculate stage execution time in StageStatsSummary from `Bl…
Browse files Browse the repository at this point in the history
…ockMetadata` (ray-project#37119)

Currently, the stage execution time used in `StageStatsSummary` is the Dataset's total execution time: https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/stats.py#L292

Instead, we should calculate the execution time as the maximum wall time from the stage's `BlockMetadata`, so that this output is correct in cases with multiple stages.

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: NripeshN <nn2012@hw.ac.uk>
  • Loading branch information
scottjlee authored and NripeshN committed Aug 15, 2023
1 parent 29acc1e commit 6040ec7
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 10 deletions.
20 changes: 13 additions & 7 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ def to_summary(self) -> "DatasetStatsSummary":
stages_stats.append(
StageStatsSummary.from_block_metadata(
metadata,
self.time_total_s,
stage_name,
is_substage=is_substage,
)
Expand Down Expand Up @@ -487,7 +486,6 @@ class StageStatsSummary:
def from_block_metadata(
cls,
block_metas: List[BlockMetadata],
time_total_s: float,
stage_name: str,
is_substage: bool,
) -> "StageStatsSummary":
Expand All @@ -496,24 +494,32 @@ def from_block_metadata(
Args:
block_metas: List of `BlockMetadata` to calculate stats of
time_total_s: Total execution time of stage
stage_name: Name of stage associated with `blocks`
is_substage: Whether this set of blocks belongs to a substage.
Returns:
A `StageStatsSummary` object initialized with the calculated statistics
"""
exec_stats = [m.exec_stats for m in block_metas if m.exec_stats is not None]
rounded_total = 0
time_total_s = 0

if is_substage:
exec_summary_str = "{}/{} blocks executed\n".format(
len(exec_stats), len(block_metas)
)
else:
rounded_total = round(time_total_s, 2)
if rounded_total <= 0:
# Handle -0.0 case.
rounded_total = 0
if exec_stats:
# Calculate the total execution time of stage as
# the difference between the latest end time and
# the earliest start time of all blocks in the stage.
earliest_start_time = min(s.start_time_s for s in exec_stats)
latest_end_time = max(s.end_time_s for s in exec_stats)
time_total_s = latest_end_time - earliest_start_time

rounded_total = round(time_total_s, 2)
if rounded_total <= 0:
# Handle -0.0 case.
rounded_total = 0
exec_summary_str = "{}/{} blocks executed in {}s".format(
len(exec_stats), len(block_metas), rounded_total
)
Expand Down
11 changes: 9 additions & 2 deletions python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class BlockExecStats:
"""

def __init__(self):
self.start_time_s: Optional[float] = None
self.end_time_s: Optional[float] = None
self.wall_time_s: Optional[float] = None
self.cpu_time_s: Optional[float] = None
self.node_id = ray.runtime_context.get_runtime_context().get_node_id()
Expand Down Expand Up @@ -191,9 +193,14 @@ def __init__(self):
self.start_cpu = time.process_time()

def build(self) -> "BlockExecStats":
self.end_time = time.perf_counter()
self.end_cpu = time.process_time()

stats = BlockExecStats()
stats.wall_time_s = time.perf_counter() - self.start_time
stats.cpu_time_s = time.process_time() - self.start_cpu
stats.start_time_s = self.start_time
stats.end_time_s = self.end_time
stats.wall_time_s = self.end_time - self.start_time
stats.cpu_time_s = self.end_cpu - self.start_cpu
if resource is None:
# NOTE(swang): resource package is not supported on Windows. This
# is only the memory usage at the end of the task, not the peak
Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import os
import posixpath
import time

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -465,9 +466,16 @@ def stage_two_block():
"cpu_time": [1.2, 3.4],
"node_id": ["a1", "b2"],
}

block_delay = 20
block_meta_list = []
for i in range(len(block_params["num_rows"])):
block_exec_stats = BlockExecStats()
# The blocks are executing from [0, 5] and [20, 30].
block_exec_stats.start_time_s = time.perf_counter() + i * block_delay
block_exec_stats.end_time_s = (
block_exec_stats.start_time_s + block_params["wall_time"][i]
)
block_exec_stats.wall_time_s = block_params["wall_time"][i]
block_exec_stats.cpu_time_s = block_params["cpu_time"][i]
block_exec_stats.node_id = block_params["node_id"][i]
Expand Down
56 changes: 55 additions & 1 deletion python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
import time
from collections import Counter
from unittest.mock import patch

Expand Down Expand Up @@ -44,6 +45,13 @@ def dummy_map_batches(x):
return x


def map_batches_sleep(x, n):
"""Dummy function used in calls to map_batches below, which
simply sleeps for `n` seconds before returning the input batch."""
time.sleep(n)
return x


def test_streaming_split_stats(ray_start_regular_shared):
ds = ray.data.range(1000, parallelism=10)
it = ds.map_batches(dummy_map_batches).streaming_split(1)[0]
Expand Down Expand Up @@ -327,6 +335,50 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
)


def test_dataset_stats_stage_execution_time(ray_start_regular_shared):
# Disable stage/operator fusion in order to test the stats
# of two different map_batches operators without fusing them together,
# so that we can observe different execution times for each.
ctx = ray.data.DataContext.get_current()
curr_optimizer_enabled = ctx.optimizer_enabled
curr_optimize_fuse_stages = ctx.optimize_fuse_stages
ctx.optimize_fuse_stages = False
ctx.optimizer_enabled = False

sleep_1 = 1
sleep_2 = 3
ds = (
ray.data.range(100, parallelism=1)
.map_batches(lambda batch: map_batches_sleep(batch, sleep_1))
.map_batches(lambda batch: map_batches_sleep(batch, sleep_2))
.materialize()
)

# Check that each map_batches operator has the corresponding execution time.
map_batches_1_stats = ds._get_stats_summary().parents[0].stages_stats[0]
map_batches_2_stats = ds._get_stats_summary().stages_stats[0]
assert sleep_1 <= map_batches_1_stats.time_total_s
assert sleep_2 <= map_batches_2_stats.time_total_s

ctx.optimize_fuse_stages = curr_optimize_fuse_stages
ctx.optimizer_enabled = curr_optimizer_enabled

# The following case runs 2 tasks with 1 CPU, with each task sleeping for
# `sleep_2` seconds. We expect the overall reported stage time to be
# at least `2 * sleep_2` seconds`, and less than the total elapsed time.
num_tasks = 2
ds = ray.data.range(100, parallelism=num_tasks).map_batches(
lambda batch: map_batches_sleep(batch, sleep_2)
)
start_time = time.time()
ds.take_all()
end_time = time.time()

stage_stats = ds._get_stats_summary().stages_stats[0]
stage_time = stage_stats.time_total_s
assert num_tasks * sleep_2 <= stage_time <= end_time - start_time


def test_dataset__repr__(ray_start_regular_shared):
n = 100
ds = ray.data.range(n)
Expand Down Expand Up @@ -1062,9 +1114,11 @@ def test_summarize_blocks(ray_start_regular_shared, stage_two_block):
calculated_stats = stats.to_summary()
summarized_lines = calculated_stats.to_string().split("\n")

latest_end_time = max(m.exec_stats.end_time_s for m in block_meta_list)
earliest_start_time = min(m.exec_stats.start_time_s for m in block_meta_list)
assert (
"Stage 0 Read: 2/2 blocks executed in {}s".format(
max(round(stats.time_total_s, 2), 0)
max(round(latest_end_time - earliest_start_time, 2), 0)
)
== summarized_lines[0]
)
Expand Down

0 comments on commit 6040ec7

Please sign in to comment.