Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Calculate stage execution time in StageStatsSummary from BlockMetadata #37119

Merged
merged 8 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ def to_summary(self) -> "DatasetStatsSummary":
stages_stats.append(
StageStatsSummary.from_block_metadata(
metadata,
self.time_total_s,
stage_name,
is_substage=is_substage,
)
Expand Down Expand Up @@ -487,7 +486,6 @@ class StageStatsSummary:
def from_block_metadata(
cls,
block_metas: List[BlockMetadata],
time_total_s: float,
stage_name: str,
is_substage: bool,
) -> "StageStatsSummary":
Expand All @@ -496,24 +494,32 @@ def from_block_metadata(

Args:
block_metas: List of `BlockMetadata` to calculate stats of
time_total_s: Total execution time of stage
stage_name: Name of stage associated with `blocks`
is_substage: Whether this set of blocks belongs to a substage.
Returns:
A `StageStatsSummary` object initialized with the calculated statistics
"""
exec_stats = [m.exec_stats for m in block_metas if m.exec_stats is not None]
rounded_total = 0
time_total_s = 0

if is_substage:
exec_summary_str = "{}/{} blocks executed\n".format(
len(exec_stats), len(block_metas)
)
else:
rounded_total = round(time_total_s, 2)
if rounded_total <= 0:
# Handle -0.0 case.
rounded_total = 0
if exec_stats:
# Calculate the total execution time of stage as
# the difference between the latest end time and
# the earliest start time of all blocks in the stage.
earliest_start_time = min(s.start_time_s for s in exec_stats)
latest_end_time = max(s.end_time_s for s in exec_stats)
time_total_s = latest_end_time - earliest_start_time

rounded_total = round(time_total_s, 2)
if rounded_total <= 0:
# Handle -0.0 case.
rounded_total = 0
exec_summary_str = "{}/{} blocks executed in {}s".format(
len(exec_stats), len(block_metas), rounded_total
)
Expand Down
11 changes: 9 additions & 2 deletions python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class BlockExecStats:
"""

def __init__(self):
self.start_time_s: Optional[float] = None
self.end_time_s: Optional[float] = None
self.wall_time_s: Optional[float] = None
self.cpu_time_s: Optional[float] = None
self.node_id = ray.runtime_context.get_runtime_context().get_node_id()
Expand Down Expand Up @@ -191,9 +193,14 @@ def __init__(self):
self.start_cpu = time.process_time()

def build(self) -> "BlockExecStats":
self.end_time = time.perf_counter()
self.end_cpu = time.process_time()

stats = BlockExecStats()
stats.wall_time_s = time.perf_counter() - self.start_time
stats.cpu_time_s = time.process_time() - self.start_cpu
stats.start_time_s = self.start_time
stats.end_time_s = self.end_time
stats.wall_time_s = self.end_time - self.start_time
stats.cpu_time_s = self.end_cpu - self.start_cpu
if resource is None:
# NOTE(swang): resource package is not supported on Windows. This
# is only the memory usage at the end of the task, not the peak
Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import os
import posixpath
import time

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -465,9 +466,16 @@ def stage_two_block():
"cpu_time": [1.2, 3.4],
"node_id": ["a1", "b2"],
}

block_delay = 20
block_meta_list = []
for i in range(len(block_params["num_rows"])):
block_exec_stats = BlockExecStats()
# The blocks are executing from [0, 5] and [20, 30].
block_exec_stats.start_time_s = time.perf_counter() + i * block_delay
block_exec_stats.end_time_s = (
block_exec_stats.start_time_s + block_params["wall_time"][i]
)
block_exec_stats.wall_time_s = block_params["wall_time"][i]
block_exec_stats.cpu_time_s = block_params["cpu_time"][i]
block_exec_stats.node_id = block_params["node_id"][i]
Expand Down
56 changes: 55 additions & 1 deletion python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
import time
from collections import Counter
from unittest.mock import patch

Expand Down Expand Up @@ -44,6 +45,13 @@ def dummy_map_batches(x):
return x


def map_batches_sleep(x, n):
"""Dummy function used in calls to map_batches below, which
simply sleeps for `n` seconds before returning the input batch."""
time.sleep(n)
return x


def test_streaming_split_stats(ray_start_regular_shared):
ds = ray.data.range(1000, parallelism=10)
it = ds.map_batches(dummy_map_batches).streaming_split(1)[0]
Expand Down Expand Up @@ -327,6 +335,50 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
)


def test_dataset_stats_stage_execution_time(ray_start_regular_shared):
# Disable stage/operator fusion in order to test the stats
# of two different map_batches operators without fusing them together,
# so that we can observe different execution times for each.
ctx = ray.data.DataContext.get_current()
curr_optimizer_enabled = ctx.optimizer_enabled
curr_optimize_fuse_stages = ctx.optimize_fuse_stages
ctx.optimize_fuse_stages = False
ctx.optimizer_enabled = False

sleep_1 = 1
sleep_2 = 3
ds = (
ray.data.range(100, parallelism=1)
.map_batches(lambda batch: map_batches_sleep(batch, sleep_1))
.map_batches(lambda batch: map_batches_sleep(batch, sleep_2))
.materialize()
)

# Check that each map_batches operator has the corresponding execution time.
map_batches_1_stats = ds._get_stats_summary().parents[0].stages_stats[0]
map_batches_2_stats = ds._get_stats_summary().stages_stats[0]
assert sleep_1 <= map_batches_1_stats.time_total_s
assert sleep_2 <= map_batches_2_stats.time_total_s

ctx.optimize_fuse_stages = curr_optimize_fuse_stages
ctx.optimizer_enabled = curr_optimizer_enabled

# The following case runs 2 tasks with 1 CPU, with each task sleeping for
# `sleep_2` seconds. We expect the overall reported stage time to be
# at least `2 * sleep_2` seconds`, and less than the total elapsed time.
num_tasks = 2
ds = ray.data.range(100, parallelism=num_tasks).map_batches(
lambda batch: map_batches_sleep(batch, sleep_2)
)
start_time = time.time()
ds.take_all()
end_time = time.time()

stage_stats = ds._get_stats_summary().stages_stats[0]
stage_time = stage_stats.time_total_s
assert num_tasks * sleep_2 <= stage_time <= end_time - start_time


def test_dataset__repr__(ray_start_regular_shared):
n = 100
ds = ray.data.range(n)
Expand Down Expand Up @@ -1062,9 +1114,11 @@ def test_summarize_blocks(ray_start_regular_shared, stage_two_block):
calculated_stats = stats.to_summary()
summarized_lines = calculated_stats.to_string().split("\n")

latest_end_time = max(m.exec_stats.end_time_s for m in block_meta_list)
earliest_start_time = min(m.exec_stats.start_time_s for m in block_meta_list)
assert (
"Stage 0 Read: 2/2 blocks executed in {}s".format(
max(round(stats.time_total_s, 2), 0)
max(round(latest_end_time - earliest_start_time, 2), 0)
)
== summarized_lines[0]
)
Expand Down