From dd5c8f5e58e71967db35b4571d86cc193f157bea Mon Sep 17 00:00:00 2001 From: Andrew Xue Date: Fri, 12 Jan 2024 13:27:19 -0500 Subject: [PATCH] [data] remove stages before/after snapshot (#42004) Removes `_stages_before/after_snapshot` from `ExecutionPlan`. This should be merged after #41747 and #41544 --------- Signed-off-by: Andrew Xue Signed-off-by: Scott Lee Co-authored-by: Scott Lee --- .../logical/operators/map_operator.py | 48 ++-- python/ray/data/_internal/plan.py | 211 +++++++++++------- python/ray/data/dataset.py | 17 +- python/ray/data/tests/test_consumption.py | 16 +- python/ray/data/tests/test_optimize.py | 4 - python/ray/data/tests/test_stats.py | 7 +- 6 files changed, 184 insertions(+), 119 deletions(-) diff --git a/python/ray/data/_internal/logical/operators/map_operator.py b/python/ray/data/_internal/logical/operators/map_operator.py index f3b1f5e6b464..a127e4b56aa3 100644 --- a/python/ray/data/_internal/logical/operators/map_operator.py +++ b/python/ray/data/_internal/logical/operators/map_operator.py @@ -7,6 +7,7 @@ from ray.data._internal.logical.operators.one_to_one_operator import AbstractOneToOne from ray.data.block import UserDefinedFunction from ray.data.context import DEFAULT_BATCH_SIZE +from ray.data.preprocessor import Preprocessor logger = DatasetLogger(__name__) @@ -35,25 +36,6 @@ def __init__( self._ray_remote_args = ray_remote_args or {} -def _get_udf_name(fn: UserDefinedFunction) -> str: - try: - if inspect.isclass(fn): - # callable class - return fn.__name__ - elif inspect.ismethod(fn): - # class method - return f"{fn.__self__.__class__.__name__}.{fn.__name__}" - elif inspect.isfunction(fn): - # normal function or lambda function. - return fn.__name__ - else: - # callable object. - return fn.__class__.__name__ - except AttributeError as e: - logger.get_logger().error("Failed to get name of UDF %s: %s", fn, e) - return "" - - class AbstractUDFMap(AbstractMap): """Abstract class for logical operators performing a UDF that should be converted to physical MapOperator. @@ -90,7 +72,7 @@ def __init__( tasks, or ``"actors"`` to use an autoscaling actor pool. ray_remote_args: Args to provide to ray.remote. """ - name = f"{name}({_get_udf_name(fn)})" + name = self._get_operator_name(name, fn) super().__init__(name, input_op, ray_remote_args=ray_remote_args) self._fn = fn self._fn_args = fn_args @@ -100,6 +82,32 @@ def __init__( self._min_rows_per_block = min_rows_per_block self._compute = compute or TaskPoolStrategy() + def _get_operator_name(self, op_name: str, fn: UserDefinedFunction): + """Gets the Operator name including the map `fn` UDF name.""" + # If the input `fn` is a Preprocessor, the + # name is simply the name of the Preprocessor class. + if inspect.ismethod(fn) and isinstance(fn.__self__, Preprocessor): + return fn.__self__.__class__.__name__ + + # Otherwise, it takes the form of `()`, + # e.g. `MapBatches(my_udf)`. + try: + if inspect.isclass(fn): + # callable class + return f"{op_name}({fn.__name__})" + elif inspect.ismethod(fn): + # class method + return f"{op_name}({fn.__self__.__class__.__name__}.{fn.__name__})" + elif inspect.isfunction(fn): + # normal function or lambda function. + return f"{op_name}({fn.__name__})" + else: + # callable object. + return f"{op_name}({fn.__class__.__name__})" + except AttributeError as e: + logger.get_logger().error("Failed to get name of UDF %s: %s", fn, e) + return "" + class MapBatches(AbstractUDFMap): """Logical operator for map_batches.""" diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 4398e413355c..73ca49133789 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -29,15 +29,13 @@ from ray.data._internal.dataset_logger import DatasetLogger from ray.data._internal.execution.interfaces import TaskContext from ray.data._internal.lazy_block_list import LazyBlockList +from ray.data._internal.logical.interfaces.logical_operator import LogicalOperator +from ray.data._internal.logical.operators.from_operators import AbstractFrom from ray.data._internal.logical.operators.input_data_operator import InputData from ray.data._internal.logical.operators.read_operator import Read from ray.data._internal.logical.rules.operator_fusion import _are_remote_args_compatible from ray.data._internal.stats import DatasetStats, DatasetStatsSummary -from ray.data._internal.util import ( - capitalize, - create_dataset_tag, - unify_block_metadata_schema, -) +from ray.data._internal.util import create_dataset_tag, unify_block_metadata_schema from ray.data.block import Block, BlockMetadata from ray.data.context import DataContext from ray.types import ObjectRef @@ -47,6 +45,7 @@ import pyarrow from ray.data._internal.execution.interfaces import Executor + from ray.data._internal.logical.interfaces.logical_plan import LogicalPlan # Scheduling strategy can be inherited from prev stage if not specified. @@ -124,10 +123,8 @@ def __init__( self._in_stats = stats # A computed snapshot of some prefix of stages. self._snapshot_blocks = None + self._snapshot_operator: Optional[LogicalOperator] = None self._snapshot_stats = None - # Chains of stages. - self._stages_before_snapshot = [] - self._stages_after_snapshot = [] # Cache of optimized stages. self._last_optimized_stages = None # Cached schema. @@ -151,8 +148,7 @@ def __repr__(self) -> str: f"dataset_uuid={self._dataset_uuid}, " f"run_by_consumer={self._run_by_consumer}, " f"in_blocks={self._in_blocks}, " - f"stages_before_snapshot={self._stages_before_snapshot}, " - f"stages_after_snapshot={self._stages_after_snapshot}, " + f"snapshot_operator={self._snapshot_operator}" f"snapshot_blocks={self._snapshot_blocks})" ) @@ -169,28 +165,51 @@ def get_plan_as_string(self, classname: str) -> str: # Do not force execution for schema, as this method is expected to be very # cheap. plan_str = "" - num_stages = 0 + plan_max_depth = 0 dataset_blocks = None - if self._stages_after_snapshot: - # Get string representation of each stage in reverse order. - for stage in self._stages_after_snapshot[::-1]: - # Get name of each stage in pascal case. - # The stage representation should be in "(...)" format, - # e.g. "MapBatches(my_udf)". - # - # TODO(chengsu): create a class to represent stage name to make it less - # fragile to parse. - stage_str = stage.name.split("(") - stage_str[0] = capitalize(stage_str[0]) - stage_name = "(".join(stage_str) - if num_stages == 0: - plan_str += f"{stage_name}\n" + if ( + self._snapshot_blocks is None + or self._snapshot_operator != self._logical_plan.dag + ): + + def generate_logical_plan_string( + op: LogicalOperator, + curr_str: str = "", + depth: int = 0, + ): + """Traverse (DFS) the LogicalPlan DAG and + return a string representation of the operators.""" + if isinstance(op, (Read, InputData, AbstractFrom)): + return curr_str, depth + + curr_max_depth = depth + op_name = op.name + if depth == 0: + curr_str += f"{op_name}\n" else: - trailing_space = " " * ((num_stages - 1) * 3) - plan_str += f"{trailing_space}+- {stage_name}\n" - num_stages += 1 + trailing_space = " " * ((depth - 1) * 3) + curr_str += f"{trailing_space}+- {op_name}\n" + + for input in op.input_dependencies: + curr_str, input_max_depth = generate_logical_plan_string( + input, curr_str, depth + 1 + ) + curr_max_depth = max(curr_max_depth, input_max_depth) + return curr_str, curr_max_depth + + # generate_logical_plan_string(self._logical_plan.dag) + plan_str, plan_max_depth = generate_logical_plan_string( + self._logical_plan.dag + ) # Get schema of initial blocks. + if self.needs_eager_execution(): + # In the case where the plan contains only a Read/From operator, + # it is cheap to execute it. + # This allows us to get the most accurate estimates related + # to the dataset, after applying execution plan optimizer rules + # (e.g. number of blocks may change based on parallelism). + self.execute() if self._snapshot_blocks is not None: schema = self._get_unified_blocks_schema( self._snapshot_blocks, fetch_if_missing=False @@ -243,7 +262,8 @@ def get_plan_as_string(self, classname: str) -> str: SCHEMA_LINE_CHAR_LIMIT = 80 MIN_FIELD_LENGTH = 10 INDENT_STR = " " * 3 - trailing_space = " " * (max(num_stages, 0) * 3) + trailing_space = INDENT_STR * plan_max_depth + if len(dataset_str) > SCHEMA_LINE_CHAR_LIMIT: # If the resulting string representation exceeds the line char limit, # first try breaking up each `Dataset` parameter into its own line @@ -290,11 +310,10 @@ def get_plan_as_string(self, classname: str) -> str: f"\n{trailing_space})" ) - if num_stages == 0: - plan_str = dataset_str + if plan_max_depth == 0: + plan_str += dataset_str else: - trailing_space = " " * ((num_stages - 1) * 3) - plan_str += f"{trailing_space}+- {dataset_str}" + plan_str += f"{INDENT_STR * (plan_max_depth - 1)}+- {dataset_str}" return plan_str def with_stage(self, stage: "Stage") -> "ExecutionPlan": @@ -307,10 +326,9 @@ def with_stage(self, stage: "Stage") -> "ExecutionPlan": A new ExecutionPlan with this stage appended. """ copy = self.copy() - copy._stages_after_snapshot.append(stage) return copy - def link_logical_plan(self, logical_plan): + def link_logical_plan(self, logical_plan: "LogicalPlan"): """Link the logical plan into this execution plan. This is used for triggering execution for optimizer code path in this legacy @@ -336,9 +354,8 @@ def copy(self) -> "ExecutionPlan": if self._snapshot_blocks is not None: # Copy over the existing snapshot. plan_copy._snapshot_blocks = self._snapshot_blocks + plan_copy._snapshot_operator = self._snapshot_operator plan_copy._snapshot_stats = self._snapshot_stats - plan_copy._stages_before_snapshot = self._stages_before_snapshot.copy() - plan_copy._stages_after_snapshot = self._stages_after_snapshot.copy() plan_copy._dataset_name = self._dataset_name return plan_copy @@ -361,9 +378,8 @@ def deep_copy(self) -> "ExecutionPlan": if self._snapshot_blocks: # Copy over the existing snapshot. plan_copy._snapshot_blocks = self._snapshot_blocks.copy() + plan_copy._snapshot_operator = copy.copy(self._snapshot_operator) plan_copy._snapshot_stats = copy.copy(self._snapshot_stats) - plan_copy._stages_before_snapshot = self._stages_before_snapshot.copy() - plan_copy._stages_after_snapshot = self._stages_after_snapshot.copy() plan_copy._dataset_name = self._dataset_name return plan_copy @@ -382,42 +398,53 @@ def schema( Returns: The schema of the output dataset. """ - from ray.data._internal.stage_impl import RandomizeBlocksStage + from ray.data._internal.logical.operators.all_to_all_operator import ( + RandomizeBlocks, + ) if self._schema is not None: return self._schema - if self._stages_after_snapshot: + if self._snapshot_blocks is None or ( + self._snapshot_operator is not None + and self._snapshot_operator.output_dependencies + ): + # There remain some operators yet to be executed. + # Even if a schema is already previously known, it may change, + # so we try executing to get the most updated schema. + # TODO(swang): There are several other stage types that could # inherit the schema or we can compute the schema without having to # execute any of the dataset: limit, filter, map_batches for # add/drop columns, etc. if fetch_if_missing: - if isinstance(self._stages_after_snapshot[-1], RandomizeBlocksStage): + if isinstance(self._logical_plan.dag, RandomizeBlocks): # TODO(ekl): this is a hack to optimize the case where we have a # trailing randomize block stages. That stage has no effect and # so we don't need to execute all blocks to get the schema. - a = self._stages_after_snapshot.pop() + + randomize_blocks_op = self._logical_plan.dag + self._logical_plan._dag = randomize_blocks_op.input_dependencies[0] try: self.execute() finally: - self._stages_after_snapshot.append(a) + self._logical_plan = randomize_blocks_op else: self.execute() - elif len(self._stages_after_snapshot) == 1 and isinstance( - self._stages_after_snapshot[-1], RandomizeBlocksStage + elif self.needs_eager_execution() or ( + isinstance(self._logical_plan.dag, RandomizeBlocks) + and self.needs_eager_execution( + self._logical_plan.dag.input_dependencies[0] + ) ): - # If RandomizeBlocksStage is last stage, we execute it (regardless of - # the fetch_if_missing), since RandomizeBlocksStage is just changing - # the order of references (hence super cheap). + # If the plan is input/read only, we execute it, so snapshot has output. + # If RandomizeBlocks is the last operator preceded by a input/read + # only plan, we can also execute it (regardless of the fetch_if_missing) + # since RandomizeBlocksStage is just changing the order of references + # (hence super cheap). Calling execute does not trigger read tasks. self.execute() else: return None - elif self._in_blocks is not None and self._snapshot_blocks is None: - # If the plan only has input blocks, we execute it, so snapshot has output. - # This applies to newly created dataset. For example, initial dataset - # from read, and output datasets of Dataset.split(). - self.execute() # Snapshot is now guaranteed to be the output of the final stage or None. blocks = self._snapshot_blocks if not blocks: @@ -461,17 +488,15 @@ def _get_unified_blocks_schema( return None def meta_count(self) -> Optional[int]: - """Get the number of rows after applying all plan stages if possible. + """Get the number of rows after applying all plan optimizations, if possible. This method will never trigger any computation. Returns: The number of records of the result Dataset, or None. """ - if self._stages_after_snapshot: - return None - elif self._in_blocks is not None and self._snapshot_blocks is None: - # If the plan only has input blocks, we execute it, so snapshot has output. + if self.needs_eager_execution(): + # If the plan is input/read only, we execute it, so snapshot has output. # This applies to newly created dataset. For example, initial dataset # from read, and output datasets of Dataset.split(). self.execute() @@ -581,7 +606,7 @@ def execute( get_legacy_lazy_block_list_read_only, ) - if self._is_input_data_only(): + if self.is_from_in_memory_only(): # No need to execute MaterializedDatasets with only an InputData # operator, since the data is already materialized. This also avoids # recording unnecessary metrics for an empty plan execution. @@ -648,10 +673,9 @@ def collect_stats(cur_stats): # Set the snapshot to the output of the final stage. self._snapshot_blocks = blocks + self._snapshot_operator = self._logical_plan.dag self._snapshot_stats = stats self._snapshot_stats.dataset_uuid = self._dataset_uuid - self._stages_before_snapshot += self._stages_after_snapshot - self._stages_after_snapshot = [] # In the case of a read-only dataset, we replace the # input LazyBlockList with a copy that includes the @@ -664,6 +688,7 @@ def collect_stats(cur_stats): # The snapshot blocks after execution will contain the execution stats. self._snapshot_stats = self._snapshot_blocks.stats() self._snapshot_blocks = executed_blocks + self._snapshot_operator = self._logical_plan.dag # When force-read is enabled, we similarly update self._in_blocks. if self.is_read_only(): self._in_blocks = self._snapshot_blocks @@ -679,13 +704,8 @@ def clear_block_refs(self) -> None: def _clear_snapshot(self) -> None: """Clear the snapshot kept in the plan to the beginning state.""" self._snapshot_blocks = None + self._snapshot_operator = None self._snapshot_stats = None - # We're erasing the snapshot, so put all stages into the "after snapshot" - # bucket. - self._stages_after_snapshot = ( - self._stages_before_snapshot + self._stages_after_snapshot - ) - self._stages_before_snapshot = [] def stats(self) -> DatasetStats: """Return stats for this plan. @@ -703,16 +723,42 @@ def has_lazy_input(self) -> bool: """Return whether this plan has lazy input blocks.""" return _is_lazy(self._in_blocks) - def is_read_only(self) -> bool: - """Return whether the underlying logical plan contains only a Read op.""" - root_op = self._logical_plan.dag + def needs_eager_execution(self, root_op: Optional[LogicalOperator] = None) -> bool: + """Return whether the LogicalPlan corresponding to `root_op` + should be eagerly executed. By default, the last operator of + the LogicalPlan is used. + + This is often useful for input/read-only plans, + where eager execution fetches accurate metadata for the dataset + without executing the underlying read tasks.""" + if root_op is None: + root_op = self._logical_plan.dag + # Since read tasks will not be scheduled until data is consumed or materialized, + # it is cheap to execute the plan (i.e. run the plan optimizer). + # In the case where the data is already in-memory (InputData, + # FromXXX operator), it is similarly also cheap to execute it. + return self.is_from_in_memory_only(root_op) or self.is_read_only(root_op) + + def is_read_only(self, root_op: Optional[LogicalOperator] = None) -> bool: + """Return whether the LogicalPlan corresponding to `root_op` + contains only a Read op. By default, the last operator of + the LogicalPlan is used.""" + if root_op is None: + root_op = self._logical_plan.dag return isinstance(root_op, Read) and len(root_op.input_dependencies) == 0 - def _is_input_data_only(self) -> bool: - """Return whether the underlying logical plan contains only an InputData op - (e.g. in the case of a :class:`~ray.data.MaterializedDataset`).""" - root_op = self._logical_plan.dag - return isinstance(root_op, InputData) and len(root_op.input_dependencies) == 0 + def is_from_in_memory_only(self, root_op: Optional[LogicalOperator] = None) -> bool: + """Return whether the LogicalPlan corresponding to `root_op` + contains only a read of already in-memory data (e.g. `FromXXX` + operators for `from_xxx` APIs, `InputData` operator for + :class:`~ray.data.MaterializedDataset`). By default, the last operator of + the LogicalPlan is used.""" + if root_op is None: + root_op = self._logical_plan.dag + return ( + isinstance(root_op, (InputData, AbstractFrom)) + and len(root_op.input_dependencies) == 0 + ) def has_computed_output(self) -> bool: """Whether this plan has a computed snapshot for the final stage, i.e. for the @@ -720,8 +766,8 @@ def has_computed_output(self) -> bool: """ return ( self._snapshot_blocks is not None - and not self._stages_after_snapshot and not self._snapshot_blocks.is_cleared() + and self._snapshot_operator == self._logical_plan.dag ) def _run_with_new_execution_backend(self) -> bool: @@ -731,13 +777,12 @@ def _run_with_new_execution_backend(self) -> bool: return self._context.new_execution_backend def require_preserve_order(self) -> bool: - """Whether this plan requires to preserve order when running with new - backend. - """ - from ray.data._internal.stage_impl import SortStage, ZipStage + """Whether this plan requires to preserve order.""" + from ray.data._internal.logical.operators.all_to_all_operator import Sort + from ray.data._internal.logical.operators.n_ary_operator import Zip - for stage in self._stages_after_snapshot: - if isinstance(stage, ZipStage) or isinstance(stage, SortStage): + for op in self._logical_plan.dag.post_order_iter(): + if isinstance(op, (Zip, Sort)): return True return False diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 5395e30e0b5d..90a38bc89ee6 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2678,10 +2678,19 @@ def schema(self, fetch_if_missing: bool = True) -> Optional["Schema"]: if not fetch_if_missing: return None - # Lazily execute only the first block to minimize computation. - # We achieve this by appending a Limit[1] operation to a copy - # of this Dataset, which we then execute to get its schema. - base_schema = self.limit(1)._plan.schema(fetch_if_missing=fetch_if_missing) + if self._plan.is_read_only(): + # For read-only plans, there is special logic for fetching the + # schema from already known metadata + # (see `get_legacy_lazy_block_list_read_only()`). This requires + # the underlying logical plan to be read-only, so we skip appending + # the Limit[1] operation as we do in the else case below. There is + # no downside in this case, since it doesn't execute any read tasks. + base_schema = self._plan.schema(fetch_if_missing=fetch_if_missing) + else: + # Lazily execute only the first block to minimize computation. + # We achieve this by appending a Limit[1] operation to a copy + # of this Dataset, which we then execute to get its schema. + base_schema = self.limit(1)._plan.schema(fetch_if_missing=fetch_if_missing) if base_schema: self._plan.cache_schema(base_schema) return Schema(base_schema) diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index cfb4f0ac2e62..8f22f31e2280 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -452,14 +452,14 @@ def test_dataset_repr(ray_start_regular_shared): ) ds = ds.filter(lambda x: x["id"] > 0) assert repr(ds) == ( - "Filter\n" + "Filter()\n" "+- MapBatches()\n" " +- Dataset(num_blocks=10, num_rows=10, schema={id: int64})" ) ds = ds.random_shuffle() assert repr(ds) == ( "RandomShuffle\n" - "+- Filter\n" + "+- Filter()\n" " +- MapBatches()\n" " +- Dataset(num_blocks=10, num_rows=10, schema={id: int64})" ) @@ -468,6 +468,7 @@ def test_dataset_repr(ray_start_regular_shared): repr(ds) == "MaterializedDataset(num_blocks=10, num_rows=9, schema={id: int64})" ) ds = ds.map_batches(lambda x: x) + assert repr(ds) == ( "MapBatches()\n" "+- Dataset(num_blocks=10, num_rows=9, schema={id: int64})" @@ -482,10 +483,17 @@ def test_dataset_repr(ray_start_regular_shared): "schema={id: int64})" ) ds3 = ds1.union(ds2) - assert repr(ds3) == "Dataset(num_blocks=10, num_rows=9, schema={id: int64})" + # TODO(scottjlee): include all of the input datasets to union() + # in the repr output, instead of only the resulting unioned dataset. + assert repr(ds3) == ( + "Union\n+- Dataset(num_blocks=10, num_rows=9, schema={id: int64})" + ) ds = ds.zip(ds3) assert repr(ds) == ( - "Zip\n" "+- Dataset(num_blocks=10, num_rows=9, schema={id: int64})" + "Zip\n" + "+- MapBatches()\n" + "+- Union\n" + " +- Dataset(num_blocks=10, num_rows=9, schema={id: int64})" ) def my_dummy_fn(x): diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py index c16e40905bd1..2a514d15cd4b 100644 --- a/python/ray/data/tests/test_optimize.py +++ b/python/ray/data/tests/test_optimize.py @@ -212,10 +212,6 @@ def test_spread_hint_inherit(ray_start_regular_shared): ds = ray.data.range(10) ds = ds.map(column_udf("id", lambda x: x + 1)) ds = ds.random_shuffle() - for s in ds._plan._stages_before_snapshot: - assert s.ray_remote_args == {}, s.ray_remote_args - for s in ds._plan._stages_after_snapshot: - assert s.ray_remote_args == {}, s.ray_remote_args shuffle_op = ds._plan._logical_plan.dag read_op = shuffle_op.input_dependencies[0].input_dependencies[0] diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index a11d91ba3752..1aa93371abf3 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -1242,10 +1242,9 @@ def test_dataset_name(): ds = ray.data.range(100, parallelism=20).map_batches(lambda x: x) ds._set_name("test_ds") assert ds._name == "test_ds" - assert ( - str(ds) - == """MapBatches() -+- Dataset(name=test_ds, num_blocks=20, num_rows=100, schema={id: int64})""" + assert str(ds) == ( + "MapBatches()\n" + "+- Dataset(name=test_ds, num_blocks=20, num_rows=100, schema={id: int64})" ) with patch_update_stats_actor() as update_fn: mds = ds.materialize()