Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Update ExecutionPlan.execute_to_iterator() to return RefBundles instead of (Block, BlockMetadata) #46575

Merged
merged 41 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
c313a52
add iter_internal_block_refs
Jul 1, 2024
09b905d
Merge branch 'master' into 0701-count
scottjlee Jul 2, 2024
5ef8041
return iterator over refbundles
Jul 3, 2024
172e423
Merge branch '0701-count' of https://github.com/scottjlee/ray into 07…
Jul 3, 2024
374898b
fix docs
Jul 3, 2024
b0ec894
update consumption api usage
Jul 3, 2024
f0b49f1
fix
Jul 3, 2024
aa368d4
clean up
Jul 3, 2024
ab8d6ed
comments
Jul 3, 2024
128614d
comments
Jul 3, 2024
65c3f4e
lint
Jul 3, 2024
7956f35
Merge branch 'master' into 0701-count
Jul 3, 2024
958d306
fix tests
Jul 4, 2024
d6a1d79
Merge branch '0701-count' of https://github.com/scottjlee/ray into 07…
Jul 4, 2024
87151ee
update tests
Jul 5, 2024
6d15acd
replace get_internal_block_refs usages
Jul 6, 2024
3e4d9b0
add deprecation warning
Jul 8, 2024
3ea9759
Merge branch 'master' into 0701-count
Jul 8, 2024
b6df226
snapshot metadata only
Jul 8, 2024
70f82de
clean up
Jul 8, 2024
97d177b
Merge branch '0701-count' into 0705-get_internal_block_Refs
Jul 8, 2024
f520c62
update parquet test
Jul 8, 2024
629d6bb
only cache metadata once iteration terminates
Jul 9, 2024
1abdb63
Merge branch '0701-count' into 0705-get_internal_block_refs
Jul 9, 2024
7c2710a
log deprecation warning
Jul 9, 2024
52944e9
Merge branch 'master' into 0705-get_internal_block_refs
Jul 10, 2024
a6e99c2
clean up
Jul 10, 2024
789037e
Merge branch '0705-get_internal_block_refs' into 0710-execiter-bundle
Jul 10, 2024
24c65bd
replace in execution path
Jul 10, 2024
246a170
update iter path
Jul 11, 2024
88a8f4d
fix ref hit test
Jul 11, 2024
595b124
Merge branch 'master' into 0710-execiter-bundle
Jul 11, 2024
93ca668
fix merge
Jul 11, 2024
9e3d280
clean up
Jul 11, 2024
1242a58
Merge branch 'master' into 0710-execiter-bundle
Jul 12, 2024
a1e2c28
clean up
Jul 12, 2024
62216d0
fix tests
Jul 12, 2024
047e381
Merge branch 'master' into 0710-execiter-bundle
Jul 12, 2024
95593e4
comments
Jul 15, 2024
a6b554d
Merge branch 'master' into 0710-execiter-bundle
Jul 15, 2024
c905a1c
remove dead code
Jul 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ py_library(
py_test_module_list(
files = glob(["tests/block_batching/test_*.py"]),
size = "medium",
tags = ["team:ml", "exclusive"],
tags = ["team:data", "exclusive"],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found that the tests were not being run under data tests, so updated the tag here.

deps = ["//:ray_lib", ":conftest"],
)

Expand Down
48 changes: 28 additions & 20 deletions python/ray/data/_internal/block_batching/iter_batches.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import collections
from contextlib import nullcontext
from typing import Any, Callable, Dict, Iterator, Optional, Tuple
from typing import Any, Callable, Dict, Iterator, Optional

import ray
from ray.data._internal.block_batching.interfaces import Batch, BlockPrefetcher
Expand All @@ -14,16 +14,17 @@
format_batches,
resolve_block_refs,
)
from ray.data._internal.execution.interfaces.ref_bundle import RefBundle
from ray.data._internal.memory_tracing import trace_deallocation
from ray.data._internal.stats import DatasetStats
from ray.data._internal.util import make_async_gen
from ray.data.block import Block, BlockMetadata, DataBatch
from ray.data.block import Block, DataBatch
from ray.data.context import DataContext
from ray.types import ObjectRef


