Skip to content

Commit

Permalink
[Data] Fix to_pandas error when multiple block types (#48583)
Browse files Browse the repository at this point in the history
Two issues:
1. When the execution plan caches the dataset schema, it might call
`unify_schema` to produce a single schema from all of the bundles'
schemas. The issue is that this function expects all of the input
schemas to be of Arrow type, but we only check if at least one schema is
of Arrow type before calling the function.
2. `to_pandas` iterates over blocks and adds them to
`DelegatingBlockBuilder`. The issue is that `DelegatingBlockBuilder`
expects all input blocks to be of the same type.

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
  • Loading branch information
bveeramani authored Nov 7, 2024
1 parent b172af5 commit 1bc18a1
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 11 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ def unify_block_metadata_schema(
except ImportError:
pa = None
# If the result contains PyArrow schemas, unify them
if pa is not None and any(isinstance(s, pa.Schema) for s in schemas_to_unify):
if pa is not None and all(isinstance(s, pa.Schema) for s in schemas_to_unify):
return unify_schemas(schemas_to_unify)
# Otherwise, if the resulting schemas are simple types (e.g. int),
# return the first schema.
Expand Down
19 changes: 10 additions & 9 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from ray.data._internal.datasource.sql_datasink import SQLDatasink
from ray.data._internal.datasource.tfrecords_datasink import TFRecordDatasink
from ray.data._internal.datasource.webdataset_datasink import WebDatasetDatasink
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.equalize import _equalize
from ray.data._internal.execution.interfaces import RefBundle
from ray.data._internal.execution.interfaces.ref_bundle import (
Expand Down Expand Up @@ -71,7 +70,7 @@
from ray.data._internal.logical.operators.one_to_one_operator import Limit
from ray.data._internal.logical.operators.write_operator import Write
from ray.data._internal.logical.optimizers import LogicalPlan
from ray.data._internal.pandas_block import PandasBlockSchema
from ray.data._internal.pandas_block import PandasBlockBuilder, PandasBlockSchema
from ray.data._internal.plan import ExecutionPlan
from ray.data._internal.planner.exchange.sort_task_spec import SortKey
from ray.data._internal.remote_fn import cached_remote_fn
Expand Down Expand Up @@ -4614,14 +4613,16 @@ def to_pandas(self, limit: int = None) -> "pandas.DataFrame":
f"{count} rows will fit in local memory, set "
"ds.to_pandas(limit=None) to disable limits."
)
bundles = self.iter_internal_ref_bundles()
output = DelegatingBlockBuilder()

for bundle in bundles:
for block_ref in bundle.block_refs:
output.add_block(ray.get(block_ref))
block = output.build()
return _block_to_df(block)
builder = PandasBlockBuilder()
for batch in self.iter_batches(batch_format="pandas", batch_size=None):
builder.add_block(batch)
block = builder.build()

# `PandasBlockBuilder` creates a dataframe with internal extension types like
# 'TensorDtype'. We use the `to_pandas` method to convert these extension
# types to regular types.
return BlockAccessor.for_block(block).to_pandas()

@ConsumptionAPI(pattern="Time complexity:")
@DeveloperAPI
Expand Down
14 changes: 14 additions & 0 deletions python/ray/data/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ def test_to_pandas(ray_start_regular_shared):
assert df.equals(dfds)


def test_to_pandas_different_block_types(ray_start_regular_shared):
# Test for https://github.com/ray-project/ray/issues/48575.
df = pd.DataFrame({"a": [0]})
ds1 = ray.data.from_pandas(df)

table = pa.Table.from_pandas(df)
ds2 = ray.data.from_arrow(table)

actual_df = ds1.union(ds2).to_pandas()

expected_df = pd.DataFrame({"a": [0, 0]})
pd.testing.assert_frame_equal(actual_df, expected_df)


def test_to_pandas_refs(ray_start_regular_shared):
n = 5
df = pd.DataFrame({"id": list(range(n))})
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,8 +1043,10 @@ def test_parquet_read_empty_file(ray_start_regular_shared, tmp_path):
path = os.path.join(tmp_path, "data.parquet")
table = pa.table({})
pq.write_table(table, path)

ds = ray.data.read_parquet(path)
pd.testing.assert_frame_equal(ds.to_pandas(), table.to_pandas())

assert ds.take_all() == []


def test_parquet_reader_batch_size(ray_start_regular_shared, tmp_path):
Expand Down

0 comments on commit 1bc18a1

Please sign in to comment.