Skip to content

Commit

Permalink
[Arrow] Revisiting ChunkedArray combination protocol to avoid int32 o…
Browse files Browse the repository at this point in the history
…ffsets overflow (#48754)

## Why are these changes needed?

> NOTE: This is a follow-up for
#48487

This PR addresses challenges of handling batches larger than 2 GiB that
are known to overflow standard Arrow types relying on int32 offsets.

Changes
---

- Unified all `ChunkedArray` combination attempts to call into
`combine_chunked_array`
- Introduced `_try_combine_chunks_safe` properly handling the cases when
single PA `Array` would be exceeding 2 GiB threshold
 - Added tests
 - Minor clean ups

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
  • Loading branch information
alexeykudinkin authored Nov 21, 2024
1 parent 154915d commit 2dbd08a
Show file tree
Hide file tree
Showing 9 changed files with 473 additions and 146 deletions.
8 changes: 6 additions & 2 deletions python/ray/air/data_batch_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

if TYPE_CHECKING:
import numpy
import pandas
import pandas # noqa: F401
import pyarrow

DataBatchType = Union["numpy.ndarray", "pandas.DataFrame", Dict[str, "numpy.ndarray"]]
# TODO de-dup with ray.data.block.DataBatch
DataBatchType = Union[
"numpy.ndarray", "pyarrow.Table" "pandas.DataFrame", Dict[str, "numpy.ndarray"]
]
51 changes: 21 additions & 30 deletions python/ray/air/util/data_batch_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

from ray.air.constants import TENSOR_COLUMN_NAME
from ray.air.data_batch_type import DataBatchType
from ray.air.util.tensor_extensions.arrow import (
get_arrow_extension_fixed_shape_tensor_types,
)
from ray.util.annotations import Deprecated, DeveloperAPI

if TYPE_CHECKING:
Expand Down Expand Up @@ -220,37 +217,31 @@ def _convert_batch_type_to_numpy(
)
return data
elif pyarrow is not None and isinstance(data, pyarrow.Table):
from ray.air.util.transform_pyarrow import (
_concatenate_extension_column,
_is_column_extension_type,
from ray.air.util.tensor_extensions.arrow import (
get_arrow_extension_fixed_shape_tensor_types,
)
from ray.data._internal.arrow_ops import transform_pyarrow

if data.column_names == [TENSOR_COLUMN_NAME] and (
isinstance(
data.schema.types[0], get_arrow_extension_fixed_shape_tensor_types()
column_values_ndarrays = []

for col in data.columns:
# Combine columnar values arrays to make these contiguous
# (making them compatible with numpy format)
combined_array = transform_pyarrow.combine_chunked_array(col)

column_values_ndarrays.append(
transform_pyarrow.to_numpy(combined_array, zero_copy_only=False)
)

arrow_fixed_shape_tensor_types = get_arrow_extension_fixed_shape_tensor_types()

# NOTE: This branch is here for backwards-compatibility
if data.column_names == [TENSOR_COLUMN_NAME] and (
isinstance(data.schema.types[0], arrow_fixed_shape_tensor_types)
):
# If representing a tensor dataset, return as a single numpy array.
# Example: ray.data.from_numpy(np.arange(12).reshape((3, 2, 2)))
# Arrow’s incorrect concatenation of extension arrays:
# https://issues.apache.org/jira/browse/ARROW-16503
return _concatenate_extension_column(data[TENSOR_COLUMN_NAME]).to_numpy(
zero_copy_only=False
)
else:
output_dict = {}
for col_name in data.column_names:
col = data[col_name]
if col.num_chunks == 0:
col = pyarrow.array([], type=col.type)
elif _is_column_extension_type(col):
# Arrow’s incorrect concatenation of extension arrays:
# https://issues.apache.org/jira/browse/ARROW-16503
col = _concatenate_extension_column(col)
else:
col = col.combine_chunks()
output_dict[col_name] = col.to_numpy(zero_copy_only=False)
return output_dict
return column_values_ndarrays[0]

return dict(zip(data.column_names, column_values_ndarrays))
elif isinstance(data, pd.DataFrame):
return _convert_pandas_to_batch_type(data, BatchFormat.NUMPY)
else:
Expand Down
22 changes: 17 additions & 5 deletions python/ray/air/util/tensor_extensions/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@
# Minimum version of Arrow that supports subclassable ExtensionScalars.
# TODO(Clark): Remove conditional definition once we only support Arrow 9.0.0+.
MIN_PYARROW_VERSION_SCALAR_SUBCLASS = parse_version("9.0.0")
# Minimum version supporting `zero_copy_only` flag in `ChunkedArray.to_numpy`
MIN_PYARROW_VERSION_CHUNKED_ARRAY_TO_NUMPY_ZERO_COPY_ONLY = parse_version("13.0.0")

NUM_BYTES_PER_UNICODE_CHAR = 4

# NOTE: Overflow threshold in bytes for most Arrow types using int32 as
# its offsets
INT32_OVERFLOW_THRESHOLD = 2 * GiB

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -212,7 +218,7 @@ def _infer_pyarrow_type(column_values: np.ndarray) -> Optional[pa.DataType]:

inferred_pa_dtype = pa.infer_type(column_values)

def _len_gt_2gb(obj: Any) -> bool:
def _len_gt_overflow_threshold(obj: Any) -> bool:
# NOTE: This utility could be seeing objects other than strings or bytes in
# cases when column contains non-scalar non-homogeneous object types as
# column values, therefore making Arrow unable to infer corresponding
Expand All @@ -221,16 +227,16 @@ def _len_gt_2gb(obj: Any) -> bool:
#
# Check out test cases for this method for an additional context.
if isinstance(obj, (str, bytes)):
return len(obj) > 2 * GiB
return len(obj) > INT32_OVERFLOW_THRESHOLD

return False

if pa.types.is_binary(inferred_pa_dtype) and any(
[_len_gt_2gb(v) for v in column_values]
[_len_gt_overflow_threshold(v) for v in column_values]
):
return pa.large_binary()
elif pa.types.is_string(inferred_pa_dtype) and any(
[_len_gt_2gb(v) for v in column_values]
[_len_gt_overflow_threshold(v) for v in column_values]
):
return pa.large_string()

Expand Down Expand Up @@ -569,7 +575,13 @@ def from_numpy(
# Stack ndarrays and pass through to ndarray handling logic below.
try:
arr = np.stack(arr, axis=0)
except ValueError:
except ValueError as ve:
logger.warning(
f"Failed to stack lists due to: {ve}; "
f"falling back to using np.array(..., dtype=object)",
exc_info=ve,
)

# ndarray stacking may fail if the arrays are heterogeneously-shaped.
arr = np.array(arr, dtype=object)
if not isinstance(arr, np.ndarray):
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ py_test(

py_test(
name = "test_arrow_block",
size = "small",
size = "medium",
srcs = ["tests/test_arrow_block.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
Expand Down Expand Up @@ -163,7 +163,7 @@ py_test(

py_test(
name = "test_binary",
size = "medium",
size = "small",
srcs = ["tests/test_binary.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
Expand Down
39 changes: 15 additions & 24 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def _build_tensor_row(
def slice(self, start: int, end: int, copy: bool = False) -> "pyarrow.Table":
view = self._table.slice(start, end - start)
if copy:
view = _copy_table(view)
view = transform_pyarrow.combine_chunks(view)
return view

def random_shuffle(self, random_seed: Optional[int]) -> "pyarrow.Table":
Expand All @@ -245,11 +245,6 @@ def to_pandas(self) -> "pandas.DataFrame":
def to_numpy(
self, columns: Optional[Union[str, List[str]]] = None
) -> Union[np.ndarray, Dict[str, np.ndarray]]:
from ray.air.util.transform_pyarrow import (
_concatenate_extension_column,
_is_column_extension_type,
)

if columns is None:
columns = self._table.column_names
should_be_single_ndarray = False
Expand All @@ -267,23 +262,24 @@ def to_numpy(
f"{column_names_set}"
)

arrays = []
for column in columns:
array = self._table[column]
if _is_column_extension_type(array):
array = _concatenate_extension_column(array)
elif array.num_chunks == 0:
array = pyarrow.array([], type=array.type)
else:
array = array.combine_chunks()
arrays.append(array.to_numpy(zero_copy_only=False))
column_values_ndarrays = []

for col_name in columns:
col = self._table[col_name]

# Combine columnar values arrays to make these contiguous
# (making them compatible with numpy format)
combined_array = transform_pyarrow.combine_chunked_array(col)

column_values_ndarrays.append(
transform_pyarrow.to_numpy(combined_array, zero_copy_only=False)
)

if should_be_single_ndarray:
assert len(columns) == 1
arrays = arrays[0]
return column_values_ndarrays[0]
else:
arrays = dict(zip(columns, arrays))
return arrays
return dict(zip(columns, column_values_ndarrays))

def to_arrow(self) -> "pyarrow.Table":
return self._table
Expand Down Expand Up @@ -652,8 +648,3 @@ def gen():

def block_type(self) -> BlockType:
return BlockType.ARROW


def _copy_table(table: "pyarrow.Table") -> "pyarrow.Table":
"""Copy the provided Arrow table."""
return transform_pyarrow.combine_chunks(table)
Loading

0 comments on commit 2dbd08a

Please sign in to comment.