def iter_batches(
block_refs: Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
ref_bundles: Iterator[RefBundle],
*,
stats: Optional[DatasetStats] = None,
clear_block_after_read: bool = False,
Expand Down Expand Up @@ -71,8 +72,7 @@ def iter_batches(
6. Fetch outputs from the threadpool, maintaining order of the batches.

Args:
block_refs: An iterator over block object references and their corresponding
metadata.
ref_bundles: An iterator over RefBundles.
stats: DatasetStats object to record timing and other statistics.
clear_block_after_read: Whether to clear the block from object store
manually (i.e. without waiting for Python's automatic GC) after it
Expand Down Expand Up @@ -121,19 +121,19 @@ def iter_batches(
eager_free = clear_block_after_read and DataContext.get_current().eager_free

def _async_iter_batches(
block_refs: Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
ref_bundles: Iterator[RefBundle],
) -> Iterator[DataBatch]:
# Step 1: Prefetch logical batches locally.
block_refs = prefetch_batches_locally(
block_ref_iter=block_refs,
block_iter = prefetch_batches_locally(
ref_bundles=ref_bundles,
prefetcher=prefetcher,
num_batches_to_prefetch=prefetch_batches,
batch_size=batch_size,
eager_free=eager_free,
)

# Step 2: Resolve the blocks.
block_iter = resolve_block_refs(block_ref_iter=block_refs, stats=stats)
block_iter = resolve_block_refs(block_ref_iter=block_iter, stats=stats)

# Step 3: Batch and shuffle the resolved blocks.
batch_iter = blocks_to_batches(
Expand Down Expand Up @@ -168,7 +168,9 @@ def _async_iter_batches(

# Run everything in a separate thread to not block the main thread when waiting
# for streaming results.
async_batch_iter = make_async_gen(block_refs, fn=_async_iter_batches, num_workers=1)
async_batch_iter = make_async_gen(
ref_bundles, fn=_async_iter_batches, num_workers=1
)

while True:
with stats.iter_total_blocked_s.timer() if stats else nullcontext():
Expand Down Expand Up @@ -229,17 +231,18 @@ def threadpool_computations_format_collate(


def prefetch_batches_locally(
block_ref_iter: Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
ref_bundles: Iterator[RefBundle],
prefetcher: BlockPrefetcher,
num_batches_to_prefetch: int,
batch_size: Optional[int],
eager_free: bool = False,
) -> Iterator[ObjectRef[Block]]:
"""Given an iterator of batched block references, returns an iterator over the same
block references while prefetching `num_batches_to_prefetch` batches in advance.
"""Given an iterator of batched RefBundles, returns an iterator over the
corresponding block references while prefetching `num_batches_to_prefetch`
batches in advance.

Args:
block_ref_iter: An iterator over batched block references.
ref_bundles: An iterator over batched RefBundles.
prefetcher: The prefetcher to use.
num_batches_to_prefetch: The number of batches to prefetch ahead of the
current batch during the scan.
Expand All @@ -251,8 +254,9 @@ def prefetch_batches_locally(
current_window_size = 0

if num_batches_to_prefetch <= 0:
for block_ref, metadata in block_ref_iter:
yield block_ref
for ref_bundle in ref_bundles:
for block_ref in ref_bundle.block_refs:
yield block_ref
return

if batch_size is not None:
Expand All @@ -268,11 +272,12 @@ def prefetch_batches_locally(
batch_size is None and len(sliding_window) < num_batches_to_prefetch
):
try:
next_block_ref_and_metadata = next(block_ref_iter)
next_ref_bundle = next(ref_bundles)
for block_ref_and_md in next_ref_bundle.blocks:
sliding_window.append(block_ref_and_md)
current_window_size += block_ref_and_md[1].num_rows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for block_ref_and_md in next_ref_bundle.blocks:
sliding_window.append(block_ref_and_md)
current_window_size += block_ref_and_md[1].num_rows
sliding_window.extend(next_ref_bundle.blocks):
current_window_size += next_ref_bundle.num_rows()

except StopIteration:
break
sliding_window.append(next_block_ref_and_metadata)
current_window_size += next_block_ref_and_metadata[1].num_rows

prefetcher.prefetch_blocks([block_ref for block_ref, _ in list(sliding_window)])

Expand All @@ -281,7 +286,10 @@ def prefetch_batches_locally(
current_window_size -= metadata.num_rows
if batch_size is None or current_window_size < num_rows_to_prefetch:
try:
sliding_window.append(next(block_ref_iter))
next_ref_bundle = next(ref_bundles)
for block_ref_and_md in next_ref_bundle.blocks:
sliding_window.append(block_ref_and_md)
current_window_size += block_ref_and_md[1].num_rows
prefetcher.prefetch_blocks(
[block_ref for block_ref, _ in list(sliding_window)]
)
Expand Down
10 changes: 10 additions & 0 deletions python/ray/data/_internal/execution/interfaces/ref_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,13 @@ def _ref_bundles_iterator_to_block_refs_list(
return [
block_ref for ref_bundle in ref_bundles for block_ref in ref_bundle.block_refs
]


def _bundle_to_block_md_iterator(
ref_bundles: Iterator[RefBundle],
) -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]:
"""Convert an iterator of RefBundles to an iterator of
`(Block object reference, corresponding BlockMetadata)`."""
for ref_bundle in ref_bundles:
for block_ref, metadata in ref_bundle.blocks:
yield block_ref, metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't called anywhere. Let's remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, removed.

15 changes: 5 additions & 10 deletions python/ray/data/_internal/iterator/iterator_impl.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from typing import TYPE_CHECKING, Iterator, Optional, Tuple, Union

from ray.data._internal.execution.interfaces.ref_bundle import RefBundle
from ray.data._internal.stats import DatasetStats
from ray.data._internal.util import create_dataset_tag
from ray.data.block import Block, BlockMetadata
from ray.data.iterator import DataIterator
from ray.types import ObjectRef

if TYPE_CHECKING:
import pyarrow
Expand All @@ -22,17 +21,13 @@ def __init__(
def __repr__(self) -> str:
return f"DataIterator({self._base_dataset})"

def _to_block_iterator(
def _to_ref_bundle_iterator(
self,
) -> Tuple[
Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
Optional[DatasetStats],
bool,
]:
) -> Tuple[Iterator[RefBundle], Optional[DatasetStats], bool]:
ds = self._base_dataset
block_iterator, stats, executor = ds._plan.execute_to_iterator()
ref_bundles_iterator, stats, executor = ds._plan.execute_to_iterator()
ds._current_executor = executor
return block_iterator, stats, False
return ref_bundles_iterator, stats, False

def stats(self) -> str:
return self._base_dataset.stats()
Expand Down
20 changes: 8 additions & 12 deletions python/ray/data/_internal/iterator/stream_split_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,27 @@ def __init__(
self._world_size = world_size
self._iter_stats = DatasetStats(metadata={}, parent=None)

def _to_block_iterator(
def _to_ref_bundle_iterator(
self,
) -> Tuple[
Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
Optional[DatasetStats],
bool,
]:
def gen_blocks() -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]:
) -> Tuple[Iterator[RefBundle], Optional[DatasetStats], bool]:
def gen_blocks() -> Iterator[RefBundle]:
cur_epoch = ray.get(
self._coord_actor.start_epoch.remote(self._output_split_idx)
)
future: ObjectRef[
Optional[ObjectRef[Block]]
] = self._coord_actor.get.remote(cur_epoch, self._output_split_idx)
while True:
block_ref: Optional[Tuple[ObjectRef[Block], BlockMetadata]] = ray.get(
future
)
if not block_ref:
block_ref_and_md: Optional[
Tuple[ObjectRef[Block], BlockMetadata]
] = ray.get(future)
if not block_ref_and_md:
break
else:
future = self._coord_actor.get.remote(
cur_epoch, self._output_split_idx
)
yield block_ref
yield RefBundle(blocks=(block_ref_and_md,), owns_blocks=False)

return gen_blocks(), self._iter_stats, False

Expand Down
60 changes: 33 additions & 27 deletions python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.stats import DatasetStats, DatasetStatsSummary
from ray.data._internal.util import create_dataset_tag, unify_block_metadata_schema
from ray.data.block import Block, BlockMetadata
from ray.data.block import BlockMetadata
from ray.data.context import DataContext
from ray.data.exceptions import omit_traceback_stdout
from ray.types import ObjectRef
from ray.util.debug import log_once

if TYPE_CHECKING:
Expand Down Expand Up @@ -353,20 +352,30 @@ def schema(
elif self._logical_plan.dag.schema() is not None:
schema = self._logical_plan.dag.schema()
elif fetch_if_missing:
blocks_with_metadata, _, _ = self.execute_to_iterator()
for _, metadata in blocks_with_metadata:
if metadata.schema is not None and (
metadata.num_rows is None or metadata.num_rows > 0
):
schema = metadata.schema
break
# blocks_with_metadata, _, _ = self.execute_to_iterator()
iter_ref_bundles, _, _ = self.execute_to_iterator()
for ref_bundle in iter_ref_bundles:
# for _, metadata in blocks_with_metadata:
for metadata in ref_bundle.metadata:
if metadata.schema is not None and (
metadata.num_rows is None or metadata.num_rows > 0
):
schema = metadata.schema
break
elif self.is_read_only():
# For consistency with the previous implementation, we fetch the schema if
# the plan is read-only even if `fetch_if_missing` is False.
blocks_with_metadata, _, _ = self.execute_to_iterator()
# blocks_with_metadata, _, _ = self.execute_to_iterator()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scottjlee to remove before merging

iter_ref_bundles, _, _ = self.execute_to_iterator()
try:
_, metadata = next(iter(blocks_with_metadata))
schema = metadata.schema
# _, metadata = next(iter(blocks_with_metadata))
ref_bundle = next(iter(iter_ref_bundles))
for metadata in ref_bundle.metadata:
if metadata.schema is not None:
schema = metadata.schema
break
# metadata = ref_bundle.metadata[0]
# schema = metadata.schema
except StopIteration: # Empty dataset.
schema = None

Expand Down Expand Up @@ -399,17 +408,13 @@ def meta_count(self) -> Optional[int]:
@omit_traceback_stdout
def execute_to_iterator(
self,
) -> Tuple[
Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
DatasetStats,
Optional["Executor"],
]:
) -> Tuple[Iterator[RefBundle], DatasetStats, Optional["Executor"]]:
"""Execute this plan, returning an iterator.

This will use streaming execution to generate outputs.

Returns:
Tuple of iterator over output blocks and the executor.
Tuple of iterator over output RefBundles, DatasetStats, and the executor.
"""
self._has_started_execution = True

Expand All @@ -418,30 +423,31 @@ def execute_to_iterator(

if self.has_computed_output():
bundle = self.execute()
return iter(bundle.blocks), self._snapshot_stats, None
return iter([bundle]), self._snapshot_stats, None

from ray.data._internal.execution.legacy_compat import (
execute_to_legacy_block_iterator,
execute_to_legacy_bundle_iterator,
)
from ray.data._internal.execution.streaming_executor import StreamingExecutor

metrics_tag = create_dataset_tag(self._dataset_name, self._dataset_uuid)
executor = StreamingExecutor(copy.deepcopy(ctx.execution_options), metrics_tag)
# TODO(scottjlee): replace with `execute_to_legacy_bundle_iterator` and
# update execute_to_iterator usages to handle RefBundles instead of Blocks
block_iter = execute_to_legacy_block_iterator(
executor,
self,
)
# block_iter = execute_to_legacy_block_iterator(
# executor,
# self,
# )
bundle_iter = execute_to_legacy_bundle_iterator(executor, self)
# Since the generator doesn't run any code until we try to fetch the first
# value, force execution of one bundle before we call get_stats().
gen = iter(block_iter)
gen = iter(bundle_iter)
try:
block_iter = itertools.chain([next(gen)], gen)
bundle_iter = itertools.chain([next(gen)], gen)
except StopIteration:
pass
self._snapshot_stats = executor.get_stats()
return block_iter, self._snapshot_stats, executor
return bundle_iter, self._snapshot_stats, executor

@omit_traceback_stdout
def execute(
Expand Down
16 changes: 8 additions & 8 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
VALID_BATCH_FORMATS,
Block,
BlockAccessor,
BlockMetadata,
DataBatch,
T,
U,
Expand Down Expand Up @@ -4682,14 +4681,15 @@ def iter_internal_ref_bundles(self) -> Iterator[RefBundle]:
An iterator over this Dataset's ``RefBundles``.
"""

def _build_ref_bundles(
iter_blocks: Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
) -> Iterator[RefBundle]:
for block in iter_blocks:
yield RefBundle((block,), owns_blocks=True)
# def _build_ref_bundles(
# iter_blocks: Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
# ) -> Iterator[RefBundle]:
# for block in iter_blocks:
# yield RefBundle((block,), owns_blocks=True)

iter_block_refs_md, _, _ = self._plan.execute_to_iterator()
iter_ref_bundles = _build_ref_bundles(iter_block_refs_md)
# iter_block_refs_md, _, _ = self._plan.execute_to_iterator()
iter_ref_bundles, _, _ = self._plan.execute_to_iterator()
# iter_ref_bundles = _build_ref_bundles(iter_block_refs_md)
self._synchronize_progress_bar()
return iter_ref_bundles

Expand Down
Loading