From 8591ac6b74f1194eeeb7a22ba373522246ddc933 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 2 Feb 2023 23:51:01 +0000 Subject: [PATCH 1/9] Propagate logical plan on ds.lazy() call. --- python/ray/data/dataset.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index bd61c92f645e2..37acc24d27220 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -3990,7 +3990,9 @@ def lazy(self) -> "Dataset[T]": ``.iter_batches()``, ``.to_torch()``, ``.to_tf()``, etc.) or execution is manually triggered via ``.fully_executed()``. """ - ds = Dataset(self._plan, self._epoch, lazy=True) + ds = Dataset( + self._plan, self._epoch, lazy=True, logical_plan=self._logical_plan + ) ds._set_uuid(self._get_uuid()) return ds From 8ea5f3069fcc8f753781469f6e7d9b6aeabe9342 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Wed, 8 Feb 2023 18:32:28 +0000 Subject: [PATCH 2/9] Move DatasetContext setting to cached_remote_fn, make sure its set for actor pool actors. --- .../execution/operators/actor_pool_map_operator.py | 6 +++++- python/ray/data/_internal/lazy_block_list.py | 6 ------ python/ray/data/_internal/remote_fn.py | 11 ++++++++++- python/ray/data/read_api.py | 6 +----- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index cc2700890de7d..a7a91c183e3f2 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -88,7 +88,8 @@ def start(self, options: ExecutionOptions): def _start_actor(self): """Start a new actor and add it to the actor pool as a pending actor.""" assert self._cls is not None - actor = self._cls.remote() + ctx = DatasetContext.get_current() + actor = self._cls.remote(ctx) self._actor_pool.add_pending_actor(actor, actor.get_location.remote()) def _add_bundled_input(self, bundle: RefBundle): @@ -279,6 +280,9 @@ def _apply_default_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str, Any class _MapWorker: """An actor worker for MapOperator.""" + def __init__(self, ctx: DatasetContext): + DatasetContext._set_current(ctx) + def get_location(self) -> NodeIdStr: return ray.get_runtime_context().get_node_id() diff --git a/python/ray/data/_internal/lazy_block_list.py b/python/ray/data/_internal/lazy_block_list.py index 8f4d384579112..551e3e3da7175 100644 --- a/python/ray/data/_internal/lazy_block_list.py +++ b/python/ray/data/_internal/lazy_block_list.py @@ -600,7 +600,6 @@ def _submit_task( .remote( i=task_idx, task=task, - context=DatasetContext.get_current(), stats_uuid=self._stats_uuid, stats_actor=stats_actor, ), @@ -613,7 +612,6 @@ def _submit_task( .remote( i=task_idx, task=task, - context=DatasetContext.get_current(), stats_uuid=self._stats_uuid, stats_actor=stats_actor, ) @@ -640,11 +638,9 @@ def _flatten_metadata( def _execute_read_task_nosplit( i: int, task: ReadTask, - context: DatasetContext, stats_uuid: str, stats_actor: ray.actor.ActorHandle, ) -> Tuple[Block, BlockMetadata]: - DatasetContext._set_current(context) stats = BlockExecStats.builder() # Execute the task. Expect only one block returned when dynamic block splitting is @@ -664,7 +660,6 @@ def _execute_read_task_nosplit( def _execute_read_task_split( i: int, task: ReadTask, - context: DatasetContext, stats_uuid: str, stats_actor: ray.actor.ActorHandle, ) -> Iterable[Union[Block, List[BlockMetadata]]]: @@ -674,7 +669,6 @@ def _execute_read_task_split( Example of return value for 3 blocks: (Block1, Block2, Block3, [BlockMetadata1, BlockMetadata2, BlockMetadata3]) """ - DatasetContext._set_current(context) # Execute the task. blocks = task() diff --git a/python/ray/data/_internal/remote_fn.py b/python/ray/data/_internal/remote_fn.py index 88528e04a34fc..d6c46ca689689 100644 --- a/python/ray/data/_internal/remote_fn.py +++ b/python/ray/data/_internal/remote_fn.py @@ -1,3 +1,4 @@ +import functools from typing import Any import ray @@ -20,7 +21,15 @@ def cached_remote_fn(fn: Any, **ray_remote_args) -> Any: "retry_exceptions": True, "scheduling_strategy": ctx.scheduling_strategy, } + + @functools.wraps(fn) + def wrapper(*args, **kwargs): + # Wrapper that sets the DatasetContext that existed on the driver/task + # submitter. + DatasetContext._set_current(ctx) + return fn(*args, **kwargs) + CACHED_FUNCTIONS[fn] = ray.remote( **{**default_ray_remote_args, **ray_remote_args} - )(fn) + )(wrapper) return CACHED_FUNCTIONS[fn] diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 25add1d6247b9..6c080ace55745 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -292,7 +292,7 @@ def read_datasource( if force_local: requested_parallelism, min_safe_parallelism, read_tasks = _get_read_tasks( - datasource, ctx, cur_pg, parallelism, local_uri, read_args + datasource, cur_pg, parallelism, local_uri, read_args ) else: # Prepare read in a remote task so that in Ray client mode, we aren't @@ -304,7 +304,6 @@ def read_datasource( requested_parallelism, min_safe_parallelism, read_tasks = ray.get( get_read_tasks.remote( datasource, - ctx, cur_pg, parallelism, local_uri, @@ -1585,7 +1584,6 @@ def _get_metadata(table: Union["pyarrow.Table", "pandas.DataFrame"]) -> BlockMet def _get_read_tasks( ds: Datasource, - ctx: DatasetContext, cur_pg: Optional[PlacementGroup], parallelism: int, local_uri: bool, @@ -1595,7 +1593,6 @@ def _get_read_tasks( Args: ds: Datasource to read from. - ctx: Dataset config to use. cur_pg: The current placement group, if any. parallelism: The user-requested parallelism, or -1 for autodetection. kwargs: Additional kwargs to pass to the reader. @@ -1607,7 +1604,6 @@ def _get_read_tasks( kwargs = _unwrap_arrow_serialization_workaround(kwargs) if local_uri: kwargs["local_uri"] = local_uri - DatasetContext._set_current(ctx) reader = ds.create_reader(**kwargs) requested_parallelism, min_safe_parallelism = _autodetect_parallelism( parallelism, cur_pg, DatasetContext.get_current(), reader From 8bcb07eb4d96a0610b3ba3d2d08012e87a50797b Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Wed, 8 Feb 2023 17:17:54 +0000 Subject: [PATCH 3/9] Don't ignore Pandas chained assignment warning with Pandas OptionContext. --- python/ray/air/util/data_batch_conversion.py | 79 ++++++++++++-------- 1 file changed, 48 insertions(+), 31 deletions(-) diff --git a/python/ray/air/util/data_batch_conversion.py b/python/ray/air/util/data_batch_conversion.py index 555a9483d3d59..229f6f2753da3 100644 --- a/python/ray/air/util/data_batch_conversion.py +++ b/python/ray/air/util/data_batch_conversion.py @@ -237,6 +237,12 @@ def _cast_ndarray_columns_to_tensor_extension(df: "pd.DataFrame") -> "pd.DataFra Cast all NumPy ndarray columns in df to our tensor extension type, TensorArray. """ pd = _lazy_import_pandas() + try: + SettingWithCopyWarning = pd.core.common.SettingWithCopyWarning + except AttributeError: + # SettingWithCopyWarning was moved to pd.errors in Pandas 1.5.0. + SettingWithCopyWarning = pd.errors.SettingWithCopyWarning + from ray.air.util.tensor_extensions.pandas import ( TensorArray, column_needs_tensor_extension, @@ -246,42 +252,53 @@ def _cast_ndarray_columns_to_tensor_extension(df: "pd.DataFrame") -> "pd.DataFra # TODO(Clark): Once Pandas supports registering extension types for type # inference on construction, implement as much for NumPy ndarrays and remove # this. See https://github.com/pandas-dev/pandas/issues/41848 - with pd.option_context("chained_assignment", None): - for col_name, col in df.items(): - if column_needs_tensor_extension(col): - try: - # Suppress Pandas warnings: - # https://github.com/ray-project/ray/issues/29270 - # We actually want in-place operations so we surpress this warning. - # https://stackoverflow.com/a/74193599 - with warnings.catch_warnings(): - warnings.simplefilter("ignore", category=FutureWarning) - df.loc[:, col_name] = TensorArray(col) - except Exception as e: - raise ValueError( - f"Tried to cast column {col_name} to the TensorArray tensor " - "extension type but the conversion failed. To disable " - "automatic casting to this tensor extension, set " - "ctx = DatasetContext.get_current(); " - "ctx.enable_tensor_extension_casting = False." - ) from e + # TODO(Clark): Optimize this with propagated DataFrame metadata containing a list of + # column names containing tensor columns, to make this an O(# of tensor columns) + # check rather than the current O(# of columns) check. + for col_name, col in df.items(): + if column_needs_tensor_extension(col): + try: + # Suppress Pandas warnings: + # https://github.com/ray-project/ray/issues/29270 + # We actually want in-place operations so we surpress this warning. + # https://stackoverflow.com/a/74193599 + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=FutureWarning) + warnings.simplefilter("ignore", category=SettingWithCopyWarning) + df.loc[:, col_name] = TensorArray(col) + except Exception as e: + raise ValueError( + f"Tried to cast column {col_name} to the TensorArray tensor " + "extension type but the conversion failed. To disable " + "automatic casting to this tensor extension, set " + "ctx = DatasetContext.get_current(); " + "ctx.enable_tensor_extension_casting = False." + ) from e return df def _cast_tensor_columns_to_ndarrays(df: "pd.DataFrame") -> "pd.DataFrame": """Cast all tensor extension columns in df to NumPy ndarrays.""" pd = _lazy_import_pandas() + try: + SettingWithCopyWarning = pd.core.common.SettingWithCopyWarning + except AttributeError: + # SettingWithCopyWarning was moved to pd.errors in Pandas 1.5.0. + SettingWithCopyWarning = pd.errors.SettingWithCopyWarning from ray.air.util.tensor_extensions.pandas import TensorDtype - with pd.option_context("chained_assignment", None): - # Try to convert any tensor extension columns to ndarray columns. - for col_name, col in df.items(): - if isinstance(col.dtype, TensorDtype): - # Suppress Pandas warnings: - # https://github.com/ray-project/ray/issues/29270 - # We actually want in-place operations so we surpress this warning. - # https://stackoverflow.com/a/74193599 - with warnings.catch_warnings(): - warnings.simplefilter("ignore", category=FutureWarning) - df.loc[:, col_name] = pd.Series(list(col.to_numpy())) - return df + # Try to convert any tensor extension columns to ndarray columns. + # TODO(Clark): Optimize this with propagated DataFrame metadata containing a list of + # column names containing tensor columns, to make this an O(# of tensor columns) + # check rather than the current O(# of columns) check. + for col_name, col in df.items(): + if isinstance(col.dtype, TensorDtype): + # Suppress Pandas warnings: + # https://github.com/ray-project/ray/issues/29270 + # We actually want in-place operations so we surpress this warning. + # https://stackoverflow.com/a/74193599 + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=FutureWarning) + warnings.simplefilter("ignore", category=SettingWithCopyWarning) + df.loc[:, col_name] = pd.Series(list(col.to_numpy())) + return df From c394a7243a59c28ae8897c2e398b71e2f82d19eb Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Wed, 8 Feb 2023 21:57:20 +0000 Subject: [PATCH 4/9] Misc. perf improvements to data layer. --- python/ray/data/_internal/arrow_block.py | 6 +++++- python/ray/data/_internal/batcher.py | 9 ++++----- python/ray/data/_internal/block_builder.py | 4 ++++ .../_internal/delegating_block_builder.py | 5 +++++ python/ray/data/_internal/pandas_block.py | 12 +++++++---- python/ray/data/_internal/simple_block.py | 6 ++++-- python/ray/data/_internal/size_estimator.py | 14 ++++++++++++- python/ray/data/_internal/table_block.py | 20 ++++++++++++++++++- 8 files changed, 62 insertions(+), 14 deletions(-) diff --git a/python/ray/data/_internal/arrow_block.py b/python/ray/data/_internal/arrow_block.py index 0d25e97e9eaad..b199eb69cd106 100644 --- a/python/ray/data/_internal/arrow_block.py +++ b/python/ray/data/_internal/arrow_block.py @@ -126,6 +126,10 @@ def _table_from_pydict(columns: Dict[str, List[Any]]) -> Block: def _concat_tables(tables: List[Block]) -> Block: return transform_pyarrow.concat(tables) + @staticmethod + def _concat_would_copy() -> bool: + return False + @staticmethod def _empty_table() -> "pyarrow.Table": return pyarrow.Table.from_pydict({}) @@ -224,7 +228,7 @@ def schema(self) -> "pyarrow.lib.Schema": def to_pandas(self) -> "pandas.DataFrame": from ray.air.util.data_batch_conversion import _cast_tensor_columns_to_ndarrays - df = self._table.to_pandas() + df = self._table.to_pandas(use_threads=False) ctx = DatasetContext.get_current() if ctx.enable_tensor_extension_casting: df = _cast_tensor_columns_to_ndarrays(df) diff --git a/python/ray/data/_internal/batcher.py b/python/ray/data/_internal/batcher.py index 569e7aac47171..d1358990dfcbd 100644 --- a/python/ray/data/_internal/batcher.py +++ b/python/ray/data/_internal/batcher.py @@ -102,11 +102,12 @@ def next_batch(self) -> Block: A batch represented as a Block. """ assert self.has_batch() or (self._done_adding and self.has_any()) + needs_copy = self._ensure_copy # If no batch size, short-circuit. if self._batch_size is None: assert len(self._buffer) == 1 block = self._buffer[0] - if self._ensure_copy: + if needs_copy: # Copy block if needing to ensure fresh batch copy. block = BlockAccessor.for_block(block) block = block.slice(0, block.num_rows(), copy=True) @@ -139,13 +140,11 @@ def next_batch(self) -> Block: # blocks consumed on the next batch extraction. self._buffer = leftover self._buffer_size -= self._batch_size + needs_copy = needs_copy and not output.will_build_yield_copy() batch = output.build() - if self._ensure_copy: + if needs_copy: # Need to ensure that the batch is a fresh copy. batch = BlockAccessor.for_block(batch) - # TOOD(Clark): This copy will often be unnecessary, e.g. for pandas - # DataFrame batches that have required concatenation to construct, which - # always requires a copy. We should elide this copy in those cases. batch = batch.slice(0, batch.num_rows(), copy=True) return batch diff --git a/python/ray/data/_internal/block_builder.py b/python/ray/data/_internal/block_builder.py index d747125336cc4..0d64ddadb26f4 100644 --- a/python/ray/data/_internal/block_builder.py +++ b/python/ray/data/_internal/block_builder.py @@ -18,6 +18,10 @@ def add_block(self, block: Block) -> None: """Append an entire block to the block being built.""" raise NotImplementedError + def will_build_yield_copy(self) -> bool: + """Whether building this block will yield a new block copy.""" + raise NotImplementedError + def build(self) -> Block: """Build the block.""" raise NotImplementedError diff --git a/python/ray/data/_internal/delegating_block_builder.py b/python/ray/data/_internal/delegating_block_builder.py index 4610b4c4a0b44..9a5a734877974 100644 --- a/python/ray/data/_internal/delegating_block_builder.py +++ b/python/ray/data/_internal/delegating_block_builder.py @@ -55,6 +55,11 @@ def add_block(self, block: Block): self._builder = accessor.builder() self._builder.add_block(block) + def will_build_yield_copy(self) -> bool: + if self._builder is None: + return True + return self._builder.will_build_yield_copy() + def build(self) -> Block: if self._builder is None: if self._empty_block is not None: diff --git a/python/ray/data/_internal/pandas_block.py b/python/ray/data/_internal/pandas_block.py index 8d78306d9f979..f6b9b7df2211a 100644 --- a/python/ray/data/_internal/pandas_block.py +++ b/python/ray/data/_internal/pandas_block.py @@ -106,14 +106,18 @@ def _concat_tables(tables: List["pandas.DataFrame"]) -> "pandas.DataFrame": if len(tables) > 1: df = pandas.concat(tables, ignore_index=True) + df.reset_index(drop=True, inplace=True) + ctx = DatasetContext.get_current() + if ctx.enable_tensor_extension_casting: + df = _cast_ndarray_columns_to_tensor_extension(df) else: df = tables[0] - df.reset_index(drop=True, inplace=True) - ctx = DatasetContext.get_current() - if ctx.enable_tensor_extension_casting: - df = _cast_ndarray_columns_to_tensor_extension(df) return df + @staticmethod + def _concat_would_copy() -> bool: + return True + @staticmethod def _empty_table() -> "pandas.DataFrame": pandas = lazy_import_pandas() diff --git a/python/ray/data/_internal/simple_block.py b/python/ray/data/_internal/simple_block.py index 587f1efb47187..750c9d0360f73 100644 --- a/python/ray/data/_internal/simple_block.py +++ b/python/ray/data/_internal/simple_block.py @@ -46,12 +46,14 @@ def add_block(self, block: List[T]) -> None: f"{block}" ) self._items.extend(block) - for item in block: - self._size_estimator.add(item) + self._size_estimator.add_block(block) def num_rows(self) -> int: return len(self._items) + def will_build_yield_copy(self) -> bool: + return True + def build(self) -> Block: return list(self._items) diff --git a/python/ray/data/_internal/size_estimator.py b/python/ray/data/_internal/size_estimator.py index 7f5231c1cee5a..561c7b3ce224e 100644 --- a/python/ray/data/_internal/size_estimator.py +++ b/python/ray/data/_internal/size_estimator.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, List import ray from ray import cloudpickle @@ -27,6 +27,18 @@ def add(self, item: Any) -> None: elif self._count % 100 == 0: self._running_mean.add(self._real_size(item), weight=100) + def add_block(self, block: List[Any]) -> None: + self._count += len(block) + if self._count < 10: + for item in block[: 10 - self._count]: + self._running_mean.add(self._real_size(item), weight=1) + elif self._count < 100: + for item in block[self._count % 10 : 100 - self._count : 10]: + self._running_mean.add(self._real_size(item), weight=10) + elif (len(block) + (self._count % 100)) // 100 > 1: + for item in block[self._count % 100 :: 100]: + self._running_mean.add(self._real_size(item), weight=100) + def size_bytes(self) -> int: return int(self._running_mean.mean * self._count) diff --git a/python/ray/data/_internal/table_block.py b/python/ray/data/_internal/table_block.py index 6ecd3aa2d0ffd..1dce4e4e5e85d 100644 --- a/python/ray/data/_internal/table_block.py +++ b/python/ray/data/_internal/table_block.py @@ -29,6 +29,12 @@ def __init__(self, block_type): self._column_names = None # The set of compacted tables we have built so far. self._tables: List[Any] = [] + # Cursor into tables indicating up to which table we've accumulated table sizes. + # This is used to defer table size calculation, which can be expensive for e.g. + # Pandas DataFrames. + self._tables_size_cursor = 0 + # Accumulated table sizes, up to the table in _tables pointed to by + # _tables_size_cursor. self._tables_size_bytes = 0 # Size estimator for un-compacted table values. self._uncompacted_size = SizeEstimator() @@ -76,7 +82,6 @@ def add_block(self, block: Any) -> None: ) accessor = BlockAccessor.for_block(block) self._tables.append(block) - self._tables_size_bytes += accessor.size_bytes() self._num_rows += accessor.num_rows() @staticmethod @@ -91,6 +96,16 @@ def _concat_tables(tables: List[Block]) -> Block: def _empty_table() -> Any: raise NotImplementedError + @staticmethod + def _concat_would_copy() -> bool: + raise NotImplementedError + + def will_build_yield_copy(self) -> bool: + if self._columns: + # Building a table from a dict of list columns always creates a copy. + return True + return self._concat_would_copy() and len(self._tables) > 1 + def build(self) -> Block: if self._columns: tables = [self._table_from_pydict(self._columns)] @@ -108,6 +123,9 @@ def num_rows(self) -> int: def get_estimated_memory_usage(self) -> int: if self._num_rows == 0: return 0 + for table in self._tables[self._tables_size_cursor + 1 :]: + self._tables_size_bytes += BlockAccessor.for_block(table).size_bytes() + self._tables_size_cursor = len(self._tables) - 1 return self._tables_size_bytes + self._uncompacted_size.size_bytes() def _compact_if_needed(self) -> None: From 17e5a3e68e90a21515d8dfec145213373830ff94 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Wed, 22 Feb 2023 20:37:47 +0000 Subject: [PATCH 5/9] Revert undesired changes. --- python/ray/data/_internal/arrow_block.py | 2 +- python/ray/data/_internal/pandas_block.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/data/_internal/arrow_block.py b/python/ray/data/_internal/arrow_block.py index b199eb69cd106..1557eded3df9c 100644 --- a/python/ray/data/_internal/arrow_block.py +++ b/python/ray/data/_internal/arrow_block.py @@ -228,7 +228,7 @@ def schema(self) -> "pyarrow.lib.Schema": def to_pandas(self) -> "pandas.DataFrame": from ray.air.util.data_batch_conversion import _cast_tensor_columns_to_ndarrays - df = self._table.to_pandas(use_threads=False) + df = self._table.to_pandas() ctx = DatasetContext.get_current() if ctx.enable_tensor_extension_casting: df = _cast_tensor_columns_to_ndarrays(df) diff --git a/python/ray/data/_internal/pandas_block.py b/python/ray/data/_internal/pandas_block.py index f6b9b7df2211a..61844543d358d 100644 --- a/python/ray/data/_internal/pandas_block.py +++ b/python/ray/data/_internal/pandas_block.py @@ -107,11 +107,11 @@ def _concat_tables(tables: List["pandas.DataFrame"]) -> "pandas.DataFrame": if len(tables) > 1: df = pandas.concat(tables, ignore_index=True) df.reset_index(drop=True, inplace=True) - ctx = DatasetContext.get_current() - if ctx.enable_tensor_extension_casting: - df = _cast_ndarray_columns_to_tensor_extension(df) else: df = tables[0] + ctx = DatasetContext.get_current() + if ctx.enable_tensor_extension_casting: + df = _cast_ndarray_columns_to_tensor_extension(df) return df @staticmethod From ae3c3cd640c3d129067c10036b6a6eb094a529b0 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 23 Feb 2023 17:38:47 +0000 Subject: [PATCH 6/9] Fix size estimator. --- python/ray/data/_internal/size_estimator.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/python/ray/data/_internal/size_estimator.py b/python/ray/data/_internal/size_estimator.py index 561c7b3ce224e..75714cc50b8d6 100644 --- a/python/ray/data/_internal/size_estimator.py +++ b/python/ray/data/_internal/size_estimator.py @@ -28,16 +28,18 @@ def add(self, item: Any) -> None: self._running_mean.add(self._real_size(item), weight=100) def add_block(self, block: List[Any]) -> None: - self._count += len(block) if self._count < 10: - for item in block[: 10 - self._count]: - self._running_mean.add(self._real_size(item), weight=1) - elif self._count < 100: - for item in block[self._count % 10 : 100 - self._count : 10]: - self._running_mean.add(self._real_size(item), weight=10) - elif (len(block) + (self._count % 100)) // 100 > 1: - for item in block[self._count % 100 :: 100]: - self._running_mean.add(self._real_size(item), weight=100) + for i in range(min(10 - self._count, len(block))): + self._running_mean.add(self._real_size(block[i]), weight=1) + if self._count < 100: + for i in range( + 10 - (self._count % 10), min(100 - self._count, len(block)), 10 + ): + self._running_mean.add(self._real_size(block[i]), weight=10) + if (len(block) + (self._count % 100)) // 100 > 1: + for i in range(100 - (self._count % 100), len(block), 100): + self._running_mean.add(self._real_size(block[i]), weight=100) + self._count += len(block) def size_bytes(self) -> int: return int(self._running_mean.mean * self._count) From d907b4beef1fa79ae087321d356e2e8ead3f2c36 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 23 Feb 2023 17:38:49 +0000 Subject: [PATCH 7/9] Revert "Move DatasetContext setting to cached_remote_fn, make sure its set for actor pool actors." This reverts commit 8ea5f3069fcc8f753781469f6e7d9b6aeabe9342. --- .../execution/operators/actor_pool_map_operator.py | 6 +----- python/ray/data/_internal/lazy_block_list.py | 6 ++++++ python/ray/data/_internal/remote_fn.py | 11 +---------- python/ray/data/read_api.py | 6 +++++- 4 files changed, 13 insertions(+), 16 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index a7a91c183e3f2..cc2700890de7d 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -88,8 +88,7 @@ def start(self, options: ExecutionOptions): def _start_actor(self): """Start a new actor and add it to the actor pool as a pending actor.""" assert self._cls is not None - ctx = DatasetContext.get_current() - actor = self._cls.remote(ctx) + actor = self._cls.remote() self._actor_pool.add_pending_actor(actor, actor.get_location.remote()) def _add_bundled_input(self, bundle: RefBundle): @@ -280,9 +279,6 @@ def _apply_default_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str, Any class _MapWorker: """An actor worker for MapOperator.""" - def __init__(self, ctx: DatasetContext): - DatasetContext._set_current(ctx) - def get_location(self) -> NodeIdStr: return ray.get_runtime_context().get_node_id() diff --git a/python/ray/data/_internal/lazy_block_list.py b/python/ray/data/_internal/lazy_block_list.py index 551e3e3da7175..8f4d384579112 100644 --- a/python/ray/data/_internal/lazy_block_list.py +++ b/python/ray/data/_internal/lazy_block_list.py @@ -600,6 +600,7 @@ def _submit_task( .remote( i=task_idx, task=task, + context=DatasetContext.get_current(), stats_uuid=self._stats_uuid, stats_actor=stats_actor, ), @@ -612,6 +613,7 @@ def _submit_task( .remote( i=task_idx, task=task, + context=DatasetContext.get_current(), stats_uuid=self._stats_uuid, stats_actor=stats_actor, ) @@ -638,9 +640,11 @@ def _flatten_metadata( def _execute_read_task_nosplit( i: int, task: ReadTask, + context: DatasetContext, stats_uuid: str, stats_actor: ray.actor.ActorHandle, ) -> Tuple[Block, BlockMetadata]: + DatasetContext._set_current(context) stats = BlockExecStats.builder() # Execute the task. Expect only one block returned when dynamic block splitting is @@ -660,6 +664,7 @@ def _execute_read_task_nosplit( def _execute_read_task_split( i: int, task: ReadTask, + context: DatasetContext, stats_uuid: str, stats_actor: ray.actor.ActorHandle, ) -> Iterable[Union[Block, List[BlockMetadata]]]: @@ -669,6 +674,7 @@ def _execute_read_task_split( Example of return value for 3 blocks: (Block1, Block2, Block3, [BlockMetadata1, BlockMetadata2, BlockMetadata3]) """ + DatasetContext._set_current(context) # Execute the task. blocks = task() diff --git a/python/ray/data/_internal/remote_fn.py b/python/ray/data/_internal/remote_fn.py index d6c46ca689689..88528e04a34fc 100644 --- a/python/ray/data/_internal/remote_fn.py +++ b/python/ray/data/_internal/remote_fn.py @@ -1,4 +1,3 @@ -import functools from typing import Any import ray @@ -21,15 +20,7 @@ def cached_remote_fn(fn: Any, **ray_remote_args) -> Any: "retry_exceptions": True, "scheduling_strategy": ctx.scheduling_strategy, } - - @functools.wraps(fn) - def wrapper(*args, **kwargs): - # Wrapper that sets the DatasetContext that existed on the driver/task - # submitter. - DatasetContext._set_current(ctx) - return fn(*args, **kwargs) - CACHED_FUNCTIONS[fn] = ray.remote( **{**default_ray_remote_args, **ray_remote_args} - )(wrapper) + )(fn) return CACHED_FUNCTIONS[fn] diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 6c080ace55745..25add1d6247b9 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -292,7 +292,7 @@ def read_datasource( if force_local: requested_parallelism, min_safe_parallelism, read_tasks = _get_read_tasks( - datasource, cur_pg, parallelism, local_uri, read_args + datasource, ctx, cur_pg, parallelism, local_uri, read_args ) else: # Prepare read in a remote task so that in Ray client mode, we aren't @@ -304,6 +304,7 @@ def read_datasource( requested_parallelism, min_safe_parallelism, read_tasks = ray.get( get_read_tasks.remote( datasource, + ctx, cur_pg, parallelism, local_uri, @@ -1584,6 +1585,7 @@ def _get_metadata(table: Union["pyarrow.Table", "pandas.DataFrame"]) -> BlockMet def _get_read_tasks( ds: Datasource, + ctx: DatasetContext, cur_pg: Optional[PlacementGroup], parallelism: int, local_uri: bool, @@ -1593,6 +1595,7 @@ def _get_read_tasks( Args: ds: Datasource to read from. + ctx: Dataset config to use. cur_pg: The current placement group, if any. parallelism: The user-requested parallelism, or -1 for autodetection. kwargs: Additional kwargs to pass to the reader. @@ -1604,6 +1607,7 @@ def _get_read_tasks( kwargs = _unwrap_arrow_serialization_workaround(kwargs) if local_uri: kwargs["local_uri"] = local_uri + DatasetContext._set_current(ctx) reader = ds.create_reader(**kwargs) requested_parallelism, min_safe_parallelism = _autodetect_parallelism( parallelism, cur_pg, DatasetContext.get_current(), reader From e0b385417f66bdb8c2fb85660852d45e3754c4a3 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 23 Feb 2023 17:40:17 +0000 Subject: [PATCH 8/9] Make sure DatasetContext is set for ActorPoolMapOperator. --- .../execution/operators/actor_pool_map_operator.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index cc2700890de7d..a7a91c183e3f2 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -88,7 +88,8 @@ def start(self, options: ExecutionOptions): def _start_actor(self): """Start a new actor and add it to the actor pool as a pending actor.""" assert self._cls is not None - actor = self._cls.remote() + ctx = DatasetContext.get_current() + actor = self._cls.remote(ctx) self._actor_pool.add_pending_actor(actor, actor.get_location.remote()) def _add_bundled_input(self, bundle: RefBundle): @@ -279,6 +280,9 @@ def _apply_default_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str, Any class _MapWorker: """An actor worker for MapOperator.""" + def __init__(self, ctx: DatasetContext): + DatasetContext._set_current(ctx) + def get_location(self) -> NodeIdStr: return ray.get_runtime_context().get_node_id() From d63d4aa9096e36723c382941bbd2de691ba7aab6 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 23 Feb 2023 17:54:53 +0000 Subject: [PATCH 9/9] Fix off-by-one error for table cursor. --- python/ray/data/_internal/table_block.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/table_block.py b/python/ray/data/_internal/table_block.py index 1dce4e4e5e85d..d66a3f60d52d6 100644 --- a/python/ray/data/_internal/table_block.py +++ b/python/ray/data/_internal/table_block.py @@ -32,6 +32,8 @@ def __init__(self, block_type): # Cursor into tables indicating up to which table we've accumulated table sizes. # This is used to defer table size calculation, which can be expensive for e.g. # Pandas DataFrames. + # This cursor points to the first table for which we haven't accumulated a table + # size. self._tables_size_cursor = 0 # Accumulated table sizes, up to the table in _tables pointed to by # _tables_size_cursor. @@ -123,9 +125,9 @@ def num_rows(self) -> int: def get_estimated_memory_usage(self) -> int: if self._num_rows == 0: return 0 - for table in self._tables[self._tables_size_cursor + 1 :]: + for table in self._tables[self._tables_size_cursor :]: self._tables_size_bytes += BlockAccessor.for_block(table).size_bytes() - self._tables_size_cursor = len(self._tables) - 1 + self._tables_size_cursor = len(self._tables) return self._tables_size_bytes + self._uncompacted_size.size_bytes() def _compact_if_needed(self) -> None: