Skip to content

Commit

Permalink
[data] fix reading multiple parquet files with ragged ndarrays (ray-p…
Browse files Browse the repository at this point in the history
…roject#47961)

## Why are these changes needed?

PyArrow infers parquet schema only based on the first file. This will
cause errors when reading multiple files with ragged ndarrays.

This PR fixes this issue by not using the inferred schema for reading.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number
Fixes ray-project#47960

---------

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
  • Loading branch information
raulchen authored and ujjawal-khare committed Oct 15, 2024
1 parent e93f8c8 commit baf0174
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
8 changes: 8 additions & 0 deletions python/ray/data/_internal/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ def __init__(
# duplicating the partition data, we disable PyArrow's partitioning.
dataset_kwargs["partitioning"] = None

# `read_schema` is the schema object that will be used to perform
# read operations.
# It should be None, unless user has specified the schema or columns.
# We don't use the inferred schema for read, because the pyarrow only infers
# schema based on the first file. Thus, files with different schemas will end
# up producing blocks with wrong schema.
# See https://github.com/ray-project/ray/issues/47960 for more context.
read_schema = schema
pq_ds = get_parquet_dataset(paths, filesystem, dataset_kwargs)

if schema is None:
Expand Down
26 changes: 26 additions & 0 deletions python/ray/data/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,32 @@ def _assert_equal(rows, expected):
_assert_equal(ds.take_all(), expected_tuples)


def test_multiple_files_with_ragged_arrays(ray_start_regular_shared, tmp_path):
# Test reading multiple parquet files, each of which has different-shaped
# ndarrays in the same column.
# See https://github.com/ray-project/ray/issues/47960 for more context.
num_rows = 3
ds = ray.data.range(num_rows)

def map(row):
id = row["id"] + 1
row["data"] = np.zeros((id * 100, id * 100), dtype=np.int8)
return row

# Write 3 parquet files with different-shaped ndarray values in the
# "data" column.
ds.map(map).repartition(num_rows).write_parquet(tmp_path)

# Read these 3 files, check that the result is correct.
ds2 = ray.data.read_parquet(tmp_path, override_num_blocks=1)
res = ds2.take_all()
res = sorted(res, key=lambda row: row["id"])
assert len(res) == num_rows
for index, item in enumerate(res):
assert item["id"] == index
assert item["data"].shape == (100 * (index + 1), 100 * (index + 1))


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit baf0174

Please sign in to comment.