Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] [Operator Fusion - 2/N] Data layer performance/bug fixes and tweaks. #32744

Merged
79 changes: 48 additions & 31 deletions python/ray/air/util/data_batch_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ def _cast_ndarray_columns_to_tensor_extension(df: "pd.DataFrame") -> "pd.DataFra
Cast all NumPy ndarray columns in df to our tensor extension type, TensorArray.
"""
pd = _lazy_import_pandas()
try:
SettingWithCopyWarning = pd.core.common.SettingWithCopyWarning
except AttributeError:
# SettingWithCopyWarning was moved to pd.errors in Pandas 1.5.0.
SettingWithCopyWarning = pd.errors.SettingWithCopyWarning

from ray.air.util.tensor_extensions.pandas import (
TensorArray,
column_needs_tensor_extension,
Expand All @@ -246,42 +252,53 @@ def _cast_ndarray_columns_to_tensor_extension(df: "pd.DataFrame") -> "pd.DataFra
# TODO(Clark): Once Pandas supports registering extension types for type
# inference on construction, implement as much for NumPy ndarrays and remove
# this. See https://github.com/pandas-dev/pandas/issues/41848
with pd.option_context("chained_assignment", None):
for col_name, col in df.items():
if column_needs_tensor_extension(col):
try:
# Suppress Pandas warnings:
# https://github.com/ray-project/ray/issues/29270
# We actually want in-place operations so we surpress this warning.
# https://stackoverflow.com/a/74193599
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=FutureWarning)
df.loc[:, col_name] = TensorArray(col)
except Exception as e:
raise ValueError(
f"Tried to cast column {col_name} to the TensorArray tensor "
"extension type but the conversion failed. To disable "
"automatic casting to this tensor extension, set "
"ctx = DatasetContext.get_current(); "
"ctx.enable_tensor_extension_casting = False."
) from e
# TODO(Clark): Optimize this with propagated DataFrame metadata containing a list of
# column names containing tensor columns, to make this an O(# of tensor columns)
# check rather than the current O(# of columns) check.
for col_name, col in df.items():
if column_needs_tensor_extension(col):
try:
# Suppress Pandas warnings:
# https://github.com/ray-project/ray/issues/29270
# We actually want in-place operations so we surpress this warning.
# https://stackoverflow.com/a/74193599
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=FutureWarning)
warnings.simplefilter("ignore", category=SettingWithCopyWarning)
df.loc[:, col_name] = TensorArray(col)
except Exception as e:
raise ValueError(
f"Tried to cast column {col_name} to the TensorArray tensor "
"extension type but the conversion failed. To disable "
"automatic casting to this tensor extension, set "
"ctx = DatasetContext.get_current(); "
"ctx.enable_tensor_extension_casting = False."
) from e
return df


def _cast_tensor_columns_to_ndarrays(df: "pd.DataFrame") -> "pd.DataFrame":
"""Cast all tensor extension columns in df to NumPy ndarrays."""
pd = _lazy_import_pandas()
try:
SettingWithCopyWarning = pd.core.common.SettingWithCopyWarning
except AttributeError:
# SettingWithCopyWarning was moved to pd.errors in Pandas 1.5.0.
SettingWithCopyWarning = pd.errors.SettingWithCopyWarning
from ray.air.util.tensor_extensions.pandas import TensorDtype

with pd.option_context("chained_assignment", None):
# Try to convert any tensor extension columns to ndarray columns.
for col_name, col in df.items():
if isinstance(col.dtype, TensorDtype):
# Suppress Pandas warnings:
# https://github.com/ray-project/ray/issues/29270
# We actually want in-place operations so we surpress this warning.
# https://stackoverflow.com/a/74193599
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=FutureWarning)
df.loc[:, col_name] = pd.Series(list(col.to_numpy()))
return df
# Try to convert any tensor extension columns to ndarray columns.
# TODO(Clark): Optimize this with propagated DataFrame metadata containing a list of
# column names containing tensor columns, to make this an O(# of tensor columns)
# check rather than the current O(# of columns) check.
for col_name, col in df.items():
if isinstance(col.dtype, TensorDtype):
# Suppress Pandas warnings:
# https://github.com/ray-project/ray/issues/29270
# We actually want in-place operations so we surpress this warning.
# https://stackoverflow.com/a/74193599
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=FutureWarning)
warnings.simplefilter("ignore", category=SettingWithCopyWarning)
df.loc[:, col_name] = pd.Series(list(col.to_numpy()))
return df
4 changes: 4 additions & 0 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ def _table_from_pydict(columns: Dict[str, List[Any]]) -> Block:
def _concat_tables(tables: List[Block]) -> Block:
return transform_pyarrow.concat(tables)

@staticmethod
def _concat_would_copy() -> bool:
return False

@staticmethod
def _empty_table() -> "pyarrow.Table":
return pyarrow.Table.from_pydict({})
Expand Down
9 changes: 4 additions & 5 deletions python/ray/data/_internal/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ def next_batch(self) -> Block:
A batch represented as a Block.
"""
assert self.has_batch() or (self._done_adding and self.has_any())
needs_copy = self._ensure_copy
# If no batch size, short-circuit.
if self._batch_size is None:
assert len(self._buffer) == 1
block = self._buffer[0]
if self._ensure_copy:
if needs_copy:
# Copy block if needing to ensure fresh batch copy.
block = BlockAccessor.for_block(block)
block = block.slice(0, block.num_rows(), copy=True)
Expand Down Expand Up @@ -139,13 +140,11 @@ def next_batch(self) -> Block:
# blocks consumed on the next batch extraction.
self._buffer = leftover
self._buffer_size -= self._batch_size
needs_copy = needs_copy and not output.will_build_yield_copy()
batch = output.build()
if self._ensure_copy:
if needs_copy:
# Need to ensure that the batch is a fresh copy.
batch = BlockAccessor.for_block(batch)
# TOOD(Clark): This copy will often be unnecessary, e.g. for pandas
# DataFrame batches that have required concatenation to construct, which
# always requires a copy. We should elide this copy in those cases.
batch = batch.slice(0, batch.num_rows(), copy=True)
return batch

