Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Incorrect StageSummaryStats execution time calculated #37105

Closed
scottjlee opened this issue Jul 5, 2023 · 0 comments · Fixed by #37119
Closed

[Data] Incorrect StageSummaryStats execution time calculated #37105

scottjlee opened this issue Jul 5, 2023 · 0 comments · Fixed by #37119
Assignees
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P0 Issues that should be fixed in short order Ray 2.6 release-blocker P0 Issue that blocks the release

Comments

@scottjlee
Copy link
Contributor

scottjlee commented Jul 5, 2023

What happened + What you expected to happen

The stats for a Dataset generated from Read->SplitBlocks(n)->MapBatches contains an incorrectly duplicated execution time summary; see example below.

Initial hypothesis is that this is caused from inheriting incorrect stats information during operator fusion / stats generation.

Versions / Dependencies

ray master

Reproduction script

import ray
import tensorflow as tf
import numpy as np
import random
import pyarrow as pa
import tempfile
import os
import sleep

def generate_random_tfrecords(
    num_rows: int,
    num_int: int,
) -> str:
    def generate_features(batch):
        batch_size = len(batch["id"])
        features = {"int_features": []}
        lower_bound = -(2**32)
        upper_bound = 2**32
        for _ in range(batch_size):
            if num_int > 0:
                int_features = [
                    random.randint(lower_bound, upper_bound) for _ in range(num_int)
                ]
                features["int_features"].append(int_features)
        features = {k: v for (k, v) in features.items() if len(v) > 0}
        return pa.table(features)

    ds = ray.data.range(num_rows).map_batches(generate_features)
    assert ds.count() == num_rows, ds.count()

    tfrecords_dir = tempfile.mkdtemp()
    ds.write_tfrecords(tfrecords_dir)
    return tfrecords_dir


file_path = generate_random_tfrecords(
    num_rows=10000,
    num_int=1000,
)
print("===> created data file at", file_path)
records = ray.data.read_tfrecords(paths=file_path)

def fn(batch):
    time.sleep(5)
    return batch

processed = records.map_batches(fn)
num_batches = 0
for batch in processed.iter_batches():
    num_batches += 1
print(f"===> Iterated over {num_batches} batches")
stats = processed.stats()
print(stats)

The block execution summary string (first line) is duplicated incorrectly for the two stages:

Stage 1 ReadTFRecord->SplitBlocks(4): 80/80 blocks executed in 1.92s
* Remote wall time: 160.21us min, 1.88s max, 413.09ms mean, 33.05s total
* Remote cpu time: 160.0us min, 1.18s max, 285.51ms mean, 22.84s total
* Peak heap memory usage (MiB): 467359.38 min, 485484.38 max, 472517 mean
* Output num rows: 125 min, 125 max, 125 mean, 10000 total
* Output size bytes: 1000500 min, 1000500 max, 1000500 mean, 80040000 total
* Tasks per node: 80 min, 80 max, 80 mean; 1 nodes used
* Extra metrics: {'obj_store_mem_alloc': 40020000, 'obj_store_mem_freed': 37572238, 'obj_store_mem_peak': 38790839, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}

Stage 2 MapBatches(fn): 80/80 blocks executed in 1.92s
* Remote wall time: 798.54us min, 4.46ms max, 1.23ms mean, 98.51ms total
* Remote cpu time: 798.0us min, 2.37ms max, 1.14ms mean, 90.86ms total
* Peak heap memory usage (MiB): 467359.38 min, 487828.12 max, 474389 mean
* Output num rows: 125 min, 125 max, 125 mean, 10000 total
* Output size bytes: 1002000 min, 1002000 max, 1002000 mean, 80160000 total
* Tasks per node: 80 min, 80 max, 80 mean; 1 nodes used
* Extra metrics: {'obj_store_mem_alloc': 5010000, 'obj_store_mem_freed': 5002500, 'obj_store_mem_peak': 10005000, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}

Dataset iterator time breakdown:
* Total time user code is blocked: 2.32s
* Total time in user code: 3.48ms
* Total time overall: 4.24s
* Num blocks local: 80
* Num blocks remote: 0
* Num blocks unknown location: 0
* Batch iteration time breakdown (summed across prefetch threads):
    * In ray.get(): 248.0us min, 3.57ms max, 633.74us avg, 50.7ms total
    * In batch creation: 103.5us min, 2.87ms max, 974.33us avg, 38.97ms total
    * In batch formatting: 977.21us min, 19.83ms max, 11.47ms avg, 458.71ms total

We should expect Stage 2 to take close to 5 seconds, due to the time.sleep(5) in the mapped function. However, we see the same time as Stage 1.

Issue Severity

None

@scottjlee scottjlee added bug Something that is supposed to be working; but isn't release-blocker P0 Issue that blocks the release P0 Issues that should be fixed in short order data Ray Data-related issues Ray 2.6 labels Jul 5, 2023
@scottjlee scottjlee self-assigned this Jul 5, 2023
@scottjlee scottjlee changed the title [Data] Incorrect Dataset iterator stats with fused stages [Data] Incorrect StageSummaryStats execution time calculated Jul 5, 2023
@c21 c21 added this to the Ray Data Benchmarks milestone Jul 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P0 Issues that should be fixed in short order Ray 2.6 release-blocker P0 Issue that blocks the release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants