Skip to content

Commit

Permalink
[Data] Remove ordering assumption and unnecessary `override_num_block…
Browse files Browse the repository at this point in the history
…s` from `test_parquet_roundtrip` (#47550)

`test_parquet_roundtrip` specifies `override_num_blocks`, even though
the argument isn't required for the specific behavior under test (i.e.,
that you can write and then read back Parquet files). Also, the test
implicitly assumes that Ray Data reads files in a particular order.

To ensure we're testing behavior and not implementation, and to make the
test resilient to future code changes, this PR removes the
`override_num_blocks` and updates the code to eliminate ordering.

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
  • Loading branch information
bveeramani authored Sep 10, 2024
1 parent 27f505d commit feb21c9
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions python/ray/data/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,22 +1032,27 @@ def test_parquet_write_create_dir(
],
)
def test_parquet_roundtrip(ray_start_regular_shared, fs, data_path):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
ds = ray.data.from_pandas([df1, df2])
ds._set_uuid("data")
path = os.path.join(data_path, "test_parquet_dir")
if fs is None:
os.mkdir(path)
else:
fs.create_dir(_unwrap_protocol(path))

df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
ds = ray.data.from_pandas([df1, df2])
ds.write_parquet(path, filesystem=fs)
ds2 = ray.data.read_parquet(path, override_num_blocks=2, filesystem=fs)
ds2df = ds2.to_pandas()
assert pd.concat([df1, df2], ignore_index=True).equals(ds2df)

ds2 = ray.data.read_parquet(path, filesystem=fs)

read_data = set(ds2.to_pandas().itertuples(index=False))
written_data = set(pd.concat([df1, df2]).itertuples(index=False))
assert read_data == written_data

# Test metadata ops.
for block, meta in ds2._plan.execute().blocks:
BlockAccessor.for_block(ray.get(block)).size_bytes() == meta.size_bytes

if fs is None:
shutil.rmtree(path)
else:
Expand Down

0 comments on commit feb21c9

Please sign in to comment.