From d013a20cfc5807df45ff731f05a03647a4b7b0c2 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Thu, 7 Nov 2024 13:53:02 -0800 Subject: [PATCH] [Data] Fix `to_pandas` error when multiple block types (#48583) Two issues: 1. When the execution plan caches the dataset schema, it might call `unify_schema` to produce a single schema from all of the bundles' schemas. The issue is that this function expects all of the input schemas to be of Arrow type, but we only check if at least one schema is of Arrow type before calling the function. 2. `to_pandas` iterates over blocks and adds them to `DelegatingBlockBuilder`. The issue is that `DelegatingBlockBuilder` expects all input blocks to be of the same type. --------- Signed-off-by: Balaji Veeramani Signed-off-by: mohitjain2504 --- python/ray/data/_internal/util.py | 2 +- python/ray/data/dataset.py | 19 ++++++++++--------- python/ray/data/tests/test_pandas.py | 14 ++++++++++++++ python/ray/data/tests/test_parquet.py | 4 +++- 4 files changed, 28 insertions(+), 11 deletions(-) diff --git a/python/ray/data/_internal/util.py b/python/ray/data/_internal/util.py index 6e7d5a00a94a..9696074fe66d 100644 --- a/python/ray/data/_internal/util.py +++ b/python/ray/data/_internal/util.py @@ -699,7 +699,7 @@ def unify_block_metadata_schema( except ImportError: pa = None # If the result contains PyArrow schemas, unify them - if pa is not None and any(isinstance(s, pa.Schema) for s in schemas_to_unify): + if pa is not None and all(isinstance(s, pa.Schema) for s in schemas_to_unify): return unify_schemas(schemas_to_unify) # Otherwise, if the resulting schemas are simple types (e.g. int), # return the first schema. diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 984dade4e73d..2fbb242b3395 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -42,7 +42,6 @@ from ray.data._internal.datasource.sql_datasink import SQLDatasink from ray.data._internal.datasource.tfrecords_datasink import TFRecordDatasink from ray.data._internal.datasource.webdataset_datasink import WebDatasetDatasink -from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder from ray.data._internal.equalize import _equalize from ray.data._internal.execution.interfaces import RefBundle from ray.data._internal.execution.interfaces.ref_bundle import ( @@ -71,7 +70,7 @@ from ray.data._internal.logical.operators.one_to_one_operator import Limit from ray.data._internal.logical.operators.write_operator import Write from ray.data._internal.logical.optimizers import LogicalPlan -from ray.data._internal.pandas_block import PandasBlockSchema +from ray.data._internal.pandas_block import PandasBlockBuilder, PandasBlockSchema from ray.data._internal.plan import ExecutionPlan from ray.data._internal.planner.exchange.sort_task_spec import SortKey from ray.data._internal.remote_fn import cached_remote_fn @@ -4614,14 +4613,16 @@ def to_pandas(self, limit: int = None) -> "pandas.DataFrame": f"{count} rows will fit in local memory, set " "ds.to_pandas(limit=None) to disable limits." ) - bundles = self.iter_internal_ref_bundles() - output = DelegatingBlockBuilder() - for bundle in bundles: - for block_ref in bundle.block_refs: - output.add_block(ray.get(block_ref)) - block = output.build() - return _block_to_df(block) + builder = PandasBlockBuilder() + for batch in self.iter_batches(batch_format="pandas", batch_size=None): + builder.add_block(batch) + block = builder.build() + + # `PandasBlockBuilder` creates a dataframe with internal extension types like + # 'TensorDtype'. We use the `to_pandas` method to convert these extension + # types to regular types. + return BlockAccessor.for_block(block).to_pandas() @ConsumptionAPI(pattern="Time complexity:") @DeveloperAPI diff --git a/python/ray/data/tests/test_pandas.py b/python/ray/data/tests/test_pandas.py index d115805e8a08..383d20f55851 100644 --- a/python/ray/data/tests/test_pandas.py +++ b/python/ray/data/tests/test_pandas.py @@ -120,6 +120,20 @@ def test_to_pandas(ray_start_regular_shared): assert df.equals(dfds) +def test_to_pandas_different_block_types(ray_start_regular_shared): + # Test for https://github.com/ray-project/ray/issues/48575. + df = pd.DataFrame({"a": [0]}) + ds1 = ray.data.from_pandas(df) + + table = pa.Table.from_pandas(df) + ds2 = ray.data.from_arrow(table) + + actual_df = ds1.union(ds2).to_pandas() + + expected_df = pd.DataFrame({"a": [0, 0]}) + pd.testing.assert_frame_equal(actual_df, expected_df) + + def test_to_pandas_refs(ray_start_regular_shared): n = 5 df = pd.DataFrame({"id": list(range(n))}) diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index 3a185fd59644..6ff18ed45d7d 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -1043,8 +1043,10 @@ def test_parquet_read_empty_file(ray_start_regular_shared, tmp_path): path = os.path.join(tmp_path, "data.parquet") table = pa.table({}) pq.write_table(table, path) + ds = ray.data.read_parquet(path) - pd.testing.assert_frame_equal(ds.to_pandas(), table.to_pandas()) + + assert ds.take_all() == [] def test_parquet_reader_batch_size(ray_start_regular_shared, tmp_path):