Skip to content

Commit

Permalink
Revert "[Data] Implement Operators for union() (#36242)" (#36583)
Browse files Browse the repository at this point in the history
This reverts commit d207361.
  • Loading branch information
rickyyx authored Jun 20, 2023
1 parent 9efe17e commit fb9faa3
Show file tree
Hide file tree
Showing 25 changed files with 83 additions and 461 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/bulk_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def execute_recursive(op: PhysicalOperator) -> List[RefBundle]:
for i, ref_bundles in enumerate(inputs):
for r in ref_bundles:
op.add_input(r, input_index=i)
op.all_inputs_done()
op.inputs_done()
output = _naive_run_until_complete(op)
finally:
op.shutdown()
Expand Down
10 changes: 1 addition & 9 deletions python/ray/data/_internal/execution/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,7 @@ def add_input(self, refs: RefBundle, input_index: int) -> None:
"""
raise NotImplementedError

def input_done(self, input_index: int) -> None:
"""Called when the upstream operator at index `input_index` has completed().
After this is called, the executor guarantees that no more inputs will be added
via `add_input` for the given input index.
"""
pass

def all_inputs_done(self) -> None:
def inputs_done(self) -> None:
"""Called when all upstream operators have completed().
After this is called, the executor guarantees that no more inputs will be added
Expand Down
6 changes: 2 additions & 4 deletions python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
RefBundle,
TaskContext,
)
from ray.data._internal.execution.operators.base_physical_operator import (
AllToAllOperator,
)
from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.execution.operators.limit_operator import LimitOperator
from ray.data._internal.execution.operators.map_operator import MapOperator
from ray.data._internal.execution.operators.one_to_one_operator import LimitOperator
from ray.data._internal.execution.util import make_callable_class_concurrent
from ray.data._internal.lazy_block_list import LazyBlockList
from ray.data._internal.logical.optimizers import get_execution_plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ def notify_work_completed(
# For either a completed task or ready worker, we try to dispatch queued tasks.
self._dispatch_tasks()

def all_inputs_done(self):
def inputs_done(self):
# Call base implementation to handle any leftover bundles. This may or may not
# trigger task dispatch.
super().all_inputs_done()
super().inputs_done()

# Mark inputs as done so future task dispatch will kill all inactive workers
# once the bundle queue is exhausted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,10 @@
RefBundle,
TaskContext,
)
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.stats import StatsDict


class OneToOneOperator(PhysicalOperator):
"""An operator that has one input and one output dependency.
This operator serves as the base for map, filter, limit, etc.
"""

def __init__(
self,
name: str,
input_op: PhysicalOperator,
):
"""Create a OneToOneOperator.
Args:
input_op: Operator generating input data for this op.
name: The name of this operator.
"""
super().__init__(name, [input_op])

@property
def input_dependency(self) -> PhysicalOperator:
return self.input_dependencies[0]


class AllToAllOperator(PhysicalOperator):
"""A blocking operator that executes once its inputs are complete.
Expand All @@ -49,6 +25,7 @@ def __init__(
name: str = "AllToAll",
):
"""Create an AllToAllOperator.
Args:
bulk_fn: The blocking transformation function to run. The inputs are the
list of input ref bundles, and the outputs are the output ref bundles
Expand Down Expand Up @@ -80,15 +57,15 @@ def add_input(self, refs: RefBundle, input_index: int) -> None:
assert input_index == 0, input_index
self._input_buffer.append(refs)

def all_inputs_done(self) -> None:
def inputs_done(self) -> None:
ctx = TaskContext(
task_idx=self._next_task_index,
sub_progress_bar_dict=self._sub_progress_bar_dict,
)
self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
self._next_task_index += 1
self._input_buffer.clear()
super().all_inputs_done()
super().inputs_done()

def has_next(self) -> bool:
return len(self._output_buffer) > 0
Expand Down Expand Up @@ -125,23 +102,3 @@ def close_sub_progress_bars(self):
if self._sub_progress_bar_dict is not None:
for sub_bar in self._sub_progress_bar_dict.values():
sub_bar.close()


class NAryOperator(PhysicalOperator):
"""An operator that has multiple input dependencies and one output.
This operator serves as the base for union, zip, etc.
"""