Expand Down
4 changes: 4 additions & 0 deletions python/ray/data/_internal/block_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ def add_block(self, block: Block) -> None:
"""Append an entire block to the block being built."""
raise NotImplementedError

def will_build_yield_copy(self) -> bool:
"""Whether building this block will yield a new block copy."""
raise NotImplementedError

def build(self) -> Block:
"""Build the block."""
raise NotImplementedError
Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/_internal/delegating_block_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ def add_block(self, block: Block):
self._builder = accessor.builder()
self._builder.add_block(block)

def will_build_yield_copy(self) -> bool:
if self._builder is None:
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe False by default?

Copy link
Contributor Author

@clarkzinzow clarkzinzow Feb 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl we'll technically create a new (empty) block in this case, which I think we should consider to be a "copy" in the sense that the returned block doesn't point to any old data buffers (this method is returning whether building will yield a new block, not whether building will copy data). The Batcher currently uses this method to determine whether we need to copy the built block in order to ensure that no old data buffers are still being referenced, so we can respect the zero_copy_batch=False, ensure_copy=True case.

return self._builder.will_build_yield_copy()

def build(self) -> Block:
if self._builder is None:
if self._empty_block is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ def start(self, options: ExecutionOptions):
def _start_actor(self):
"""Start a new actor and add it to the actor pool as a pending actor."""
assert self._cls is not None
actor = self._cls.remote()
ctx = DatasetContext.get_current()
actor = self._cls.remote(ctx)
self._actor_pool.add_pending_actor(actor, actor.get_location.remote())

def _add_bundled_input(self, bundle: RefBundle):
Expand Down Expand Up @@ -279,6 +280,9 @@ def _apply_default_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str, Any
class _MapWorker:
"""An actor worker for MapOperator."""

def __init__(self, ctx: DatasetContext):
DatasetContext._set_current(ctx)

def get_location(self) -> NodeIdStr:
return ray.get_runtime_context().get_node_id()

Expand Down
6 changes: 5 additions & 1 deletion python/ray/data/_internal/pandas_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,18 @@ def _concat_tables(tables: List["pandas.DataFrame"]) -> "pandas.DataFrame":

if len(tables) > 1:
df = pandas.concat(tables, ignore_index=True)
df.reset_index(drop=True, inplace=True)
else:
df = tables[0]
df.reset_index(drop=True, inplace=True)
ctx = DatasetContext.get_current()
if ctx.enable_tensor_extension_casting:
df = _cast_ndarray_columns_to_tensor_extension(df)
return df

