Skip to content

Commit

Permalink
[data] [streaming] Add streaming_split() API (ray-project#32991)
Browse files Browse the repository at this point in the history
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
  • Loading branch information
ericl authored and edoakes committed Mar 22, 2023
1 parent 644e13f commit 54240b5
Show file tree
Hide file tree
Showing 10 changed files with 485 additions and 28 deletions.
9 changes: 9 additions & 0 deletions python/ray/data/_internal/execution/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,15 @@ def get_work_refs(self) -> List[ray.ObjectRef]:
"""
return []

def throttling_disabled(self) -> bool:
"""Whether to disable resource throttling for this operator.
This should return True for operators that only manipulate bundle metadata
(e.g., the OutputSplitter operator). This hints to the execution engine that
these operators should not be throttled based on resource usage.
"""
return False

def num_active_work_refs(self) -> int:
"""Return the number of active work refs.
Expand Down
31 changes: 25 additions & 6 deletions python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,43 @@ def execute_to_legacy_block_iterator(
allow_clear_input_blocks: bool,
dataset_uuid: str,
) -> Iterator[ObjectRef[Block]]:
"""Execute a plan with the new executor and return a block iterator.
"""Same as execute_to_legacy_bundle_iterator but returning blocks."""
bundle_iter = execute_to_legacy_bundle_iterator(
executor, plan, allow_clear_input_blocks, dataset_uuid
)
for bundle in bundle_iter:
for block, _ in bundle.blocks:
yield block


def execute_to_legacy_bundle_iterator(
executor: Executor,
plan: ExecutionPlan,
allow_clear_input_blocks: bool,
dataset_uuid: str,
dag_rewrite=None,
) -> Iterator[RefBundle]:
"""Execute a plan with the new executor and return a bundle iterator.
Args:
executor: The executor to use.
plan: The legacy plan to execute.
allow_clear_input_blocks: Whether the executor may consider clearing blocks.
dataset_uuid: UUID of the dataset for this execution.
dag_rewrite: Callback that can be used to mutate the DAG prior to execution.
This is currently used as a legacy hack to inject the OutputSplit operator
for `Dataset.streaming_split()`.
Returns:
The output as a block iterator.
The output as a bundle iterator.
"""

if DatasetContext.get_current().optimizer_enabled:
dag, stats = get_execution_plan(plan._logical_plan).dag, None
else:
dag, stats = _to_operator_dag(plan, allow_clear_input_blocks)
if dag_rewrite:
dag = dag_rewrite(dag)

# Enforce to preserve ordering if the plan has stages required to do so, such as
# Zip and Sort.
Expand All @@ -64,10 +86,7 @@ def execute_to_legacy_block_iterator(
executor._options.preserve_order = True

bundle_iter = executor.execute(dag, initial_stats=stats)

for bundle in bundle_iter:
for block, _ in bundle.blocks:
yield block
return bundle_iter


def execute_to_legacy_block_list(
Expand Down
90 changes: 77 additions & 13 deletions python/ray/data/_internal/execution/operators/output_splitter.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import math
from typing import List, Dict
from typing import List, Dict, Optional

from ray.data.block import Block, BlockMetadata, BlockAccessor
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.stats import StatsDict
from ray.data._internal.execution.interfaces import (
RefBundle,
PhysicalOperator,
ExecutionOptions,
ExecutionResources,
NodeIdStr,
)
from ray.types import ObjectRef

Expand All @@ -17,10 +19,12 @@ class OutputSplitter(PhysicalOperator):
The output bundles of this operator will have a `bundle.output_split_idx` attr
set to an integer from [0..n-1]. This operator tries to divide the rows evenly
across output splits.
across output splits. If the `equal` option is set, the operator will furthermore
guarantee an exact split of rows across outputs, truncating the Dataset as needed.
If the `equal` option is set, the operator will furthermore guarantee an exact
split of rows across outputs, truncating the Dataset as needed.
Implementation wise, this operator keeps an internal buffer of bundles. The buffer
has a minimum size calculated to enable a good locality hit rate, as well as ensure
we can satisfy the `equal` requirement.
OutputSplitter does not provide any ordering guarantees.
"""
Expand All @@ -30,6 +34,7 @@ def __init__(
input_op: PhysicalOperator,
n: int,
equal: bool,
locality_hints: Optional[List[NodeIdStr]] = None,
):
super().__init__(f"split({n}, equal={equal})", [input_op])
self._equal = equal
Expand All @@ -40,6 +45,40 @@ def __init__(
# The number of rows output to each output split so far.
self._num_output: List[int] = [0 for _ in range(n)]

if locality_hints is not None:
if n != len(locality_hints):
raise ValueError(
"Locality hints list must have length `n`: "
f"len({locality_hints}) != {n}"
)
self._locality_hints = locality_hints
if locality_hints:
# To optimize locality, we should buffer a certain number of elements
# internally before dispatch to allow the locality algorithm a good chance
# of selecting a preferred location. We use a small multiple of `n` since
# it's reasonable to buffer a couple blocks per consumer.
self._min_buffer_size = 2 * n
else:
self._min_buffer_size = 0
self._locality_hits = 0
self._locality_misses = 0

def start(self, options: ExecutionOptions) -> None:
super().start(options)
# Force disable locality optimization.
if not options.actor_locality_enabled:
self._locality_hints = None
self._min_buffer_size = 0

def throttling_disabled(self) -> bool:
"""Disables resource-based throttling.
It doesn't make sense to throttle the inputs to this operator, since all that
would do is lower the buffer size and prevent us from emitting outputs /
reduce the locality hit rate.
"""
return True

def has_next(self) -> bool:
return len(self._output_queue) > 0

Expand All @@ -64,8 +103,8 @@ def add_input(self, bundle, input_index) -> None:
def inputs_done(self) -> None:
super().inputs_done()
if not self._equal:
# There shouldn't be any buffered data if we're not in equal split mode.
assert not self._buffer
self._dispatch_bundles(dispatch_all=True)
assert not self._buffer, "Should have dispatched all bundles."
return

# Otherwise:
Expand Down Expand Up @@ -101,21 +140,31 @@ def current_resource_usage(self) -> ExecutionResources:
)

def progress_str(self) -> str:
if self._equal:
return f"{len(self._buffer)} buffered"
assert not self._buffer
return ""
if self._locality_hints:
return (
f"[{self._locality_hits} locality hits, {self._locality_misses} misses]"
)
else:
return "[locality disabled]"

def _dispatch_bundles(self) -> None:
def _dispatch_bundles(self, dispatch_all: bool = False) -> None:
# Dispatch all dispatchable bundles from the internal buffer.
# This may not dispatch all bundles when equal=True.
while self._buffer:
while self._buffer and (
dispatch_all or len(self._buffer) >= self._min_buffer_size
):
target_index = self._select_output_index()
target_bundle = self._pop_bundle_to_dispatch(target_index)
if self._can_safely_dispatch(target_index, target_bundle.num_rows()):
target_bundle.output_split_idx = target_index
self._num_output[target_index] += target_bundle.num_rows()
self._output_queue.append(target_bundle)
if self._locality_hints:
preferred_loc = self._locality_hints[target_index]
if self._get_location(target_bundle) == preferred_loc:
self._locality_hits += 1
else:
self._locality_misses += 1
else:
# Put it back and abort.
self._buffer.insert(0, target_bundle)
Expand All @@ -127,7 +176,12 @@ def _select_output_index(self) -> int:
return i

def _pop_bundle_to_dispatch(self, target_index: int) -> RefBundle:
# TODO implement locality aware bundle selection.
if self._locality_hints:
preferred_loc = self._locality_hints[target_index]
for bundle in self._buffer:
if self._get_location(bundle) == preferred_loc:
self._buffer.remove(bundle)
return bundle
return self._buffer.pop(0)

def _can_safely_dispatch(self, target_index: int, nrow: int) -> bool:
Expand Down Expand Up @@ -164,6 +218,16 @@ def _split_from_buffer(self, nrow: int) -> List[RefBundle]:
assert sum(b.num_rows() for b in output) == nrow, (acc, nrow)
return output

def _get_location(self, bundle: RefBundle) -> Optional[NodeIdStr]:
"""Ask Ray for the node id of the given bundle.
This method may be overriden for testing.
Returns:
A node id associated with the bundle, or None if unknown.
"""
return bundle.get_cached_location()


def _split(bundle: RefBundle, left_size: int) -> (RefBundle, RefBundle):
left_blocks, left_meta = [], []
Expand Down
17 changes: 13 additions & 4 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ def get_output_blocking(self, output_split_idx: Optional[int]) -> MaybeRefBundle
# Scan the queue and look for outputs tagged for the given index.
for i in range(len(self.outqueue)):
bundle = self.outqueue[i]
if bundle is None:
if bundle is None or isinstance(bundle, Exception):
# End of stream for this index! Note that we
# do not remove the None, so that it can act
# as the termination signal for all indices.
return None
return bundle
elif bundle.output_split_idx == output_split_idx:
self.outqueue.remove(bundle)
return bundle
Expand Down Expand Up @@ -324,9 +324,14 @@ def select_operator_to_run(
if not ops:
return None

# Equally penalize outqueue length and num bundles processing for backpressure.
# Run metadata-only operators first. After that, equally penalize outqueue length
# and num bundles processing for backpressure.
return min(
ops, key=lambda op: len(topology[op].outqueue) + topology[op].num_processing()
ops,
key=lambda op: (
not op.throttling_disabled(),
len(topology[op].outqueue) + topology[op].num_processing(),
),
)


Expand All @@ -353,6 +358,10 @@ def _execution_allowed(
Returns:
Whether the op is allowed to run.
"""

if op.throttling_disabled():
return True

assert isinstance(global_usage, TopologyResourceUsage), global_usage
# To avoid starvation problems when dealing with fractional resource types,
# convert all quantities to integer (0 or 1) for deciding admissibility. This
Expand Down
Loading

0 comments on commit 54240b5

Please sign in to comment.