Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ray Data] fix problem: to_pandas failed on datasets returned by from_spark #32968

Merged
merged 10 commits into from
Mar 28, 2023
17 changes: 17 additions & 0 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,23 @@ def __init__(self):
raise ImportError("Run `pip install pyarrow` for Arrow support")
super().__init__(pyarrow.Table)

def add_block(self, block: Any) -> None:
if not isinstance(block, pyarrow.Table) and not isinstance(block, bytes):
raise TypeError(
f"Got a block of type {type(block)}, expected {self._block_type}."
"If you are mapping a function, ensure it returns an "
"object with the expected type. Block:\n"
f"{block}"
)
accessor = BlockAccessor.for_block(block)
if isinstance(block, bytes):
reader = pyarrow.ipc.open_stream(block)
self._tables.append(reader.read_all())
else:
self._tables.append(block)
self._tables_size_bytes += accessor.size_bytes()
self._num_rows += accessor.num_rows()
Copy link
Contributor

@clarkzinzow clarkzinzow Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kira-lin Why is this necessary? The existing TableBlockBuilder.add_block() logic should convert the bytes into an Arrow table: delegation, conversion

Is this an issue with the instance check in TableBlockBuilder.add_block()? If so, we can change the block type supplied by the ArrowBlockBuilder constructor to its parent's constructor to a (pyarrow.Table, bytes) tuple, since isinstance() checks work with a tuple of types, and TableBlockBuilder._block_type is only used for that instance check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see. I tried Union, didn't work. but this alone is not enough to solve the problem. Please see the update.


@staticmethod
def _table_from_pydict(columns: Dict[str, List[Any]]) -> Block:
for col_name, col in columns.items():
Expand Down
9 changes: 9 additions & 0 deletions python/ray/data/tests/test_raydp_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import ray
import raydp
import torch
import pandas


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -50,6 +51,14 @@ def test_raydp_to_torch_iter(spark):
assert torch.equal(data_features, features) and torch.equal(data_labels, labels)


def test_to_pandas(spark):
df = spark.range(100)
ds = ray.data.from_spark(df)
pdf = ds.to_pandas()
pdf2 = df.toPandas()
pandas.testing.assert_frame_equal(pdf, pdf2)


if __name__ == "__main__":
import sys

Expand Down