@staticmethod
def _concat_would_copy() -> bool:
return True

@staticmethod
def _empty_table() -> "pandas.DataFrame":
pandas = lazy_import_pandas()
Expand Down
6 changes: 4 additions & 2 deletions python/ray/data/_internal/simple_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ def add_block(self, block: List[T]) -> None:
f"{block}"
)
self._items.extend(block)
for item in block:
self._size_estimator.add(item)
self._size_estimator.add_block(block)

def num_rows(self) -> int:
return len(self._items)

def will_build_yield_copy(self) -> bool:
return True

def build(self) -> Block:
return list(self._items)

Expand Down
16 changes: 15 additions & 1 deletion python/ray/data/_internal/size_estimator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, List

import ray
from ray import cloudpickle
Expand Down Expand Up @@ -27,6 +27,20 @@ def add(self, item: Any) -> None:
elif self._count % 100 == 0:
self._running_mean.add(self._real_size(item), weight=100)

def add_block(self, block: List[Any]) -> None:
if self._count < 10:
for i in range(min(10 - self._count, len(block))):
self._running_mean.add(self._real_size(block[i]), weight=1)
if self._count < 100:
for i in range(
10 - (self._count % 10), min(100 - self._count, len(block)), 10
):
self._running_mean.add(self._real_size(block[i]), weight=10)
if (len(block) + (self._count % 100)) // 100 > 1:
for i in range(100 - (self._count % 100), len(block), 100):
self._running_mean.add(self._real_size(block[i]), weight=100)
self._count += len(block)

def size_bytes(self) -> int:
return int(self._running_mean.mean * self._count)

Expand Down
22 changes: 21 additions & 1 deletion python/ray/data/_internal/table_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ def __init__(self, block_type):
self._column_names = None
# The set of compacted tables we have built so far.
self._tables: List[Any] = []
# Cursor into tables indicating up to which table we've accumulated table sizes.
# This is used to defer table size calculation, which can be expensive for e.g.
# Pandas DataFrames.
# This cursor points to the first table for which we haven't accumulated a table
# size.
self._tables_size_cursor = 0
# Accumulated table sizes, up to the table in _tables pointed to by
# _tables_size_cursor.
self._tables_size_bytes = 0
# Size estimator for un-compacted table values.
self._uncompacted_size = SizeEstimator()
Expand Down Expand Up @@ -76,7 +84,6 @@ def add_block(self, block: Any) -> None:
)
accessor = BlockAccessor.for_block(block)
self._tables.append(block)
self._tables_size_bytes += accessor.size_bytes()
self._num_rows += accessor.num_rows()

@staticmethod
Expand All @@ -91,6 +98,16 @@ def _concat_tables(tables: List[Block]) -> Block:
def _empty_table() -> Any:
raise NotImplementedError

@staticmethod
def _concat_would_copy() -> bool:
raise NotImplementedError

def will_build_yield_copy(self) -> bool:
if self._columns:
# Building a table from a dict of list columns always creates a copy.
return True
return self._concat_would_copy() and len(self._tables) > 1

def build(self) -> Block:
if self._columns:
tables = [self._table_from_pydict(self._columns)]
Expand All @@ -108,6 +125,9 @@ def num_rows(self) -> int:
def get_estimated_memory_usage(self) -> int:
if self._num_rows == 0:
return 0
for table in self._tables[self._tables_size_cursor :]:
self._tables_size_bytes += BlockAccessor.for_block(table).size_bytes()
self._tables_size_cursor = len(self._tables)
return self._tables_size_bytes + self._uncompacted_size.size_bytes()

def _compact_if_needed(self) -> None:
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3990,7 +3990,9 @@ def lazy(self) -> "Dataset[T]":
``.iter_batches()``, ``.to_torch()``, ``.to_tf()``, etc.) or execution is
manually triggered via ``.fully_executed()``.
"""
ds = Dataset(self._plan, self._epoch, lazy=True)
ds = Dataset(
self._plan, self._epoch, lazy=True, logical_plan=self._logical_plan
)
ds._set_uuid(self._get_uuid())
return ds

Expand Down