Skip to content

Commit

Permalink
[Data] Implement Operators for union() (#36242)
Browse files Browse the repository at this point in the history
Implement the `LogicalOperator` and `PhysicalOperator` for `Dataset.union()`, and make `union()` lazy. This PR also introduces `Nary` and `NaryOperator` Logical/Physical Operators to support abstraction for `Union` and `Zip` operators.

---------

Signed-off-by: Scott Lee <sjl@anyscale.com>
  • Loading branch information
scottjlee authored Jun 19, 2023
1 parent 31ab8a7 commit d207361
Show file tree
Hide file tree
Showing 25 changed files with 461 additions and 83 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/bulk_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def execute_recursive(op: PhysicalOperator) -> List[RefBundle]:
for i, ref_bundles in enumerate(inputs):
for r in ref_bundles:
op.add_input(r, input_index=i)
op.inputs_done()
op.all_inputs_done()
output = _naive_run_until_complete(op)
finally:
op.shutdown()
Expand Down
10 changes: 9 additions & 1 deletion python/ray/data/_internal/execution/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,15 @@ def add_input(self, refs: RefBundle, input_index: int) -> None:
"""
raise NotImplementedError

def inputs_done(self) -> None:
def input_done(self, input_index: int) -> None:
"""Called when the upstream operator at index `input_index` has completed().
After this is called, the executor guarantees that no more inputs will be added
via `add_input` for the given input index.
"""
pass

def all_inputs_done(self) -> None:
"""Called when all upstream operators have completed().
After this is called, the executor guarantees that no more inputs will be added
Expand Down
6 changes: 4 additions & 2 deletions python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
RefBundle,
TaskContext,
)
from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator
from ray.data._internal.execution.operators.base_physical_operator import (
AllToAllOperator,
)
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.execution.operators.limit_operator import LimitOperator
from ray.data._internal.execution.operators.map_operator import MapOperator
from ray.data._internal.execution.operators.one_to_one_operator import LimitOperator
from ray.data._internal.execution.util import make_callable_class_concurrent
from ray.data._internal.lazy_block_list import LazyBlockList
from ray.data._internal.logical.optimizers import get_execution_plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ def notify_work_completed(
# For either a completed task or ready worker, we try to dispatch queued tasks.
self._dispatch_tasks()

def inputs_done(self):
def all_inputs_done(self):
# Call base implementation to handle any leftover bundles. This may or may not
# trigger task dispatch.
super().inputs_done()
super().all_inputs_done()

# Mark inputs as done so future task dispatch will kill all inactive workers
# once the bundle queue is exhausted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,34 @@
RefBundle,
TaskContext,
)
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.stats import StatsDict


class OneToOneOperator(PhysicalOperator):
"""An operator that has one input and one output dependency.
This operator serves as the base for map, filter, limit, etc.
"""

def __init__(
self,
name: str,
input_op: PhysicalOperator,
):
"""Create a OneToOneOperator.
Args:
input_op: Operator generating input data for this op.
name: The name of this operator.
"""
super().__init__(name, [input_op])

@property
def input_dependency(self) -> PhysicalOperator:
return self.input_dependencies[0]


class AllToAllOperator(PhysicalOperator):
"""A blocking operator that executes once its inputs are complete.
Expand All @@ -25,7 +49,6 @@ def __init__(
name: str = "AllToAll",
):
"""Create an AllToAllOperator.
Args:
bulk_fn: The blocking transformation function to run. The inputs are the
list of input ref bundles, and the outputs are the output ref bundles
Expand Down Expand Up @@ -57,15 +80,15 @@ def add_input(self, refs: RefBundle, input_index: int) -> None:
assert input_index == 0, input_index
self._input_buffer.append(refs)

def inputs_done(self) -> None:
def all_inputs_done(self) -> None:
ctx = TaskContext(
task_idx=self._next_task_index,
sub_progress_bar_dict=self._sub_progress_bar_dict,
)
self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
self._next_task_index += 1
self._input_buffer.clear()
super().inputs_done()
super().all_inputs_done()

