Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Fix to_pandas error when multiple block types #48583

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ def unify_block_metadata_schema(
except ImportError:
pa = None
# If the result contains PyArrow schemas, unify them
if pa is not None and any(isinstance(s, pa.Schema) for s in schemas_to_unify):
if pa is not None and all(isinstance(s, pa.Schema) for s in schemas_to_unify):
return unify_schemas(schemas_to_unify)
# Otherwise, if the resulting schemas are simple types (e.g. int),
# return the first schema.
Expand Down
19 changes: 10 additions & 9 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from ray.data._internal.datasource.sql_datasink import SQLDatasink
from ray.data._internal.datasource.tfrecords_datasink import TFRecordDatasink
from ray.data._internal.datasource.webdataset_datasink import WebDatasetDatasink
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.equalize import _equalize
from ray.data._internal.execution.interfaces import RefBundle
from ray.data._internal.execution.interfaces.ref_bundle import (
Expand Down Expand Up @@ -71,7 +70,7 @@
from ray.data._internal.logical.operators.one_to_one_operator import Limit
from ray.data._internal.logical.operators.write_operator import Write
from ray.data._internal.logical.optimizers import LogicalPlan
from ray.data._internal.pandas_block import PandasBlockSchema
from ray.data._internal.pandas_block import PandasBlockBuilder, PandasBlockSchema
from ray.data._internal.plan import ExecutionPlan
from ray.data._internal.planner.exchange.sort_task_spec import SortKey
from ray.data._internal.remote_fn import cached_remote_fn
Expand Down Expand Up @@ -4614,14 +4613,16 @@ def to_pandas(self, limit: int = None) -> "pandas.DataFrame":
f"{count} rows will fit in local memory, set "
"ds.to_pandas(limit=None) to disable limits."
)
bundles = self.iter_internal_ref_bundles()
output = DelegatingBlockBuilder()

for bundle in bundles:
for block_ref in bundle.block_refs:
output.add_block(ray.get(block_ref))
block = output.build()
return _block_to_df(block)
builder = PandasBlockBuilder()
for batch in self.iter_batches(batch_format="pandas", batch_size=None):
builder.add_block(batch)
block = builder.build()

# `PandasBlockBuilder` creates a dataframe with internal extension types like
# 'TensorDtype'. We use the `to_pandas` method to convert these extension
# types to regular types.
return BlockAccessor.for_block(block).to_pandas()
Comment on lines +4617 to +4625
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i'm understanding correctly, i think this new code now adds an extra conversion from Arrow to pandas batch. is there any way we can avoid it?

the old code:
Arrow internal -> Arrow Block -> build to big Arrow block -> convert to pd DF

the new code:
Arrow internal -> Pandas batch -> build to big Arrow block -> convert to pd DF

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no step 3 ("build to big Arrow block") in the new code, since we're using PandasBlockBuilder

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah you're right, i mistakenly thought it converts to arrow tables.


@ConsumptionAPI(pattern="Time complexity:")
@DeveloperAPI
Expand Down
14 changes: 14 additions & 0 deletions python/ray/data/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ def test_to_pandas(ray_start_regular_shared):
assert df.equals(dfds)


def test_to_pandas_different_block_types(ray_start_regular_shared):
# Test for https://github.com/ray-project/ray/issues/48575.
df = pd.DataFrame({"a": [0]})
ds1 = ray.data.from_pandas(df)

table = pa.Table.from_pandas(df)
ds2 = ray.data.from_arrow(table)

actual_df = ds1.union(ds2).to_pandas()

expected_df = pd.DataFrame({"a": [0, 0]})
pd.testing.assert_frame_equal(actual_df, expected_df)


def test_to_pandas_refs(ray_start_regular_shared):
n = 5
df = pd.DataFrame({"id": list(range(n))})
Expand Down
Loading