Skip to content

Commit

Permalink
[data] remove stages before/after snapshot (ray-project#42004)
Browse files Browse the repository at this point in the history
Removes `_stages_before/after_snapshot` from `ExecutionPlan`.

This should be merged after ray-project#41747 and ray-project#41544
---------

Signed-off-by: Andrew Xue <andewzxue@gmail.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Co-authored-by: Scott Lee <sjl@anyscale.com>
  • Loading branch information
2 people authored and vickytsang committed Jan 12, 2024
1 parent a5657b2 commit dd5c8f5
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 119 deletions.
48 changes: 28 additions & 20 deletions python/ray/data/_internal/logical/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ray.data._internal.logical.operators.one_to_one_operator import AbstractOneToOne
from ray.data.block import UserDefinedFunction
from ray.data.context import DEFAULT_BATCH_SIZE
from ray.data.preprocessor import Preprocessor

logger = DatasetLogger(__name__)

Expand Down Expand Up @@ -35,25 +36,6 @@ def __init__(
self._ray_remote_args = ray_remote_args or {}


def _get_udf_name(fn: UserDefinedFunction) -> str:
try:
if inspect.isclass(fn):
# callable class
return fn.__name__
elif inspect.ismethod(fn):
# class method
return f"{fn.__self__.__class__.__name__}.{fn.__name__}"
elif inspect.isfunction(fn):
# normal function or lambda function.
return fn.__name__
else:
# callable object.
return fn.__class__.__name__
except AttributeError as e:
logger.get_logger().error("Failed to get name of UDF %s: %s", fn, e)
return "<unknown>"


class AbstractUDFMap(AbstractMap):
"""Abstract class for logical operators performing a UDF that should be converted
to physical MapOperator.
Expand Down Expand Up @@ -90,7 +72,7 @@ def __init__(
tasks, or ``"actors"`` to use an autoscaling actor pool.
ray_remote_args: Args to provide to ray.remote.
"""
name = f"{name}({_get_udf_name(fn)})"
name = self._get_operator_name(name, fn)
super().__init__(name, input_op, ray_remote_args=ray_remote_args)
self._fn = fn
self._fn_args = fn_args
Expand All @@ -100,6 +82,32 @@ def __init__(
self._min_rows_per_block = min_rows_per_block
self._compute = compute or TaskPoolStrategy()

def _get_operator_name(self, op_name: str, fn: UserDefinedFunction):
"""Gets the Operator name including the map `fn` UDF name."""
# If the input `fn` is a Preprocessor, the
# name is simply the name of the Preprocessor class.
if inspect.ismethod(fn) and isinstance(fn.__self__, Preprocessor):
return fn.__self__.__class__.__name__

# Otherwise, it takes the form of `<MapOperator class>(<UDF name>)`,
# e.g. `MapBatches(my_udf)`.
try:
if inspect.isclass(fn):
# callable class
return f"{op_name}({fn.__name__})"
elif inspect.ismethod(fn):
# class method
return f"{op_name}({fn.__self__.__class__.__name__}.{fn.__name__})"
elif inspect.isfunction(fn):
# normal function or lambda function.
return f"{op_name}({fn.__name__})"
else:
# callable object.
return f"{op_name}({fn.__class__.__name__})"
except AttributeError as e:
logger.get_logger().error("Failed to get name of UDF %s: %s", fn, e)
return "<unknown>"


class MapBatches(AbstractUDFMap):
"""Logical operator for map_batches."""
Expand Down
Loading

0 comments on commit dd5c8f5

Please sign in to comment.