def __init__(
self,
*input_ops: LogicalOperator,
):
"""Create a OneToOneOperator.
Args:
input_op: Operator generating input data for this op.
name: The name of this operator.
"""
input_names = ", ".join([op._name for op in input_ops])
op_name = f"{self.__class__.__name__}({input_names})"
super().__init__(op_name, list(input_ops))
8 changes: 3 additions & 5 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
RefBundle,
TaskContext,
)
from ray.data._internal.execution.operators.base_physical_operator import (
OneToOneOperator,
)
from ray.data._internal.execution.operators.one_to_one_operator import OneToOneOperator
from ray.data._internal.memory_tracing import trace_allocation
from ray.data._internal.stats import StatsDict
from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata
Expand Down Expand Up @@ -282,13 +280,13 @@ def _handle_task_done(self, task: "_TaskState"):
if self._metrics.cur > self._metrics.peak:
self._metrics.peak = self._metrics.cur

def all_inputs_done(self):
def inputs_done(self):
self._block_ref_bundler.done_adding_bundles()
if self._block_ref_bundler.has_bundle():
# Handle any leftover bundles in the bundler.
bundle = self._block_ref_bundler.get_next_bundle()
self._add_bundled_input(bundle)
super().all_inputs_done()
super().inputs_done()

def has_next(self) -> bool:
assert self._started
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,35 @@

import ray
from ray.data._internal.execution.interfaces import PhysicalOperator, RefBundle
from ray.data._internal.execution.operators.base_physical_operator import (
OneToOneOperator,
)
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.stats import StatsDict
from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.types import ObjectRef


class OneToOneOperator(PhysicalOperator):
"""An operator that has one input and one output dependency.
This operator serves as the base for map, filter, limit, etc.
"""

def __init__(
self,
name: str,
input_op: PhysicalOperator,
):
"""Create a OneToOneOperator.
Args:
input_op: Operator generating input data for this op.
name: The name of this operator.
"""
super().__init__(name, [input_op])

@property
def input_dependency(self) -> PhysicalOperator:
return self.input_dependencies[0]


class LimitOperator(OneToOneOperator):
"""Physical operator for limit."""

Expand All @@ -29,7 +49,7 @@ def __init__(
self._cur_output_bundles = 0
super().__init__(self._name, input_op)
if self._limit <= 0:
self.all_inputs_done()
self.inputs_done()

def _limit_reached(self) -> bool:
return self._consumed_rows >= self._limit
Expand Down Expand Up @@ -79,7 +99,7 @@ def slice_fn(block, metadata, num_rows) -> Tuple[Block, BlockMetadata]:
)
self._buffer.append(out_refs)
if self._limit_reached():
self.all_inputs_done()
self.inputs_done()

def has_next(self) -> bool:
return len(self._buffer) > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def add_input(self, bundle, input_index) -> None:
self._buffer.append(bundle)
self._dispatch_bundles()

def all_inputs_done(self) -> None:
super().all_inputs_done()
def inputs_done(self) -> None:
super().inputs_done()
if not self._equal:
self._dispatch_bundles(dispatch_all=True)
assert not self._buffer, "Should have dispatched all bundles."
Expand Down
107 changes: 0 additions & 107 deletions python/ray/data/_internal/execution/operators/union_operator.py

This file was deleted.

4 changes: 2 additions & 2 deletions python/ray/data/_internal/execution/operators/zip_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ def add_input(self, refs: RefBundle, input_index: int) -> None:
else:
self._right_buffer.append(refs)

def all_inputs_done(self) -> None:
def inputs_done(self) -> None:
self._output_buffer, self._stats = self._zip(
self._left_buffer, self._right_buffer
)
self._left_buffer.clear()
self._right_buffer.clear()
super().all_inputs_done()
super().inputs_done()

def has_next(self) -> bool:
return len(self._output_buffer) > 0
Expand Down
3 changes: 0 additions & 3 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
build_streaming_topology,
process_completed_tasks,
select_operator_to_run,
update_operator_states,
)
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.stats import DatasetStats
Expand Down Expand Up @@ -258,8 +257,6 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
autoscaling_state=self._autoscaling_state,
)

update_operator_states(topology)

# Update the progress bar to reflect scheduling decisions.
for op_state in topology.values():
op_state.refresh_progress_bar()
Expand Down
Loading

0 comments on commit fb9faa3

Please sign in to comment.