def has_next(self) -> bool:
return len(self._output_buffer) > 0
Expand Down Expand Up @@ -102,3 +125,23 @@ def close_sub_progress_bars(self):
if self._sub_progress_bar_dict is not None:
for sub_bar in self._sub_progress_bar_dict.values():
sub_bar.close()


class NAryOperator(PhysicalOperator):
"""An operator that has multiple input dependencies and one output.
This operator serves as the base for union, zip, etc.
"""

def __init__(
self,
*input_ops: LogicalOperator,
):
"""Create a OneToOneOperator.
Args:
input_op: Operator generating input data for this op.
name: The name of this operator.
"""
input_names = ", ".join([op._name for op in input_ops])
op_name = f"{self.__class__.__name__}({input_names})"
super().__init__(op_name, list(input_ops))
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,15 @@

import ray
from ray.data._internal.execution.interfaces import PhysicalOperator, RefBundle
from ray.data._internal.execution.operators.base_physical_operator import (
OneToOneOperator,
)
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.stats import StatsDict
from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.types import ObjectRef


class OneToOneOperator(PhysicalOperator):
"""An operator that has one input and one output dependency.
This operator serves as the base for map, filter, limit, etc.
"""

def __init__(
self,
name: str,
input_op: PhysicalOperator,
):
"""Create a OneToOneOperator.
Args:
input_op: Operator generating input data for this op.
name: The name of this operator.
"""
super().__init__(name, [input_op])

@property
def input_dependency(self) -> PhysicalOperator:
return self.input_dependencies[0]


class LimitOperator(OneToOneOperator):
"""Physical operator for limit."""

Expand All @@ -49,7 +29,7 @@ def __init__(
self._cur_output_bundles = 0
super().__init__(self._name, input_op)
if self._limit <= 0:
self.inputs_done()
self.all_inputs_done()

def _limit_reached(self) -> bool:
return self._consumed_rows >= self._limit
Expand Down Expand Up @@ -99,7 +79,7 @@ def slice_fn(block, metadata, num_rows) -> Tuple[Block, BlockMetadata]:
)
self._buffer.append(out_refs)
if self._limit_reached():
self.inputs_done()
self.all_inputs_done()

def has_next(self) -> bool:
return len(self._buffer) > 0
Expand Down
8 changes: 5 additions & 3 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
RefBundle,
TaskContext,
)
from ray.data._internal.execution.operators.one_to_one_operator import OneToOneOperator
from ray.data._internal.execution.operators.base_physical_operator import (
OneToOneOperator,
)
from ray.data._internal.memory_tracing import trace_allocation
from ray.data._internal.stats import StatsDict
from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata
Expand Down Expand Up @@ -280,13 +282,13 @@ def _handle_task_done(self, task: "_TaskState"):
if self._metrics.cur > self._metrics.peak:
self._metrics.peak = self._metrics.cur

def inputs_done(self):
def all_inputs_done(self):
self._block_ref_bundler.done_adding_bundles()
if self._block_ref_bundler.has_bundle():
# Handle any leftover bundles in the bundler.
bundle = self._block_ref_bundler.get_next_bundle()
self._add_bundled_input(bundle)
super().inputs_done()
super().all_inputs_done()

def has_next(self) -> bool:
assert self._started
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def add_input(self, bundle, input_index) -> None:
self._buffer.append(bundle)
self._dispatch_bundles()

def inputs_done(self) -> None:
super().inputs_done()
def all_inputs_done(self) -> None:
super().all_inputs_done()
if not self._equal:
self._dispatch_bundles(dispatch_all=True)
assert not self._buffer, "Should have dispatched all bundles."
Expand Down
107 changes: 107 additions & 0 deletions python/ray/data/_internal/execution/operators/union_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from typing import List, Optional

from ray.data._internal.execution.interfaces import (
ExecutionOptions,
PhysicalOperator,
RefBundle,
)
from ray.data._internal.execution.operators.base_physical_operator import NAryOperator
from ray.data._internal.stats import StatsDict


class UnionOperator(NAryOperator):
"""An operator that combines output blocks from
two or more input operators into a single output."""

def __init__(
self,
*input_ops: PhysicalOperator,
):
"""Create a UnionOperator.
Args:
input_ops: Operators generating input data for this operator to union.
"""

# By default, union does not preserve the order of output blocks.
# To preserve the order, configure ExecutionOptions accordingly.
self._preserve_order = False

# Intermediary buffers used to store blocks from each input dependency.
# Only used when `self._prserve_order` is True.
self._input_buffers: List[List[RefBundle]] = [[] for _ in range(len(input_ops))]

# The index of the input dependency that is currently the source of
# the output buffer. New inputs from this input dependency will be added
# directly to the output buffer. Only used when `self._preserve_order` is True.
self._input_idx_to_output = 0

self._output_buffer: List[RefBundle] = []
self._stats: StatsDict = {}
super().__init__(*input_ops)

def start(self, options: ExecutionOptions):
# Whether to preserve the order of the input data (both the
# order of the input operators and the order of the blocks within).
self._preserve_order = options.preserve_order
super().start(options)

def num_outputs_total(self) -> Optional[int]:
num_outputs = 0
for input_op in self.input_dependencies:
op_num_outputs = input_op.num_outputs_total()
# If at least one of the input ops has an unknown number of outputs,
# the number of outputs of the union operator is unknown.
if op_num_outputs is None:
return None
num_outputs += op_num_outputs
return num_outputs

def add_input(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert 0 <= input_index <= len(self._input_dependencies), input_index

if not self._preserve_order:
self._output_buffer.append(refs)
else:
if input_index == self._input_idx_to_output:
self._output_buffer.append(refs)
else:
self._input_buffers[input_index].append(refs)

def input_done(self, input_index: int) -> None:
"""When `self._preserve_order` is True, change the
output buffer source to the next input dependency
once the current input dependency calls `input_done()`."""
if not self._preserve_order:
return
if not input_index == self._input_idx_to_output:
return
next_input_idx = self._input_idx_to_output + 1
if next_input_idx < len(self._input_buffers):
self._output_buffer.extend(self._input_buffers[next_input_idx])
self._input_buffers[next_input_idx].clear()
self._input_idx_to_output = next_input_idx
super().input_done(input_index)

def all_inputs_done(self) -> None:
# Note that in the case where order is not preserved, all inputs
# are directly added to the output buffer as soon as they are received,
# so there is no need to check any intermediary buffers.
if self._preserve_order:
for idx, input_buffer in enumerate(self._input_buffers):
assert len(input_buffer) == 0, (
f"Input at index {idx} still has "
f"{len(input_buffer)} blocks remaining."
)
super().all_inputs_done()

def has_next(self) -> bool:
# Check if the output buffer still contains at least one block.
return len(self._output_buffer) > 0

def get_next(self) -> RefBundle:
return self._output_buffer.pop(0)

def get_stats(self) -> StatsDict:
return self._stats
4 changes: 2 additions & 2 deletions python/ray/data/_internal/execution/operators/zip_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ def add_input(self, refs: RefBundle, input_index: int) -> None:
else:
self._right_buffer.append(refs)

def inputs_done(self) -> None:
def all_inputs_done(self) -> None:
self._output_buffer, self._stats = self._zip(
self._left_buffer, self._right_buffer
)
self._left_buffer.clear()
self._right_buffer.clear()
super().inputs_done()
super().all_inputs_done()

def has_next(self) -> bool:
return len(self._output_buffer) > 0
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
build_streaming_topology,
process_completed_tasks,
select_operator_to_run,
update_operator_states,
)
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.stats import DatasetStats
Expand Down Expand Up @@ -257,6 +258,8 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
autoscaling_state=self._autoscaling_state,
)

update_operator_states(topology)

# Update the progress bar to reflect scheduling decisions.
for op_state in topology.values():
op_state.refresh_progress_bar()
Expand Down
Loading

0 comments on commit d207361

Please sign in to comment.